1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include <boost/scope_exit.hpp>
24 #include <boost/algorithm/string/predicate.hpp>
26 #include "json_spirit/json_spirit_reader.h"
27 #include "json_spirit/json_spirit_writer.h"
30 #include "common/version.h"
31 #include "common/blkdev.h"
32 #include "common/cmdparse.h"
33 #include "common/signal.h"
35 #include "osd/OSDMap.h"
37 #include "MonitorDBStore.h"
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MMonMap.h"
41 #include "messages/MMonGetMap.h"
42 #include "messages/MMonGetVersion.h"
43 #include "messages/MMonGetVersionReply.h"
44 #include "messages/MGenericMessage.h"
45 #include "messages/MMonCommand.h"
46 #include "messages/MMonCommandAck.h"
47 #include "messages/MMonSync.h"
48 #include "messages/MMonScrub.h"
49 #include "messages/MMonProbe.h"
50 #include "messages/MMonJoin.h"
51 #include "messages/MMonPaxos.h"
52 #include "messages/MRoute.h"
53 #include "messages/MForward.h"
55 #include "messages/MMonSubscribe.h"
56 #include "messages/MMonSubscribeAck.h"
58 #include "messages/MCommand.h"
59 #include "messages/MCommandReply.h"
61 #include "messages/MTimeCheck2.h"
62 #include "messages/MPing.h"
64 #include "common/strtol.h"
65 #include "common/ceph_argparse.h"
66 #include "common/Timer.h"
67 #include "common/Clock.h"
68 #include "common/errno.h"
69 #include "common/perf_counters.h"
70 #include "common/admin_socket.h"
71 #include "global/signal_handler.h"
72 #include "common/Formatter.h"
73 #include "include/stringify.h"
74 #include "include/color.h"
75 #include "include/ceph_fs.h"
76 #include "include/str_list.h"
78 #include "OSDMonitor.h"
79 #include "MDSMonitor.h"
80 #include "MonmapMonitor.h"
81 #include "LogMonitor.h"
82 #include "AuthMonitor.h"
83 #include "MgrMonitor.h"
84 #include "MgrStatMonitor.h"
85 #include "ConfigMonitor.h"
86 #include "KVMonitor.h"
87 #include "mon/HealthMonitor.h"
88 #include "common/config.h"
89 #include "common/cmdparse.h"
90 #include "include/ceph_assert.h"
91 #include "include/compat.h"
92 #include "perfglue/heap_profiler.h"
94 #include "auth/none/AuthNoneClientHandler.h"
96 #define dout_subsys ceph_subsys_mon
98 #define dout_prefix _prefix(_dout, this)
99 using namespace TOPNSPC::common
;
106 using std::make_pair
;
108 using std::ostringstream
;
113 using std::stringstream
;
114 using std::to_string
;
116 using std::unique_ptr
;
118 using ceph::bufferlist
;
121 using ceph::ErasureCodeInterfaceRef
;
122 using ceph::ErasureCodeProfile
;
123 using ceph::Formatter
;
124 using ceph::JSONFormatter
;
125 using ceph::make_message
;
126 using ceph::mono_clock
;
127 using ceph::mono_time
;
128 using ceph::timespan_str
;
131 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
132 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
133 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
136 const string
Monitor::MONITOR_NAME
= "monitor";
137 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
142 #undef COMMAND_WITH_FLAG
143 #define FLAG(f) (MonCommand::FLAG_##f)
144 #define COMMAND(parsesig, helptext, modulename, req_perms) \
145 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
146 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
147 {parsesig, helptext, modulename, req_perms, flags},
148 MonCommand mon_commands
[] = {
149 #include <mon/MonCommands.h>
152 #undef COMMAND_WITH_FLAG
154 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
155 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
161 con_self(m
? m
->get_loopback_connection() : NULL
),
163 finisher(cct_
, "mon_finisher", "fin"),
164 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads
),
165 has_ever_joined(false),
166 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
168 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
169 key_server(cct
, &keyring
),
170 auth_cluster_required(cct
,
171 cct
->_conf
->auth_supported
.empty() ?
172 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
173 auth_service_required(cct
,
174 cct
->_conf
->auth_supported
.empty() ?
175 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
176 mgr_messenger(mgr_m
),
177 mgr_client(cct_
, mgr_m
, monmap
),
178 gss_ktfile_client(cct
->_conf
.get_val
<std::string
>("gss_ktab_client_file")),
181 elector(this, map
->strategy
),
182 required_features(0),
184 quorum_con_features(0),
188 scrub_timeout_event(NULL
),
191 sync_provider_count(0),
194 sync_start_version(0),
195 sync_timeout_event(NULL
),
196 sync_last_committed_floor(0),
200 timecheck_rounds_since_clean(0),
201 timecheck_event(NULL
),
204 routed_request_tid(0),
205 op_tracker(cct
, g_conf().get_val
<bool>("mon_enable_op_tracker"), 1)
207 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
208 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
210 update_log_clients();
212 if (!gss_ktfile_client
.empty()) {
213 // Assert we can export environment variable
215 The default client keytab is used, if it is present and readable,
216 to automatically obtain initial credentials for GSSAPI client
217 applications. The principal name of the first entry in the client
218 keytab is used by default when obtaining initial credentials.
219 1. The KRB5_CLIENT_KTNAME environment variable.
220 2. The default_client_keytab_name profile variable in [libdefaults].
221 3. The hardcoded default, DEFCKTNAME.
223 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
224 gss_ktfile_client
.c_str(), 1));
225 ceph_assert(set_result
== 0);
228 op_tracker
.set_complaint_and_threshold(
229 g_conf().get_val
<std::chrono::seconds
>("mon_op_complaint_time").count(),
230 g_conf().get_val
<int64_t>("mon_op_log_threshold"));
231 op_tracker
.set_history_size_and_duration(
232 g_conf().get_val
<uint64_t>("mon_op_history_size"),
233 g_conf().get_val
<std::chrono::seconds
>("mon_op_history_duration").count());
234 op_tracker
.set_history_slow_op_size_and_threshold(
235 g_conf().get_val
<uint64_t>("mon_op_history_slow_op_size"),
236 g_conf().get_val
<std::chrono::seconds
>("mon_op_history_slow_op_threshold").count());
238 paxos
= std::make_unique
<Paxos
>(*this, "paxos");
240 paxos_service
[PAXOS_MDSMAP
].reset(new MDSMonitor(*this, *paxos
, "mdsmap"));
241 paxos_service
[PAXOS_MONMAP
].reset(new MonmapMonitor(*this, *paxos
, "monmap"));
242 paxos_service
[PAXOS_OSDMAP
].reset(new OSDMonitor(cct
, *this, *paxos
, "osdmap"));
243 paxos_service
[PAXOS_LOG
].reset(new LogMonitor(*this, *paxos
, "logm"));
244 paxos_service
[PAXOS_AUTH
].reset(new AuthMonitor(*this, *paxos
, "auth"));
245 paxos_service
[PAXOS_MGR
].reset(new MgrMonitor(*this, *paxos
, "mgr"));
246 paxos_service
[PAXOS_MGRSTAT
].reset(new MgrStatMonitor(*this, *paxos
, "mgrstat"));
247 paxos_service
[PAXOS_HEALTH
].reset(new HealthMonitor(*this, *paxos
, "health"));
248 paxos_service
[PAXOS_CONFIG
].reset(new ConfigMonitor(*this, *paxos
, "config"));
249 paxos_service
[PAXOS_KV
].reset(new KVMonitor(*this, *paxos
, "kv"));
251 bool r
= mon_caps
.parse("allow *", NULL
);
254 exited_quorum
= ceph_clock_now();
256 // prepare local commands
257 local_mon_commands
.resize(std::size(mon_commands
));
258 for (unsigned i
= 0; i
< std::size(mon_commands
); ++i
) {
259 local_mon_commands
[i
] = mon_commands
[i
];
261 MonCommand::encode_vector(local_mon_commands
, local_mon_commands_bl
);
263 prenautilus_local_mon_commands
= local_mon_commands
;
264 for (auto& i
: prenautilus_local_mon_commands
) {
265 std::string n
= cmddesc_get_prenautilus_compat(i
.cmdstring
);
266 if (n
!= i
.cmdstring
) {
267 dout(20) << " pre-nautilus cmd " << i
.cmdstring
<< " -> " << n
<< dendl
;
271 MonCommand::encode_vector(prenautilus_local_mon_commands
, prenautilus_local_mon_commands_bl
);
273 // assume our commands until we have an election. this only means
274 // we won't reply with EINVAL before the election; any command that
275 // actually matters will wait until we have quorum etc and then
276 // retry (and revalidate).
277 leader_mon_commands
= local_mon_commands
;
282 op_tracker
.on_shutdown();
285 ceph_assert(session_map
.sessions
.empty());
289 class AdminHook
: public AdminSocketHook
{
292 explicit AdminHook(Monitor
*m
) : mon(m
) {}
293 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
296 bufferlist
& out
) override
{
298 int r
= mon
->do_admin_command(command
, cmdmap
, f
, errss
, outss
);
304 int Monitor::do_admin_command(
305 std::string_view command
,
306 const cmdmap_t
& cmdmap
,
311 std::lock_guard
l(lock
);
315 for (auto p
= cmdmap
.begin();
316 p
!= cmdmap
.end(); ++p
) {
317 if (p
->first
== "prefix")
321 args
+= cmd_vartype_stringify(p
->second
);
323 args
= "[" + args
+ "]";
325 bool read_only
= (command
== "mon_status" ||
326 command
== "mon metadata" ||
327 command
== "quorum_status" ||
329 command
== "sessions");
331 (read_only
? audit_clog
->debug() : audit_clog
->info())
332 << "from='admin socket' entity='admin socket' "
333 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
335 if (command
== "mon_status") {
337 } else if (command
== "quorum_status") {
338 _quorum_status(f
, out
);
339 } else if (command
== "sync_force") {
340 bool validate
= false;
341 if (!cmd_getval(cmdmap
, "yes_i_really_mean_it", validate
)) {
343 if (cmd_getval(cmdmap
, "validate", v
) &&
344 v
== "--yes-i-really-mean-it") {
349 err
<< "are you SURE? this will mean the monitor store will be erased "
350 "the next time the monitor is restarted. pass "
351 "'--yes-i-really-mean-it' if you really do.";
356 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
357 command
.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
358 if (!_add_bootstrap_peer_hint(command
, cmdmap
, out
))
360 } else if (command
== "quorum enter") {
361 elector
.start_participating();
363 out
<< "started responding to quorum, initiated new election";
364 } else if (command
== "quorum exit") {
366 elector
.stop_participating();
367 out
<< "stopped responding to quorum, initiated new election";
368 } else if (command
== "ops") {
369 (void)op_tracker
.dump_ops_in_flight(f
);
370 } else if (command
== "sessions") {
371 f
->open_array_section("sessions");
372 for (auto p
: session_map
.sessions
) {
373 f
->dump_object("session", *p
);
376 } else if (command
== "dump_historic_ops") {
377 if (!op_tracker
.dump_historic_ops(f
)) {
378 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
379 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
381 } else if (command
== "dump_historic_ops_by_duration" ) {
382 if (op_tracker
.dump_historic_ops(f
, true)) {
383 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
384 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
386 } else if (command
== "dump_historic_slow_ops") {
387 if (op_tracker
.dump_historic_slow_ops(f
, {})) {
388 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
389 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
391 } else if (command
== "quorum") {
393 cmd_getval(cmdmap
, "quorumcmd", quorumcmd
);
394 if (quorumcmd
== "exit") {
396 elector
.stop_participating();
397 out
<< "stopped responding to quorum, initiated new election" << std::endl
;
398 } else if (quorumcmd
== "enter") {
399 elector
.start_participating();
401 out
<< "started responding to quorum, initiated new election" << std::endl
;
403 err
<< "needs a valid 'quorum' command" << std::endl
;
405 } else if (command
== "connection scores dump") {
406 if (!get_quorum_mon_features().contains_all(
407 ceph::features::mon::FEATURE_PINGING
)) {
408 err
<< "Not all monitors support changing election strategies; \
409 please upgrade them first!";
411 elector
.dump_connection_scores(f
);
412 } else if (command
== "connection scores reset") {
413 if (!get_quorum_mon_features().contains_all(
414 ceph::features::mon::FEATURE_PINGING
)) {
415 err
<< "Not all monitors support changing election strategies; \
416 please upgrade them first!";
418 elector
.notify_clear_peer_state();
419 } else if (command
== "smart") {
421 cmd_getval(cmdmap
, "devid", want_devid
);
423 string devname
= store
->get_devname();
424 if (devname
.empty()) {
425 err
<< "could not determine device name for " << store
->get_path();
429 set
<string
> devnames
;
430 get_raw_devices(devname
, &devnames
);
431 json_spirit::mObject json_map
;
432 uint64_t smart_timeout
= cct
->_conf
.get_val
<uint64_t>(
433 "mon_smart_report_timeout");
434 for (auto& devname
: devnames
) {
436 string devid
= get_device_id(devname
, &err
);
437 if (want_devid
.size() && want_devid
!= devid
) {
438 derr
<< "get_device_id failed on " << devname
<< ": " << err
<< dendl
;
441 json_spirit::mValue smart_json
;
442 if (block_device_get_metrics(devname
, smart_timeout
,
444 dout(10) << "block_device_get_metrics failed for /dev/" << devname
448 json_map
[devid
] = smart_json
;
450 json_spirit::write(json_map
, out
, json_spirit::pretty_print
);
451 } else if (command
== "heap") {
452 if (!ceph_using_tcmalloc()) {
453 err
<< "could not issue heap profiler command -- not using tcmalloc!";
458 if (!cmd_getval(cmdmap
, "heapcmd", cmd
)) {
459 err
<< "unable to get value for command \"" << cmd
<< "\"";
463 std::vector
<std::string
> cmd_vec
;
464 get_str_vec(cmd
, cmd_vec
);
466 if (cmd_getval(cmdmap
, "value", val
)) {
467 cmd_vec
.push_back(val
);
469 ceph_heap_profiler_handle_command(cmd_vec
, out
);
470 } else if (command
== "compact") {
471 dout(1) << "triggering manual compaction" << dendl
;
472 auto start
= ceph::coarse_mono_clock::now();
473 store
->compact_async();
474 auto end
= ceph::coarse_mono_clock::now();
475 auto duration
= ceph::to_seconds
<double>(end
- start
);
476 dout(1) << "finished manual compaction in "
477 << duration
<< " seconds" << dendl
;
478 out
<< "compacted " << g_conf().get_val
<std::string
>("mon_keyvaluedb")
479 << " in " << duration
<< " seconds";
481 ceph_abort_msg("bad AdminSocket command binding");
483 (read_only
? audit_clog
->debug() : audit_clog
->info())
484 << "from='admin socket' "
485 << "entity='admin socket' "
486 << "cmd=" << command
<< " "
487 << "args=" << args
<< ": finished";
491 (read_only
? audit_clog
->debug() : audit_clog
->info())
492 << "from='admin socket' "
493 << "entity='admin socket' "
494 << "cmd=" << command
<< " "
495 << "args=" << args
<< ": aborted";
499 void Monitor::handle_signal(int signum
)
501 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
502 if (signum
== SIGHUP
) {
503 sighup_handler(signum
);
504 logmon()->reopen_logs();
506 ceph_assert(signum
== SIGINT
|| signum
== SIGTERM
);
511 CompatSet
Monitor::get_initial_supported_features()
513 CompatSet::FeatureSet ceph_mon_feature_compat
;
514 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
515 CompatSet::FeatureSet ceph_mon_feature_incompat
;
516 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
517 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
518 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
519 ceph_mon_feature_incompat
);
522 CompatSet
Monitor::get_supported_features()
524 CompatSet compat
= get_initial_supported_features();
525 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
526 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
527 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
528 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
529 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
530 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
531 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC
);
532 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
);
533 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
);
534 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC
);
535 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_QUINCY
);
539 CompatSet
Monitor::get_legacy_features()
541 CompatSet::FeatureSet ceph_mon_feature_compat
;
542 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
543 CompatSet::FeatureSet ceph_mon_feature_incompat
;
544 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
545 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
546 ceph_mon_feature_incompat
);
549 int Monitor::check_features(MonitorDBStore
*store
)
551 CompatSet required
= get_supported_features();
554 read_features_off_disk(store
, &ondisk
);
556 if (!required
.writeable(ondisk
)) {
557 CompatSet diff
= required
.unsupported(ondisk
);
558 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
565 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
567 bufferlist featuresbl
;
568 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
569 if (featuresbl
.length() == 0) {
570 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
571 << "Assuming it is old-style and introducing one." << dendl
;
572 //we only want the baseline ~v.18 features assumed to be on disk.
573 //If new features are introduced this code needs to disappear or
575 *features
= get_legacy_features();
577 features
->encode(featuresbl
);
578 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
579 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
580 store
->apply_transaction(t
);
582 auto it
= featuresbl
.cbegin();
583 features
->decode(it
);
587 void Monitor::read_features()
589 read_features_off_disk(store
, &features
);
590 dout(10) << "features " << features
<< dendl
;
592 calc_quorum_requirements();
593 dout(10) << "required_features " << required_features
<< dendl
;
596 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
600 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
603 const char** Monitor::get_tracked_conf_keys() const
605 static const char* KEYS
[] = {
606 "crushtool", // helpful for testing
607 "mon_election_timeout",
609 "mon_lease_renew_interval_factor",
610 "mon_lease_ack_timeout_factor",
611 "mon_accept_timeout_factor",
615 "clog_to_syslog_facility",
616 "clog_to_syslog_level",
618 "clog_to_graylog_host",
619 "clog_to_graylog_port",
620 "mon_cluster_log_to_file",
623 // periodic health to clog
624 "mon_health_to_clog",
625 "mon_health_to_clog_interval",
626 "mon_health_to_clog_tick_interval",
628 "mon_scrub_interval",
629 "mon_allow_pool_delete",
630 // osdmap pruning - observed, not handled.
631 "mon_osdmap_full_prune_enabled",
632 "mon_osdmap_full_prune_min",
633 "mon_osdmap_full_prune_interval",
634 "mon_osdmap_full_prune_txsize",
635 // debug options - observed, not handled
636 "mon_debug_extra_checks",
637 "mon_debug_block_osdmap_trim",
643 void Monitor::handle_conf_change(const ConfigProxy
& conf
,
644 const std::set
<std::string
> &changed
)
648 dout(10) << __func__
<< " " << changed
<< dendl
;
650 if (changed
.count("clog_to_monitors") ||
651 changed
.count("clog_to_syslog") ||
652 changed
.count("clog_to_syslog_level") ||
653 changed
.count("clog_to_syslog_facility") ||
654 changed
.count("clog_to_graylog") ||
655 changed
.count("clog_to_graylog_host") ||
656 changed
.count("clog_to_graylog_port") ||
657 changed
.count("host") ||
658 changed
.count("fsid")) {
659 update_log_clients();
662 if (changed
.count("mon_health_to_clog") ||
663 changed
.count("mon_health_to_clog_interval") ||
664 changed
.count("mon_health_to_clog_tick_interval")) {
665 finisher
.queue(new C_MonContext
{this, [this, changed
](int) {
666 std::lock_guard l
{lock
};
667 health_to_clog_update_conf(changed
);
671 if (changed
.count("mon_scrub_interval")) {
672 auto scrub_interval
=
673 conf
.get_val
<std::chrono::seconds
>("mon_scrub_interval");
674 finisher
.queue(new C_MonContext
{this, [this, scrub_interval
](int) {
675 std::lock_guard l
{lock
};
676 scrub_update_interval(scrub_interval
);
681 void Monitor::update_log_clients()
683 clog
->parse_client_options(g_ceph_context
);
684 audit_clog
->parse_client_options(g_ceph_context
);
687 int Monitor::sanitize_options()
691 // mon_lease must be greater than mon_lease_renewal; otherwise we
692 // may incur in leases expiring before they are renewed.
693 if (g_conf()->mon_lease_renew_interval_factor
>= 1.0) {
694 clog
->error() << "mon_lease_renew_interval_factor ("
695 << g_conf()->mon_lease_renew_interval_factor
696 << ") must be less than 1.0";
700 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
701 // got time to renew the lease and get an ack for it. Having both options
702 // with the same value, for a given small vale, could mean timing out if
703 // the monitors happened to be overloaded -- or even under normal load for
704 // a small enough value.
705 if (g_conf()->mon_lease_ack_timeout_factor
<= 1.0) {
706 clog
->error() << "mon_lease_ack_timeout_factor ("
707 << g_conf()->mon_lease_ack_timeout_factor
708 << ") must be greater than 1.0";
715 int Monitor::preinit()
717 std::unique_lock
l(lock
);
719 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
721 int r
= sanitize_options();
723 derr
<< "option sanitization failed!" << dendl
;
727 ceph_assert(!logger
);
729 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
730 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess",
731 PerfCountersBuilder::PRIO_USEFUL
);
732 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions",
733 "sadd", PerfCountersBuilder::PRIO_INTERESTING
);
734 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions",
735 "srm", PerfCountersBuilder::PRIO_INTERESTING
);
736 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions",
737 "strm", PerfCountersBuilder::PRIO_USEFUL
);
738 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in",
739 "ecnt", PerfCountersBuilder::PRIO_USEFUL
);
740 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started",
741 "estt", PerfCountersBuilder::PRIO_INTERESTING
);
742 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won",
743 "ewon", PerfCountersBuilder::PRIO_INTERESTING
);
744 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost",
745 "elst", PerfCountersBuilder::PRIO_INTERESTING
);
746 logger
= pcb
.create_perf_counters();
747 cct
->get_perfcounters_collection()->add(logger
);
750 ceph_assert(!cluster_logger
);
752 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
753 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
754 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
755 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
756 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
757 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
758 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
759 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster", NULL
, 0, unit_t(UNIT_BYTES
));
760 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space", NULL
, 0, unit_t(UNIT_BYTES
));
761 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space", NULL
, 0, unit_t(UNIT_BYTES
));
762 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
763 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
764 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
765 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
766 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
767 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
768 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
769 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
770 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
771 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects", NULL
, 0, unit_t(UNIT_BYTES
));
772 cluster_logger
= pcb
.create_perf_counters();
775 paxos
->init_logger();
777 // verify cluster_uuid
779 int r
= check_fsid();
790 // have we ever joined a quorum?
791 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
792 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
794 if (!has_ever_joined
) {
795 // impose initial quorum restrictions?
796 list
<string
> initial_members
;
797 get_str_list(g_conf()->mon_initial_members
, initial_members
);
799 if (!initial_members
.empty()) {
800 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
802 monmap
->set_initial_members(
803 g_ceph_context
, initial_members
, name
, messenger
->get_myaddrs(),
806 dout(10) << " monmap is " << *monmap
<< dendl
;
807 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
809 } else if (!monmap
->contains(name
)) {
810 derr
<< "not in monmap and have been in a quorum before; "
811 << "must have been removed" << dendl
;
812 if (g_conf()->mon_force_quorum_join
) {
813 dout(0) << "we should have died but "
814 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
816 derr
<< "commit suicide!" << dendl
;
822 // We have a potentially inconsistent store state in hands. Get rid of it
824 bool clear_store
= false;
825 if (store
->exists("mon_sync", "in_sync")) {
826 dout(1) << __func__
<< " clean up potentially inconsistent store state"
831 if (store
->get("mon_sync", "force_sync") > 0) {
832 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
837 set
<string
> sync_prefixes
= get_sync_targets_names();
838 store
->clear(sync_prefixes
);
842 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
843 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
847 if (is_keyring_required()) {
848 // we need to bootstrap authentication keys so we can form an
850 if (authmon()->get_last_committed() == 0) {
851 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
853 int err
= store
->get("mkfs", "keyring", bl
);
854 if (err
== 0 && bl
.length() > 0) {
855 // Attempt to decode and extract keyring only if it is found.
857 auto p
= bl
.cbegin();
859 extract_save_mon_key(keyring
);
863 string keyring_loc
= g_conf()->mon_data
+ "/keyring";
865 r
= keyring
.load(cct
, keyring_loc
);
868 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
870 if (key_server
.get_auth(mon_name
, mon_key
)) {
871 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
872 keyring
.add(mon_name
, mon_key
);
874 keyring
.encode_plaintext(bl
);
875 write_default_keyring(bl
);
877 derr
<< "unable to load initial keyring " << g_conf()->keyring
<< dendl
;
883 admin_hook
= new AdminHook(this);
884 AdminSocket
* admin_socket
= cct
->get_admin_socket();
886 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
888 // register tell/asock commands
889 for (const auto& command
: local_mon_commands
) {
890 if (!command
.is_tell()) {
893 const auto prefix
= cmddesc_get_prefix(command
.cmdstring
);
894 if (prefix
== "injectargs" ||
895 prefix
== "version" ||
897 // not registerd by me
900 r
= admin_socket
->register_command(command
.cmdstring
, admin_hook
,
906 // add ourselves as a conf observer
907 g_conf().add_observer(this);
909 messenger
->set_auth_client(this);
910 messenger
->set_auth_server(this);
911 mgr_messenger
->set_auth_client(this);
913 auth_registry
.refresh_config();
920 dout(2) << "init" << dendl
;
921 std::lock_guard
l(lock
);
932 messenger
->add_dispatcher_tail(this);
934 // kickstart pet mgrclient
936 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
937 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
938 mgrmon()->prime_mgr_client();
940 // generate list of filestore OSDs
941 osdmon()->get_filestore_osd_list();
943 state
= STATE_PROBING
;
945 // add features of myself into feature_map
946 session_map
.feature_map
.add_mon(con_self
->get_features());
950 void Monitor::init_paxos()
952 dout(10) << __func__
<< dendl
;
956 for (auto& svc
: paxos_service
) {
960 refresh_from_paxos(NULL
);
963 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
965 dout(10) << __func__
<< dendl
;
968 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
971 auto p
= bl
.cbegin();
972 decode(fingerprint
, p
);
974 catch (ceph::buffer::error
& e
) {
975 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
978 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
981 for (auto& svc
: paxos_service
) {
982 svc
->refresh(need_bootstrap
);
984 for (auto& svc
: paxos_service
) {
990 void Monitor::register_cluster_logger()
992 if (!cluster_logger_registered
) {
993 dout(10) << "register_cluster_logger" << dendl
;
994 cluster_logger_registered
= true;
995 cct
->get_perfcounters_collection()->add(cluster_logger
);
997 dout(10) << "register_cluster_logger - already registered" << dendl
;
1001 void Monitor::unregister_cluster_logger()
1003 if (cluster_logger_registered
) {
1004 dout(10) << "unregister_cluster_logger" << dendl
;
1005 cluster_logger_registered
= false;
1006 cct
->get_perfcounters_collection()->remove(cluster_logger
);
1008 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
1012 void Monitor::update_logger()
1014 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
1015 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
1018 void Monitor::shutdown()
1020 dout(1) << "shutdown" << dendl
;
1024 wait_for_paxos_write();
1027 std::lock_guard
l(auth_lock
);
1028 authmon()->_set_mon_num_rank(0, 0);
1031 state
= STATE_SHUTDOWN
;
1034 g_conf().remove_observer(this);
1038 cct
->get_admin_socket()->unregister_commands(admin_hook
);
1045 mgr_client
.shutdown();
1048 finisher
.wait_for_empty();
1054 for (auto& svc
: paxos_service
) {
1058 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
1059 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
1065 remove_all_sessions();
1067 log_client
.shutdown();
1069 // unlock before msgr shutdown...
1072 // shutdown messenger before removing logger from perfcounter collection,
1073 // otherwise _ms_dispatch() will try to update deleted logger
1074 messenger
->shutdown();
1075 mgr_messenger
->shutdown();
1078 cct
->get_perfcounters_collection()->remove(logger
);
1080 if (cluster_logger
) {
1081 if (cluster_logger_registered
)
1082 cct
->get_perfcounters_collection()->remove(cluster_logger
);
1083 delete cluster_logger
;
1084 cluster_logger
= NULL
;
1088 void Monitor::wait_for_paxos_write()
1090 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
1091 dout(10) << __func__
<< " flushing pending write" << dendl
;
1095 dout(10) << __func__
<< " flushed pending write" << dendl
;
1099 void Monitor::respawn()
1101 // --- WARNING TO FUTURE COPY/PASTERS ---
1102 // You must also add a call like
1104 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1106 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1107 // instead of "(exe)", so that killall (and log rotation) will work.
1109 dout(0) << __func__
<< dendl
;
1111 char *new_argv
[orig_argc
+1];
1112 dout(1) << " e: '" << orig_argv
[0] << "'" << dendl
;
1113 for (int i
=0; i
<orig_argc
; i
++) {
1114 new_argv
[i
] = (char *)orig_argv
[i
];
1115 dout(1) << " " << i
<< ": '" << orig_argv
[i
] << "'" << dendl
;
1117 new_argv
[orig_argc
] = NULL
;
1119 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1120 * This allows us to exec the same executable even if it has since been
1123 char exe_path
[PATH_MAX
] = "";
1125 if (readlink(PROCPREFIX
"/proc/self/exe", exe_path
, PATH_MAX
-1) != -1) {
1126 dout(1) << "respawning with exe " << exe_path
<< dendl
;
1127 strcpy(exe_path
, PROCPREFIX
"/proc/self/exe");
1132 /* Print CWD for the user's interest */
1134 char *cwd
= getcwd(buf
, sizeof(buf
));
1136 dout(1) << " cwd " << cwd
<< dendl
;
1138 /* Fall back to a best-effort: just running in our CWD */
1139 strncpy(exe_path
, orig_argv
[0], PATH_MAX
-1);
1142 dout(1) << " exe_path " << exe_path
<< dendl
;
1144 unblock_all_signals(NULL
);
1145 execv(exe_path
, new_argv
);
1147 dout(0) << "respawn execv " << orig_argv
[0]
1148 << " failed with " << cpp_strerror(errno
) << dendl
;
1150 // We have to assert out here, because suicide() returns, and callers
1151 // to respawn expect it never to return.
1155 void Monitor::bootstrap()
1157 dout(10) << "bootstrap" << dendl
;
1158 wait_for_paxos_write();
1160 sync_reset_requester();
1161 unregister_cluster_logger();
1162 cancel_probe_timeout();
1164 if (monmap
->get_epoch() == 0) {
1165 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl
;
1166 monmap
->calc_legacy_ranks();
1168 dout(10) << "monmap " << *monmap
<< dendl
;
1170 auto from_release
= monmap
->min_mon_release
;
1172 if (!can_upgrade_from(from_release
, "min_mon_release", err
)) {
1173 derr
<< "current monmap has " << err
.str() << " stopping." << dendl
;
1178 int newrank
= monmap
->get_rank(messenger
->get_myaddrs());
1179 if (newrank
< 0 && rank
>= 0) {
1180 // was i ever part of the quorum?
1181 if (has_ever_joined
) {
1182 dout(0) << " removed from monmap, suicide." << dendl
;
1185 elector
.notify_clear_peer_state();
1188 monmap
->get_addrs(newrank
) != messenger
->get_myaddrs()) {
1189 dout(0) << " monmap addrs for rank " << newrank
<< " changed, i am "
1190 << messenger
->get_myaddrs()
1191 << ", monmap is " << monmap
->get_addrs(newrank
) << ", respawning"
1194 if (monmap
->get_epoch()) {
1195 // store this map in temp mon_sync location so that we use it on
1197 derr
<< " stashing newest monmap " << monmap
->get_epoch()
1198 << " for next startup" << dendl
;
1200 monmap
->encode(bl
, -1);
1201 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1202 t
->put("mon_sync", "temp_newer_monmap", bl
);
1203 store
->apply_transaction(t
);
1208 if (newrank
!= rank
) {
1209 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
1210 messenger
->set_myname(entity_name_t::MON(newrank
));
1212 elector
.notify_rank_changed(rank
);
1214 // reset all connections, or else our peers will think we are someone else.
1215 messenger
->mark_down_all();
1219 state
= STATE_PROBING
;
1224 if (g_conf()->mon_compact_on_bootstrap
) {
1225 dout(10) << "bootstrap -- triggering compaction" << dendl
;
1227 dout(10) << "bootstrap -- finished compaction" << dendl
;
1230 // stretch mode bits
1231 set_elector_disallowed_leaders(false);
1233 // singleton monitor?
1234 if (monmap
->size() == 1 && rank
== 0) {
1235 win_standalone_election();
1239 reset_probe_timeout();
1241 // i'm outside the quorum
1242 if (monmap
->contains(name
))
1243 outside_quorum
.insert(name
);
1246 dout(10) << "probing other monitors" << dendl
;
1247 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1250 new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
,
1254 for (auto& av
: extra_probe_peers
) {
1255 if (av
!= messenger
->get_myaddrs()) {
1256 messenger
->send_to_mon(
1257 new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
,
1264 bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd
,
1265 const cmdmap_t
& cmdmap
,
1268 if (is_leader() || is_peon()) {
1269 ss
<< "mon already active; ignoring bootstrap hint";
1273 entity_addrvec_t addrs
;
1275 if (cmd_getval(cmdmap
, "addr", addrstr
)) {
1276 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' addr '"
1277 << addrstr
<< "'" << dendl
;
1280 if (!addr
.parse(addrstr
, entity_addr_t::TYPE_ANY
)) {
1281 ss
<< "failed to parse addrs '" << addrstr
1282 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1286 addrs
.v
.push_back(addr
);
1287 if (addr
.get_port() == 0) {
1288 addrs
.v
[0].set_type(entity_addr_t::TYPE_MSGR2
);
1289 addrs
.v
[0].set_port(CEPH_MON_PORT_IANA
);
1290 addrs
.v
.push_back(addr
);
1291 addrs
.v
[1].set_type(entity_addr_t::TYPE_LEGACY
);
1292 addrs
.v
[1].set_port(CEPH_MON_PORT_LEGACY
);
1293 } else if (addr
.get_type() == entity_addr_t::TYPE_ANY
) {
1294 if (addr
.get_port() == CEPH_MON_PORT_LEGACY
) {
1295 addrs
.v
[0].set_type(entity_addr_t::TYPE_LEGACY
);
1297 addrs
.v
[0].set_type(entity_addr_t::TYPE_MSGR2
);
1300 } else if (cmd_getval(cmdmap
, "addrv", addrstr
)) {
1301 dout(10) << "_add_bootstrap_peer_hintv '" << cmd
<< "' addrv '"
1302 << addrstr
<< "'" << dendl
;
1303 const char *end
= 0;
1304 if (!addrs
.parse(addrstr
.c_str(), &end
)) {
1305 ss
<< "failed to parse addrs '" << addrstr
1306 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1310 ss
<< "no addr or addrv provided";
1314 extra_probe_peers
.insert(addrs
);
1315 ss
<< "adding peer " << addrs
<< " to list: " << extra_probe_peers
;
1319 // called by bootstrap(), or on leader|peon -> electing
1320 void Monitor::_reset()
1322 dout(10) << __func__
<< dendl
;
1324 // disable authentication
1326 std::lock_guard
l(auth_lock
);
1327 authmon()->_set_mon_num_rank(0, 0);
1330 cancel_probe_timeout();
1332 health_events_cleanup();
1333 health_check_log_times
.clear();
1334 scrub_event_cancel();
1336 leader_since
= utime_t();
1338 if (!quorum
.empty()) {
1339 exited_quorum
= ceph_clock_now();
1342 outside_quorum
.clear();
1343 quorum_feature_map
.clear();
1349 for (auto& svc
: paxos_service
) {
1355 // -----------------------------------------------------------
1358 set
<string
> Monitor::get_sync_targets_names()
1360 set
<string
> targets
;
1361 targets
.insert(paxos
->get_name());
1362 for (auto& svc
: paxos_service
) {
1363 svc
->get_store_prefixes(targets
);
1369 void Monitor::sync_timeout()
1371 dout(10) << __func__
<< dendl
;
1372 ceph_assert(state
== STATE_SYNCHRONIZING
);
1376 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1378 dout(1) << __func__
<< dendl
;
1380 MonMap latest_monmap
;
1382 // Grab latest monmap from MonmapMonitor
1383 bufferlist monmon_bl
;
1384 int err
= monmon()->get_monmap(monmon_bl
);
1386 if (err
!= -ENOENT
) {
1388 << " something wrong happened while reading the store: "
1389 << cpp_strerror(err
) << dendl
;
1390 ceph_abort_msg("error reading the store");
1393 latest_monmap
.decode(monmon_bl
);
1396 // Grab last backed up monmap (if any) and compare epochs
1397 if (store
->exists("mon_sync", "latest_monmap")) {
1398 bufferlist backup_bl
;
1399 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1402 << " something wrong happened while reading the store: "
1403 << cpp_strerror(err
) << dendl
;
1404 ceph_abort_msg("error reading the store");
1406 ceph_assert(backup_bl
.length() > 0);
1408 MonMap backup_monmap
;
1409 backup_monmap
.decode(backup_bl
);
1411 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1412 latest_monmap
= backup_monmap
;
1415 // Check if our current monmap's epoch is greater than the one we've
1417 if (monmap
->epoch
> latest_monmap
.epoch
)
1418 latest_monmap
= *monmap
;
1420 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1422 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1425 void Monitor::sync_reset_requester()
1427 dout(10) << __func__
<< dendl
;
1429 if (sync_timeout_event
) {
1430 timer
.cancel_event(sync_timeout_event
);
1431 sync_timeout_event
= NULL
;
1434 sync_provider
= entity_addrvec_t();
1437 sync_start_version
= 0;
1440 void Monitor::sync_reset_provider()
1442 dout(10) << __func__
<< dendl
;
1443 sync_providers
.clear();
1446 void Monitor::sync_start(entity_addrvec_t
&addrs
, bool full
)
1448 dout(10) << __func__
<< " " << addrs
<< (full
? " full" : " recent") << dendl
;
1450 ceph_assert(state
== STATE_PROBING
||
1451 state
== STATE_SYNCHRONIZING
);
1452 state
= STATE_SYNCHRONIZING
;
1454 // make sure are not a provider for anyone!
1455 sync_reset_provider();
1460 // stash key state, and mark that we are syncing
1461 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1462 sync_stash_critical_state(t
);
1463 t
->put("mon_sync", "in_sync", 1);
1465 sync_last_committed_floor
= std::max(sync_last_committed_floor
, paxos
->get_version());
1466 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1467 << sync_last_committed_floor
<< dendl
;
1468 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1470 store
->apply_transaction(t
);
1472 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 1);
1474 // clear the underlying store
1475 set
<string
> targets
= get_sync_targets_names();
1476 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1477 store
->clear(targets
);
1479 // make sure paxos knows it has been reset. this prevents a
1480 // bootstrap and then different probe reply order from possibly
1481 // deciding a partial or no sync is needed.
1484 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 2);
1487 // assume 'other' as the leader. We will update the leader once we receive
1488 // a reply to the sync start.
1489 sync_provider
= addrs
;
1491 sync_reset_timeout();
1493 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1495 m
->last_committed
= paxos
->get_version();
1496 messenger
->send_to_mon(m
, sync_provider
);
1499 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1501 dout(10) << __func__
<< dendl
;
1502 bufferlist backup_monmap
;
1503 sync_obtain_latest_monmap(backup_monmap
);
1504 ceph_assert(backup_monmap
.length() > 0);
1505 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1508 void Monitor::sync_reset_timeout()
1510 dout(10) << __func__
<< dendl
;
1511 if (sync_timeout_event
)
1512 timer
.cancel_event(sync_timeout_event
);
1513 sync_timeout_event
= timer
.add_event_after(
1514 g_conf()->mon_sync_timeout
,
1515 new C_MonContext
{this, [this](int) {
1520 void Monitor::sync_finish(version_t last_committed
)
1522 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1524 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 7);
1527 // finalize the paxos commits
1528 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1529 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1531 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1533 dout(30) << __func__
<< " final tx dump:\n";
1534 JSONFormatter
f(true);
1539 store
->apply_transaction(tx
);
1542 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 8);
1544 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1545 t
->erase("mon_sync", "in_sync");
1546 t
->erase("mon_sync", "force_sync");
1547 t
->erase("mon_sync", "last_committed_floor");
1548 store
->apply_transaction(t
);
1550 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 9);
1554 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 10);
1559 void Monitor::handle_sync(MonOpRequestRef op
)
1561 auto m
= op
->get_req
<MMonSync
>();
1562 dout(10) << __func__
<< " " << *m
<< dendl
;
1565 // provider ---------
1567 case MMonSync::OP_GET_COOKIE_FULL
:
1568 case MMonSync::OP_GET_COOKIE_RECENT
:
1569 handle_sync_get_cookie(op
);
1571 case MMonSync::OP_GET_CHUNK
:
1572 handle_sync_get_chunk(op
);
1575 // client -----------
1577 case MMonSync::OP_COOKIE
:
1578 handle_sync_cookie(op
);
1581 case MMonSync::OP_CHUNK
:
1582 case MMonSync::OP_LAST_CHUNK
:
1583 handle_sync_chunk(op
);
1585 case MMonSync::OP_NO_COOKIE
:
1586 handle_sync_no_cookie(op
);
1590 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1591 ceph_abort_msg("unknown op");
1597 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1599 auto m
= op
->get_req
<MMonSync
>();
1600 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1601 m
->get_connection()->send_message(reply
);
1604 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1606 auto m
= op
->get_req
<MMonSync
>();
1607 if (is_synchronizing()) {
1608 _sync_reply_no_cookie(op
);
1612 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 1);
1614 // make sure they can understand us.
1615 if ((required_features
^ m
->get_connection()->get_features()) &
1616 required_features
) {
1617 dout(5) << " ignoring peer mon." << m
->get_source().num()
1618 << " has features " << std::hex
1619 << m
->get_connection()->get_features()
1620 << " but we require " << required_features
<< std::dec
<< dendl
;
1624 // make up a unique cookie. include election epoch (which persists
1625 // across restarts for the whole cluster) and a counter for this
1626 // process instance. there is no need to be unique *across*
1627 // monitors, though.
1628 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1629 ceph_assert(sync_providers
.count(cookie
) == 0);
1631 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1633 SyncProvider
& sp
= sync_providers
[cookie
];
1635 sp
.addrs
= m
->get_source_addrs();
1636 sp
.reset_timeout(g_ceph_context
, g_conf()->mon_sync_timeout
* 2);
1638 set
<string
> sync_targets
;
1639 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1641 sync_targets
= get_sync_targets_names();
1642 sp
.last_committed
= paxos
->get_version();
1643 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1645 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1647 // just catch up paxos
1648 sp
.last_committed
= m
->last_committed
;
1650 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1652 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1653 reply
->last_committed
= sp
.last_committed
;
1654 m
->get_connection()->send_message(reply
);
1657 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1659 auto m
= op
->get_req
<MMonSync
>();
1660 dout(10) << __func__
<< " " << *m
<< dendl
;
1662 if (sync_providers
.count(m
->cookie
) == 0) {
1663 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1664 _sync_reply_no_cookie(op
);
1668 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 2);
1670 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1671 sp
.reset_timeout(g_ceph_context
, g_conf()->mon_sync_timeout
* 2);
1673 if (sp
.last_committed
< paxos
->get_first_committed() &&
1674 paxos
->get_first_committed() > 1) {
1675 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1676 << " < our fc " << paxos
->get_first_committed() << dendl
;
1677 sync_providers
.erase(m
->cookie
);
1678 _sync_reply_no_cookie(op
);
1682 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1683 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1685 int bytes_left
= g_conf()->mon_sync_max_payload_size
;
1686 int keys_left
= g_conf()->mon_sync_max_payload_keys
;
1687 while (sp
.last_committed
< paxos
->get_version() &&
1691 sp
.last_committed
++;
1693 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1694 ceph_assert(err
== 0);
1696 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1697 bytes_left
-= bl
.length();
1699 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1702 reply
->last_committed
= sp
.last_committed
;
1704 if (sp
.full
&& bytes_left
> 0 && keys_left
> 0) {
1705 sp
.synchronizer
->get_chunk_tx(tx
, bytes_left
, keys_left
);
1706 sp
.last_key
= sp
.synchronizer
->get_last_key();
1707 reply
->last_key
= sp
.last_key
;
1710 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1711 sp
.last_committed
< paxos
->get_version()) {
1712 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1713 << " key " << sp
.last_key
<< dendl
;
1715 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1716 << " key " << sp
.last_key
<< dendl
;
1717 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1719 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 3);
1721 // clean up our local state
1722 sync_providers
.erase(sp
.cookie
);
1725 encode(*tx
, reply
->chunk_bl
);
1727 m
->get_connection()->send_message(reply
);
1732 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1734 auto m
= op
->get_req
<MMonSync
>();
1735 dout(10) << __func__
<< " " << *m
<< dendl
;
1737 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1740 if (m
->get_source_addrs() != sync_provider
) {
1741 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1744 sync_cookie
= m
->cookie
;
1745 sync_start_version
= m
->last_committed
;
1747 sync_reset_timeout();
1748 sync_get_next_chunk();
1750 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 3);
1753 void Monitor::sync_get_next_chunk()
1755 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1756 if (g_conf()->mon_inject_sync_get_chunk_delay
> 0) {
1757 dout(20) << __func__
<< " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay
<< dendl
;
1758 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay
* 1000000.0));
1760 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1761 messenger
->send_to_mon(r
, sync_provider
);
1763 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 4);
1766 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1768 auto m
= op
->get_req
<MMonSync
>();
1769 dout(10) << __func__
<< " " << *m
<< dendl
;
1771 if (m
->cookie
!= sync_cookie
) {
1772 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1775 if (m
->get_source_addrs() != sync_provider
) {
1776 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1780 ceph_assert(state
== STATE_SYNCHRONIZING
);
1781 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 5);
1783 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1784 tx
->append_from_encoded(m
->chunk_bl
);
1786 dout(30) << __func__
<< " tx dump:\n";
1787 JSONFormatter
f(true);
1792 store
->apply_transaction(tx
);
1794 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 6);
1797 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1798 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1799 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1801 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1803 dout(30) << __func__
<< " tx dump:\n";
1804 JSONFormatter
f(true);
1809 store
->apply_transaction(tx
);
1810 paxos
->init(); // to refresh what we just wrote
1813 if (m
->op
== MMonSync::OP_CHUNK
) {
1814 sync_reset_timeout();
1815 sync_get_next_chunk();
1816 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1817 sync_finish(m
->last_committed
);
1821 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1823 dout(10) << __func__
<< dendl
;
1827 void Monitor::sync_trim_providers()
1829 dout(20) << __func__
<< dendl
;
1831 utime_t now
= ceph_clock_now();
1832 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1833 while (p
!= sync_providers
.end()) {
1834 if (now
> p
->second
.timeout
) {
1835 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
1836 << " for " << p
->second
.addrs
<< dendl
;
1837 sync_providers
.erase(p
++);
1844 // ---------------------------------------------------
1847 void Monitor::cancel_probe_timeout()
1849 if (probe_timeout_event
) {
1850 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1851 timer
.cancel_event(probe_timeout_event
);
1852 probe_timeout_event
= NULL
;
1854 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1858 void Monitor::reset_probe_timeout()
1860 cancel_probe_timeout();
1861 probe_timeout_event
= new C_MonContext
{this, [this](int r
) {
1864 double t
= g_conf()->mon_probe_timeout
;
1865 if (timer
.add_event_after(t
, probe_timeout_event
)) {
1866 dout(10) << "reset_probe_timeout " << probe_timeout_event
1867 << " after " << t
<< " seconds" << dendl
;
1869 probe_timeout_event
= nullptr;
1873 void Monitor::probe_timeout(int r
)
1875 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1876 ceph_assert(is_probing() || is_synchronizing());
1877 ceph_assert(probe_timeout_event
);
1878 probe_timeout_event
= NULL
;
1882 void Monitor::handle_probe(MonOpRequestRef op
)
1884 auto m
= op
->get_req
<MMonProbe
>();
1885 dout(10) << "handle_probe " << *m
<< dendl
;
1887 if (m
->fsid
!= monmap
->fsid
) {
1888 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1893 case MMonProbe::OP_PROBE
:
1894 handle_probe_probe(op
);
1897 case MMonProbe::OP_REPLY
:
1898 handle_probe_reply(op
);
1901 case MMonProbe::OP_MISSING_FEATURES
:
1902 derr
<< __func__
<< " require release " << (int)m
->mon_release
<< " > "
1903 << (int)ceph_release()
1904 << ", or missing features (have " << CEPH_FEATURES_ALL
1905 << ", required " << m
->required_features
1906 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
) << ")"
1912 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1914 auto m
= op
->get_req
<MMonProbe
>();
1916 dout(10) << "handle_probe_probe " << m
->get_source_inst() << " " << *m
1917 << " features " << m
->get_connection()->get_features() << dendl
;
1918 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1919 if ((m
->mon_release
!= ceph_release_t::unknown
&&
1920 m
->mon_release
< monmap
->min_mon_release
) ||
1922 dout(1) << " peer " << m
->get_source_addr()
1923 << " release " << m
->mon_release
1924 << " < min_mon_release " << monmap
->min_mon_release
1925 << ", or missing features " << missing
<< dendl
;
1926 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1927 name
, has_ever_joined
, monmap
->min_mon_release
);
1928 m
->required_features
= required_features
;
1929 m
->get_connection()->send_message(r
);
1933 if (!is_probing() && !is_synchronizing()) {
1934 // If the probing mon is way ahead of us, we need to re-bootstrap.
1935 // Normally we capture this case when we initially bootstrap, but
1936 // it is possible we pass those checks (we overlap with
1937 // quorum-to-be) but fail to join a quorum before it moves past
1938 // us. We need to be kicked back to bootstrap so we can
1939 // synchonize, not keep calling elections.
1940 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1941 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1942 << "ahead of us, re-bootstrapping" << dendl
;
1950 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
,
1955 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1956 r
->paxos_first_version
= paxos
->get_first_committed();
1957 r
->paxos_last_version
= paxos
->get_version();
1958 m
->get_connection()->send_message(r
);
1960 // did we discover a peer here?
1961 if (!monmap
->contains(m
->get_source_addr())) {
1962 dout(1) << " adding peer " << m
->get_source_addrs()
1963 << " to list of hints" << dendl
;
1964 extra_probe_peers
.insert(m
->get_source_addrs());
1966 elector
.begin_peer_ping(monmap
->get_rank(m
->get_source_addr()));
1973 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1975 auto m
= op
->get_req
<MMonProbe
>();
1976 dout(10) << "handle_probe_reply " << m
->get_source_inst()
1977 << " " << *m
<< dendl
;
1978 dout(10) << " monmap is " << *monmap
<< dendl
;
1980 // discover name and addrs during probing or electing states.
1981 if (!is_probing() && !is_electing()) {
1985 // newer map, or they've joined a quorum and we haven't?
1987 monmap
->encode(mybl
, m
->get_connection()->get_features());
1988 // make sure it's actually different; the checks below err toward
1989 // taking the other guy's map, which could cause us to loop.
1990 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1991 MonMap
*newmap
= new MonMap
;
1992 newmap
->decode(m
->monmap_bl
);
1993 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1994 !has_ever_joined
)) {
1995 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1996 << ", mine was " << monmap
->get_epoch() << dendl
;
1998 monmap
->decode(m
->monmap_bl
);
1999 notify_new_monmap(false);
2008 string peer_name
= monmap
->get_name(m
->get_source_addr());
2009 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
2010 dout(10) << " renaming peer " << m
->get_source_addr() << " "
2011 << peer_name
<< " -> " << m
->name
<< " in my monmap"
2013 monmap
->rename(peer_name
, m
->name
);
2015 if (is_electing()) {
2019 } else if (peer_name
.size()) {
2020 dout(10) << " peer name is " << peer_name
<< dendl
;
2022 dout(10) << " peer " << m
->get_source_addr() << " not in map" << dendl
;
2025 // new initial peer?
2026 if (monmap
->get_epoch() == 0 &&
2027 monmap
->contains(m
->name
) &&
2028 monmap
->get_addrs(m
->name
).front().is_blank_ip()) {
2029 dout(1) << " learned initial mon " << m
->name
2030 << " addrs " << m
->get_source_addrs() << dendl
;
2031 monmap
->set_addrvec(m
->name
, m
->get_source_addrs());
2037 // end discover phase
2038 if (!is_probing()) {
2042 ceph_assert(paxos
!= NULL
);
2044 if (is_synchronizing()) {
2045 dout(10) << " currently syncing" << dendl
;
2049 entity_addrvec_t other
= m
->get_source_addrs();
2051 if (m
->paxos_last_version
< sync_last_committed_floor
) {
2052 dout(10) << " peer paxos versions [" << m
->paxos_first_version
2053 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
2054 << sync_last_committed_floor
<< ", ignoring"
2057 if (paxos
->get_version() < m
->paxos_first_version
&&
2058 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
2059 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
2060 << "," << m
->paxos_last_version
<< "]"
2061 << " vs my version " << paxos
->get_version()
2062 << " (too far ahead)"
2064 cancel_probe_timeout();
2065 sync_start(other
, true);
2068 if (paxos
->get_version() + g_conf()->paxos_max_join_drift
< m
->paxos_last_version
) {
2069 dout(10) << " peer paxos last version " << m
->paxos_last_version
2070 << " vs my version " << paxos
->get_version()
2071 << " (too far ahead)"
2073 cancel_probe_timeout();
2074 sync_start(other
, false);
2079 // did the existing cluster complete upgrade to luminous?
2080 if (osdmon()->osdmap
.get_epoch()) {
2081 if (osdmon()->osdmap
.require_osd_release
< ceph_release_t::luminous
) {
2082 derr
<< __func__
<< " existing cluster has not completed upgrade to"
2083 << " luminous; 'ceph osd require_osd_release luminous' before"
2084 << " upgrading" << dendl
;
2087 if (!osdmon()->osdmap
.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS
) ||
2088 !osdmon()->osdmap
.test_flag(CEPH_OSDMAP_RECOVERY_DELETES
)) {
2089 derr
<< __func__
<< " existing cluster has not completed a full luminous"
2090 << " scrub to purge legacy snapdir objects; please scrub before"
2091 << " upgrading beyond luminous." << dendl
;
2096 // is there an existing quorum?
2097 if (m
->quorum
.size()) {
2098 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
2100 dout(10) << " peer paxos version " << m
->paxos_last_version
2101 << " vs my version " << paxos
->get_version()
2104 bool in_map
= false;
2105 const auto my_info
= monmap
->mon_info
.find(name
);
2106 const map
<string
,string
> *map_crush_loc
{nullptr};
2107 if (my_info
!= monmap
->mon_info
.end()) {
2109 map_crush_loc
= &my_info
->second
.crush_loc
;
2112 !monmap
->get_addrs(name
).front().is_blank_ip() &&
2113 (!need_set_crush_loc
|| (*map_crush_loc
== crush_loc
))) {
2114 // i'm part of the cluster; just initiate a new election
2117 dout(10) << " ready to join, but i'm not in the monmap/"
2118 "my addr is blank/location is wrong, trying to join" << dendl
;
2119 send_mon_message(new MMonJoin(monmap
->fsid
, name
,
2120 messenger
->get_myaddrs(), crush_loc
,
2121 need_set_crush_loc
),
2125 if (monmap
->contains(m
->name
)) {
2126 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
2127 outside_quorum
.insert(m
->name
);
2129 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
2133 unsigned need
= monmap
->min_quorum_size();
2134 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
2135 if (outside_quorum
.size() >= need
) {
2136 if (outside_quorum
.count(name
)) {
2137 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
2140 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
2143 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
2148 void Monitor::join_election()
2150 dout(10) << __func__
<< dendl
;
2151 wait_for_paxos_write();
2153 state
= STATE_ELECTING
;
2155 logger
->inc(l_mon_num_elections
);
2158 void Monitor::start_election()
2160 dout(10) << "start_election" << dendl
;
2161 wait_for_paxos_write();
2163 state
= STATE_ELECTING
;
2165 logger
->inc(l_mon_num_elections
);
2166 logger
->inc(l_mon_election_call
);
2168 clog
->info() << "mon." << name
<< " calling monitor election";
2169 elector
.call_election();
2172 void Monitor::win_standalone_election()
2174 dout(1) << "win_standalone_election" << dendl
;
2176 // bump election epoch, in case the previous epoch included other
2177 // monitors; we need to be able to make the distinction.
2178 elector
.declare_standalone_victory();
2180 rank
= monmap
->get_rank(name
);
2181 ceph_assert(rank
== 0);
2185 map
<int,Metadata
> metadata
;
2186 collect_metadata(&metadata
[0]);
2188 win_election(elector
.get_epoch(), q
,
2190 ceph::features::mon::get_supported(),
2195 const utime_t
& Monitor::get_leader_since() const
2197 ceph_assert(state
== STATE_LEADER
);
2198 return leader_since
;
2201 epoch_t
Monitor::get_epoch()
2203 return elector
.get_epoch();
2206 void Monitor::_finish_svc_election()
2208 ceph_assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
2210 for (auto& svc
: paxos_service
) {
2211 // we already called election_finished() on monmon(); avoid callig twice
2212 if (state
== STATE_LEADER
&& svc
.get() == monmon())
2214 svc
->election_finished();
2218 void Monitor::win_election(epoch_t epoch
, const set
<int>& active
, uint64_t features
,
2219 const mon_feature_t
& mon_features
,
2220 ceph_release_t min_mon_release
,
2221 const map
<int,Metadata
>& metadata
)
2223 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
2224 << " features " << features
2225 << " mon_features " << mon_features
2226 << " min_mon_release " << min_mon_release
2228 ceph_assert(is_electing());
2229 state
= STATE_LEADER
;
2230 leader_since
= ceph_clock_now();
2231 quorum_since
= mono_clock::now();
2234 quorum_con_features
= features
;
2235 quorum_mon_features
= mon_features
;
2236 quorum_min_mon_release
= min_mon_release
;
2237 pending_metadata
= metadata
;
2238 outside_quorum
.clear();
2240 clog
->info() << "mon." << name
<< " is new leader, mons " << get_quorum_names()
2241 << " in quorum (ranks " << quorum
<< ")";
2243 set_leader_commands(get_local_commands(mon_features
));
2245 paxos
->leader_init();
2246 // NOTE: tell monmap monitor first. This is important for the
2247 // bootstrap case to ensure that the very first paxos proposal
2248 // codifies the monmap. Otherwise any manner of chaos can ensue
2249 // when monitors are call elections or participating in a paxos
2250 // round without agreeing on who the participants are.
2251 monmon()->election_finished();
2252 _finish_svc_election();
2254 logger
->inc(l_mon_election_win
);
2256 // inject new metadata in first transaction.
2258 // include previous metadata for missing mons (that aren't part of
2259 // the current quorum).
2260 map
<int,Metadata
> m
= metadata
;
2261 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
2262 if (m
.count(rank
) == 0 &&
2263 mon_metadata
.count(rank
)) {
2264 m
[rank
] = mon_metadata
[rank
];
2268 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2269 // a new transaction immediately after the election finishes. We should
2270 // do that anyway for other reasons, though.
2271 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
2274 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
2278 if (monmap
->size() > 1 &&
2279 monmap
->get_epoch() > 0) {
2281 health_tick_start();
2283 // Freshen the health status before doing health_to_clog in case
2284 // our just-completed election changed the health
2285 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r
){
2286 dout(20) << "healthmon now active" << dendl
;
2287 healthmon()->tick();
2288 if (healthmon()->is_proposing()) {
2289 dout(20) << __func__
<< " healthmon proposing, waiting" << dendl
;
2290 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext
{this,
2292 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
2293 do_health_to_clog_interval();
2297 do_health_to_clog_interval();
2301 scrub_event_start();
2305 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
2307 const mon_feature_t
& mon_features
,
2308 ceph_release_t min_mon_release
)
2311 leader_since
= utime_t();
2312 quorum_since
= mono_clock::now();
2315 outside_quorum
.clear();
2316 quorum_con_features
= features
;
2317 quorum_mon_features
= mon_features
;
2318 quorum_min_mon_release
= min_mon_release
;
2319 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
2320 << " quorum is " << quorum
<< " features are " << quorum_con_features
2321 << " mon_features are " << quorum_mon_features
2322 << " min_mon_release " << min_mon_release
2326 _finish_svc_election();
2328 logger
->inc(l_mon_election_lose
);
2334 std::string
collect_compression_algorithms()
2337 bool printed
= false;
2338 for (auto [name
, key
] : Compressor::compression_algorithms
) {
2351 void Monitor::collect_metadata(Metadata
*m
)
2353 collect_sys_info(m
, g_ceph_context
);
2354 (*m
)["addrs"] = stringify(messenger
->get_myaddrs());
2355 (*m
)["compression_algorithms"] = collect_compression_algorithms();
2357 // infer storage device
2358 string devname
= store
->get_devname();
2359 set
<string
> devnames
;
2360 get_raw_devices(devname
, &devnames
);
2361 map
<string
,string
> errs
;
2362 get_device_metadata(devnames
, m
, &errs
);
2363 for (auto& i
: errs
) {
2364 dout(1) << __func__
<< " " << i
.first
<< ": " << i
.second
<< dendl
;
2368 void Monitor::finish_election()
2370 apply_quorum_to_compatset_features();
2371 apply_monmap_to_compatset_features();
2373 exited_quorum
= utime_t();
2374 finish_contexts(g_ceph_context
, waitfor_quorum
);
2375 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2376 resend_routed_requests();
2378 register_cluster_logger();
2380 // enable authentication
2382 std::lock_guard
l(auth_lock
);
2383 authmon()->_set_mon_num_rank(monmap
->size(), rank
);
2386 // am i named and located properly?
2387 string cur_name
= monmap
->get_name(messenger
->get_myaddrs());
2388 const auto my_infop
= monmap
->mon_info
.find(cur_name
);
2389 const map
<string
,string
>& map_crush_loc
= my_infop
->second
.crush_loc
;
2391 if (cur_name
!= name
||
2392 (need_set_crush_loc
&& map_crush_loc
!= crush_loc
)) {
2393 dout(10) << " renaming/moving myself from " << cur_name
<< "/"
2394 << map_crush_loc
<<" -> " << name
<< "/" << crush_loc
<< dendl
;
2395 send_mon_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddrs(),
2396 crush_loc
, need_set_crush_loc
),
2400 do_stretch_mode_election_work();
2403 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2405 if (new_features
.compare(features
) != 0) {
2406 CompatSet diff
= features
.unsupported(new_features
);
2407 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2408 features
= new_features
;
2410 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2412 store
->apply_transaction(t
);
2414 calc_quorum_requirements();
2418 void Monitor::apply_quorum_to_compatset_features()
2420 CompatSet
new_features(features
);
2421 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2422 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2423 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2425 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2426 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2427 dout(5) << __func__
<< dendl
;
2428 _apply_compatset_features(new_features
);
2431 void Monitor::apply_monmap_to_compatset_features()
2433 CompatSet
new_features(features
);
2434 mon_feature_t monmap_features
= monmap
->get_required_features();
2436 /* persistent monmap features may go into the compatset.
2437 * optional monmap features may not - why?
2438 * because optional monmap features may be set/unset by the admin,
2439 * and possibly by other means that haven't yet been thought out,
2440 * so we can't make the monitor enforce them on start - because they
2442 * this, of course, does not invalidate setting a compatset feature
2443 * for an optional feature - as long as you make sure to clean it up
2444 * once you unset it.
2446 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2447 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2448 ceph::features::mon::FEATURE_KRAKEN
));
2449 // this feature should only ever be set if the quorum supports it.
2450 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2451 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2453 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
2454 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2455 ceph::features::mon::FEATURE_LUMINOUS
));
2456 // this feature should only ever be set if the quorum supports it.
2457 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
));
2458 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
2460 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_MIMIC
)) {
2461 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2462 ceph::features::mon::FEATURE_MIMIC
));
2463 // this feature should only ever be set if the quorum supports it.
2464 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_MIMIC
));
2465 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC
);
2467 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_NAUTILUS
)) {
2468 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2469 ceph::features::mon::FEATURE_NAUTILUS
));
2470 // this feature should only ever be set if the quorum supports it.
2471 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_NAUTILUS
));
2472 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
);
2474 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_OCTOPUS
)) {
2475 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2476 ceph::features::mon::FEATURE_OCTOPUS
));
2477 // this feature should only ever be set if the quorum supports it.
2478 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_OCTOPUS
));
2479 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
);
2481 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_PACIFIC
)) {
2482 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2483 ceph::features::mon::FEATURE_PACIFIC
));
2484 // this feature should only ever be set if the quorum supports it.
2485 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_PACIFIC
));
2486 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC
);
2488 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_QUINCY
)) {
2489 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2490 ceph::features::mon::FEATURE_QUINCY
));
2491 // this feature should only ever be set if the quorum supports it.
2492 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_QUINCY
));
2493 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_QUINCY
);
2496 dout(5) << __func__
<< dendl
;
2497 _apply_compatset_features(new_features
);
2500 void Monitor::calc_quorum_requirements()
2502 required_features
= 0;
2505 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2506 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2508 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2509 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2511 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
)) {
2512 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2514 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC
)) {
2515 required_features
|= CEPH_FEATUREMASK_SERVER_MIMIC
;
2517 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
)) {
2518 required_features
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
|
2519 CEPH_FEATUREMASK_CEPHX_V2
;
2521 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
)) {
2522 required_features
|= CEPH_FEATUREMASK_SERVER_OCTOPUS
;
2524 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_PACIFIC
)) {
2525 required_features
|= CEPH_FEATUREMASK_SERVER_PACIFIC
;
2527 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_QUINCY
)) {
2528 required_features
|= CEPH_FEATUREMASK_SERVER_QUINCY
;
2532 if (monmap
->get_required_features().contains_all(
2533 ceph::features::mon::FEATURE_KRAKEN
)) {
2534 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2536 if (monmap
->get_required_features().contains_all(
2537 ceph::features::mon::FEATURE_LUMINOUS
)) {
2538 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2540 if (monmap
->get_required_features().contains_all(
2541 ceph::features::mon::FEATURE_MIMIC
)) {
2542 required_features
|= CEPH_FEATUREMASK_SERVER_MIMIC
;
2544 if (monmap
->get_required_features().contains_all(
2545 ceph::features::mon::FEATURE_NAUTILUS
)) {
2546 required_features
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
|
2547 CEPH_FEATUREMASK_CEPHX_V2
;
2549 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2552 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2554 *fm
+= session_map
.feature_map
;
2555 for (auto id
: quorum
) {
2557 *fm
+= quorum_feature_map
[id
];
2562 void Monitor::sync_force(Formatter
*f
)
2564 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2565 sync_stash_critical_state(tx
);
2566 tx
->put("mon_sync", "force_sync", 1);
2567 store
->apply_transaction(tx
);
2569 f
->open_object_section("sync_force");
2570 f
->dump_int("ret", 0);
2571 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2572 f
->close_section(); // sync_force
2575 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2577 bool free_formatter
= false;
2580 // louzy/lazy hack: default to json if no formatter has been defined
2581 f
= new JSONFormatter();
2582 free_formatter
= true;
2584 f
->open_object_section("quorum_status");
2585 f
->dump_int("election_epoch", get_epoch());
2587 f
->open_array_section("quorum");
2588 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2589 f
->dump_int("mon", *p
);
2590 f
->close_section(); // quorum
2592 list
<string
> quorum_names
= get_quorum_names();
2593 f
->open_array_section("quorum_names");
2594 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2595 f
->dump_string("mon", *p
);
2596 f
->close_section(); // quorum_names
2598 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(leader
));
2600 if (!quorum
.empty()) {
2606 f
->open_object_section("features");
2607 f
->dump_stream("quorum_con") << quorum_con_features
;
2608 quorum_mon_features
.dump(f
, "quorum_mon");
2611 f
->open_object_section("monmap");
2613 f
->close_section(); // monmap
2615 f
->close_section(); // quorum_status
2621 void Monitor::get_mon_status(Formatter
*f
)
2623 f
->open_object_section("mon_status");
2624 f
->dump_string("name", name
);
2625 f
->dump_int("rank", rank
);
2626 f
->dump_string("state", get_state_name());
2627 f
->dump_int("election_epoch", get_epoch());
2629 f
->open_array_section("quorum");
2630 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2631 f
->dump_int("mon", *p
);
2633 f
->close_section(); // quorum
2635 if (!quorum
.empty()) {
2641 f
->open_object_section("features");
2642 f
->dump_stream("required_con") << required_features
;
2643 mon_feature_t req_mon_features
= get_required_mon_features();
2644 req_mon_features
.dump(f
, "required_mon");
2645 f
->dump_stream("quorum_con") << quorum_con_features
;
2646 quorum_mon_features
.dump(f
, "quorum_mon");
2647 f
->close_section(); // features
2649 f
->open_array_section("outside_quorum");
2650 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2651 f
->dump_string("mon", *p
);
2652 f
->close_section(); // outside_quorum
2654 f
->open_array_section("extra_probe_peers");
2655 for (set
<entity_addrvec_t
>::iterator p
= extra_probe_peers
.begin();
2656 p
!= extra_probe_peers
.end();
2658 f
->dump_object("peer", *p
);
2660 f
->close_section(); // extra_probe_peers
2662 f
->open_array_section("sync_provider");
2663 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2664 p
!= sync_providers
.end();
2666 f
->dump_unsigned("cookie", p
->second
.cookie
);
2667 f
->dump_object("addrs", p
->second
.addrs
);
2668 f
->dump_stream("timeout") << p
->second
.timeout
;
2669 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2670 f
->dump_stream("last_key") << p
->second
.last_key
;
2674 if (is_synchronizing()) {
2675 f
->open_object_section("sync");
2676 f
->dump_stream("sync_provider") << sync_provider
;
2677 f
->dump_unsigned("sync_cookie", sync_cookie
);
2678 f
->dump_unsigned("sync_start_version", sync_start_version
);
2682 if (g_conf()->mon_sync_provider_kill_at
> 0)
2683 f
->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at
);
2684 if (g_conf()->mon_sync_requester_kill_at
> 0)
2685 f
->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at
);
2687 f
->open_object_section("monmap");
2691 f
->dump_object("feature_map", session_map
.feature_map
);
2692 f
->dump_bool("stretch_mode", stretch_mode_engaged
);
2693 f
->close_section(); // mon_status
2697 // health status to clog
2699 void Monitor::health_tick_start()
2701 if (!cct
->_conf
->mon_health_to_clog
||
2702 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2705 dout(15) << __func__
<< dendl
;
2708 health_tick_event
= timer
.add_event_after(
2709 cct
->_conf
->mon_health_to_clog_tick_interval
,
2710 new C_MonContext
{this, [this](int r
) {
2713 health_tick_start();
2717 void Monitor::health_tick_stop()
2719 dout(15) << __func__
<< dendl
;
2721 if (health_tick_event
) {
2722 timer
.cancel_event(health_tick_event
);
2723 health_tick_event
= NULL
;
2727 ceph::real_clock::time_point
Monitor::health_interval_calc_next_update()
2729 auto now
= ceph::real_clock::now();
2731 auto secs
= std::chrono::duration_cast
<std::chrono::seconds
>(now
.time_since_epoch());
2732 int remainder
= secs
.count() % cct
->_conf
->mon_health_to_clog_interval
;
2733 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2734 auto next
= secs
+ std::chrono::seconds(adjustment
);
2736 dout(20) << __func__
2737 << " now: " << now
<< ","
2738 << " next: " << next
<< ","
2739 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2742 return ceph::real_clock::time_point
{next
};
2745 void Monitor::health_interval_start()
2747 dout(15) << __func__
<< dendl
;
2749 if (!cct
->_conf
->mon_health_to_clog
||
2750 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2754 health_interval_stop();
2755 auto next
= health_interval_calc_next_update();
2756 health_interval_event
= new C_MonContext
{this, [this](int r
) {
2759 do_health_to_clog_interval();
2761 if (!timer
.add_event_at(next
, health_interval_event
)) {
2762 health_interval_event
= nullptr;
2766 void Monitor::health_interval_stop()
2768 dout(15) << __func__
<< dendl
;
2769 if (health_interval_event
) {
2770 timer
.cancel_event(health_interval_event
);
2772 health_interval_event
= NULL
;
2775 void Monitor::health_events_cleanup()
2778 health_interval_stop();
2779 health_status_cache
.reset();
2782 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2784 dout(20) << __func__
<< dendl
;
2786 if (changed
.count("mon_health_to_clog")) {
2787 if (!cct
->_conf
->mon_health_to_clog
) {
2788 health_events_cleanup();
2791 if (!health_tick_event
) {
2792 health_tick_start();
2794 if (!health_interval_event
) {
2795 health_interval_start();
2800 if (changed
.count("mon_health_to_clog_interval")) {
2801 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2802 health_interval_stop();
2804 health_interval_start();
2808 if (changed
.count("mon_health_to_clog_tick_interval")) {
2809 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2812 health_tick_start();
2817 void Monitor::do_health_to_clog_interval()
2819 // outputting to clog may have been disabled in the conf
2820 // since we were scheduled.
2821 if (!cct
->_conf
->mon_health_to_clog
||
2822 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2825 dout(10) << __func__
<< dendl
;
2827 // do we have a cached value for next_clog_update? if not,
2828 // do we know when the last update was?
2830 do_health_to_clog(true);
2831 health_interval_start();
2834 void Monitor::do_health_to_clog(bool force
)
2836 // outputting to clog may have been disabled in the conf
2837 // since we were scheduled.
2838 if (!cct
->_conf
->mon_health_to_clog
||
2839 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2842 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2845 health_status_t level
= healthmon()->get_health_status(false, nullptr, &summary
);
2847 summary
== health_status_cache
.summary
&&
2848 level
== health_status_cache
.overall
)
2851 if (g_conf()->mon_health_detail_to_clog
&&
2852 summary
!= health_status_cache
.summary
&&
2853 level
!= HEALTH_OK
) {
2855 level
= healthmon()->get_health_status(true, nullptr, &details
);
2856 clog
->health(level
) << "Health detail: " << details
;
2858 clog
->health(level
) << "overall " << summary
;
2860 health_status_cache
.summary
= summary
;
2861 health_status_cache
.overall
= level
;
2864 void Monitor::log_health(
2865 const health_check_map_t
& updated
,
2866 const health_check_map_t
& previous
,
2867 MonitorDBStore::TransactionRef t
)
2869 if (!g_conf()->mon_health_to_clog
) {
2873 const utime_t now
= ceph_clock_now();
2875 // FIXME: log atomically as part of @t instead of using clog.
2876 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2877 << " previous " << previous
.checks
.size()
2879 const auto min_log_period
= g_conf().get_val
<int64_t>(
2880 "mon_health_log_update_period");
2881 for (auto& p
: updated
.checks
) {
2882 auto q
= previous
.checks
.find(p
.first
);
2883 bool logged
= false;
2884 if (q
== previous
.checks
.end()) {
2887 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2889 clog
->health(p
.second
.severity
) << ss
.str();
2893 if (p
.second
.summary
!= q
->second
.summary
||
2894 p
.second
.severity
!= q
->second
.severity
) {
2896 auto status_iter
= health_check_log_times
.find(p
.first
);
2897 if (status_iter
!= health_check_log_times
.end()) {
2898 if (p
.second
.severity
== q
->second
.severity
&&
2899 now
- status_iter
->second
.updated_at
< min_log_period
) {
2900 // We already logged this recently and the severity is unchanged,
2901 // so skip emitting an update of the summary string.
2902 // We'll get an update out of tick() later if the check
2903 // is still failing.
2908 // summary or severity changed (ignore detail changes at this level)
2910 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2911 clog
->health(p
.second
.severity
) << ss
.str();
2916 // Record the time at which we last logged, so that we can check this
2917 // when considering whether/when to print update messages.
2919 auto iter
= health_check_log_times
.find(p
.first
);
2920 if (iter
== health_check_log_times
.end()) {
2921 health_check_log_times
.emplace(p
.first
, HealthCheckLogStatus(
2922 p
.second
.severity
, p
.second
.summary
, now
));
2924 iter
->second
= HealthCheckLogStatus(
2925 p
.second
.severity
, p
.second
.summary
, now
);
2929 for (auto& p
: previous
.checks
) {
2930 if (!updated
.checks
.count(p
.first
)) {
2933 if (p
.first
== "DEGRADED_OBJECTS") {
2934 clog
->info() << "All degraded objects recovered";
2935 } else if (p
.first
== "OSD_FLAGS") {
2936 clog
->info() << "OSD flags cleared";
2938 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2939 << p
.second
.summary
<< ")";
2942 if (health_check_log_times
.count(p
.first
)) {
2943 health_check_log_times
.erase(p
.first
);
2948 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2949 // We might be going into a fully healthy state, check
2951 bool any_checks
= false;
2952 for (auto& svc
: paxos_service
) {
2953 if (&(svc
->get_health_checks()) == &(previous
)) {
2954 // Ignore the ones we're clearing right now
2958 if (svc
->get_health_checks().checks
.size() > 0) {
2964 clog
->info() << "Cluster is now healthy";
2969 void Monitor::update_pending_metadata()
2972 collect_metadata(&metadata
);
2973 size_t version_size
= mon_metadata
[rank
]["ceph_version_short"].size();
2974 const std::string current_version
= mon_metadata
[rank
]["ceph_version_short"];
2975 const std::string pending_version
= metadata
["ceph_version_short"];
2977 if (current_version
.compare(0, version_size
, pending_version
) < 0) {
2978 mgr_client
.update_daemon_metadata("mon", name
, metadata
);
2982 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
,
2983 MonSession
*session
)
2986 f
->open_object_section("status");
2988 const auto&& fs_names
= session
->get_allowed_fs_names();
2991 f
->dump_stream("fsid") << monmap
->get_fsid();
2992 healthmon()->get_health_status(false, f
, nullptr);
2993 f
->dump_unsigned("election_epoch", get_epoch());
2995 f
->open_array_section("quorum");
2996 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2997 f
->dump_int("rank", *p
);
2999 f
->open_array_section("quorum_names");
3000 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
3001 f
->dump_string("id", monmap
->get_name(*p
));
3007 f
->open_object_section("monmap");
3008 monmap
->dump_summary(f
);
3010 f
->open_object_section("osdmap");
3011 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
3013 f
->open_object_section("pgmap");
3014 mgrstatmon()->print_summary(f
, NULL
);
3016 f
->open_object_section("fsmap");
3018 FSMap fsmap_copy
= mdsmon()->get_fsmap();
3019 if (!fs_names
.empty()) {
3020 fsmap_copy
.filter(fs_names
);
3022 const FSMap
*fsmapp
= &fsmap_copy
;
3024 fsmapp
->print_summary(f
, NULL
);
3026 f
->open_object_section("mgrmap");
3027 mgrmon()->get_map().print_summary(f
, nullptr);
3030 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
3032 f
->open_object_section("progress_events");
3033 for (auto& i
: mgrstatmon()->get_progress_events()) {
3034 f
->dump_object(i
.first
.c_str(), i
.second
);
3040 ss
<< " cluster:\n";
3041 ss
<< " id: " << monmap
->get_fsid() << "\n";
3044 healthmon()->get_health_status(false, nullptr, &health
,
3046 ss
<< " health: " << health
<< "\n";
3048 ss
<< "\n \n services:\n";
3051 auto& service_map
= mgrstatmon()->get_service_map();
3052 for (auto& p
: service_map
.services
) {
3053 maxlen
= std::max(maxlen
, p
.first
.size());
3055 string
spacing(maxlen
- 3, ' ');
3056 const auto quorum_names
= get_quorum_names();
3057 const auto mon_count
= monmap
->mon_info
.size();
3058 auto mnow
= ceph::mono_clock::now();
3059 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
3060 << quorum_names
<< " (age " << timespan_str(mnow
- quorum_since
) << ")";
3061 if (quorum_names
.size() != mon_count
) {
3062 std::list
<std::string
> out_of_q
;
3063 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
3064 if (quorum
.count(i
) == 0) {
3065 out_of_q
.push_back(monmap
->ranks
[i
]);
3068 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
3069 out_of_q
.end(), std::string(", "));
3072 if (mgrmon()->in_use()) {
3073 ss
<< " mgr: " << spacing
;
3074 mgrmon()->get_map().print_summary(nullptr, &ss
);
3078 FSMap fsmap_copy
= mdsmon()->get_fsmap();
3079 if (!fs_names
.empty()) {
3080 fsmap_copy
.filter(fs_names
);
3082 const FSMap
*fsmapp
= &fsmap_copy
;
3084 if (fsmapp
->filesystem_count() > 0 and mdsmon()->should_print_status()){
3085 ss
<< " mds: " << spacing
;
3086 fsmapp
->print_daemon_summary(ss
);
3090 ss
<< " osd: " << spacing
;
3091 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
3093 for (auto& p
: service_map
.services
) {
3094 const std::string
&service
= p
.first
;
3095 // filter out normal ceph entity types
3096 if (ServiceMap::is_normal_ceph_entity(service
)) {
3099 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
3100 << p
.second
.get_summary() << "\n";
3104 if (auto& service_map
= mgrstatmon()->get_service_map();
3105 std::any_of(service_map
.services
.begin(),
3106 service_map
.services
.end(),
3108 return service
.second
.has_running_tasks();
3110 ss
<< "\n \n task status:\n";
3111 for (auto& [name
, service
] : service_map
.services
) {
3112 ss
<< service
.get_task_summary(name
);
3116 ss
<< "\n \n data:\n";
3117 mdsmon()->print_fs_summary(ss
);
3118 mgrstatmon()->print_summary(NULL
, &ss
);
3120 auto& pem
= mgrstatmon()->get_progress_events();
3122 ss
<< "\n \n progress:\n";
3123 for (auto& i
: pem
) {
3124 if (i
.second
.add_to_ceph_s
){
3125 ss
<< " " << i
.second
.message
<< "\n";
3133 void Monitor::_generate_command_map(cmdmap_t
& cmdmap
,
3134 map
<string
,string
> ¶m_str_map
)
3136 for (auto p
= cmdmap
.begin(); p
!= cmdmap
.end(); ++p
) {
3137 if (p
->first
== "prefix")
3139 if (p
->first
== "caps") {
3141 if (cmd_getval(cmdmap
, "caps", cv
) &&
3142 cv
.size() % 2 == 0) {
3143 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
3144 string k
= string("caps_") + cv
[i
];
3145 param_str_map
[k
] = cv
[i
+ 1];
3150 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
3154 const MonCommand
*Monitor::_get_moncommand(
3155 const string
&cmd_prefix
,
3156 const vector
<MonCommand
>& cmds
)
3158 for (auto& c
: cmds
) {
3159 if (c
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
3166 bool Monitor::_allowed_command(MonSession
*s
, const string
&module
,
3167 const string
&prefix
, const cmdmap_t
& cmdmap
,
3168 const map
<string
,string
>& param_str_map
,
3169 const MonCommand
*this_cmd
) {
3171 bool cmd_r
= this_cmd
->requires_perm('r');
3172 bool cmd_w
= this_cmd
->requires_perm('w');
3173 bool cmd_x
= this_cmd
->requires_perm('x');
3175 bool capable
= s
->caps
.is_capable(
3178 module
, prefix
, param_str_map
,
3179 cmd_r
, cmd_w
, cmd_x
,
3180 s
->get_peer_socket_addr());
3182 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
3186 void Monitor::format_command_descriptions(const std::vector
<MonCommand
> &commands
,
3192 f
->open_object_section("command_descriptions");
3193 for (const auto &cmd
: commands
) {
3194 unsigned flags
= cmd
.flags
;
3195 ostringstream secname
;
3196 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
3197 dump_cmddesc_to_json(f
, features
, secname
.str(),
3198 cmd
.cmdstring
, cmd
.helpstring
, cmd
.module
,
3199 cmd
.req_perms
, flags
);
3202 f
->close_section(); // command_descriptions
3207 bool Monitor::is_keyring_required()
3209 return auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
) ||
3210 auth_service_required
.is_supported_auth(CEPH_AUTH_CEPHX
) ||
3211 auth_cluster_required
.is_supported_auth(CEPH_AUTH_GSS
) ||
3212 auth_service_required
.is_supported_auth(CEPH_AUTH_GSS
);
3215 struct C_MgrProxyCommand
: public Context
{
3221 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
3222 : mon(mon
), op(op
), size(s
) { }
3223 void finish(int r
) {
3224 std::lock_guard
l(mon
->lock
);
3225 mon
->mgr_proxy_bytes
-= size
;
3226 mon
->reply_command(op
, r
, outs
, outbl
, 0);
3230 void Monitor::handle_tell_command(MonOpRequestRef op
)
3232 ceph_assert(op
->is_type_command());
3233 MCommand
*m
= static_cast<MCommand
*>(op
->get_req());
3234 if (m
->fsid
!= monmap
->fsid
) {
3235 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
3236 return reply_tell_command(op
, -EACCES
, "wrong fsid");
3238 MonSession
*session
= op
->get_session();
3240 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
3244 if (stringstream ss
; !cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3245 return reply_tell_command(op
, -EINVAL
, ss
.str());
3247 map
<string
,string
> param_str_map
;
3248 _generate_command_map(cmdmap
, param_str_map
);
3250 if (!cmd_getval(cmdmap
, "prefix", prefix
)) {
3251 return reply_tell_command(op
, -EINVAL
, "no prefix");
3253 if (auto cmd
= _get_moncommand(prefix
,
3254 get_local_commands(quorum_mon_features
));
3256 if (cmd
->is_obsolete() ||
3257 (cct
->_conf
->mon_debug_deprecated_as_obsolete
&&
3258 cmd
->is_deprecated())) {
3259 return reply_tell_command(op
, -ENOTSUP
,
3260 "command is obsolete; "
3261 "please check usage and/or man page");
3264 // see if command is allowed
3265 if (!session
->caps
.is_capable(
3267 session
->entity_name
,
3268 "mon", prefix
, param_str_map
,
3270 session
->get_peer_socket_addr())) {
3271 return reply_tell_command(op
, -EACCES
, "insufficient caps");
3274 cct
->get_admin_socket()->queue_tell_command(m
);
3277 void Monitor::handle_command(MonOpRequestRef op
)
3279 ceph_assert(op
->is_type_command());
3280 auto m
= op
->get_req
<MMonCommand
>();
3281 if (m
->fsid
!= monmap
->fsid
) {
3282 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
3284 reply_command(op
, -EPERM
, "wrong fsid", 0);
3288 MonSession
*session
= op
->get_session();
3290 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
3294 if (m
->cmd
.empty()) {
3295 reply_command(op
, -EINVAL
, "no command specified", 0);
3300 vector
<string
> fullcmd
;
3302 stringstream ss
, ds
;
3306 rs
= "unrecognized command";
3308 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3309 // ss has reason for failure
3312 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3313 reply_command(op
, r
, rs
, 0);
3317 // check return value. If no prefix parameter provided,
3318 // return value will be false, then return error info.
3319 if (!cmd_getval(cmdmap
, "prefix", prefix
)) {
3320 reply_command(op
, -EINVAL
, "command prefix not found", 0);
3324 // check prefix is empty
3325 if (prefix
.empty()) {
3326 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
3330 if (prefix
== "get_command_descriptions") {
3332 Formatter
*f
= Formatter::create("json");
3334 std::vector
<MonCommand
> commands
= static_cast<MgrMonitor
*>(
3335 paxos_service
[PAXOS_MGR
].get())->get_command_descs();
3337 for (auto& c
: leader_mon_commands
) {
3338 commands
.push_back(c
);
3341 auto features
= m
->get_connection()->get_features();
3342 format_command_descriptions(commands
, f
, features
, &rdata
);
3344 reply_command(op
, 0, "", rdata
, 0);
3348 dout(0) << "handle_command " << *m
<< dendl
;
3350 string format
= cmd_getval_or
<string
>(cmdmap
, "format", "plain");
3351 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
3353 get_str_vec(prefix
, fullcmd
);
3355 // make sure fullcmd is not empty.
3356 // invalid prefix will cause empty vector fullcmd.
3357 // such as, prefix=";,,;"
3358 if (fullcmd
.empty()) {
3359 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3363 std::string_view module
= fullcmd
[0];
3365 // validate command is in leader map
3367 const MonCommand
*leader_cmd
;
3368 const auto& mgr_cmds
= mgrmon()->get_command_descs();
3369 const MonCommand
*mgr_cmd
= nullptr;
3370 if (!mgr_cmds
.empty()) {
3371 mgr_cmd
= _get_moncommand(prefix
, mgr_cmds
);
3373 leader_cmd
= _get_moncommand(prefix
, leader_mon_commands
);
3375 leader_cmd
= mgr_cmd
;
3377 reply_command(op
, -EINVAL
, "command not known", 0);
3381 // validate command is in our map & matches, or forward if it is allowed
3382 const MonCommand
*mon_cmd
= _get_moncommand(
3384 get_local_commands(quorum_mon_features
));
3390 if (leader_cmd
->is_noforward()) {
3391 reply_command(op
, -EINVAL
,
3392 "command not locally supported and not allowed to forward",
3396 dout(10) << "Command not locally supported, forwarding request "
3398 forward_request_leader(op
);
3400 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3401 if (mon_cmd
->is_noforward()) {
3402 reply_command(op
, -EINVAL
,
3403 "command not compatible with leader and not allowed to forward",
3407 dout(10) << "Command not compatible with leader, forwarding request "
3409 forward_request_leader(op
);
3414 if (mon_cmd
->is_obsolete() ||
3415 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3416 && mon_cmd
->is_deprecated())) {
3417 reply_command(op
, -ENOTSUP
,
3418 "command is obsolete; please check usage and/or man page",
3423 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3424 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3425 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3429 /* what we perceive as being the service the command falls under */
3430 string
service(mon_cmd
->module
);
3432 dout(25) << __func__
<< " prefix='" << prefix
3433 << "' module='" << module
3434 << "' service='" << service
<< "'" << dendl
;
3437 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3439 // validate user's permissions for requested command
3440 map
<string
,string
> param_str_map
;
3442 // Catch bad_cmd_get exception if _generate_command_map() throws it
3444 _generate_command_map(cmdmap
, param_str_map
);
3446 catch(bad_cmd_get
& e
) {
3447 reply_command(op
, -EINVAL
, e
.what(), 0);
3450 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3451 param_str_map
, mon_cmd
)) {
3452 dout(1) << __func__
<< " access denied" << dendl
;
3453 if (prefix
!= "config set" && prefix
!= "config-key set")
3454 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3455 << "from='" << session
->name
<< " " << session
->addrs
<< "' "
3456 << "entity='" << session
->entity_name
<< "' "
3457 << "cmd=" << m
->cmd
<< ": access denied";
3458 reply_command(op
, -EACCES
, "access denied", 0);
3462 if (prefix
!= "config set" && prefix
!= "config-key set")
3463 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3464 << "from='" << session
->name
<< " " << session
->addrs
<< "' "
3465 << "entity='" << session
->entity_name
<< "' "
3466 << "cmd=" << m
->cmd
<< ": dispatch";
3468 // compat kludge for legacy clients trying to tell commands that are
3469 // new. see bottom of MonCommands.h. we need to handle both (1)
3470 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3471 // and octopus mons.
3472 if ((!HAVE_FEATURE(m
->get_connection()->get_features(), SERVER_OCTOPUS
) ||
3473 monmap
->min_mon_release
< ceph_release_t::octopus
) &&
3474 (prefix
== "injectargs" ||
3475 prefix
== "smart" ||
3476 prefix
== "mon_status" ||
3477 prefix
== "heap")) {
3478 if (m
->get_connection()->get_messenger() == 0) {
3479 // Prior to octopus, monitors might forward these messages
3480 // around. that was broken at baseline, and if we try to process
3481 // this message now, it will assert out when we try to send a
3482 // message in reply from the asok/tell worker (see
3483 // AnonConnection). Just reply with an error.
3484 dout(5) << __func__
<< " failing forwarded command from a (presumably) "
3485 << "pre-octopus peer" << dendl
;
3488 "failing forwarded tell command in mixed-version mon cluster", 0);
3491 dout(5) << __func__
<< " passing command to tell/asok" << dendl
;
3492 cct
->get_admin_socket()->queue_tell_command(m
);
3496 if (mon_cmd
->is_mgr()) {
3497 const auto& hdr
= m
->get_header();
3498 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3499 uint64_t max
= g_conf().get_val
<Option::size_t>("mon_client_bytes")
3500 * g_conf().get_val
<double>("mon_mgr_proxy_client_bytes_ratio");
3501 if (mgr_proxy_bytes
+ size
> max
) {
3502 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3503 << " + " << size
<< " > max " << max
<< dendl
;
3504 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3507 mgr_proxy_bytes
+= size
;
3508 dout(10) << __func__
<< " proxying mgr command (+" << size
3509 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3510 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3511 mgr_client
.start_command(m
->cmd
,
3515 new C_OnFinisher(fin
, &finisher
));
3519 if ((module
== "mds" || module
== "fs") &&
3520 prefix
!= "fs authorize") {
3521 mdsmon()->dispatch(op
);
3524 if ((module
== "osd" ||
3525 prefix
== "pg map" ||
3526 prefix
== "pg repeer") &&
3527 prefix
!= "osd last-stat-seq") {
3528 osdmon()->dispatch(op
);
3531 if (module
== "config") {
3532 configmon()->dispatch(op
);
3536 if (module
== "mon" &&
3537 /* Let the Monitor class handle the following commands:
3540 prefix
!= "mon scrub" &&
3541 prefix
!= "mon metadata" &&
3542 prefix
!= "mon versions" &&
3543 prefix
!= "mon count-metadata" &&
3544 prefix
!= "mon ok-to-stop" &&
3545 prefix
!= "mon ok-to-add-offline" &&
3546 prefix
!= "mon ok-to-rm") {
3547 monmon()->dispatch(op
);
3550 if (module
== "health" && prefix
!= "health") {
3551 healthmon()->dispatch(op
);
3554 if (module
== "auth" || prefix
== "fs authorize") {
3555 authmon()->dispatch(op
);
3558 if (module
== "log") {
3559 logmon()->dispatch(op
);
3563 if (module
== "config-key") {
3564 kvmon()->dispatch(op
);
3568 if (module
== "mgr") {
3569 mgrmon()->dispatch(op
);
3573 if (prefix
== "fsid") {
3575 f
->open_object_section("fsid");
3576 f
->dump_stream("fsid") << monmap
->fsid
;
3583 reply_command(op
, 0, "", rdata
, 0);
3587 if (prefix
== "mon scrub") {
3588 wait_for_paxos_write();
3590 int r
= scrub_start();
3591 reply_command(op
, r
, "", rdata
, 0);
3592 } else if (is_peon()) {
3593 forward_request_leader(op
);
3595 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3600 if (prefix
== "time-sync-status") {
3602 f
.reset(Formatter::create("json-pretty"));
3603 f
->open_object_section("time_sync");
3604 if (!timecheck_skews
.empty()) {
3605 f
->open_object_section("time_skew_status");
3606 for (auto& i
: timecheck_skews
) {
3607 double skew
= i
.second
;
3608 double latency
= timecheck_latencies
[i
.first
];
3609 string name
= monmap
->get_name(i
.first
);
3611 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3612 f
->open_object_section(name
.c_str());
3613 f
->dump_float("skew", skew
);
3614 f
->dump_float("latency", latency
);
3615 f
->dump_stream("health") << tcstatus
;
3616 if (tcstatus
!= HEALTH_OK
) {
3617 f
->dump_stream("details") << tcss
.str();
3623 f
->open_object_section("timechecks");
3624 f
->dump_unsigned("epoch", get_epoch());
3625 f
->dump_int("round", timecheck_round
);
3626 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3627 "on-going" : "finished");
3633 } else if (prefix
== "status" ||
3634 prefix
== "health" ||
3637 cmd_getval(cmdmap
, "detail", detail
);
3639 if (prefix
== "status") {
3640 // get_cluster_status handles f == NULL
3641 get_cluster_status(ds
, f
.get(), session
);
3648 } else if (prefix
== "health") {
3650 healthmon()->get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3655 rdata
.append(plain
);
3657 } else if (prefix
== "df") {
3658 bool verbose
= (detail
== "detail");
3660 f
->open_object_section("stats");
3662 mgrstatmon()->dump_cluster_stats(&ds
, f
.get(), verbose
);
3666 mgrstatmon()->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3674 ceph_abort_msg("We should never get here!");
3680 } else if (prefix
== "report") {
3682 // this must be formatted, in its current form
3684 f
.reset(Formatter::create("json-pretty"));
3685 f
->open_object_section("report");
3686 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3687 f
->dump_string("version", ceph_version_to_str());
3688 f
->dump_string("commit", git_version_to_str());
3689 f
->dump_stream("timestamp") << ceph_clock_now();
3691 vector
<string
> tagsvec
;
3692 cmd_getval(cmdmap
, "tags", tagsvec
);
3693 string tagstr
= str_join(tagsvec
, " ");
3694 if (!tagstr
.empty())
3695 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3696 f
->dump_string("tag", tagstr
);
3698 healthmon()->get_health_status(true, f
.get(), nullptr);
3700 monmon()->dump_info(f
.get());
3701 osdmon()->dump_info(f
.get());
3702 mdsmon()->dump_info(f
.get());
3703 authmon()->dump_info(f
.get());
3704 mgrstatmon()->dump_info(f
.get());
3706 paxos
->dump_info(f
.get());
3712 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT_LEGACY
);
3715 } else if (prefix
== "osd last-stat-seq") {
3717 cmd_getval(cmdmap
, "id", osd
);
3718 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3720 f
->dump_unsigned("seq", seq
);
3728 } else if (prefix
== "node ls") {
3729 string
node_type("all");
3730 cmd_getval(cmdmap
, "type", node_type
);
3732 f
.reset(Formatter::create("json-pretty"));
3733 if (node_type
== "all") {
3734 f
->open_object_section("nodes");
3735 print_nodes(f
.get(), ds
);
3736 osdmon()->print_nodes(f
.get());
3737 mdsmon()->print_nodes(f
.get());
3738 mgrmon()->print_nodes(f
.get());
3740 } else if (node_type
== "mon") {
3741 print_nodes(f
.get(), ds
);
3742 } else if (node_type
== "osd") {
3743 osdmon()->print_nodes(f
.get());
3744 } else if (node_type
== "mds") {
3745 mdsmon()->print_nodes(f
.get());
3746 } else if (node_type
== "mgr") {
3747 mgrmon()->print_nodes(f
.get());
3753 } else if (prefix
== "features") {
3754 if (!is_leader() && !is_peon()) {
3755 dout(10) << " waiting for quorum" << dendl
;
3756 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3760 forward_request_leader(op
);
3764 f
.reset(Formatter::create("json-pretty"));
3766 get_combined_feature_map(&fm
);
3767 f
->dump_object("features", fm
);
3771 } else if (prefix
== "mon metadata") {
3773 f
.reset(Formatter::create("json-pretty"));
3776 bool all
= !cmd_getval(cmdmap
, "id", name
);
3778 // Dump a single mon's metadata
3779 int mon
= monmap
->get_rank(name
);
3781 rs
= "requested mon not found";
3785 f
->open_object_section("mon_metadata");
3786 r
= get_mon_metadata(mon
, f
.get(), ds
);
3789 // Dump all mons' metadata
3791 f
->open_array_section("mon_metadata");
3792 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3793 std::ostringstream get_err
;
3794 f
->open_object_section("mon");
3795 f
->dump_string("name", monmap
->get_name(rank
));
3796 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3798 if (r
== -ENOENT
|| r
== -EINVAL
) {
3799 dout(1) << get_err
.str() << dendl
;
3800 // Drop error, list what metadata we do have
3802 } else if (r
!= 0) {
3803 derr
<< "Unexpected error from get_mon_metadata: "
3804 << cpp_strerror(r
) << dendl
;
3805 ds
<< get_err
.str();
3815 } else if (prefix
== "mon versions") {
3817 f
.reset(Formatter::create("json-pretty"));
3818 count_metadata("ceph_version", f
.get());
3823 } else if (prefix
== "mon count-metadata") {
3825 f
.reset(Formatter::create("json-pretty"));
3827 cmd_getval(cmdmap
, "property", field
);
3828 count_metadata(field
, f
.get());
3833 } else if (prefix
== "quorum_status") {
3834 // make sure our map is readable and up to date
3835 if (!is_leader() && !is_peon()) {
3836 dout(10) << " waiting for quorum" << dendl
;
3837 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3840 _quorum_status(f
.get(), ds
);
3844 } else if (prefix
== "mon ok-to-stop") {
3846 if (!cmd_getval(cmdmap
, "ids", ids
)) {
3850 set
<string
> wouldbe
;
3851 for (auto rank
: quorum
) {
3852 wouldbe
.insert(monmap
->get_name(rank
));
3854 for (auto& n
: ids
) {
3855 if (monmap
->contains(n
)) {
3859 if (wouldbe
.size() < monmap
->min_quorum_size()) {
3861 rs
= "not enough monitors would be available (" + stringify(wouldbe
) +
3862 ") after stopping mons " + stringify(ids
);
3866 rs
= "quorum should be preserved (" + stringify(wouldbe
) +
3867 ") after stopping " + stringify(ids
);
3868 } else if (prefix
== "mon ok-to-add-offline") {
3869 if (quorum
.size() < monmap
->min_quorum_size(monmap
->size() + 1)) {
3870 rs
= "adding a monitor may break quorum (until that monitor starts)";
3874 rs
= "adding another mon that is not yet online will not break quorum";
3876 } else if (prefix
== "mon ok-to-rm") {
3878 if (!cmd_getval(cmdmap
, "id", id
)) {
3880 rs
= "must specify a monitor id";
3883 if (!monmap
->contains(id
)) {
3885 rs
= "mon." + id
+ " does not exist";
3888 int rank
= monmap
->get_rank(id
);
3889 if (quorum
.count(rank
) &&
3890 quorum
.size() - 1 < monmap
->min_quorum_size(monmap
->size() - 1)) {
3892 rs
= "removing mon." + id
+ " would break quorum";
3896 rs
= "safe to remove mon." + id
;
3897 } else if (prefix
== "version") {
3899 f
->open_object_section("version");
3900 f
->dump_string("version", pretty_version_to_str());
3904 ds
<< pretty_version_to_str();
3909 } else if (prefix
== "versions") {
3911 f
.reset(Formatter::create("json-pretty"));
3912 map
<string
,int> overall
;
3913 f
->open_object_section("version");
3914 map
<string
,int> mon
, mgr
, osd
, mds
;
3916 count_metadata("ceph_version", &mon
);
3917 f
->open_object_section("mon");
3918 for (auto& p
: mon
) {
3919 f
->dump_int(p
.first
.c_str(), p
.second
);
3920 overall
[p
.first
] += p
.second
;
3924 mgrmon()->count_metadata("ceph_version", &mgr
);
3925 f
->open_object_section("mgr");
3926 for (auto& p
: mgr
) {
3927 f
->dump_int(p
.first
.c_str(), p
.second
);
3928 overall
[p
.first
] += p
.second
;
3932 osdmon()->count_metadata("ceph_version", &osd
);
3933 f
->open_object_section("osd");
3934 for (auto& p
: osd
) {
3935 f
->dump_int(p
.first
.c_str(), p
.second
);
3936 overall
[p
.first
] += p
.second
;
3940 mdsmon()->count_metadata("ceph_version", &mds
);
3941 f
->open_object_section("mds");
3942 for (auto& p
: mds
) {
3943 f
->dump_int(p
.first
.c_str(), p
.second
);
3944 overall
[p
.first
] += p
.second
;
3948 for (auto& p
: mgrstatmon()->get_service_map().services
) {
3949 auto &service
= p
.first
;
3950 if (ServiceMap::is_normal_ceph_entity(service
)) {
3953 f
->open_object_section(service
.c_str());
3955 p
.second
.count_metadata("ceph_version", &m
);
3957 f
->dump_int(q
.first
.c_str(), q
.second
);
3958 overall
[q
.first
] += q
.second
;
3963 f
->open_object_section("overall");
3964 for (auto& p
: overall
) {
3965 f
->dump_int(p
.first
.c_str(), p
.second
);
3975 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3976 reply_command(op
, r
, rs
, rdata
, 0);
3979 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3982 reply_command(op
, rc
, rs
, rdata
, version
);
3985 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3986 bufferlist
& rdata
, version_t version
)
3988 auto m
= op
->get_req
<MMonCommand
>();
3989 ceph_assert(m
->get_type() == MSG_MON_COMMAND
);
3990 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3991 reply
->set_tid(m
->get_tid());
3992 reply
->set_data(rdata
);
3993 send_reply(op
, reply
);
3996 void Monitor::reply_tell_command(
3997 MonOpRequestRef op
, int rc
, const string
&rs
)
3999 MCommand
*m
= static_cast<MCommand
*>(op
->get_req());
4000 ceph_assert(m
->get_type() == MSG_COMMAND
);
4001 MCommandReply
*reply
= new MCommandReply(rc
, rs
);
4002 reply
->set_tid(m
->get_tid());
4003 m
->get_connection()->send_message(reply
);
4007 // ------------------------
4008 // request/reply routing
4010 // a client/mds/osd will connect to a random monitor. we need to forward any
4011 // messages requiring state updates to the leader, and then route any replies
4012 // back via the correct monitor and back to them. (the monitor will not
4013 // initiate any connections.)
4015 void Monitor::forward_request_leader(MonOpRequestRef op
)
4017 op
->mark_event(__func__
);
4019 int mon
= get_leader();
4020 MonSession
*session
= op
->get_session();
4021 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
4023 if (req
->get_source().is_mon() && req
->get_source_addrs() != messenger
->get_myaddrs()) {
4024 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
4025 } else if (session
->proxy_con
) {
4026 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
4027 } else if (!session
->closed
) {
4028 RoutedRequest
*rr
= new RoutedRequest
;
4029 rr
->tid
= ++routed_request_tid
;
4030 rr
->con
= req
->get_connection();
4031 rr
->con_features
= rr
->con
->get_features();
4032 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
4033 rr
->session
= static_cast<MonSession
*>(session
->get());
4035 routed_requests
[rr
->tid
] = rr
;
4036 session
->routed_request_tids
.insert(rr
->tid
);
4038 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
4039 << " features " << rr
->con_features
<< dendl
;
4041 MForward
*forward
= new MForward(rr
->tid
,
4045 forward
->set_priority(req
->get_priority());
4046 if (session
->auth_handler
) {
4047 forward
->entity_name
= session
->entity_name
;
4048 } else if (req
->get_source().is_mon()) {
4049 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
4051 send_mon_message(forward
, mon
);
4052 op
->mark_forwarded();
4053 ceph_assert(op
->get_req()->get_type() != 0);
4055 dout(10) << "forward_request no session for request " << *req
<< dendl
;
4059 // fake connection attached to forwarded messages
4060 struct AnonConnection
: public Connection
{
4061 entity_addr_t socket_addr
;
4063 int send_message(Message
*m
) override
{
4064 ceph_assert(!"send_message on anonymous connection");
4066 void send_keepalive() override
{
4067 ceph_assert(!"send_keepalive on anonymous connection");
4069 void mark_down() override
{
4072 void mark_disposable() override
{
4075 bool is_connected() override
{ return false; }
4076 entity_addr_t
get_peer_socket_addr() const override
{
4081 FRIEND_MAKE_REF(AnonConnection
);
4082 explicit AnonConnection(CephContext
*cct
, const entity_addr_t
& sa
)
4083 : Connection(cct
, nullptr),
4087 //extract the original message and put it into the regular dispatch function
4088 void Monitor::handle_forward(MonOpRequestRef op
)
4090 auto m
= op
->get_req
<MForward
>();
4091 dout(10) << "received forwarded message from "
4092 << ceph_entity_type_name(m
->client_type
)
4093 << " " << m
->client_addrs
4094 << " via " << m
->get_source_inst() << dendl
;
4095 MonSession
*session
= op
->get_session();
4096 ceph_assert(session
);
4098 if (!session
->is_capable("mon", MON_CAP_X
)) {
4099 dout(0) << "forward from entity with insufficient caps! "
4100 << session
->caps
<< dendl
;
4102 // see PaxosService::dispatch(); we rely on this being anon
4103 // (c->msgr == NULL)
4104 PaxosServiceMessage
*req
= m
->claim_message();
4105 ceph_assert(req
!= NULL
);
4107 auto c
= ceph::make_ref
<AnonConnection
>(cct
, m
->client_socket_addr
);
4108 MonSession
*s
= new MonSession(static_cast<Connection
*>(c
.get()));
4109 s
->_ident(req
->get_source(),
4110 req
->get_source_addrs());
4111 c
->set_priv(RefCountedPtr
{s
, false});
4112 c
->set_peer_addrs(m
->client_addrs
);
4113 c
->set_peer_type(m
->client_type
);
4114 c
->set_features(m
->con_features
);
4116 s
->authenticated
= true;
4117 s
->caps
= m
->client_caps
;
4118 dout(10) << " caps are " << s
->caps
<< dendl
;
4119 s
->entity_name
= m
->entity_name
;
4120 dout(10) << " entity name '" << s
->entity_name
<< "' type "
4121 << s
->entity_name
.get_type() << dendl
;
4122 s
->proxy_con
= m
->get_connection();
4123 s
->proxy_tid
= m
->tid
;
4125 req
->set_connection(c
);
4127 // not super accurate, but better than nothing.
4128 req
->set_recv_stamp(m
->get_recv_stamp());
4131 * note which election epoch this is; we will drop the message if
4132 * there is a future election since our peers will resend routed
4133 * requests in that case.
4135 req
->rx_election_epoch
= get_epoch();
4137 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
4140 // break the session <-> con ref loop by removing the con->session
4141 // reference, which is no longer needed once the MonOpRequest is
4147 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
4149 op
->mark_event(__func__
);
4151 MonSession
*session
= op
->get_session();
4152 ceph_assert(session
);
4153 Message
*req
= op
->get_req();
4154 ConnectionRef con
= op
->get_connection();
4156 reply
->set_cct(g_ceph_context
);
4157 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
4160 dout(2) << "send_reply no connection, dropping reply " << *reply
4161 << " to " << req
<< " " << *req
<< dendl
;
4163 op
->mark_event("reply: no connection");
4167 if (!session
->con
&& !session
->proxy_con
) {
4168 dout(2) << "send_reply no connection, dropping reply " << *reply
4169 << " to " << req
<< " " << *req
<< dendl
;
4171 op
->mark_event("reply: no connection");
4175 if (session
->proxy_con
) {
4176 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
4177 << " via " << session
->proxy_con
->get_peer_addr()
4178 << " for request " << *req
<< dendl
;
4179 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
4180 op
->mark_event("reply: send routed request");
4182 session
->con
->send_message(reply
);
4183 op
->mark_event("reply: send");
4187 void Monitor::no_reply(MonOpRequestRef op
)
4189 MonSession
*session
= op
->get_session();
4190 Message
*req
= op
->get_req();
4192 if (session
->proxy_con
) {
4193 dout(10) << "no_reply to " << req
->get_source_inst()
4194 << " via " << session
->proxy_con
->get_peer_addr()
4195 << " for request " << *req
<< dendl
;
4196 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
4197 op
->mark_event("no_reply: send routed request");
4199 dout(10) << "no_reply to " << req
->get_source_inst()
4200 << " " << *req
<< dendl
;
4201 op
->mark_event("no_reply");
4205 void Monitor::handle_route(MonOpRequestRef op
)
4207 auto m
= op
->get_req
<MRoute
>();
4208 MonSession
*session
= op
->get_session();
4210 if (!session
->is_capable("mon", MON_CAP_X
)) {
4211 dout(0) << "MRoute received from entity without appropriate perms! "
4216 dout(10) << "handle_route tid " << m
->session_mon_tid
<< " " << *m
->msg
4219 dout(10) << "handle_route tid " << m
->session_mon_tid
<< " null" << dendl
;
4222 if (!m
->session_mon_tid
) {
4223 dout(10) << " not a routed request, ignoring" << dendl
;
4226 auto found
= routed_requests
.find(m
->session_mon_tid
);
4227 if (found
== routed_requests
.end()) {
4228 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
4231 std::unique_ptr
<RoutedRequest
> rr
{found
->second
};
4232 // reset payload, in case encoding is dependent on target features
4234 m
->msg
->clear_payload();
4235 rr
->con
->send_message(m
->msg
);
4238 if (m
->send_osdmap_first
) {
4239 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
4240 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
4241 true, MonOpRequestRef());
4243 ceph_assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
4244 routed_requests
.erase(found
);
4245 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
4248 void Monitor::resend_routed_requests()
4250 dout(10) << "resend_routed_requests" << dendl
;
4251 int mon
= get_leader();
4252 list
<Context
*> retry
;
4253 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
4254 p
!= routed_requests
.end();
4256 RoutedRequest
*rr
= p
->second
;
4259 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
4260 rr
->op
->mark_event("retry routed request");
4261 retry
.push_back(new C_RetryMessage(this, rr
->op
));
4263 ceph_assert(rr
->session
->routed_request_tids
.count(p
->first
));
4264 rr
->session
->routed_request_tids
.erase(p
->first
);
4268 auto q
= rr
->request_bl
.cbegin();
4269 PaxosServiceMessage
*req
=
4270 (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
4271 rr
->op
->mark_event("resend forwarded message to leader");
4272 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
4274 MForward
*forward
= new MForward(rr
->tid
,
4278 req
->put(); // forward takes its own ref; drop ours.
4279 forward
->client_type
= rr
->con
->get_peer_type();
4280 forward
->client_addrs
= rr
->con
->get_peer_addrs();
4281 forward
->client_socket_addr
= rr
->con
->get_peer_socket_addr();
4282 forward
->set_priority(req
->get_priority());
4283 send_mon_message(forward
, mon
);
4287 routed_requests
.clear();
4288 finish_contexts(g_ceph_context
, retry
);
4292 void Monitor::remove_session(MonSession
*s
)
4294 dout(10) << "remove_session " << s
<< " " << s
->name
<< " " << s
->addrs
4295 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
4296 ceph_assert(s
->con
);
4297 ceph_assert(!s
->closed
);
4298 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
4299 p
!= s
->routed_request_tids
.end();
4301 ceph_assert(routed_requests
.count(*p
));
4302 RoutedRequest
*rr
= routed_requests
[*p
];
4303 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
4305 routed_requests
.erase(*p
);
4307 s
->routed_request_tids
.clear();
4308 s
->con
->set_priv(nullptr);
4309 session_map
.remove_session(s
);
4310 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4311 logger
->inc(l_mon_session_rm
);
4314 void Monitor::remove_all_sessions()
4316 std::lock_guard
l(session_map_lock
);
4317 while (!session_map
.sessions
.empty()) {
4318 MonSession
*s
= session_map
.sessions
.front();
4320 logger
->inc(l_mon_session_rm
);
4323 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4326 void Monitor::send_mon_message(Message
*m
, int rank
)
4328 messenger
->send_to_mon(m
, monmap
->get_addrs(rank
));
4331 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
4334 * Wait list the new session until we're in the quorum, assuming it's
4336 * tick() will periodically send them back through so we can send
4337 * the client elsewhere if we don't think we're getting back in.
4339 * But we allow a few sorts of messages:
4340 * 1) Monitors can talk to us at any time, of course.
4341 * 2) auth messages. It's unlikely to go through much faster, but
4342 * it's possible we've just lost our quorum status and we want to take...
4343 * 3) command messages. We want to accept these under all possible
4346 Message
*m
= op
->get_req();
4347 MonSession
*s
= op
->get_session();
4348 ConnectionRef con
= op
->get_connection();
4349 utime_t too_old
= ceph_clock_now();
4350 too_old
-= g_ceph_context
->_conf
->mon_lease
;
4351 if (m
->get_recv_stamp() > too_old
&&
4352 con
->is_connected()) {
4353 dout(5) << "waitlisting message " << *m
<< dendl
;
4354 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
4355 op
->mark_wait_for_quorum();
4357 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
4359 // proxied sessions aren't registered and don't have a con; don't remove
4361 if (!s
->proxy_con
) {
4362 std::lock_guard
l(session_map_lock
);
4369 void Monitor::_ms_dispatch(Message
*m
)
4371 if (is_shutdown()) {
4376 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
4377 bool src_is_mon
= op
->is_src_mon();
4378 op
->mark_event("mon:_ms_dispatch");
4379 MonSession
*s
= op
->get_session();
4380 if (s
&& s
->closed
) {
4384 if (src_is_mon
&& s
) {
4385 ConnectionRef con
= m
->get_connection();
4386 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
4387 // only update features if this is a non-anonymous connection
4388 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
4389 << " (was " << s
->con_features
4390 << ", now " << con
->get_features() << ")" << dendl
;
4391 // connection features changed - recreate session.
4392 if (s
->con
&& s
->con
!= con
) {
4393 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
4394 << " changed from session; mark down and replace" << dendl
;
4395 s
->con
->mark_down();
4397 if (s
->item
.is_on_list()) {
4398 // forwarded messages' sessions are not in the sessions map and
4399 // exist only while the op is being handled.
4400 std::lock_guard
l(session_map_lock
);
4408 // if the sender is not a monitor, make sure their first message for a
4409 // session is an MAuth. If it is not, assume it's a stray message,
4410 // and considering that we are creating a new session it is safe to
4411 // assume that the sender hasn't authenticated yet, so we have no way
4412 // of assessing whether we should handle it or not.
4413 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
4414 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4415 m
->get_type() != CEPH_MSG_PING
)) {
4416 dout(1) << __func__
<< " dropping stray message " << *m
4417 << " from " << m
->get_source_inst() << dendl
;
4421 ConnectionRef con
= m
->get_connection();
4423 std::lock_guard
l(session_map_lock
);
4424 s
= session_map
.new_session(m
->get_source(),
4425 m
->get_source_addrs(),
4429 con
->set_priv(RefCountedPtr
{s
, false});
4430 dout(10) << __func__
<< " new session " << s
<< " " << *s
4431 << " features 0x" << std::hex
4432 << s
->con_features
<< std::dec
<< dendl
;
4435 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4436 logger
->inc(l_mon_session_add
);
4439 // give it monitor caps; the peer type has been authenticated
4440 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4441 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4443 s
->authenticated
= true;
4446 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->name
4452 s
->session_timeout
= ceph_clock_now();
4453 s
->session_timeout
+= g_conf()->mon_session_timeout
;
4455 if (s
->auth_handler
) {
4456 s
->entity_name
= s
->auth_handler
->get_entity_name();
4457 s
->global_id
= s
->auth_handler
->get_global_id();
4458 s
->global_id_status
= s
->auth_handler
->get_global_id_status();
4460 dout(20) << " entity_name " << s
->entity_name
4461 << " global_id " << s
->global_id
4462 << " (" << s
->global_id_status
4463 << ") caps " << s
->caps
.get_str() << dendl
;
4465 if (!session_stretch_allowed(s
, op
)) {
4468 if ((is_synchronizing() ||
4469 (!s
->authenticated
&& !exited_quorum
.is_zero())) &&
4471 m
->get_type() != CEPH_MSG_PING
) {
4472 waitlist_or_zap_client(op
);
4479 void Monitor::dispatch_op(MonOpRequestRef op
)
4481 op
->mark_event("mon:dispatch_op");
4482 MonSession
*s
= op
->get_session();
4485 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4489 /* we will consider the default type as being 'monitor' until proven wrong */
4490 op
->set_type_monitor();
4491 /* deal with all messages that do not necessarily need caps */
4492 switch (op
->get_req()->get_type()) {
4494 case MSG_MON_GLOBAL_ID
:
4496 op
->set_type_service();
4497 /* no need to check caps here */
4498 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4505 op
->set_type_command();
4506 handle_tell_command(op
);
4510 if (!op
->get_session()->authenticated
) {
4511 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4512 << " is not authenticated, dropping " << *(op
->get_req())
4517 // global_id_status == NONE: all sessions for auth_none and krb,
4518 // mon <-> mon sessions (including proxied sessions) for cephx
4519 ceph_assert(s
->global_id_status
== global_id_status_t::NONE
||
4520 s
->global_id_status
== global_id_status_t::NEW_OK
||
4521 s
->global_id_status
== global_id_status_t::NEW_NOT_EXPOSED
||
4522 s
->global_id_status
== global_id_status_t::RECLAIM_OK
||
4523 s
->global_id_status
== global_id_status_t::RECLAIM_INSECURE
);
4525 // let mon_getmap through for "ping" (which doesn't reconnect)
4526 // and "tell" (which reconnects but doesn't attempt to preserve
4527 // its global_id and stays in NEW_NOT_EXPOSED, retrying until
4528 // ->send_attempts reaches 0)
4529 if (cct
->_conf
->auth_expose_insecure_global_id_reclaim
&&
4530 s
->global_id_status
== global_id_status_t::NEW_NOT_EXPOSED
&&
4531 op
->get_req()->get_type() != CEPH_MSG_MON_GET_MAP
) {
4532 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4533 << " may omit old_ticket on reconnects, discarding "
4534 << *op
->get_req() << " and forcing reconnect" << dendl
;
4535 ceph_assert(s
->con
&& !s
->proxy_con
);
4536 s
->con
->mark_down();
4538 std::lock_guard
l(session_map_lock
);
4545 switch (op
->get_req()->get_type()) {
4546 case CEPH_MSG_MON_GET_MAP
:
4547 handle_mon_get_map(op
);
4550 case MSG_GET_CONFIG
:
4551 configmon()->handle_get_config(op
);
4554 case CEPH_MSG_MON_SUBSCRIBE
:
4555 /* FIXME: check what's being subscribed, filter accordingly */
4556 handle_subscribe(op
);
4560 /* well, maybe the op belongs to a service... */
4561 op
->set_type_service();
4562 /* deal with all messages which caps should be checked somewhere else */
4563 switch (op
->get_req()->get_type()) {
4566 case CEPH_MSG_MON_GET_OSDMAP
:
4567 case CEPH_MSG_POOLOP
:
4568 case MSG_OSD_BEACON
:
4569 case MSG_OSD_MARK_ME_DOWN
:
4570 case MSG_OSD_MARK_ME_DEAD
:
4572 case MSG_OSD_FAILURE
:
4575 case MSG_OSD_PGTEMP
:
4576 case MSG_OSD_PG_CREATED
:
4577 case MSG_REMOVE_SNAPS
:
4578 case MSG_MON_GET_PURGED_SNAPS
:
4579 case MSG_OSD_PG_READY_TO_MERGE
:
4580 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4584 case MSG_MDS_BEACON
:
4585 case MSG_MDS_OFFLOAD_TARGETS
:
4586 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4590 case MSG_MGR_BEACON
:
4591 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4595 case MSG_MON_MGR_REPORT
:
4596 case CEPH_MSG_STATFS
:
4597 case MSG_GETPOOLSTATS
:
4598 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4603 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4606 // handle_command() does its own caps checking
4607 case MSG_MON_COMMAND
:
4608 op
->set_type_command();
4613 /* nop, looks like it's not a service message; revert back to monitor */
4614 op
->set_type_monitor();
4616 /* messages we, the Monitor class, need to deal with
4617 * but may be sent by clients. */
4619 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4620 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4621 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4626 switch (op
->get_req()->get_type()) {
4628 case CEPH_MSG_MON_GET_VERSION
:
4629 handle_get_version(op
);
4633 if (!op
->is_src_mon()) {
4634 dout(1) << __func__
<< " unexpected monitor message from"
4635 << " non-monitor entity " << op
->get_req()->get_source_inst()
4636 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4640 /* messages that should only be sent by another monitor */
4641 switch (op
->get_req()->get_type()) {
4651 // Sync (i.e., the new slurp, but on steroids)
4659 /* log acks are sent from a monitor we sent the MLog to, and are
4660 never sent by clients to us. */
4662 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4667 op
->set_type_service();
4668 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4674 op
->set_type_paxos();
4675 auto pm
= op
->get_req
<MMonPaxos
>();
4676 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4681 if (state
== STATE_SYNCHRONIZING
) {
4682 // we are synchronizing. These messages would do us no
4683 // good, thus just drop them and ignore them.
4684 dout(10) << __func__
<< " ignore paxos msg from "
4685 << pm
->get_source_inst() << dendl
;
4690 if (pm
->epoch
> get_epoch()) {
4694 if (pm
->epoch
!= get_epoch()) {
4698 paxos
->dispatch(op
);
4703 case MSG_MON_ELECTION
:
4704 op
->set_type_election_or_ping();
4705 //check privileges here for simplicity
4706 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4707 dout(0) << "MMonElection received from entity without enough caps!"
4708 << op
->get_session()->caps
<< dendl
;
4711 if (!is_probing() && !is_synchronizing()) {
4712 elector
.dispatch(op
);
4717 op
->set_type_election_or_ping();
4718 elector
.dispatch(op
);
4726 dout(5) << __func__
<< " ignoring " << op
<< dendl
;
4728 case MSG_TIMECHECK2
:
4729 handle_timecheck(op
);
4732 case MSG_MON_HEALTH
:
4733 dout(5) << __func__
<< " dropping deprecated message: "
4734 << *op
->get_req() << dendl
;
4736 case MSG_MON_HEALTH_CHECKS
:
4737 op
->set_type_service();
4738 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4741 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4745 void Monitor::handle_ping(MonOpRequestRef op
)
4747 auto m
= op
->get_req
<MPing
>();
4748 dout(10) << __func__
<< " " << *m
<< dendl
;
4749 MPing
*reply
= new MPing
;
4751 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4752 f
->open_object_section("pong");
4754 healthmon()->get_health_status(false, f
.get(), nullptr);
4755 get_mon_status(f
.get());
4760 encode(ss
.str(), payload
);
4761 reply
->set_payload(payload
);
4762 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4763 m
->get_connection()->send_message(reply
);
4766 void Monitor::timecheck_start()
4768 dout(10) << __func__
<< dendl
;
4769 timecheck_cleanup();
4770 if (get_quorum_mon_features().contains_all(
4771 ceph::features::mon::FEATURE_NAUTILUS
)) {
4772 timecheck_start_round();
4776 void Monitor::timecheck_finish()
4778 dout(10) << __func__
<< dendl
;
4779 timecheck_cleanup();
4782 void Monitor::timecheck_start_round()
4784 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4785 ceph_assert(is_leader());
4787 if (monmap
->size() == 1) {
4788 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
4792 if (timecheck_round
% 2) {
4793 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4794 utime_t curr_time
= ceph_clock_now();
4795 double max
= g_conf()->mon_timecheck_interval
*3;
4796 if (curr_time
- timecheck_round_start
< max
) {
4797 dout(10) << __func__
<< " keep current round going" << dendl
;
4800 dout(10) << __func__
4801 << " finish current timecheck and start new" << dendl
;
4802 timecheck_cancel_round();
4806 ceph_assert(timecheck_round
% 2 == 0);
4809 timecheck_round_start
= ceph_clock_now();
4810 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4814 dout(10) << __func__
<< " setting up next event" << dendl
;
4815 timecheck_reset_event();
4818 void Monitor::timecheck_finish_round(bool success
)
4820 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4821 ceph_assert(timecheck_round
% 2);
4823 timecheck_round_start
= utime_t();
4826 ceph_assert(timecheck_waiting
.empty());
4827 ceph_assert(timecheck_acks
== quorum
.size());
4829 timecheck_check_skews();
4833 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4834 << " peers still waiting:";
4835 for (auto& p
: timecheck_waiting
) {
4836 *_dout
<< " mon." << p
.first
;
4839 timecheck_waiting
.clear();
4841 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4844 void Monitor::timecheck_cancel_round()
4846 timecheck_finish_round(false);
4849 void Monitor::timecheck_cleanup()
4851 timecheck_round
= 0;
4853 timecheck_round_start
= utime_t();
4855 if (timecheck_event
) {
4856 timer
.cancel_event(timecheck_event
);
4857 timecheck_event
= NULL
;
4859 timecheck_waiting
.clear();
4860 timecheck_skews
.clear();
4861 timecheck_latencies
.clear();
4863 timecheck_rounds_since_clean
= 0;
4866 void Monitor::timecheck_reset_event()
4868 if (timecheck_event
) {
4869 timer
.cancel_event(timecheck_event
);
4870 timecheck_event
= NULL
;
4874 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4876 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4877 delay
= cct
->_conf
->mon_timecheck_interval
;
4880 dout(10) << __func__
<< " delay " << delay
4881 << " rounds_since_clean " << timecheck_rounds_since_clean
4884 timecheck_event
= timer
.add_event_after(
4886 new C_MonContext
{this, [this](int) {
4887 timecheck_start_round();
4891 void Monitor::timecheck_check_skews()
4893 dout(10) << __func__
<< dendl
;
4894 ceph_assert(is_leader());
4895 ceph_assert((timecheck_round
% 2) == 0);
4896 if (monmap
->size() == 1) {
4897 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4900 ceph_assert(timecheck_latencies
.size() == timecheck_skews
.size());
4902 bool found_skew
= false;
4903 for (auto& p
: timecheck_skews
) {
4905 if (timecheck_has_skew(p
.second
, &abs_skew
)) {
4906 dout(10) << __func__
4907 << " " << p
.first
<< " skew " << abs_skew
<< dendl
;
4913 ++timecheck_rounds_since_clean
;
4914 timecheck_reset_event();
4915 } else if (timecheck_rounds_since_clean
> 0) {
4917 << " no clock skews found after " << timecheck_rounds_since_clean
4918 << " rounds" << dendl
;
4919 // make sure the skews are really gone and not just a transient success
4920 // this will run just once if not in the presence of skews again.
4921 timecheck_rounds_since_clean
= 1;
4922 timecheck_reset_event();
4923 timecheck_rounds_since_clean
= 0;
4928 void Monitor::timecheck_report()
4930 dout(10) << __func__
<< dendl
;
4931 ceph_assert(is_leader());
4932 ceph_assert((timecheck_round
% 2) == 0);
4933 if (monmap
->size() == 1) {
4934 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4938 ceph_assert(timecheck_latencies
.size() == timecheck_skews
.size());
4939 bool do_output
= true; // only output report once
4940 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4941 if (monmap
->get_name(*q
) == name
)
4944 MTimeCheck2
*m
= new MTimeCheck2(MTimeCheck2::OP_REPORT
);
4945 m
->epoch
= get_epoch();
4946 m
->round
= timecheck_round
;
4948 for (auto& it
: timecheck_skews
) {
4949 double skew
= it
.second
;
4950 double latency
= timecheck_latencies
[it
.first
];
4952 m
->skews
[it
.first
] = skew
;
4953 m
->latencies
[it
.first
] = latency
;
4956 dout(25) << __func__
<< " mon." << it
.first
4957 << " latency " << latency
4958 << " skew " << skew
<< dendl
;
4962 dout(10) << __func__
<< " send report to mon." << *q
<< dendl
;
4963 send_mon_message(m
, *q
);
4967 void Monitor::timecheck()
4969 dout(10) << __func__
<< dendl
;
4970 ceph_assert(is_leader());
4971 if (monmap
->size() == 1) {
4972 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4975 ceph_assert(timecheck_round
% 2 != 0);
4977 timecheck_acks
= 1; // we ack ourselves
4979 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4980 << " round " << timecheck_round
<< dendl
;
4982 // we are at the eye of the storm; the point of reference
4983 timecheck_skews
[rank
] = 0.0;
4984 timecheck_latencies
[rank
] = 0.0;
4986 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4987 if (monmap
->get_name(*it
) == name
)
4990 utime_t curr_time
= ceph_clock_now();
4991 timecheck_waiting
[*it
] = curr_time
;
4992 MTimeCheck2
*m
= new MTimeCheck2(MTimeCheck2::OP_PING
);
4993 m
->epoch
= get_epoch();
4994 m
->round
= timecheck_round
;
4995 dout(10) << __func__
<< " send " << *m
<< " to mon." << *it
<< dendl
;
4996 send_mon_message(m
, *it
);
5000 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
5001 const double skew_bound
,
5002 const double latency
)
5004 health_status_t status
= HEALTH_OK
;
5005 ceph_assert(latency
>= 0);
5008 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
5009 status
= HEALTH_WARN
;
5010 ss
<< "clock skew " << abs_skew
<< "s"
5011 << " > max " << g_conf()->mon_clock_drift_allowed
<< "s";
5017 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
5019 auto m
= op
->get_req
<MTimeCheck2
>();
5020 dout(10) << __func__
<< " " << *m
<< dendl
;
5021 /* handles PONG's */
5022 ceph_assert(m
->op
== MTimeCheck2::OP_PONG
);
5024 int other
= m
->get_source().num();
5025 if (m
->epoch
< get_epoch()) {
5026 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
5027 << " from " << other
5028 << " curr " << get_epoch()
5029 << " -- severely lagged? discard" << dendl
;
5032 ceph_assert(m
->epoch
== get_epoch());
5034 if (m
->round
< timecheck_round
) {
5035 dout(1) << __func__
<< " got old round " << m
->round
5036 << " from " << other
5037 << " curr " << timecheck_round
<< " -- discard" << dendl
;
5041 utime_t curr_time
= ceph_clock_now();
5043 ceph_assert(timecheck_waiting
.count(other
) > 0);
5044 utime_t timecheck_sent
= timecheck_waiting
[other
];
5045 timecheck_waiting
.erase(other
);
5046 if (curr_time
< timecheck_sent
) {
5047 // our clock was readjusted -- drop everything until it all makes sense.
5048 dout(1) << __func__
<< " our clock was readjusted --"
5049 << " bump round and drop current check"
5051 timecheck_cancel_round();
5055 /* update peer latencies */
5056 double latency
= (double)(curr_time
- timecheck_sent
);
5058 if (timecheck_latencies
.count(other
) == 0)
5059 timecheck_latencies
[other
] = latency
;
5061 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
5062 timecheck_latencies
[other
] = avg_latency
;
5068 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
5069 * and 'a' happens to be lower than 'b'; so we use double instead.
5071 * latency is always expected to be >= 0.
5073 * delta, the difference between theirs timestamp and ours, may either be
5074 * lower or higher than 0; will hardly ever be 0.
5076 * The absolute skew is the absolute delta minus the latency, which is
5077 * taken as a whole instead of an rtt given that there is some queueing
5078 * and dispatch times involved and it's hard to assess how long exactly
5079 * it took for the message to travel to the other side and be handled. So
5080 * we call it a bounded skew, the worst case scenario.
5084 * Given that the latency is always positive, we can establish that the
5085 * bounded skew will be:
5087 * 1. positive if the absolute delta is higher than the latency and
5089 * 2. negative if the absolute delta is higher than the latency and
5090 * delta is negative.
5091 * 3. zero if the absolute delta is lower than the latency.
5093 * On 3. we make a judgement call and treat the skew as non-existent.
5094 * This is because that, if the absolute delta is lower than the
5095 * latency, then the apparently existing skew is nothing more than a
5096 * side-effect of the high latency at work.
5098 * This may not be entirely true though, as a severely skewed clock
5099 * may be masked by an even higher latency, but with high latencies
5100 * we probably have worse issues to deal with than just skewed clocks.
5102 ceph_assert(latency
>= 0);
5104 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
5105 double abs_delta
= (delta
> 0 ? delta
: -delta
);
5106 double skew_bound
= abs_delta
- latency
;
5110 skew_bound
= -skew_bound
;
5113 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
5114 if (status
!= HEALTH_OK
) {
5115 clog
->health(status
) << other
<< " " << ss
.str();
5118 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
5119 << " delta " << delta
<< " skew_bound " << skew_bound
5120 << " latency " << latency
<< dendl
;
5122 timecheck_skews
[other
] = skew_bound
;
5125 if (timecheck_acks
== quorum
.size()) {
5126 dout(10) << __func__
<< " got pongs from everybody ("
5127 << timecheck_acks
<< " total)" << dendl
;
5128 ceph_assert(timecheck_skews
.size() == timecheck_acks
);
5129 ceph_assert(timecheck_waiting
.empty());
5130 // everyone has acked, so bump the round to finish it.
5131 timecheck_finish_round();
5135 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
5137 auto m
= op
->get_req
<MTimeCheck2
>();
5138 dout(10) << __func__
<< " " << *m
<< dendl
;
5140 ceph_assert(is_peon());
5141 ceph_assert(m
->op
== MTimeCheck2::OP_PING
|| m
->op
== MTimeCheck2::OP_REPORT
);
5143 if (m
->epoch
!= get_epoch()) {
5144 dout(1) << __func__
<< " got wrong epoch "
5145 << "(ours " << get_epoch()
5146 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
5150 if (m
->round
< timecheck_round
) {
5151 dout(1) << __func__
<< " got old round " << m
->round
5152 << " current " << timecheck_round
5153 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
5157 timecheck_round
= m
->round
;
5159 if (m
->op
== MTimeCheck2::OP_REPORT
) {
5160 ceph_assert((timecheck_round
% 2) == 0);
5161 timecheck_latencies
.swap(m
->latencies
);
5162 timecheck_skews
.swap(m
->skews
);
5166 ceph_assert((timecheck_round
% 2) != 0);
5167 MTimeCheck2
*reply
= new MTimeCheck2(MTimeCheck2::OP_PONG
);
5168 utime_t curr_time
= ceph_clock_now();
5169 reply
->timestamp
= curr_time
;
5170 reply
->epoch
= m
->epoch
;
5171 reply
->round
= m
->round
;
5172 dout(10) << __func__
<< " send " << *m
5173 << " to " << m
->get_source_inst() << dendl
;
5174 m
->get_connection()->send_message(reply
);
5177 void Monitor::handle_timecheck(MonOpRequestRef op
)
5179 auto m
= op
->get_req
<MTimeCheck2
>();
5180 dout(10) << __func__
<< " " << *m
<< dendl
;
5183 if (m
->op
!= MTimeCheck2::OP_PONG
) {
5184 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
5186 handle_timecheck_leader(op
);
5188 } else if (is_peon()) {
5189 if (m
->op
!= MTimeCheck2::OP_PING
&& m
->op
!= MTimeCheck2::OP_REPORT
) {
5190 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
5192 handle_timecheck_peon(op
);
5195 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
5199 void Monitor::handle_subscribe(MonOpRequestRef op
)
5201 auto m
= op
->get_req
<MMonSubscribe
>();
5202 dout(10) << "handle_subscribe " << *m
<< dendl
;
5206 MonSession
*s
= op
->get_session();
5209 if (m
->hostname
.size()) {
5210 s
->remote_host
= m
->hostname
;
5213 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
5216 if (p
->first
== "monmap" || p
->first
== "config") {
5217 // these require no caps
5218 } else if (!s
->is_capable("mon", MON_CAP_R
)) {
5219 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
5220 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
5225 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5226 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
5229 // remove conflicting subscribes
5230 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
5231 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
5232 it
!= s
->sub_map
.end(); ) {
5233 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
5234 std::lock_guard
l(session_map_lock
);
5235 session_map
.remove_sub((it
++)->second
);
5243 std::lock_guard
l(session_map_lock
);
5244 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
5245 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
5246 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
5249 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
5250 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
5251 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
5252 Subscription
*sub
= s
->sub_map
[p
->first
];
5253 ceph_assert(sub
!= nullptr);
5254 mdsmon()->check_sub(sub
);
5256 } else if (p
->first
== "osdmap") {
5257 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
5258 if (s
->osd_epoch
> p
->second
.start
) {
5259 // client needs earlier osdmaps on purpose, so reset the sent epoch
5262 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
5264 } else if (p
->first
== "osd_pg_creates") {
5265 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
5266 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
5268 } else if (p
->first
== "monmap") {
5269 monmon()->check_sub(s
->sub_map
[p
->first
]);
5270 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
5271 logmon()->check_sub(s
->sub_map
[p
->first
]);
5272 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
5273 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
5274 } else if (p
->first
== "servicemap") {
5275 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
5276 } else if (p
->first
== "config") {
5277 configmon()->check_sub(s
);
5278 } else if (p
->first
.find("kv:") == 0) {
5279 kvmon()->check_sub(s
->sub_map
[p
->first
]);
5284 // we only need to reply if the client is old enough to think it
5285 // has to send renewals.
5286 ConnectionRef con
= m
->get_connection();
5287 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
5288 m
->get_connection()->send_message(new MMonSubscribeAck(
5289 monmap
->get_fsid(), (int)g_conf()->mon_subscribe_interval
));
5294 void Monitor::handle_get_version(MonOpRequestRef op
)
5296 auto m
= op
->get_req
<MMonGetVersion
>();
5297 dout(10) << "handle_get_version " << *m
<< dendl
;
5298 PaxosService
*svc
= NULL
;
5300 MonSession
*s
= op
->get_session();
5303 if (!is_leader() && !is_peon()) {
5304 dout(10) << " waiting for quorum" << dendl
;
5305 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
5309 if (m
->what
== "mdsmap") {
5311 } else if (m
->what
== "fsmap") {
5313 } else if (m
->what
== "osdmap") {
5315 } else if (m
->what
== "monmap") {
5318 derr
<< "invalid map type " << m
->what
<< dendl
;
5322 if (!svc
->is_readable()) {
5323 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
5327 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
5328 reply
->handle
= m
->handle
;
5329 reply
->version
= svc
->get_last_committed();
5330 reply
->oldest_version
= svc
->get_first_committed();
5331 reply
->set_tid(m
->get_tid());
5333 m
->get_connection()->send_message(reply
);
5339 bool Monitor::ms_handle_reset(Connection
*con
)
5341 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
5343 // ignore lossless monitor sessions
5344 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
5347 auto priv
= con
->get_priv();
5348 auto s
= static_cast<MonSession
*>(priv
.get());
5352 // break any con <-> session ref cycle
5353 s
->con
->set_priv(nullptr);
5358 std::lock_guard
l(lock
);
5360 dout(10) << "reset/close on session " << s
->name
<< " " << s
->addrs
<< dendl
;
5361 if (!s
->closed
&& s
->item
.is_on_list()) {
5362 std::lock_guard
l(session_map_lock
);
5368 bool Monitor::ms_handle_refused(Connection
*con
)
5370 // just log for now...
5371 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
5377 void Monitor::send_latest_monmap(Connection
*con
)
5380 monmap
->encode(bl
, con
->get_features());
5381 con
->send_message(new MMonMap(bl
));
5384 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
5386 auto m
= op
->get_req
<MMonGetMap
>();
5387 dout(10) << "handle_mon_get_map" << dendl
;
5388 send_latest_monmap(m
->get_connection().get());
5391 int Monitor::load_metadata()
5394 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5397 auto it
= bl
.cbegin();
5398 decode(mon_metadata
, it
);
5400 pending_metadata
= mon_metadata
;
5404 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
5407 if (!mon_metadata
.count(mon
)) {
5408 err
<< "mon." << mon
<< " not found";
5411 const Metadata
& m
= mon_metadata
[mon
];
5412 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
5413 f
->dump_string(p
->first
.c_str(), p
->second
);
5418 void Monitor::count_metadata(const string
& field
, map
<string
,int> *out
)
5420 for (auto& p
: mon_metadata
) {
5421 auto q
= p
.second
.find(field
);
5422 if (q
== p
.second
.end()) {
5423 (*out
)["unknown"]++;
5425 (*out
)[q
->second
]++;
5430 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5432 map
<string
,int> by_val
;
5433 count_metadata(field
, &by_val
);
5434 f
->open_object_section(field
.c_str());
5435 for (auto& p
: by_val
) {
5436 f
->dump_int(p
.first
.c_str(), p
.second
);
5441 void Monitor::get_all_versions(std::map
<string
, list
<string
> > &versions
)
5444 get_versions(versions
);
5446 osdmon()->get_versions(versions
);
5448 mgrmon()->get_versions(versions
);
5450 mdsmon()->get_versions(versions
);
5451 dout(20) << __func__
<< " all versions=" << versions
<< dendl
;
5454 void Monitor::get_versions(std::map
<string
, list
<string
> > &versions
)
5456 for (auto& [rank
, metadata
] : mon_metadata
) {
5457 auto q
= metadata
.find("ceph_version_short");
5458 if (q
== metadata
.end()) {
5462 versions
[q
->second
].push_back(string("mon.") + monmap
->get_name(rank
));
5466 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5468 map
<string
, list
<string
> > mons
; // hostname => mon
5469 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5470 it
!= mon_metadata
.end(); ++it
) {
5471 const Metadata
& m
= it
->second
;
5472 Metadata::const_iterator hostname
= m
.find("hostname");
5473 if (hostname
== m
.end()) {
5474 // not likely though
5477 mons
[hostname
->second
].push_back(monmap
->get_name(it
->first
));
5480 dump_services(f
, mons
, "mon");
5484 // ----------------------------------------------
5487 int Monitor::scrub_start()
5489 dout(10) << __func__
<< dendl
;
5490 ceph_assert(is_leader());
5492 if (!scrub_result
.empty()) {
5493 clog
->info() << "scrub already in progress";
5497 scrub_event_cancel();
5498 scrub_result
.clear();
5499 scrub_state
.reset(new ScrubState
);
5505 int Monitor::scrub()
5507 ceph_assert(is_leader());
5508 ceph_assert(scrub_state
);
5510 scrub_cancel_timeout();
5511 wait_for_paxos_write();
5512 scrub_version
= paxos
->get_version();
5515 // scrub all keys if we're the only monitor in the quorum
5517 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5519 for (set
<int>::iterator p
= quorum
.begin();
5524 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5526 r
->key
= scrub_state
->last_key
;
5527 send_mon_message(r
, *p
);
5531 bool r
= _scrub(&scrub_result
[rank
],
5532 &scrub_state
->last_key
,
5535 scrub_state
->finished
= !r
;
5537 // only after we got our scrub results do we really care whether the
5538 // other monitors are late on their results. Also, this way we avoid
5539 // triggering the timeout if we end up getting stuck in _scrub() for
5540 // longer than the duration of the timeout.
5541 scrub_reset_timeout();
5543 if (quorum
.size() == 1) {
5544 ceph_assert(scrub_state
->finished
== true);
5550 void Monitor::handle_scrub(MonOpRequestRef op
)
5552 auto m
= op
->get_req
<MMonScrub
>();
5553 dout(10) << __func__
<< " " << *m
<< dendl
;
5555 case MMonScrub::OP_SCRUB
:
5560 wait_for_paxos_write();
5562 if (m
->version
!= paxos
->get_version())
5565 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5569 reply
->key
= m
->key
;
5570 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5571 m
->get_connection()->send_message(reply
);
5575 case MMonScrub::OP_RESULT
:
5579 if (m
->version
!= scrub_version
)
5581 // reset the timeout each time we get a result
5582 scrub_reset_timeout();
5584 int from
= m
->get_source().num();
5585 ceph_assert(scrub_result
.count(from
) == 0);
5586 scrub_result
[from
] = m
->result
;
5588 if (scrub_result
.size() == quorum
.size()) {
5589 scrub_check_results();
5590 scrub_result
.clear();
5591 if (scrub_state
->finished
)
5601 bool Monitor::_scrub(ScrubResult
*r
,
5602 pair
<string
,string
> *start
,
5605 ceph_assert(r
!= NULL
);
5606 ceph_assert(start
!= NULL
);
5607 ceph_assert(num_keys
!= NULL
);
5609 set
<string
> prefixes
= get_sync_targets_names();
5610 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5612 dout(10) << __func__
<< " start (" << *start
<< ")"
5613 << " num_keys " << *num_keys
<< dendl
;
5615 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5617 int scrubbed_keys
= 0;
5618 pair
<string
,string
> last_key
;
5620 while (it
->has_next_chunk()) {
5622 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5625 pair
<string
,string
> k
= it
->get_next_key();
5626 if (prefixes
.count(k
.first
) == 0)
5629 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5630 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5631 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5637 int err
= store
->get(k
.first
, k
.second
, bl
);
5638 ceph_assert(err
== 0);
5640 uint32_t key_crc
= bl
.crc32c(0);
5641 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5642 << " crc " << key_crc
<< dendl
;
5643 r
->prefix_keys
[k
.first
]++;
5644 if (r
->prefix_crc
.count(k
.first
) == 0) {
5645 r
->prefix_crc
[k
.first
] = 0;
5647 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5649 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5650 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5651 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5652 r
->prefix_crc
[k
.first
] += 1;
5659 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5660 << " scrubbed_keys " << scrubbed_keys
5661 << " has_next " << it
->has_next_chunk() << dendl
;
5664 *num_keys
= scrubbed_keys
;
5666 return it
->has_next_chunk();
5669 void Monitor::scrub_check_results()
5671 dout(10) << __func__
<< dendl
;
5675 ScrubResult
& mine
= scrub_result
[rank
];
5676 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5677 p
!= scrub_result
.end();
5679 if (p
->first
== rank
)
5681 if (p
->second
!= mine
) {
5683 clog
->error() << "scrub mismatch";
5684 clog
->error() << " mon." << rank
<< " " << mine
;
5685 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5689 clog
->debug() << "scrub ok on " << quorum
<< ": " << mine
;
5692 inline void Monitor::scrub_timeout()
5694 dout(1) << __func__
<< " restarting scrub" << dendl
;
5699 void Monitor::scrub_finish()
5701 dout(10) << __func__
<< dendl
;
5703 scrub_event_start();
5706 void Monitor::scrub_reset()
5708 dout(10) << __func__
<< dendl
;
5709 scrub_cancel_timeout();
5711 scrub_result
.clear();
5712 scrub_state
.reset();
5715 inline void Monitor::scrub_update_interval(ceph::timespan interval
)
5717 // we don't care about changes if we are not the leader.
5718 // changes will be visible if we become the leader.
5722 dout(1) << __func__
<< " new interval = " << interval
<< dendl
;
5724 // if scrub already in progress, all changes will already be visible during
5725 // the next round. Nothing to do.
5726 if (scrub_state
!= NULL
)
5729 scrub_event_cancel();
5730 scrub_event_start();
5733 void Monitor::scrub_event_start()
5735 dout(10) << __func__
<< dendl
;
5738 scrub_event_cancel();
5740 auto scrub_interval
=
5741 cct
->_conf
.get_val
<std::chrono::seconds
>("mon_scrub_interval");
5742 if (scrub_interval
== std::chrono::seconds::zero()) {
5743 dout(1) << __func__
<< " scrub event is disabled"
5744 << " (mon_scrub_interval = " << scrub_interval
5749 scrub_event
= timer
.add_event_after(
5751 new C_MonContext
{this, [this](int) {
5756 void Monitor::scrub_event_cancel()
5758 dout(10) << __func__
<< dendl
;
5760 timer
.cancel_event(scrub_event
);
5765 inline void Monitor::scrub_cancel_timeout()
5767 if (scrub_timeout_event
) {
5768 timer
.cancel_event(scrub_timeout_event
);
5769 scrub_timeout_event
= NULL
;
5773 void Monitor::scrub_reset_timeout()
5775 dout(15) << __func__
<< " reset timeout event" << dendl
;
5776 scrub_cancel_timeout();
5777 scrub_timeout_event
= timer
.add_event_after(
5778 g_conf()->mon_scrub_timeout
,
5779 new C_MonContext
{this, [this](int) {
5784 /************ TICK ***************/
5785 void Monitor::new_tick()
5787 timer
.add_event_after(g_conf()->mon_tick_interval
, new C_MonContext
{this, [this](int) {
5792 void Monitor::tick()
5795 dout(11) << "tick" << dendl
;
5796 const utime_t now
= ceph_clock_now();
5798 // Check if we need to emit any delayed health check updated messages
5800 const auto min_period
= g_conf().get_val
<int64_t>(
5801 "mon_health_log_update_period");
5802 for (auto& svc
: paxos_service
) {
5803 auto health
= svc
->get_health_checks();
5805 for (const auto &i
: health
.checks
) {
5806 const std::string
&code
= i
.first
;
5807 const std::string
&summary
= i
.second
.summary
;
5808 const health_status_t severity
= i
.second
.severity
;
5810 auto status_iter
= health_check_log_times
.find(code
);
5811 if (status_iter
== health_check_log_times
.end()) {
5815 auto &log_status
= status_iter
->second
;
5816 bool const changed
= log_status
.last_message
!= summary
5817 || log_status
.severity
!= severity
;
5819 if (changed
&& now
- log_status
.updated_at
> min_period
) {
5820 log_status
.last_message
= summary
;
5821 log_status
.updated_at
= now
;
5822 log_status
.severity
= severity
;
5825 ss
<< "Health check update: " << summary
<< " (" << code
<< ")";
5826 clog
->health(severity
) << ss
.str();
5833 for (auto& svc
: paxos_service
) {
5840 std::lock_guard
l(session_map_lock
);
5841 auto p
= session_map
.sessions
.begin();
5843 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5844 now
> (exited_quorum
+ 2*g_conf()->mon_lease
));
5850 // don't trim monitors
5851 if (s
->name
.is_mon())
5854 if (s
->session_timeout
< now
&& s
->con
) {
5855 // check keepalive, too
5856 s
->session_timeout
= s
->con
->get_last_keepalive();
5857 s
->session_timeout
+= g_conf()->mon_session_timeout
;
5859 if (s
->session_timeout
< now
) {
5860 dout(10) << " trimming session " << s
->con
<< " " << s
->name
5862 << " (timeout " << s
->session_timeout
5863 << " < now " << now
<< ")" << dendl
;
5864 } else if (out_for_too_long
) {
5865 // boot the client Session because we've taken too long getting back in
5866 dout(10) << " trimming session " << s
->con
<< " " << s
->name
5867 << " because we've been out of quorum too long" << dendl
;
5872 s
->con
->mark_down();
5874 logger
->inc(l_mon_session_trim
);
5877 sync_trim_providers();
5879 if (!maybe_wait_for_quorum
.empty()) {
5880 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5883 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5884 // this is only necessary on upgraded clusters.
5885 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5886 prepare_new_fingerprint(t
);
5887 paxos
->trigger_propose();
5890 mgr_client
.update_daemon_health(get_health_metrics());
5894 vector
<DaemonHealthMetric
> Monitor::get_health_metrics()
5896 vector
<DaemonHealthMetric
> metrics
;
5898 utime_t oldest_secs
;
5899 const utime_t now
= ceph_clock_now();
5901 too_old
-= g_conf().get_val
<std::chrono::seconds
>("mon_op_complaint_time").count();
5903 TrackedOpRef oldest_op
;
5904 auto count_slow_ops
= [&](TrackedOp
& op
) {
5905 if (op
.get_initiated() < too_old
) {
5907 if (!oldest_op
|| op
.get_initiated() < oldest_op
->get_initiated()) {
5915 if (op_tracker
.visit_ops_in_flight(&oldest_secs
, count_slow_ops
)) {
5917 derr
<< __func__
<< " reporting " << slow
<< " slow ops, oldest is "
5918 << oldest_op
->get_desc() << dendl
;
5920 metrics
.emplace_back(daemon_metric::SLOW_OPS
, slow
, oldest_secs
);
5922 metrics
.emplace_back(daemon_metric::SLOW_OPS
, 0, 0);
5927 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5930 nf
.generate_random();
5931 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5935 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5938 int Monitor::check_fsid()
5941 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5944 ceph_assert(r
== 0);
5946 string
es(ebl
.c_str(), ebl
.length());
5948 // only keep the first line
5949 size_t pos
= es
.find_first_of('\n');
5950 if (pos
!= string::npos
)
5953 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5955 if (!ondisk
.parse(es
.c_str())) {
5956 derr
<< "error: unable to parse uuid" << dendl
;
5960 if (monmap
->get_fsid() != ondisk
) {
5961 derr
<< "error: cluster_uuid file exists with value " << ondisk
5962 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5969 int Monitor::write_fsid()
5971 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5973 int r
= store
->apply_transaction(t
);
5977 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5980 ss
<< monmap
->get_fsid() << "\n";
5981 string us
= ss
.str();
5986 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5991 * this is the closest thing to a traditional 'mkfs' for ceph.
5992 * initialize the monitor state machines to their initial values.
5994 int Monitor::mkfs(bufferlist
& osdmapbl
)
5996 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5998 // verify cluster fsid
5999 int r
= check_fsid();
6000 if (r
< 0 && r
!= -ENOENT
)
6004 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
6005 magicbl
.append("\n");
6006 t
->put(MONITOR_NAME
, "magic", magicbl
);
6009 features
= get_initial_supported_features();
6012 // save monmap, osdmap, keyring.
6013 bufferlist monmapbl
;
6014 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
6015 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
6016 t
->put("mkfs", "monmap", monmapbl
);
6018 if (osdmapbl
.length()) {
6019 // make sure it's a valid osdmap
6022 om
.decode(osdmapbl
);
6024 catch (ceph::buffer::error
& e
) {
6025 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
6028 t
->put("mkfs", "osdmap", osdmapbl
);
6031 if (is_keyring_required()) {
6033 string keyring_filename
;
6035 r
= ceph_resolve_file_search(g_conf()->keyring
, keyring_filename
);
6037 if (g_conf()->key
!= "") {
6038 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf()->key
+
6039 "\n\tcaps mon = \"allow *\"\n";
6041 bl
.append(keyring_plaintext
);
6043 auto i
= bl
.cbegin();
6046 catch (const ceph::buffer::error
& e
) {
6047 derr
<< "error decoding keyring " << keyring_plaintext
6048 << ": " << e
.what() << dendl
;
6052 derr
<< "unable to find a keyring on " << g_conf()->keyring
6053 << ": " << cpp_strerror(r
) << dendl
;
6057 r
= keyring
.load(g_ceph_context
, keyring_filename
);
6059 derr
<< "unable to load initial keyring " << g_conf()->keyring
<< dendl
;
6064 // put mon. key in external keyring; seed with everything else.
6065 extract_save_mon_key(keyring
);
6067 bufferlist keyringbl
;
6068 keyring
.encode_plaintext(keyringbl
);
6069 t
->put("mkfs", "keyring", keyringbl
);
6072 store
->apply_transaction(t
);
6077 int Monitor::write_default_keyring(bufferlist
& bl
)
6080 os
<< g_conf()->mon_data
<< "/keyring";
6083 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
|O_CLOEXEC
, 0600);
6086 dout(0) << __func__
<< " failed to open " << os
.str()
6087 << ": " << cpp_strerror(err
) << dendl
;
6091 err
= bl
.write_fd(fd
);
6094 VOID_TEMP_FAILURE_RETRY(::close(fd
));
6099 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
6101 EntityName mon_name
;
6102 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
6104 if (keyring
.get_auth(mon_name
, mon_key
)) {
6105 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
6107 pkey
.add(mon_name
, mon_key
);
6109 pkey
.encode_plaintext(bl
);
6110 write_default_keyring(bl
);
6111 keyring
.remove(mon_name
);
6115 // AuthClient methods -- for mon <-> mon communication
6116 int Monitor::get_auth_request(
6118 AuthConnectionMeta
*auth_meta
,
6120 vector
<uint32_t> *preferred_modes
,
6123 std::scoped_lock
l(auth_lock
);
6124 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
&&
6125 con
->get_peer_type() != CEPH_ENTITY_TYPE_MGR
) {
6128 AuthAuthorizer
*auth
;
6129 if (!get_authorizer(con
->get_peer_type(), &auth
)) {
6132 auth_meta
->authorizer
.reset(auth
);
6133 auth_registry
.get_supported_modes(con
->get_peer_type(),
6136 *method
= auth
->protocol
;
6141 int Monitor::handle_auth_reply_more(
6143 AuthConnectionMeta
*auth_meta
,
6144 const bufferlist
& bl
,
6147 std::scoped_lock
l(auth_lock
);
6148 if (!auth_meta
->authorizer
) {
6149 derr
<< __func__
<< " no authorizer?" << dendl
;
6152 auth_meta
->authorizer
->add_challenge(cct
, bl
);
6153 *reply
= auth_meta
->authorizer
->bl
;
6157 int Monitor::handle_auth_done(
6159 AuthConnectionMeta
*auth_meta
,
6162 const bufferlist
& bl
,
6163 CryptoKey
*session_key
,
6164 std::string
*connection_secret
)
6166 std::scoped_lock
l(auth_lock
);
6167 // verify authorizer reply
6168 auto p
= bl
.begin();
6169 if (!auth_meta
->authorizer
->verify_reply(p
, connection_secret
)) {
6170 dout(0) << __func__
<< " failed verifying authorizer reply" << dendl
;
6173 auth_meta
->session_key
= auth_meta
->authorizer
->session_key
;
6177 int Monitor::handle_auth_bad_method(
6179 AuthConnectionMeta
*auth_meta
,
6180 uint32_t old_auth_method
,
6182 const std::vector
<uint32_t>& allowed_methods
,
6183 const std::vector
<uint32_t>& allowed_modes
)
6185 derr
<< __func__
<< " hmm, they didn't like " << old_auth_method
6186 << " result " << cpp_strerror(result
) << dendl
;
6190 bool Monitor::get_authorizer(int service_id
, AuthAuthorizer
**authorizer
)
6192 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id
)
6198 // we only connect to other monitors and mgr; every else connects to us.
6199 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
6200 service_id
!= CEPH_ENTITY_TYPE_MGR
)
6203 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
6205 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
6206 AuthNoneClientHandler handler
{g_ceph_context
};
6207 handler
.set_global_id(0);
6208 *authorizer
= handler
.build_authorizer(service_id
);
6212 CephXServiceTicketInfo auth_ticket_info
;
6213 CephXSessionAuthInfo info
;
6217 name
.set_type(CEPH_ENTITY_TYPE_MON
);
6218 auth_ticket_info
.ticket
.name
= name
;
6219 auth_ticket_info
.ticket
.global_id
= 0;
6221 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
6222 // mon to mon authentication uses the private monitor shared key and not the
6225 if (!keyring
.get_secret(name
, secret
) &&
6226 !key_server
.get_secret(name
, secret
)) {
6227 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6229 stringstream ss
, ds
;
6230 int err
= key_server
.list_secrets(ds
);
6232 ss
<< "no installed auth entries!";
6234 ss
<< "installed auth entries:";
6235 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
6239 ret
= key_server
.build_session_auth_info(
6240 service_id
, auth_ticket_info
.ticket
, secret
, (uint64_t)-1, info
);
6242 dout(0) << __func__
<< " failed to build mon session_auth_info "
6243 << cpp_strerror(ret
) << dendl
;
6246 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
6248 ret
= key_server
.build_session_auth_info(
6249 service_id
, auth_ticket_info
.ticket
, info
);
6251 derr
<< __func__
<< " failed to build mgr service session_auth_info "
6252 << cpp_strerror(ret
) << dendl
;
6256 ceph_abort(); // see check at top of fn
6259 CephXTicketBlob blob
;
6260 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
6261 dout(0) << "get_authorizer failed to build service ticket" << dendl
;
6264 bufferlist ticket_data
;
6265 encode(blob
, ticket_data
);
6267 auto iter
= ticket_data
.cbegin();
6268 CephXTicketHandler
handler(g_ceph_context
, service_id
);
6269 decode(handler
.ticket
, iter
);
6271 handler
.session_key
= info
.session_key
;
6273 *authorizer
= handler
.build_authorizer(0);
6278 int Monitor::handle_auth_request(
6280 AuthConnectionMeta
*auth_meta
,
6282 uint32_t auth_method
,
6283 const bufferlist
&payload
,
6286 std::scoped_lock
l(auth_lock
);
6288 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6289 // e.g., peer_features, peer_addrs, and others are still unknown.
6291 dout(10) << __func__
<< " con " << con
<< (more
? " (more)":" (start)")
6292 << " method " << auth_method
6293 << " payload " << payload
.length()
6295 if (!payload
.length()) {
6296 if (!con
->is_msgr2() &&
6297 con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
) {
6298 // for v1 connections, we tolerate no authorizer (from
6299 // non-monitors), because authentication happens via MAuth
6306 auth_meta
->auth_mode
= payload
[0];
6309 if (auth_meta
->auth_mode
>= AUTH_MODE_AUTHORIZER
&&
6310 auth_meta
->auth_mode
<= AUTH_MODE_AUTHORIZER_MAX
) {
6311 AuthAuthorizeHandler
*ah
= get_auth_authorize_handler(con
->get_peer_type(),
6314 lderr(cct
) << __func__
<< " no AuthAuthorizeHandler found for auth method "
6315 << auth_method
<< dendl
;
6318 bool was_challenge
= (bool)auth_meta
->authorizer_challenge
;
6319 bool isvalid
= ah
->verify_authorizer(
6323 auth_meta
->get_connection_secret_length(),
6326 &con
->peer_global_id
,
6327 &con
->peer_caps_info
,
6328 &auth_meta
->session_key
,
6329 &auth_meta
->connection_secret
,
6330 &auth_meta
->authorizer_challenge
);
6332 ms_handle_authentication(con
);
6335 if (!more
&& !was_challenge
&& auth_meta
->authorizer_challenge
) {
6338 dout(10) << __func__
<< " bad authorizer on " << con
<< dendl
;
6340 } else if (auth_meta
->auth_mode
< AUTH_MODE_MON
||
6341 auth_meta
->auth_mode
> AUTH_MODE_MON_MAX
) {
6342 derr
<< __func__
<< " unrecognized auth mode " << auth_meta
->auth_mode
6347 // wait until we've formed an initial quorum on mkfs so that we have
6348 // the initial keys (e.g., client.admin).
6349 if (authmon()->get_last_committed() == 0) {
6350 dout(10) << __func__
<< " haven't formed initial quorum, EBUSY" << dendl
;
6357 auto p
= payload
.begin();
6359 if (con
->get_priv()) {
6360 return -EACCES
; // wtf
6364 unique_ptr
<AuthServiceHandler
> auth_handler
{get_auth_service_handler(
6365 auth_method
, g_ceph_context
, &key_server
)};
6366 if (!auth_handler
) {
6367 dout(1) << __func__
<< " auth_method " << auth_method
<< " not supported"
6373 EntityName entity_name
;
6377 if (mode
< AUTH_MODE_MON
||
6378 mode
> AUTH_MODE_MON_MAX
) {
6379 dout(1) << __func__
<< " invalid mode " << (int)mode
<< dendl
;
6382 assert(mode
>= AUTH_MODE_MON
&& mode
<= AUTH_MODE_MON_MAX
);
6383 decode(entity_name
, p
);
6384 decode(con
->peer_global_id
, p
);
6385 } catch (ceph::buffer::error
& e
) {
6386 dout(1) << __func__
<< " failed to decode, " << e
.what() << dendl
;
6390 // supported method?
6391 if (entity_name
.get_type() == CEPH_ENTITY_TYPE_MON
||
6392 entity_name
.get_type() == CEPH_ENTITY_TYPE_OSD
||
6393 entity_name
.get_type() == CEPH_ENTITY_TYPE_MDS
||
6394 entity_name
.get_type() == CEPH_ENTITY_TYPE_MGR
) {
6395 if (!auth_cluster_required
.is_supported_auth(auth_method
)) {
6396 dout(10) << __func__
<< " entity " << entity_name
<< " method "
6397 << auth_method
<< " not among supported "
6398 << auth_cluster_required
.get_supported_set() << dendl
;
6402 if (!auth_service_required
.is_supported_auth(auth_method
)) {
6403 dout(10) << __func__
<< " entity " << entity_name
<< " method "
6404 << auth_method
<< " not among supported "
6405 << auth_cluster_required
.get_supported_set() << dendl
;
6410 // for msgr1 we would do some weirdness here to ensure signatures
6411 // are supported by the client if we require it. for msgr2 that
6412 // is not necessary.
6414 bool is_new_global_id
= false;
6415 if (!con
->peer_global_id
) {
6416 con
->peer_global_id
= authmon()->_assign_global_id();
6417 if (!con
->peer_global_id
) {
6418 dout(1) << __func__
<< " failed to assign global_id" << dendl
;
6421 is_new_global_id
= true;
6424 // set up partial session
6425 s
= new MonSession(con
);
6426 s
->auth_handler
= auth_handler
.release();
6427 con
->set_priv(RefCountedPtr
{s
, false});
6429 r
= s
->auth_handler
->start_session(
6431 con
->peer_global_id
,
6434 &con
->peer_caps_info
);
6436 priv
= con
->get_priv();
6438 // this can happen if the async ms_handle_reset event races with
6439 // the unlocked call into handle_auth_request
6442 s
= static_cast<MonSession
*>(priv
.get());
6443 r
= s
->auth_handler
->handle_request(
6445 auth_meta
->get_connection_secret_length(),
6447 &con
->peer_caps_info
,
6448 &auth_meta
->session_key
,
6449 &auth_meta
->connection_secret
);
6452 !s
->authenticated
) {
6453 ms_handle_authentication(con
);
6456 dout(30) << " r " << r
<< " reply:\n";
6457 reply
->hexdump(*_dout
);
6462 void Monitor::ms_handle_accept(Connection
*con
)
6464 auto priv
= con
->get_priv();
6465 MonSession
*s
= static_cast<MonSession
*>(priv
.get());
6467 // legacy protocol v1?
6468 dout(10) << __func__
<< " con " << con
<< " no session" << dendl
;
6472 if (s
->item
.is_on_list()) {
6473 dout(10) << __func__
<< " con " << con
<< " session " << s
6474 << " already on list" << dendl
;
6476 std::lock_guard
l(session_map_lock
);
6477 if (state
== STATE_SHUTDOWN
) {
6478 dout(10) << __func__
<< " ignoring new con " << con
<< " (shutdown)" << dendl
;
6482 dout(10) << __func__
<< " con " << con
<< " session " << s
6483 << " registering session for "
6484 << con
->get_peer_addrs() << dendl
;
6485 s
->_ident(entity_name_t(con
->get_peer_type(), con
->get_peer_id()),
6486 con
->get_peer_addrs());
6487 session_map
.add_session(s
);
6491 int Monitor::ms_handle_authentication(Connection
*con
)
6493 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
6494 // mon <-> mon connections need no Session, and setting one up
6495 // creates an awkward ref cycle between Session and Connection.
6499 auto priv
= con
->get_priv();
6500 MonSession
*s
= static_cast<MonSession
*>(priv
.get());
6502 // must be msgr2, otherwise dispatch would have set up the session.
6503 s
= session_map
.new_session(
6504 entity_name_t(con
->get_peer_type(), -1), // we don't know yet
6505 con
->get_peer_addrs(),
6508 dout(10) << __func__
<< " adding session " << s
<< " to con " << con
6511 logger
->set(l_mon_num_sessions
, session_map
.get_size());
6512 logger
->inc(l_mon_session_add
);
6514 dout(10) << __func__
<< " session " << s
<< " con " << con
6515 << " addr " << s
->con
->get_peer_addr()
6516 << " " << *s
<< dendl
;
6518 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
6520 if (caps_info
.allow_all
) {
6521 s
->caps
.set_allow_all();
6522 s
->authenticated
= true;
6524 } else if (caps_info
.caps
.length()) {
6525 bufferlist::const_iterator p
= caps_info
.caps
.cbegin();
6529 } catch (const ceph::buffer::error
&err
) {
6530 derr
<< __func__
<< " corrupt cap data for " << con
->get_peer_entity_name()
6531 << " in auth db" << dendl
;
6536 if (s
->caps
.parse(str
, NULL
)) {
6537 s
->authenticated
= true;
6540 derr
<< __func__
<< " unparseable caps '" << str
<< "' for "
6541 << con
->get_peer_entity_name() << dendl
;
6550 void Monitor::set_mon_crush_location(const string
& loc
)
6555 vector
<string
> loc_vec
;
6556 loc_vec
.push_back(loc
);
6557 CrushWrapper::parse_loc_map(loc_vec
, &crush_loc
);
6558 need_set_crush_loc
= true;
6561 void Monitor::notify_new_monmap(bool can_change_external_state
)
6563 if (need_set_crush_loc
) {
6564 auto my_info_i
= monmap
->mon_info
.find(name
);
6565 if (my_info_i
!= monmap
->mon_info
.end() &&
6566 my_info_i
->second
.crush_loc
== crush_loc
) {
6567 need_set_crush_loc
= false;
6570 elector
.notify_strategy_maybe_changed(monmap
->strategy
);
6571 dout(30) << __func__
<< "we have " << monmap
->removed_ranks
.size() << " removed ranks" << dendl
;
6572 for (auto i
= monmap
->removed_ranks
.rbegin();
6573 i
!= monmap
->removed_ranks
.rend(); ++i
) {
6575 dout(10) << __func__
<< "removing rank " << rank
<< dendl
;
6576 elector
.notify_rank_removed(rank
);
6579 if (monmap
->stretch_mode_enabled
) {
6580 try_engage_stretch_mode();
6583 if (is_stretch_mode()) {
6584 if (!monmap
->stretch_marked_down_mons
.empty()) {
6585 set_degraded_stretch_mode();
6588 set_elector_disallowed_leaders(can_change_external_state
);
6591 void Monitor::set_elector_disallowed_leaders(bool allow_election
)
6594 for (auto name
: monmap
->disallowed_leaders
) {
6595 dl
.insert(monmap
->get_rank(name
));
6597 if (is_stretch_mode()) {
6598 for (auto name
: monmap
->stretch_marked_down_mons
) {
6599 dl
.insert(monmap
->get_rank(name
));
6601 dl
.insert(monmap
->get_rank(monmap
->tiebreaker_mon
));
6604 bool disallowed_changed
= elector
.set_disallowed_leaders(dl
);
6605 if (disallowed_changed
&& allow_election
) {
6606 elector
.call_election();
6610 struct CMonEnableStretchMode
: public Context
{
6612 CMonEnableStretchMode(Monitor
*mon
) : m(mon
) {}
6613 void finish(int r
) {
6614 m
->try_engage_stretch_mode();
6617 void Monitor::try_engage_stretch_mode()
6619 dout(20) << __func__
<< dendl
;
6620 if (stretch_mode_engaged
) return;
6621 if (!osdmon()->is_readable()) {
6622 osdmon()->wait_for_readable_ctx(new CMonEnableStretchMode(this));
6624 if (osdmon()->osdmap
.stretch_mode_enabled
&&
6625 monmap
->stretch_mode_enabled
) {
6626 dout(10) << "Engaging stretch mode!" << dendl
;
6627 stretch_mode_engaged
= true;
6628 int32_t stretch_divider_id
= osdmon()->osdmap
.stretch_mode_bucket
;
6629 stretch_bucket_divider
= osdmon()->osdmap
.
6630 crush
->get_type_name(stretch_divider_id
);
6631 disconnect_disallowed_stretch_sessions();
6635 void Monitor::do_stretch_mode_election_work()
6637 dout(20) << __func__
<< dendl
;
6638 if (!is_stretch_mode() ||
6639 !is_leader()) return;
6640 dout(20) << "checking for degraded stretch mode" << dendl
;
6641 map
<string
, set
<string
>> old_dead_buckets
;
6642 old_dead_buckets
.swap(dead_mon_buckets
);
6643 up_mon_buckets
.clear();
6644 // identify if we've lost a CRUSH bucket, request OSDMonitor check for death
6645 map
<string
,set
<string
>> down_mon_buckets
;
6646 for (unsigned i
= 0; i
< monmap
->size(); ++i
) {
6647 const auto &mi
= monmap
->mon_info
[monmap
->get_name(i
)];
6648 auto ci
= mi
.crush_loc
.find(stretch_bucket_divider
);
6649 ceph_assert(ci
!= mi
.crush_loc
.end());
6650 if (quorum
.count(i
)) {
6651 up_mon_buckets
.insert(ci
->second
);
6653 down_mon_buckets
[ci
->second
].insert(mi
.name
);
6656 dout(20) << "prior dead_mon_buckets: " << old_dead_buckets
6657 << "; down_mon_buckets: " << down_mon_buckets
6658 << "; up_mon_buckets: " << up_mon_buckets
<< dendl
;
6659 for (const auto& di
: down_mon_buckets
) {
6660 if (!up_mon_buckets
.count(di
.first
)) {
6661 dead_mon_buckets
[di
.first
] = di
.second
;
6664 dout(20) << "new dead_mon_buckets " << dead_mon_buckets
<< dendl
;
6666 if (dead_mon_buckets
!= old_dead_buckets
&&
6667 dead_mon_buckets
.size() >= old_dead_buckets
.size()) {
6668 maybe_go_degraded_stretch_mode();
6672 struct CMonGoDegraded
: public Context
{
6674 CMonGoDegraded(Monitor
*mon
) : m(mon
) {}
6675 void finish(int r
) {
6676 m
->maybe_go_degraded_stretch_mode();
6680 struct CMonGoRecovery
: public Context
{
6682 CMonGoRecovery(Monitor
*mon
) : m(mon
) {}
6683 void finish(int r
) {
6684 m
->go_recovery_stretch_mode();
6687 void Monitor::go_recovery_stretch_mode()
6689 dout(20) << __func__
<< dendl
;
6690 if (!is_leader()) return;
6691 if (!is_degraded_stretch_mode()) return;
6692 if (is_recovering_stretch_mode()) return;
6694 if (dead_mon_buckets
.size()) {
6695 ceph_assert( 0 == "how did we try and do stretch recovery while we have dead monitor buckets?");
6696 // we can't recover if we are missing monitors in a zone!
6700 if (!osdmon()->is_readable()) {
6701 osdmon()->wait_for_readable_ctx(new CMonGoRecovery(this));
6705 if (!osdmon()->is_writeable()) {
6706 osdmon()->wait_for_writeable_ctx(new CMonGoRecovery(this));
6708 osdmon()->trigger_recovery_stretch_mode();
6711 void Monitor::set_recovery_stretch_mode()
6713 degraded_stretch_mode
= true;
6714 recovering_stretch_mode
= true;
6715 osdmon()->set_recovery_stretch_mode();
6718 void Monitor::maybe_go_degraded_stretch_mode()
6720 dout(20) << __func__
<< dendl
;
6721 if (is_degraded_stretch_mode()) return;
6722 if (!is_leader()) return;
6723 if (dead_mon_buckets
.empty()) return;
6724 if (!osdmon()->is_readable()) {
6725 osdmon()->wait_for_readable_ctx(new CMonGoDegraded(this));
6728 ceph_assert(monmap
->contains(monmap
->tiebreaker_mon
));
6729 // filter out the tiebreaker zone and check if remaining sites are down by OSDs too
6730 const auto &mi
= monmap
->mon_info
[monmap
->tiebreaker_mon
];
6731 auto ci
= mi
.crush_loc
.find(stretch_bucket_divider
);
6732 map
<string
, set
<string
>> filtered_dead_buckets
= dead_mon_buckets
;
6733 filtered_dead_buckets
.erase(ci
->second
);
6735 set
<int> matched_down_buckets
;
6736 set
<string
> matched_down_mons
;
6737 bool dead
= osdmon()->check_for_dead_crush_zones(filtered_dead_buckets
,
6738 &matched_down_buckets
,
6739 &matched_down_mons
);
6741 if (!osdmon()->is_writeable()) {
6742 osdmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6744 if (!monmon()->is_writeable()) {
6745 monmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6747 trigger_degraded_stretch_mode(matched_down_mons
, matched_down_buckets
);
6751 void Monitor::trigger_degraded_stretch_mode(const set
<string
>& dead_mons
,
6752 const set
<int>& dead_buckets
)
6754 dout(20) << __func__
<< dendl
;
6755 ceph_assert(osdmon()->is_writeable());
6756 ceph_assert(monmon()->is_writeable());
6758 // figure out which OSD zone(s) remains alive by removing
6759 // tiebreaker mon from up_mon_buckets
6760 set
<string
> live_zones
= up_mon_buckets
;
6761 ceph_assert(monmap
->contains(monmap
->tiebreaker_mon
));
6762 const auto &mi
= monmap
->mon_info
[monmap
->tiebreaker_mon
];
6763 auto ci
= mi
.crush_loc
.find(stretch_bucket_divider
);
6764 live_zones
.erase(ci
->second
);
6765 ceph_assert(live_zones
.size() == 1); // only support 2 zones right now
6767 osdmon()->trigger_degraded_stretch_mode(dead_buckets
, live_zones
);
6768 monmon()->trigger_degraded_stretch_mode(dead_mons
);
6769 set_degraded_stretch_mode();
6772 void Monitor::set_degraded_stretch_mode()
6774 degraded_stretch_mode
= true;
6775 recovering_stretch_mode
= false;
6776 osdmon()->set_degraded_stretch_mode();
6779 struct CMonGoHealthy
: public Context
{
6781 CMonGoHealthy(Monitor
*mon
) : m(mon
) {}
6782 void finish(int r
) {
6783 m
->trigger_healthy_stretch_mode();
6788 void Monitor::trigger_healthy_stretch_mode()
6790 dout(20) << __func__
<< dendl
;
6791 if (!is_degraded_stretch_mode()) return;
6792 if (!is_leader()) return;
6793 if (!osdmon()->is_writeable()) {
6794 osdmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6796 if (!monmon()->is_writeable()) {
6797 monmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6800 ceph_assert(osdmon()->osdmap
.recovering_stretch_mode
);
6801 osdmon()->trigger_healthy_stretch_mode();
6802 monmon()->trigger_healthy_stretch_mode();
6805 void Monitor::set_healthy_stretch_mode()
6807 degraded_stretch_mode
= false;
6808 recovering_stretch_mode
= false;
6809 osdmon()->set_healthy_stretch_mode();
6812 bool Monitor::session_stretch_allowed(MonSession
*s
, MonOpRequestRef
& op
)
6814 if (!is_stretch_mode()) return true;
6815 if (s
->proxy_con
) return true;
6816 if (s
->validated_stretch_connection
) return true;
6817 if (!s
->con
) return true;
6818 if (s
->con
->peer_is_osd()) {
6819 dout(20) << __func__
<< "checking OSD session" << s
<< dendl
;
6820 // okay, check the crush location
6821 int barrier_id
= [&] {
6822 auto type_id
= osdmon()->osdmap
.crush
->get_validated_type_id(
6823 stretch_bucket_divider
);
6824 ceph_assert(type_id
.has_value());
6827 int osd_bucket_id
= osdmon()->osdmap
.crush
->get_parent_of_type(s
->con
->peer_id
,
6829 const auto &mi
= monmap
->mon_info
.find(name
);
6830 ceph_assert(mi
!= monmap
->mon_info
.end());
6831 auto ci
= mi
->second
.crush_loc
.find(stretch_bucket_divider
);
6832 ceph_assert(ci
!= mi
->second
.crush_loc
.end());
6833 int mon_bucket_id
= osdmon()->osdmap
.crush
->get_item_id(ci
->second
);
6835 if (osd_bucket_id
!= mon_bucket_id
) {
6836 dout(5) << "discarding session " << *s
6837 << " and sending OSD to matched zone" << dendl
;
6838 s
->con
->mark_down();
6839 std::lock_guard
l(session_map_lock
);
6848 s
->validated_stretch_connection
= true;
6852 void Monitor::disconnect_disallowed_stretch_sessions()
6854 dout(20) << __func__
<< dendl
;
6855 MonOpRequestRef blank
;
6856 auto i
= session_map
.sessions
.begin();
6857 while (i
!= session_map
.sessions
.end()) {
6860 session_stretch_allowed(*j
, blank
);