1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include <boost/scope_exit.hpp>
24 #include <boost/algorithm/string/predicate.hpp>
26 #include "json_spirit/json_spirit_reader.h"
27 #include "json_spirit/json_spirit_writer.h"
30 #include "common/version.h"
31 #include "common/blkdev.h"
32 #include "common/cmdparse.h"
33 #include "common/signal.h"
35 #include "osd/OSDMap.h"
37 #include "MonitorDBStore.h"
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MMonMap.h"
41 #include "messages/MMonGetMap.h"
42 #include "messages/MMonGetVersion.h"
43 #include "messages/MMonGetVersionReply.h"
44 #include "messages/MGenericMessage.h"
45 #include "messages/MMonCommand.h"
46 #include "messages/MMonCommandAck.h"
47 #include "messages/MMonSync.h"
48 #include "messages/MMonScrub.h"
49 #include "messages/MMonProbe.h"
50 #include "messages/MMonJoin.h"
51 #include "messages/MMonPaxos.h"
52 #include "messages/MRoute.h"
53 #include "messages/MForward.h"
55 #include "messages/MMonSubscribe.h"
56 #include "messages/MMonSubscribeAck.h"
58 #include "messages/MCommand.h"
59 #include "messages/MCommandReply.h"
61 #include "messages/MTimeCheck2.h"
62 #include "messages/MPing.h"
64 #include "common/strtol.h"
65 #include "common/ceph_argparse.h"
66 #include "common/Timer.h"
67 #include "common/Clock.h"
68 #include "common/errno.h"
69 #include "common/perf_counters.h"
70 #include "common/admin_socket.h"
71 #include "global/signal_handler.h"
72 #include "common/Formatter.h"
73 #include "include/stringify.h"
74 #include "include/color.h"
75 #include "include/ceph_fs.h"
76 #include "include/str_list.h"
78 #include "OSDMonitor.h"
79 #include "MDSMonitor.h"
80 #include "MonmapMonitor.h"
81 #include "LogMonitor.h"
82 #include "AuthMonitor.h"
83 #include "MgrMonitor.h"
84 #include "MgrStatMonitor.h"
85 #include "ConfigMonitor.h"
86 #include "KVMonitor.h"
87 #include "mon/HealthMonitor.h"
88 #include "common/config.h"
89 #include "common/cmdparse.h"
90 #include "include/ceph_assert.h"
91 #include "include/compat.h"
92 #include "perfglue/heap_profiler.h"
94 #include "auth/none/AuthNoneClientHandler.h"
96 #define dout_subsys ceph_subsys_mon
98 #define dout_prefix _prefix(_dout, this)
99 using namespace TOPNSPC::common
;
106 using std::make_pair
;
108 using std::ostringstream
;
113 using std::stringstream
;
114 using std::to_string
;
116 using std::unique_ptr
;
118 using ceph::bufferlist
;
121 using ceph::ErasureCodeInterfaceRef
;
122 using ceph::ErasureCodeProfile
;
123 using ceph::Formatter
;
124 using ceph::JSONFormatter
;
125 using ceph::make_message
;
126 using ceph::mono_clock
;
127 using ceph::mono_time
;
128 using ceph::timespan_str
;
131 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
132 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
133 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
136 const string
Monitor::MONITOR_NAME
= "monitor";
137 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
142 #undef COMMAND_WITH_FLAG
143 #define FLAG(f) (MonCommand::FLAG_##f)
144 #define COMMAND(parsesig, helptext, modulename, req_perms) \
145 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
146 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
147 {parsesig, helptext, modulename, req_perms, flags},
148 MonCommand mon_commands
[] = {
149 #include <mon/MonCommands.h>
152 #undef COMMAND_WITH_FLAG
154 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
155 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
161 con_self(m
? m
->get_loopback_connection() : NULL
),
163 finisher(cct_
, "mon_finisher", "fin"),
164 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads
),
165 has_ever_joined(false),
166 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
168 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
169 key_server(cct
, &keyring
),
170 auth_cluster_required(cct
,
171 cct
->_conf
->auth_supported
.empty() ?
172 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
173 auth_service_required(cct
,
174 cct
->_conf
->auth_supported
.empty() ?
175 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
176 mgr_messenger(mgr_m
),
177 mgr_client(cct_
, mgr_m
, monmap
),
178 gss_ktfile_client(cct
->_conf
.get_val
<std::string
>("gss_ktab_client_file")),
181 elector(this, map
->strategy
),
182 required_features(0),
184 quorum_con_features(0),
188 scrub_timeout_event(NULL
),
191 sync_provider_count(0),
194 sync_start_version(0),
195 sync_timeout_event(NULL
),
196 sync_last_committed_floor(0),
200 timecheck_rounds_since_clean(0),
201 timecheck_event(NULL
),
204 routed_request_tid(0),
205 op_tracker(cct
, g_conf().get_val
<bool>("mon_enable_op_tracker"), 1)
207 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
208 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
210 update_log_clients();
212 if (!gss_ktfile_client
.empty()) {
213 // Assert we can export environment variable
215 The default client keytab is used, if it is present and readable,
216 to automatically obtain initial credentials for GSSAPI client
217 applications. The principal name of the first entry in the client
218 keytab is used by default when obtaining initial credentials.
219 1. The KRB5_CLIENT_KTNAME environment variable.
220 2. The default_client_keytab_name profile variable in [libdefaults].
221 3. The hardcoded default, DEFCKTNAME.
223 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
224 gss_ktfile_client
.c_str(), 1));
225 ceph_assert(set_result
== 0);
228 op_tracker
.set_complaint_and_threshold(
229 g_conf().get_val
<std::chrono::seconds
>("mon_op_complaint_time").count(),
230 g_conf().get_val
<int64_t>("mon_op_log_threshold"));
231 op_tracker
.set_history_size_and_duration(
232 g_conf().get_val
<uint64_t>("mon_op_history_size"),
233 g_conf().get_val
<std::chrono::seconds
>("mon_op_history_duration").count());
234 op_tracker
.set_history_slow_op_size_and_threshold(
235 g_conf().get_val
<uint64_t>("mon_op_history_slow_op_size"),
236 g_conf().get_val
<std::chrono::seconds
>("mon_op_history_slow_op_threshold").count());
238 paxos
= std::make_unique
<Paxos
>(*this, "paxos");
240 paxos_service
[PAXOS_MDSMAP
].reset(new MDSMonitor(*this, *paxos
, "mdsmap"));
241 paxos_service
[PAXOS_MONMAP
].reset(new MonmapMonitor(*this, *paxos
, "monmap"));
242 paxos_service
[PAXOS_OSDMAP
].reset(new OSDMonitor(cct
, *this, *paxos
, "osdmap"));
243 paxos_service
[PAXOS_LOG
].reset(new LogMonitor(*this, *paxos
, "logm"));
244 paxos_service
[PAXOS_AUTH
].reset(new AuthMonitor(*this, *paxos
, "auth"));
245 paxos_service
[PAXOS_MGR
].reset(new MgrMonitor(*this, *paxos
, "mgr"));
246 paxos_service
[PAXOS_MGRSTAT
].reset(new MgrStatMonitor(*this, *paxos
, "mgrstat"));
247 paxos_service
[PAXOS_HEALTH
].reset(new HealthMonitor(*this, *paxos
, "health"));
248 paxos_service
[PAXOS_CONFIG
].reset(new ConfigMonitor(*this, *paxos
, "config"));
249 paxos_service
[PAXOS_KV
].reset(new KVMonitor(*this, *paxos
, "kv"));
251 bool r
= mon_caps
.parse("allow *", NULL
);
254 exited_quorum
= ceph_clock_now();
256 // prepare local commands
257 local_mon_commands
.resize(std::size(mon_commands
));
258 for (unsigned i
= 0; i
< std::size(mon_commands
); ++i
) {
259 local_mon_commands
[i
] = mon_commands
[i
];
261 MonCommand::encode_vector(local_mon_commands
, local_mon_commands_bl
);
263 prenautilus_local_mon_commands
= local_mon_commands
;
264 for (auto& i
: prenautilus_local_mon_commands
) {
265 std::string n
= cmddesc_get_prenautilus_compat(i
.cmdstring
);
266 if (n
!= i
.cmdstring
) {
267 dout(20) << " pre-nautilus cmd " << i
.cmdstring
<< " -> " << n
<< dendl
;
271 MonCommand::encode_vector(prenautilus_local_mon_commands
, prenautilus_local_mon_commands_bl
);
273 // assume our commands until we have an election. this only means
274 // we won't reply with EINVAL before the election; any command that
275 // actually matters will wait until we have quorum etc and then
276 // retry (and revalidate).
277 leader_mon_commands
= local_mon_commands
;
282 op_tracker
.on_shutdown();
285 ceph_assert(session_map
.sessions
.empty());
289 class AdminHook
: public AdminSocketHook
{
292 explicit AdminHook(Monitor
*m
) : mon(m
) {}
293 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
297 bufferlist
& out
) override
{
299 int r
= mon
->do_admin_command(command
, cmdmap
, f
, errss
, outss
);
305 int Monitor::do_admin_command(
306 std::string_view command
,
307 const cmdmap_t
& cmdmap
,
312 std::lock_guard
l(lock
);
316 for (auto p
= cmdmap
.begin();
317 p
!= cmdmap
.end(); ++p
) {
318 if (p
->first
== "prefix")
322 args
+= cmd_vartype_stringify(p
->second
);
324 args
= "[" + args
+ "]";
326 bool read_only
= (command
== "mon_status" ||
327 command
== "mon metadata" ||
328 command
== "quorum_status" ||
330 command
== "sessions");
332 (read_only
? audit_clog
->debug() : audit_clog
->info())
333 << "from='admin socket' entity='admin socket' "
334 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
336 if (command
== "mon_status") {
338 } else if (command
== "quorum_status") {
339 _quorum_status(f
, out
);
340 } else if (command
== "sync_force") {
341 bool validate
= false;
342 if (!cmd_getval(cmdmap
, "yes_i_really_mean_it", validate
)) {
344 if (cmd_getval(cmdmap
, "validate", v
) &&
345 v
== "--yes-i-really-mean-it") {
350 err
<< "are you SURE? this will mean the monitor store will be erased "
351 "the next time the monitor is restarted. pass "
352 "'--yes-i-really-mean-it' if you really do.";
357 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
358 command
.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
359 if (!_add_bootstrap_peer_hint(command
, cmdmap
, out
))
361 } else if (command
== "quorum enter") {
362 elector
.start_participating();
364 out
<< "started responding to quorum, initiated new election";
365 } else if (command
== "quorum exit") {
367 elector
.stop_participating();
368 out
<< "stopped responding to quorum, initiated new election";
369 } else if (command
== "ops") {
370 (void)op_tracker
.dump_ops_in_flight(f
);
371 } else if (command
== "sessions") {
372 f
->open_array_section("sessions");
373 for (auto p
: session_map
.sessions
) {
374 f
->dump_object("session", *p
);
377 } else if (command
== "dump_historic_ops") {
378 if (!op_tracker
.dump_historic_ops(f
)) {
379 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
380 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
382 } else if (command
== "dump_historic_ops_by_duration" ) {
383 if (op_tracker
.dump_historic_ops(f
, true)) {
384 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
385 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
387 } else if (command
== "dump_historic_slow_ops") {
388 if (op_tracker
.dump_historic_slow_ops(f
, {})) {
389 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
390 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
392 } else if (command
== "quorum") {
394 cmd_getval(cmdmap
, "quorumcmd", quorumcmd
);
395 if (quorumcmd
== "exit") {
397 elector
.stop_participating();
398 out
<< "stopped responding to quorum, initiated new election" << std::endl
;
399 } else if (quorumcmd
== "enter") {
400 elector
.start_participating();
402 out
<< "started responding to quorum, initiated new election" << std::endl
;
404 err
<< "needs a valid 'quorum' command" << std::endl
;
406 } else if (command
== "connection scores dump") {
407 if (!get_quorum_mon_features().contains_all(
408 ceph::features::mon::FEATURE_PINGING
)) {
409 err
<< "Not all monitors support changing election strategies; \
410 please upgrade them first!";
412 elector
.dump_connection_scores(f
);
413 } else if (command
== "connection scores reset") {
414 if (!get_quorum_mon_features().contains_all(
415 ceph::features::mon::FEATURE_PINGING
)) {
416 err
<< "Not all monitors support changing election strategies; \
417 please upgrade them first!";
419 elector
.notify_clear_peer_state();
420 } else if (command
== "smart") {
422 cmd_getval(cmdmap
, "devid", want_devid
);
424 string devname
= store
->get_devname();
425 if (devname
.empty()) {
426 err
<< "could not determine device name for " << store
->get_path();
430 set
<string
> devnames
;
431 get_raw_devices(devname
, &devnames
);
432 json_spirit::mObject json_map
;
433 uint64_t smart_timeout
= cct
->_conf
.get_val
<uint64_t>(
434 "mon_smart_report_timeout");
435 for (auto& devname
: devnames
) {
437 string devid
= get_device_id(devname
, &err
);
438 if (want_devid
.size() && want_devid
!= devid
) {
439 derr
<< "get_device_id failed on " << devname
<< ": " << err
<< dendl
;
442 json_spirit::mValue smart_json
;
443 if (block_device_get_metrics(devname
, smart_timeout
,
445 dout(10) << "block_device_get_metrics failed for /dev/" << devname
449 json_map
[devid
] = smart_json
;
451 json_spirit::write(json_map
, out
, json_spirit::pretty_print
);
452 } else if (command
== "heap") {
453 if (!ceph_using_tcmalloc()) {
454 err
<< "could not issue heap profiler command -- not using tcmalloc!";
459 if (!cmd_getval(cmdmap
, "heapcmd", cmd
)) {
460 err
<< "unable to get value for command \"" << cmd
<< "\"";
464 std::vector
<std::string
> cmd_vec
;
465 get_str_vec(cmd
, cmd_vec
);
467 if (cmd_getval(cmdmap
, "value", val
)) {
468 cmd_vec
.push_back(val
);
470 ceph_heap_profiler_handle_command(cmd_vec
, out
);
471 } else if (command
== "compact") {
472 dout(1) << "triggering manual compaction" << dendl
;
473 auto start
= ceph::coarse_mono_clock::now();
474 store
->compact_async();
475 auto end
= ceph::coarse_mono_clock::now();
476 auto duration
= ceph::to_seconds
<double>(end
- start
);
477 dout(1) << "finished manual compaction in "
478 << duration
<< " seconds" << dendl
;
479 out
<< "compacted " << g_conf().get_val
<std::string
>("mon_keyvaluedb")
480 << " in " << duration
<< " seconds";
482 ceph_abort_msg("bad AdminSocket command binding");
484 (read_only
? audit_clog
->debug() : audit_clog
->info())
485 << "from='admin socket' "
486 << "entity='admin socket' "
487 << "cmd=" << command
<< " "
488 << "args=" << args
<< ": finished";
492 (read_only
? audit_clog
->debug() : audit_clog
->info())
493 << "from='admin socket' "
494 << "entity='admin socket' "
495 << "cmd=" << command
<< " "
496 << "args=" << args
<< ": aborted";
500 void Monitor::handle_signal(int signum
)
502 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
503 if (signum
== SIGHUP
) {
504 sighup_handler(signum
);
505 logmon()->reopen_logs();
507 ceph_assert(signum
== SIGINT
|| signum
== SIGTERM
);
512 CompatSet
Monitor::get_initial_supported_features()
514 CompatSet::FeatureSet ceph_mon_feature_compat
;
515 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
516 CompatSet::FeatureSet ceph_mon_feature_incompat
;
517 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
518 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
519 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
520 ceph_mon_feature_incompat
);
523 CompatSet
Monitor::get_supported_features()
525 CompatSet compat
= get_initial_supported_features();
526 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
527 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
528 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
529 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
530 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
531 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
532 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC
);
533 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
);
534 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
);
535 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC
);
536 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_QUINCY
);
537 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_REEF
);
541 CompatSet
Monitor::get_legacy_features()
543 CompatSet::FeatureSet ceph_mon_feature_compat
;
544 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
545 CompatSet::FeatureSet ceph_mon_feature_incompat
;
546 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
547 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
548 ceph_mon_feature_incompat
);
551 int Monitor::check_features(MonitorDBStore
*store
)
553 CompatSet required
= get_supported_features();
556 read_features_off_disk(store
, &ondisk
);
558 if (!required
.writeable(ondisk
)) {
559 CompatSet diff
= required
.unsupported(ondisk
);
560 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
567 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
569 bufferlist featuresbl
;
570 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
571 if (featuresbl
.length() == 0) {
572 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
573 << "Assuming it is old-style and introducing one." << dendl
;
574 //we only want the baseline ~v.18 features assumed to be on disk.
575 //If new features are introduced this code needs to disappear or
577 *features
= get_legacy_features();
579 features
->encode(featuresbl
);
580 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
581 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
582 store
->apply_transaction(t
);
584 auto it
= featuresbl
.cbegin();
585 features
->decode(it
);
589 void Monitor::read_features()
591 read_features_off_disk(store
, &features
);
592 dout(10) << "features " << features
<< dendl
;
594 calc_quorum_requirements();
595 dout(10) << "required_features " << required_features
<< dendl
;
598 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
602 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
605 const char** Monitor::get_tracked_conf_keys() const
607 static const char* KEYS
[] = {
608 "crushtool", // helpful for testing
609 "mon_election_timeout",
611 "mon_lease_renew_interval_factor",
612 "mon_lease_ack_timeout_factor",
613 "mon_accept_timeout_factor",
617 "clog_to_syslog_facility",
618 "clog_to_syslog_level",
620 "clog_to_graylog_host",
621 "clog_to_graylog_port",
622 "mon_cluster_log_to_file",
625 // periodic health to clog
626 "mon_health_to_clog",
627 "mon_health_to_clog_interval",
628 "mon_health_to_clog_tick_interval",
630 "mon_scrub_interval",
631 "mon_allow_pool_delete",
632 // osdmap pruning - observed, not handled.
633 "mon_osdmap_full_prune_enabled",
634 "mon_osdmap_full_prune_min",
635 "mon_osdmap_full_prune_interval",
636 "mon_osdmap_full_prune_txsize",
637 // debug options - observed, not handled
638 "mon_debug_extra_checks",
639 "mon_debug_block_osdmap_trim",
645 void Monitor::handle_conf_change(const ConfigProxy
& conf
,
646 const std::set
<std::string
> &changed
)
650 dout(10) << __func__
<< " " << changed
<< dendl
;
652 if (changed
.count("clog_to_monitors") ||
653 changed
.count("clog_to_syslog") ||
654 changed
.count("clog_to_syslog_level") ||
655 changed
.count("clog_to_syslog_facility") ||
656 changed
.count("clog_to_graylog") ||
657 changed
.count("clog_to_graylog_host") ||
658 changed
.count("clog_to_graylog_port") ||
659 changed
.count("host") ||
660 changed
.count("fsid")) {
661 update_log_clients();
664 if (changed
.count("mon_health_to_clog") ||
665 changed
.count("mon_health_to_clog_interval") ||
666 changed
.count("mon_health_to_clog_tick_interval")) {
667 finisher
.queue(new C_MonContext
{this, [this, changed
](int) {
668 std::lock_guard l
{lock
};
669 health_to_clog_update_conf(changed
);
673 if (changed
.count("mon_scrub_interval")) {
674 auto scrub_interval
=
675 conf
.get_val
<std::chrono::seconds
>("mon_scrub_interval");
676 finisher
.queue(new C_MonContext
{this, [this, scrub_interval
](int) {
677 std::lock_guard l
{lock
};
678 scrub_update_interval(scrub_interval
);
683 void Monitor::update_log_clients()
685 clog
->parse_client_options(g_ceph_context
);
686 audit_clog
->parse_client_options(g_ceph_context
);
689 int Monitor::sanitize_options()
693 // mon_lease must be greater than mon_lease_renewal; otherwise we
694 // may incur in leases expiring before they are renewed.
695 if (g_conf()->mon_lease_renew_interval_factor
>= 1.0) {
696 clog
->error() << "mon_lease_renew_interval_factor ("
697 << g_conf()->mon_lease_renew_interval_factor
698 << ") must be less than 1.0";
702 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
703 // got time to renew the lease and get an ack for it. Having both options
704 // with the same value, for a given small vale, could mean timing out if
705 // the monitors happened to be overloaded -- or even under normal load for
706 // a small enough value.
707 if (g_conf()->mon_lease_ack_timeout_factor
<= 1.0) {
708 clog
->error() << "mon_lease_ack_timeout_factor ("
709 << g_conf()->mon_lease_ack_timeout_factor
710 << ") must be greater than 1.0";
717 int Monitor::preinit()
719 std::unique_lock
l(lock
);
721 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
723 int r
= sanitize_options();
725 derr
<< "option sanitization failed!" << dendl
;
729 ceph_assert(!logger
);
731 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
732 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess",
733 PerfCountersBuilder::PRIO_USEFUL
);
734 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions",
735 "sadd", PerfCountersBuilder::PRIO_INTERESTING
);
736 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions",
737 "srm", PerfCountersBuilder::PRIO_INTERESTING
);
738 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions",
739 "strm", PerfCountersBuilder::PRIO_USEFUL
);
740 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in",
741 "ecnt", PerfCountersBuilder::PRIO_USEFUL
);
742 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started",
743 "estt", PerfCountersBuilder::PRIO_INTERESTING
);
744 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won",
745 "ewon", PerfCountersBuilder::PRIO_INTERESTING
);
746 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost",
747 "elst", PerfCountersBuilder::PRIO_INTERESTING
);
748 logger
= pcb
.create_perf_counters();
749 cct
->get_perfcounters_collection()->add(logger
);
752 ceph_assert(!cluster_logger
);
754 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
755 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
756 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
757 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
758 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
759 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
760 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
761 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster", NULL
, 0, unit_t(UNIT_BYTES
));
762 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space", NULL
, 0, unit_t(UNIT_BYTES
));
763 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space", NULL
, 0, unit_t(UNIT_BYTES
));
764 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
765 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
766 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
767 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
768 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
769 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
770 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
771 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
772 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
773 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects", NULL
, 0, unit_t(UNIT_BYTES
));
774 cluster_logger
= pcb
.create_perf_counters();
777 paxos
->init_logger();
779 // verify cluster_uuid
781 int r
= check_fsid();
792 // have we ever joined a quorum?
793 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
794 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
796 if (!has_ever_joined
) {
797 // impose initial quorum restrictions?
798 list
<string
> initial_members
;
799 get_str_list(g_conf()->mon_initial_members
, initial_members
);
801 if (!initial_members
.empty()) {
802 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
804 monmap
->set_initial_members(
805 g_ceph_context
, initial_members
, name
, messenger
->get_myaddrs(),
808 dout(10) << " monmap is " << *monmap
<< dendl
;
809 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
811 } else if (!monmap
->contains(name
)) {
812 derr
<< "not in monmap and have been in a quorum before; "
813 << "must have been removed" << dendl
;
814 if (g_conf()->mon_force_quorum_join
) {
815 dout(0) << "we should have died but "
816 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
818 derr
<< "commit suicide!" << dendl
;
824 // We have a potentially inconsistent store state in hands. Get rid of it
826 bool clear_store
= false;
827 if (store
->exists("mon_sync", "in_sync")) {
828 dout(1) << __func__
<< " clean up potentially inconsistent store state"
833 if (store
->get("mon_sync", "force_sync") > 0) {
834 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
839 set
<string
> sync_prefixes
= get_sync_targets_names();
840 store
->clear(sync_prefixes
);
844 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
845 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
849 if (is_keyring_required()) {
850 // we need to bootstrap authentication keys so we can form an
852 if (authmon()->get_last_committed() == 0) {
853 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
855 int err
= store
->get("mkfs", "keyring", bl
);
856 if (err
== 0 && bl
.length() > 0) {
857 // Attempt to decode and extract keyring only if it is found.
859 auto p
= bl
.cbegin();
861 extract_save_mon_key(keyring
);
865 string keyring_loc
= g_conf()->mon_data
+ "/keyring";
867 r
= keyring
.load(cct
, keyring_loc
);
870 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
872 if (key_server
.get_auth(mon_name
, mon_key
)) {
873 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
874 keyring
.add(mon_name
, mon_key
);
876 keyring
.encode_plaintext(bl
);
877 write_default_keyring(bl
);
879 derr
<< "unable to load initial keyring " << g_conf()->keyring
<< dendl
;
885 admin_hook
= new AdminHook(this);
886 AdminSocket
* admin_socket
= cct
->get_admin_socket();
888 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
890 // register tell/asock commands
891 for (const auto& command
: local_mon_commands
) {
892 if (!command
.is_tell()) {
895 const auto prefix
= cmddesc_get_prefix(command
.cmdstring
);
896 if (prefix
== "injectargs" ||
897 prefix
== "version" ||
899 // not registerd by me
902 r
= admin_socket
->register_command(command
.cmdstring
, admin_hook
,
908 // add ourselves as a conf observer
909 g_conf().add_observer(this);
911 messenger
->set_auth_client(this);
912 messenger
->set_auth_server(this);
913 mgr_messenger
->set_auth_client(this);
915 auth_registry
.refresh_config();
922 dout(2) << "init" << dendl
;
923 std::lock_guard
l(lock
);
934 messenger
->add_dispatcher_tail(this);
936 // kickstart pet mgrclient
938 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
939 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
940 mgrmon()->prime_mgr_client();
942 // generate list of filestore OSDs
943 osdmon()->get_filestore_osd_list();
945 state
= STATE_PROBING
;
949 if (!elector
.peer_tracker_is_clean()){
950 dout(10) << "peer_tracker looks inconsistent"
951 << " previous bad logic, clearing ..." << dendl
;
952 elector
.notify_clear_peer_state();
955 // add features of myself into feature_map
956 session_map
.feature_map
.add_mon(con_self
->get_features());
960 void Monitor::init_paxos()
962 dout(10) << __func__
<< dendl
;
966 for (auto& svc
: paxos_service
) {
970 refresh_from_paxos(NULL
);
973 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
975 dout(10) << __func__
<< dendl
;
978 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
981 auto p
= bl
.cbegin();
982 decode(fingerprint
, p
);
984 catch (ceph::buffer::error
& e
) {
985 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
988 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
991 for (auto& svc
: paxos_service
) {
992 svc
->refresh(need_bootstrap
);
994 for (auto& svc
: paxos_service
) {
1000 void Monitor::register_cluster_logger()
1002 if (!cluster_logger_registered
) {
1003 dout(10) << "register_cluster_logger" << dendl
;
1004 cluster_logger_registered
= true;
1005 cct
->get_perfcounters_collection()->add(cluster_logger
);
1007 dout(10) << "register_cluster_logger - already registered" << dendl
;
1011 void Monitor::unregister_cluster_logger()
1013 if (cluster_logger_registered
) {
1014 dout(10) << "unregister_cluster_logger" << dendl
;
1015 cluster_logger_registered
= false;
1016 cct
->get_perfcounters_collection()->remove(cluster_logger
);
1018 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
1022 void Monitor::update_logger()
1024 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
1025 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
1028 void Monitor::shutdown()
1030 dout(1) << "shutdown" << dendl
;
1034 wait_for_paxos_write();
1037 std::lock_guard
l(auth_lock
);
1038 authmon()->_set_mon_num_rank(0, 0);
1041 state
= STATE_SHUTDOWN
;
1044 g_conf().remove_observer(this);
1048 cct
->get_admin_socket()->unregister_commands(admin_hook
);
1055 mgr_client
.shutdown();
1058 finisher
.wait_for_empty();
1064 for (auto& svc
: paxos_service
) {
1068 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
1069 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
1075 remove_all_sessions();
1077 log_client
.shutdown();
1079 // unlock before msgr shutdown...
1082 // shutdown messenger before removing logger from perfcounter collection,
1083 // otherwise _ms_dispatch() will try to update deleted logger
1084 messenger
->shutdown();
1085 mgr_messenger
->shutdown();
1088 cct
->get_perfcounters_collection()->remove(logger
);
1090 if (cluster_logger
) {
1091 if (cluster_logger_registered
)
1092 cct
->get_perfcounters_collection()->remove(cluster_logger
);
1093 delete cluster_logger
;
1094 cluster_logger
= NULL
;
1098 void Monitor::wait_for_paxos_write()
1100 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
1101 dout(10) << __func__
<< " flushing pending write" << dendl
;
1105 dout(10) << __func__
<< " flushed pending write" << dendl
;
1109 void Monitor::respawn()
1111 // --- WARNING TO FUTURE COPY/PASTERS ---
1112 // You must also add a call like
1114 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1116 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1117 // instead of "(exe)", so that killall (and log rotation) will work.
1119 dout(0) << __func__
<< dendl
;
1121 char *new_argv
[orig_argc
+1];
1122 dout(1) << " e: '" << orig_argv
[0] << "'" << dendl
;
1123 for (int i
=0; i
<orig_argc
; i
++) {
1124 new_argv
[i
] = (char *)orig_argv
[i
];
1125 dout(1) << " " << i
<< ": '" << orig_argv
[i
] << "'" << dendl
;
1127 new_argv
[orig_argc
] = NULL
;
1129 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1130 * This allows us to exec the same executable even if it has since been
1133 char exe_path
[PATH_MAX
] = "";
1135 if (readlink(PROCPREFIX
"/proc/self/exe", exe_path
, PATH_MAX
-1) != -1) {
1136 dout(1) << "respawning with exe " << exe_path
<< dendl
;
1137 strcpy(exe_path
, PROCPREFIX
"/proc/self/exe");
1142 /* Print CWD for the user's interest */
1144 char *cwd
= getcwd(buf
, sizeof(buf
));
1146 dout(1) << " cwd " << cwd
<< dendl
;
1148 /* Fall back to a best-effort: just running in our CWD */
1149 strncpy(exe_path
, orig_argv
[0], PATH_MAX
-1);
1152 dout(1) << " exe_path " << exe_path
<< dendl
;
1154 unblock_all_signals(NULL
);
1155 execv(exe_path
, new_argv
);
1157 dout(0) << "respawn execv " << orig_argv
[0]
1158 << " failed with " << cpp_strerror(errno
) << dendl
;
1160 // We have to assert out here, because suicide() returns, and callers
1161 // to respawn expect it never to return.
1165 void Monitor::bootstrap()
1167 dout(10) << "bootstrap" << dendl
;
1168 wait_for_paxos_write();
1170 sync_reset_requester();
1171 unregister_cluster_logger();
1172 cancel_probe_timeout();
1174 if (monmap
->get_epoch() == 0) {
1175 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl
;
1176 monmap
->calc_legacy_ranks();
1178 dout(10) << "monmap " << *monmap
<< dendl
;
1180 auto from_release
= monmap
->min_mon_release
;
1182 if (!can_upgrade_from(from_release
, "min_mon_release", err
)) {
1183 derr
<< "current monmap has " << err
.str() << " stopping." << dendl
;
1188 int newrank
= monmap
->get_rank(messenger
->get_myaddrs());
1189 if (newrank
< 0 && rank
>= 0) {
1190 // was i ever part of the quorum?
1191 if (has_ever_joined
) {
1192 dout(0) << " removed from monmap, suicide." << dendl
;
1195 elector
.notify_clear_peer_state();
1198 monmap
->get_addrs(newrank
) != messenger
->get_myaddrs()) {
1199 dout(0) << " monmap addrs for rank " << newrank
<< " changed, i am "
1200 << messenger
->get_myaddrs()
1201 << ", monmap is " << monmap
->get_addrs(newrank
) << ", respawning"
1204 if (monmap
->get_epoch()) {
1205 // store this map in temp mon_sync location so that we use it on
1207 derr
<< " stashing newest monmap " << monmap
->get_epoch()
1208 << " for next startup" << dendl
;
1210 monmap
->encode(bl
, -1);
1211 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1212 t
->put("mon_sync", "temp_newer_monmap", bl
);
1213 store
->apply_transaction(t
);
1218 if (newrank
!= rank
) {
1219 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
1220 messenger
->set_myname(entity_name_t::MON(newrank
));
1222 elector
.notify_rank_changed(rank
);
1224 // reset all connections, or else our peers will think we are someone else.
1225 messenger
->mark_down_all();
1229 state
= STATE_PROBING
;
1234 if (g_conf()->mon_compact_on_bootstrap
) {
1235 dout(10) << "bootstrap -- triggering compaction" << dendl
;
1237 dout(10) << "bootstrap -- finished compaction" << dendl
;
1240 // stretch mode bits
1241 set_elector_disallowed_leaders(false);
1243 // singleton monitor?
1244 if (monmap
->size() == 1 && rank
== 0) {
1245 win_standalone_election();
1249 reset_probe_timeout();
1251 // i'm outside the quorum
1252 if (monmap
->contains(name
))
1253 outside_quorum
.insert(name
);
1256 dout(10) << "probing other monitors" << dendl
;
1257 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1260 new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
,
1264 for (auto& av
: extra_probe_peers
) {
1265 if (av
!= messenger
->get_myaddrs()) {
1266 messenger
->send_to_mon(
1267 new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
,
1274 bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd
,
1275 const cmdmap_t
& cmdmap
,
1278 if (is_leader() || is_peon()) {
1279 ss
<< "mon already active; ignoring bootstrap hint";
1283 entity_addrvec_t addrs
;
1285 if (cmd_getval(cmdmap
, "addr", addrstr
)) {
1286 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' addr '"
1287 << addrstr
<< "'" << dendl
;
1290 if (!addr
.parse(addrstr
, entity_addr_t::TYPE_ANY
)) {
1291 ss
<< "failed to parse addrs '" << addrstr
1292 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1296 addrs
.v
.push_back(addr
);
1297 if (addr
.get_port() == 0) {
1298 addrs
.v
[0].set_type(entity_addr_t::TYPE_MSGR2
);
1299 addrs
.v
[0].set_port(CEPH_MON_PORT_IANA
);
1300 addrs
.v
.push_back(addr
);
1301 addrs
.v
[1].set_type(entity_addr_t::TYPE_LEGACY
);
1302 addrs
.v
[1].set_port(CEPH_MON_PORT_LEGACY
);
1303 } else if (addr
.get_type() == entity_addr_t::TYPE_ANY
) {
1304 if (addr
.get_port() == CEPH_MON_PORT_LEGACY
) {
1305 addrs
.v
[0].set_type(entity_addr_t::TYPE_LEGACY
);
1307 addrs
.v
[0].set_type(entity_addr_t::TYPE_MSGR2
);
1310 } else if (cmd_getval(cmdmap
, "addrv", addrstr
)) {
1311 dout(10) << "_add_bootstrap_peer_hintv '" << cmd
<< "' addrv '"
1312 << addrstr
<< "'" << dendl
;
1313 const char *end
= 0;
1314 if (!addrs
.parse(addrstr
.c_str(), &end
)) {
1315 ss
<< "failed to parse addrs '" << addrstr
1316 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1320 ss
<< "no addr or addrv provided";
1324 extra_probe_peers
.insert(addrs
);
1325 ss
<< "adding peer " << addrs
<< " to list: " << extra_probe_peers
;
1329 // called by bootstrap(), or on leader|peon -> electing
1330 void Monitor::_reset()
1332 dout(10) << __func__
<< dendl
;
1334 // disable authentication
1336 std::lock_guard
l(auth_lock
);
1337 authmon()->_set_mon_num_rank(0, 0);
1340 cancel_probe_timeout();
1342 health_events_cleanup();
1343 health_check_log_times
.clear();
1344 scrub_event_cancel();
1346 leader_since
= utime_t();
1348 if (!quorum
.empty()) {
1349 exited_quorum
= ceph_clock_now();
1352 outside_quorum
.clear();
1353 quorum_feature_map
.clear();
1359 for (auto& svc
: paxos_service
) {
1365 // -----------------------------------------------------------
1368 set
<string
> Monitor::get_sync_targets_names()
1370 set
<string
> targets
;
1371 targets
.insert(paxos
->get_name());
1372 for (auto& svc
: paxos_service
) {
1373 svc
->get_store_prefixes(targets
);
1379 void Monitor::sync_timeout()
1381 dout(10) << __func__
<< dendl
;
1382 ceph_assert(state
== STATE_SYNCHRONIZING
);
1386 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1388 dout(1) << __func__
<< dendl
;
1390 MonMap latest_monmap
;
1392 // Grab latest monmap from MonmapMonitor
1393 bufferlist monmon_bl
;
1394 int err
= monmon()->get_monmap(monmon_bl
);
1396 if (err
!= -ENOENT
) {
1398 << " something wrong happened while reading the store: "
1399 << cpp_strerror(err
) << dendl
;
1400 ceph_abort_msg("error reading the store");
1403 latest_monmap
.decode(monmon_bl
);
1406 // Grab last backed up monmap (if any) and compare epochs
1407 if (store
->exists("mon_sync", "latest_monmap")) {
1408 bufferlist backup_bl
;
1409 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1412 << " something wrong happened while reading the store: "
1413 << cpp_strerror(err
) << dendl
;
1414 ceph_abort_msg("error reading the store");
1416 ceph_assert(backup_bl
.length() > 0);
1418 MonMap backup_monmap
;
1419 backup_monmap
.decode(backup_bl
);
1421 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1422 latest_monmap
= backup_monmap
;
1425 // Check if our current monmap's epoch is greater than the one we've
1427 if (monmap
->epoch
> latest_monmap
.epoch
)
1428 latest_monmap
= *monmap
;
1430 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1432 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1435 void Monitor::sync_reset_requester()
1437 dout(10) << __func__
<< dendl
;
1439 if (sync_timeout_event
) {
1440 timer
.cancel_event(sync_timeout_event
);
1441 sync_timeout_event
= NULL
;
1444 sync_provider
= entity_addrvec_t();
1447 sync_start_version
= 0;
1450 void Monitor::sync_reset_provider()
1452 dout(10) << __func__
<< dendl
;
1453 sync_providers
.clear();
1456 void Monitor::sync_start(entity_addrvec_t
&addrs
, bool full
)
1458 dout(10) << __func__
<< " " << addrs
<< (full
? " full" : " recent") << dendl
;
1460 ceph_assert(state
== STATE_PROBING
||
1461 state
== STATE_SYNCHRONIZING
);
1462 state
= STATE_SYNCHRONIZING
;
1464 // make sure are not a provider for anyone!
1465 sync_reset_provider();
1470 // stash key state, and mark that we are syncing
1471 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1472 sync_stash_critical_state(t
);
1473 t
->put("mon_sync", "in_sync", 1);
1475 sync_last_committed_floor
= std::max(sync_last_committed_floor
, paxos
->get_version());
1476 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1477 << sync_last_committed_floor
<< dendl
;
1478 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1480 store
->apply_transaction(t
);
1482 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 1);
1484 // clear the underlying store
1485 set
<string
> targets
= get_sync_targets_names();
1486 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1487 store
->clear(targets
);
1489 // make sure paxos knows it has been reset. this prevents a
1490 // bootstrap and then different probe reply order from possibly
1491 // deciding a partial or no sync is needed.
1494 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 2);
1497 // assume 'other' as the leader. We will update the leader once we receive
1498 // a reply to the sync start.
1499 sync_provider
= addrs
;
1501 sync_reset_timeout();
1503 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1505 m
->last_committed
= paxos
->get_version();
1506 messenger
->send_to_mon(m
, sync_provider
);
1509 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1511 dout(10) << __func__
<< dendl
;
1512 bufferlist backup_monmap
;
1513 sync_obtain_latest_monmap(backup_monmap
);
1514 ceph_assert(backup_monmap
.length() > 0);
1515 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1518 void Monitor::sync_reset_timeout()
1520 dout(10) << __func__
<< dendl
;
1521 if (sync_timeout_event
)
1522 timer
.cancel_event(sync_timeout_event
);
1523 sync_timeout_event
= timer
.add_event_after(
1524 g_conf()->mon_sync_timeout
,
1525 new C_MonContext
{this, [this](int) {
1530 void Monitor::sync_finish(version_t last_committed
)
1532 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1534 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 7);
1537 // finalize the paxos commits
1538 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1539 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1541 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1543 dout(30) << __func__
<< " final tx dump:\n";
1544 JSONFormatter
f(true);
1549 store
->apply_transaction(tx
);
1552 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 8);
1554 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1555 t
->erase("mon_sync", "in_sync");
1556 t
->erase("mon_sync", "force_sync");
1557 t
->erase("mon_sync", "last_committed_floor");
1558 store
->apply_transaction(t
);
1560 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 9);
1564 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 10);
1569 void Monitor::handle_sync(MonOpRequestRef op
)
1571 auto m
= op
->get_req
<MMonSync
>();
1572 dout(10) << __func__
<< " " << *m
<< dendl
;
1575 // provider ---------
1577 case MMonSync::OP_GET_COOKIE_FULL
:
1578 case MMonSync::OP_GET_COOKIE_RECENT
:
1579 handle_sync_get_cookie(op
);
1581 case MMonSync::OP_GET_CHUNK
:
1582 handle_sync_get_chunk(op
);
1585 // client -----------
1587 case MMonSync::OP_COOKIE
:
1588 handle_sync_cookie(op
);
1591 case MMonSync::OP_CHUNK
:
1592 case MMonSync::OP_LAST_CHUNK
:
1593 handle_sync_chunk(op
);
1595 case MMonSync::OP_NO_COOKIE
:
1596 handle_sync_no_cookie(op
);
1600 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1601 ceph_abort_msg("unknown op");
1607 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1609 auto m
= op
->get_req
<MMonSync
>();
1610 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1611 m
->get_connection()->send_message(reply
);
1614 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1616 auto m
= op
->get_req
<MMonSync
>();
1617 if (is_synchronizing()) {
1618 _sync_reply_no_cookie(op
);
1622 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 1);
1624 // make sure they can understand us.
1625 if ((required_features
^ m
->get_connection()->get_features()) &
1626 required_features
) {
1627 dout(5) << " ignoring peer mon." << m
->get_source().num()
1628 << " has features " << std::hex
1629 << m
->get_connection()->get_features()
1630 << " but we require " << required_features
<< std::dec
<< dendl
;
1634 // make up a unique cookie. include election epoch (which persists
1635 // across restarts for the whole cluster) and a counter for this
1636 // process instance. there is no need to be unique *across*
1637 // monitors, though.
1638 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1639 ceph_assert(sync_providers
.count(cookie
) == 0);
1641 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1643 SyncProvider
& sp
= sync_providers
[cookie
];
1645 sp
.addrs
= m
->get_source_addrs();
1646 sp
.reset_timeout(g_ceph_context
, g_conf()->mon_sync_timeout
* 2);
1648 set
<string
> sync_targets
;
1649 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1651 sync_targets
= get_sync_targets_names();
1652 sp
.last_committed
= paxos
->get_version();
1653 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1655 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1657 // just catch up paxos
1658 sp
.last_committed
= m
->last_committed
;
1660 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1662 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1663 reply
->last_committed
= sp
.last_committed
;
1664 m
->get_connection()->send_message(reply
);
1667 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1669 auto m
= op
->get_req
<MMonSync
>();
1670 dout(10) << __func__
<< " " << *m
<< dendl
;
1672 if (sync_providers
.count(m
->cookie
) == 0) {
1673 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1674 _sync_reply_no_cookie(op
);
1678 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 2);
1680 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1681 sp
.reset_timeout(g_ceph_context
, g_conf()->mon_sync_timeout
* 2);
1683 if (sp
.last_committed
< paxos
->get_first_committed() &&
1684 paxos
->get_first_committed() > 1) {
1685 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1686 << " < our fc " << paxos
->get_first_committed() << dendl
;
1687 sync_providers
.erase(m
->cookie
);
1688 _sync_reply_no_cookie(op
);
1692 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1693 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1695 int bytes_left
= g_conf()->mon_sync_max_payload_size
;
1696 int keys_left
= g_conf()->mon_sync_max_payload_keys
;
1697 while (sp
.last_committed
< paxos
->get_version() &&
1701 sp
.last_committed
++;
1703 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1704 ceph_assert(err
== 0);
1706 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1707 bytes_left
-= bl
.length();
1709 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1712 reply
->last_committed
= sp
.last_committed
;
1714 if (sp
.full
&& bytes_left
> 0 && keys_left
> 0) {
1715 sp
.synchronizer
->get_chunk_tx(tx
, bytes_left
, keys_left
);
1716 sp
.last_key
= sp
.synchronizer
->get_last_key();
1717 reply
->last_key
= sp
.last_key
;
1720 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1721 sp
.last_committed
< paxos
->get_version()) {
1722 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1723 << " key " << sp
.last_key
<< dendl
;
1725 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1726 << " key " << sp
.last_key
<< dendl
;
1727 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1729 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 3);
1731 // clean up our local state
1732 sync_providers
.erase(sp
.cookie
);
1735 encode(*tx
, reply
->chunk_bl
);
1737 m
->get_connection()->send_message(reply
);
1742 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1744 auto m
= op
->get_req
<MMonSync
>();
1745 dout(10) << __func__
<< " " << *m
<< dendl
;
1747 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1750 if (m
->get_source_addrs() != sync_provider
) {
1751 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1754 sync_cookie
= m
->cookie
;
1755 sync_start_version
= m
->last_committed
;
1757 sync_reset_timeout();
1758 sync_get_next_chunk();
1760 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 3);
1763 void Monitor::sync_get_next_chunk()
1765 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1766 if (g_conf()->mon_inject_sync_get_chunk_delay
> 0) {
1767 dout(20) << __func__
<< " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay
<< dendl
;
1768 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay
* 1000000.0));
1770 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1771 messenger
->send_to_mon(r
, sync_provider
);
1773 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 4);
1776 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1778 auto m
= op
->get_req
<MMonSync
>();
1779 dout(10) << __func__
<< " " << *m
<< dendl
;
1781 if (m
->cookie
!= sync_cookie
) {
1782 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1785 if (m
->get_source_addrs() != sync_provider
) {
1786 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1790 ceph_assert(state
== STATE_SYNCHRONIZING
);
1791 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 5);
1793 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1794 tx
->append_from_encoded(m
->chunk_bl
);
1796 dout(30) << __func__
<< " tx dump:\n";
1797 JSONFormatter
f(true);
1802 store
->apply_transaction(tx
);
1804 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 6);
1807 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1808 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1809 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1811 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1813 dout(30) << __func__
<< " tx dump:\n";
1814 JSONFormatter
f(true);
1819 store
->apply_transaction(tx
);
1820 paxos
->init(); // to refresh what we just wrote
1823 if (m
->op
== MMonSync::OP_CHUNK
) {
1824 sync_reset_timeout();
1825 sync_get_next_chunk();
1826 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1827 sync_finish(m
->last_committed
);
1831 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1833 dout(10) << __func__
<< dendl
;
1837 void Monitor::sync_trim_providers()
1839 dout(20) << __func__
<< dendl
;
1841 utime_t now
= ceph_clock_now();
1842 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1843 while (p
!= sync_providers
.end()) {
1844 if (now
> p
->second
.timeout
) {
1845 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
1846 << " for " << p
->second
.addrs
<< dendl
;
1847 sync_providers
.erase(p
++);
1854 // ---------------------------------------------------
1857 void Monitor::cancel_probe_timeout()
1859 if (probe_timeout_event
) {
1860 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1861 timer
.cancel_event(probe_timeout_event
);
1862 probe_timeout_event
= NULL
;
1864 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1868 void Monitor::reset_probe_timeout()
1870 cancel_probe_timeout();
1871 probe_timeout_event
= new C_MonContext
{this, [this](int r
) {
1874 double t
= g_conf()->mon_probe_timeout
;
1875 if (timer
.add_event_after(t
, probe_timeout_event
)) {
1876 dout(10) << "reset_probe_timeout " << probe_timeout_event
1877 << " after " << t
<< " seconds" << dendl
;
1879 probe_timeout_event
= nullptr;
1883 void Monitor::probe_timeout(int r
)
1885 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1886 ceph_assert(is_probing() || is_synchronizing());
1887 ceph_assert(probe_timeout_event
);
1888 probe_timeout_event
= NULL
;
1892 void Monitor::handle_probe(MonOpRequestRef op
)
1894 auto m
= op
->get_req
<MMonProbe
>();
1895 dout(10) << "handle_probe " << *m
<< dendl
;
1897 if (m
->fsid
!= monmap
->fsid
) {
1898 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1903 case MMonProbe::OP_PROBE
:
1904 handle_probe_probe(op
);
1907 case MMonProbe::OP_REPLY
:
1908 handle_probe_reply(op
);
1911 case MMonProbe::OP_MISSING_FEATURES
:
1912 derr
<< __func__
<< " require release " << (int)m
->mon_release
<< " > "
1913 << (int)ceph_release()
1914 << ", or missing features (have " << CEPH_FEATURES_ALL
1915 << ", required " << m
->required_features
1916 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
) << ")"
1922 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1924 auto m
= op
->get_req
<MMonProbe
>();
1926 dout(10) << "handle_probe_probe " << m
->get_source_inst() << " " << *m
1927 << " features " << m
->get_connection()->get_features() << dendl
;
1928 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1929 if ((m
->mon_release
!= ceph_release_t::unknown
&&
1930 m
->mon_release
< monmap
->min_mon_release
) ||
1932 dout(1) << " peer " << m
->get_source_addr()
1933 << " release " << m
->mon_release
1934 << " < min_mon_release " << monmap
->min_mon_release
1935 << ", or missing features " << missing
<< dendl
;
1936 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1937 name
, has_ever_joined
, monmap
->min_mon_release
);
1938 m
->required_features
= required_features
;
1939 m
->get_connection()->send_message(r
);
1943 if (!is_probing() && !is_synchronizing()) {
1944 // If the probing mon is way ahead of us, we need to re-bootstrap.
1945 // Normally we capture this case when we initially bootstrap, but
1946 // it is possible we pass those checks (we overlap with
1947 // quorum-to-be) but fail to join a quorum before it moves past
1948 // us. We need to be kicked back to bootstrap so we can
1949 // synchonize, not keep calling elections.
1950 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1951 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1952 << "ahead of us, re-bootstrapping" << dendl
;
1960 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
,
1965 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1966 r
->paxos_first_version
= paxos
->get_first_committed();
1967 r
->paxos_last_version
= paxos
->get_version();
1968 m
->get_connection()->send_message(r
);
1970 // did we discover a peer here?
1971 if (!monmap
->contains(m
->get_source_addr())) {
1972 dout(1) << " adding peer " << m
->get_source_addrs()
1973 << " to list of hints" << dendl
;
1974 extra_probe_peers
.insert(m
->get_source_addrs());
1976 elector
.begin_peer_ping(monmap
->get_rank(m
->get_source_addr()));
1983 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1985 auto m
= op
->get_req
<MMonProbe
>();
1986 dout(10) << "handle_probe_reply " << m
->get_source_inst()
1987 << " " << *m
<< dendl
;
1988 dout(10) << " monmap is " << *monmap
<< dendl
;
1990 // discover name and addrs during probing or electing states.
1991 if (!is_probing() && !is_electing()) {
1995 // newer map, or they've joined a quorum and we haven't?
1997 monmap
->encode(mybl
, m
->get_connection()->get_features());
1998 // make sure it's actually different; the checks below err toward
1999 // taking the other guy's map, which could cause us to loop.
2000 if (!mybl
.contents_equal(m
->monmap_bl
)) {
2001 MonMap
*newmap
= new MonMap
;
2002 newmap
->decode(m
->monmap_bl
);
2003 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
2004 !has_ever_joined
)) {
2005 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
2006 << ", mine was " << monmap
->get_epoch() << dendl
;
2007 int epoch_diff
= newmap
->get_epoch() - monmap
->get_epoch();
2009 monmap
->decode(m
->monmap_bl
);
2010 dout(20) << "has_ever_joined: " << has_ever_joined
<< dendl
;
2011 if (epoch_diff
== 1 && has_ever_joined
) {
2012 notify_new_monmap(false);
2014 notify_new_monmap(false, false);
2015 elector
.notify_clear_peer_state();
2024 string peer_name
= monmap
->get_name(m
->get_source_addr());
2025 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
2026 dout(10) << " renaming peer " << m
->get_source_addr() << " "
2027 << peer_name
<< " -> " << m
->name
<< " in my monmap"
2029 monmap
->rename(peer_name
, m
->name
);
2031 if (is_electing()) {
2035 } else if (peer_name
.size()) {
2036 dout(10) << " peer name is " << peer_name
<< dendl
;
2038 dout(10) << " peer " << m
->get_source_addr() << " not in map" << dendl
;
2041 // new initial peer?
2042 if (monmap
->get_epoch() == 0 &&
2043 monmap
->contains(m
->name
) &&
2044 monmap
->get_addrs(m
->name
).front().is_blank_ip()) {
2045 dout(1) << " learned initial mon " << m
->name
2046 << " addrs " << m
->get_source_addrs() << dendl
;
2047 monmap
->set_addrvec(m
->name
, m
->get_source_addrs());
2053 // end discover phase
2054 if (!is_probing()) {
2058 ceph_assert(paxos
!= NULL
);
2060 if (is_synchronizing()) {
2061 dout(10) << " currently syncing" << dendl
;
2065 entity_addrvec_t other
= m
->get_source_addrs();
2067 if (m
->paxos_last_version
< sync_last_committed_floor
) {
2068 dout(10) << " peer paxos versions [" << m
->paxos_first_version
2069 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
2070 << sync_last_committed_floor
<< ", ignoring"
2073 if (paxos
->get_version() < m
->paxos_first_version
&&
2074 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
2075 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
2076 << "," << m
->paxos_last_version
<< "]"
2077 << " vs my version " << paxos
->get_version()
2078 << " (too far ahead)"
2080 cancel_probe_timeout();
2081 sync_start(other
, true);
2084 if (paxos
->get_version() + g_conf()->paxos_max_join_drift
< m
->paxos_last_version
) {
2085 dout(10) << " peer paxos last version " << m
->paxos_last_version
2086 << " vs my version " << paxos
->get_version()
2087 << " (too far ahead)"
2089 cancel_probe_timeout();
2090 sync_start(other
, false);
2095 // did the existing cluster complete upgrade to luminous?
2096 if (osdmon()->osdmap
.get_epoch()) {
2097 if (osdmon()->osdmap
.require_osd_release
< ceph_release_t::luminous
) {
2098 derr
<< __func__
<< " existing cluster has not completed upgrade to"
2099 << " luminous; 'ceph osd require_osd_release luminous' before"
2100 << " upgrading" << dendl
;
2103 if (!osdmon()->osdmap
.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS
) ||
2104 !osdmon()->osdmap
.test_flag(CEPH_OSDMAP_RECOVERY_DELETES
)) {
2105 derr
<< __func__
<< " existing cluster has not completed a full luminous"
2106 << " scrub to purge legacy snapdir objects; please scrub before"
2107 << " upgrading beyond luminous." << dendl
;
2112 // is there an existing quorum?
2113 if (m
->quorum
.size()) {
2114 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
2116 dout(10) << " peer paxos version " << m
->paxos_last_version
2117 << " vs my version " << paxos
->get_version()
2120 bool in_map
= false;
2121 const auto my_info
= monmap
->mon_info
.find(name
);
2122 const map
<string
,string
> *map_crush_loc
{nullptr};
2123 if (my_info
!= monmap
->mon_info
.end()) {
2125 map_crush_loc
= &my_info
->second
.crush_loc
;
2128 !monmap
->get_addrs(name
).front().is_blank_ip() &&
2129 (!need_set_crush_loc
|| (*map_crush_loc
== crush_loc
))) {
2130 // i'm part of the cluster; just initiate a new election
2133 dout(10) << " ready to join, but i'm not in the monmap/"
2134 "my addr is blank/location is wrong, trying to join" << dendl
;
2135 send_mon_message(new MMonJoin(monmap
->fsid
, name
,
2136 messenger
->get_myaddrs(), crush_loc
,
2137 need_set_crush_loc
),
2141 if (monmap
->contains(m
->name
)) {
2142 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
2143 outside_quorum
.insert(m
->name
);
2145 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
2149 unsigned need
= monmap
->min_quorum_size();
2150 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
2151 if (outside_quorum
.size() >= need
) {
2152 if (outside_quorum
.count(name
)) {
2153 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
2156 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
2159 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
2164 void Monitor::join_election()
2166 dout(10) << __func__
<< dendl
;
2167 wait_for_paxos_write();
2169 state
= STATE_ELECTING
;
2171 logger
->inc(l_mon_num_elections
);
2174 void Monitor::start_election()
2176 dout(10) << "start_election" << dendl
;
2177 wait_for_paxos_write();
2179 state
= STATE_ELECTING
;
2181 logger
->inc(l_mon_num_elections
);
2182 logger
->inc(l_mon_election_call
);
2184 clog
->info() << "mon." << name
<< " calling monitor election";
2185 elector
.call_election();
2188 void Monitor::win_standalone_election()
2190 dout(1) << "win_standalone_election" << dendl
;
2192 // bump election epoch, in case the previous epoch included other
2193 // monitors; we need to be able to make the distinction.
2194 elector
.declare_standalone_victory();
2196 rank
= monmap
->get_rank(name
);
2197 ceph_assert(rank
== 0);
2201 map
<int,Metadata
> metadata
;
2202 collect_metadata(&metadata
[0]);
2204 win_election(elector
.get_epoch(), q
,
2206 ceph::features::mon::get_supported(),
2211 const utime_t
& Monitor::get_leader_since() const
2213 ceph_assert(state
== STATE_LEADER
);
2214 return leader_since
;
2217 epoch_t
Monitor::get_epoch()
2219 return elector
.get_epoch();
2222 void Monitor::_finish_svc_election()
2224 ceph_assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
2226 for (auto& svc
: paxos_service
) {
2227 // we already called election_finished() on monmon(); avoid callig twice
2228 if (state
== STATE_LEADER
&& svc
.get() == monmon())
2230 svc
->election_finished();
2234 void Monitor::win_election(epoch_t epoch
, const set
<int>& active
, uint64_t features
,
2235 const mon_feature_t
& mon_features
,
2236 ceph_release_t min_mon_release
,
2237 const map
<int,Metadata
>& metadata
)
2239 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
2240 << " features " << features
2241 << " mon_features " << mon_features
2242 << " min_mon_release " << min_mon_release
2244 ceph_assert(is_electing());
2245 state
= STATE_LEADER
;
2246 leader_since
= ceph_clock_now();
2247 quorum_since
= mono_clock::now();
2250 quorum_con_features
= features
;
2251 quorum_mon_features
= mon_features
;
2252 quorum_min_mon_release
= min_mon_release
;
2253 pending_metadata
= metadata
;
2254 outside_quorum
.clear();
2256 clog
->info() << "mon." << name
<< " is new leader, mons " << get_quorum_names()
2257 << " in quorum (ranks " << quorum
<< ")";
2259 set_leader_commands(get_local_commands(mon_features
));
2261 paxos
->leader_init();
2262 // NOTE: tell monmap monitor first. This is important for the
2263 // bootstrap case to ensure that the very first paxos proposal
2264 // codifies the monmap. Otherwise any manner of chaos can ensue
2265 // when monitors are call elections or participating in a paxos
2266 // round without agreeing on who the participants are.
2267 monmon()->election_finished();
2268 _finish_svc_election();
2270 logger
->inc(l_mon_election_win
);
2272 // inject new metadata in first transaction.
2274 // include previous metadata for missing mons (that aren't part of
2275 // the current quorum).
2276 map
<int,Metadata
> m
= metadata
;
2277 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
2278 if (m
.count(rank
) == 0 &&
2279 mon_metadata
.count(rank
)) {
2280 m
[rank
] = mon_metadata
[rank
];
2284 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2285 // a new transaction immediately after the election finishes. We should
2286 // do that anyway for other reasons, though.
2287 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
2290 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
2294 if (monmap
->size() > 1 &&
2295 monmap
->get_epoch() > 0) {
2297 health_tick_start();
2299 // Freshen the health status before doing health_to_clog in case
2300 // our just-completed election changed the health
2301 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r
){
2302 dout(20) << "healthmon now active" << dendl
;
2303 healthmon()->tick();
2304 if (healthmon()->is_proposing()) {
2305 dout(20) << __func__
<< " healthmon proposing, waiting" << dendl
;
2306 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext
{this,
2308 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
2309 do_health_to_clog_interval();
2313 do_health_to_clog_interval();
2317 scrub_event_start();
2321 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
2323 const mon_feature_t
& mon_features
,
2324 ceph_release_t min_mon_release
)
2327 leader_since
= utime_t();
2328 quorum_since
= mono_clock::now();
2331 outside_quorum
.clear();
2332 quorum_con_features
= features
;
2333 quorum_mon_features
= mon_features
;
2334 quorum_min_mon_release
= min_mon_release
;
2335 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
2336 << " quorum is " << quorum
<< " features are " << quorum_con_features
2337 << " mon_features are " << quorum_mon_features
2338 << " min_mon_release " << min_mon_release
2342 _finish_svc_election();
2344 logger
->inc(l_mon_election_lose
);
2350 std::string
collect_compression_algorithms()
2353 bool printed
= false;
2354 for (auto [name
, key
] : Compressor::compression_algorithms
) {
2367 void Monitor::collect_metadata(Metadata
*m
)
2369 collect_sys_info(m
, g_ceph_context
);
2370 (*m
)["addrs"] = stringify(messenger
->get_myaddrs());
2371 (*m
)["compression_algorithms"] = collect_compression_algorithms();
2373 // infer storage device
2374 string devname
= store
->get_devname();
2375 set
<string
> devnames
;
2376 get_raw_devices(devname
, &devnames
);
2377 map
<string
,string
> errs
;
2378 get_device_metadata(devnames
, m
, &errs
);
2379 for (auto& i
: errs
) {
2380 dout(1) << __func__
<< " " << i
.first
<< ": " << i
.second
<< dendl
;
2384 void Monitor::finish_election()
2386 apply_quorum_to_compatset_features();
2387 apply_monmap_to_compatset_features();
2389 exited_quorum
= utime_t();
2390 finish_contexts(g_ceph_context
, waitfor_quorum
);
2391 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2392 resend_routed_requests();
2394 register_cluster_logger();
2396 // enable authentication
2398 std::lock_guard
l(auth_lock
);
2399 authmon()->_set_mon_num_rank(monmap
->size(), rank
);
2402 // am i named and located properly?
2403 string cur_name
= monmap
->get_name(messenger
->get_myaddrs());
2404 const auto my_infop
= monmap
->mon_info
.find(cur_name
);
2405 const map
<string
,string
>& map_crush_loc
= my_infop
->second
.crush_loc
;
2407 if (cur_name
!= name
||
2408 (need_set_crush_loc
&& map_crush_loc
!= crush_loc
)) {
2409 dout(10) << " renaming/moving myself from " << cur_name
<< "/"
2410 << map_crush_loc
<<" -> " << name
<< "/" << crush_loc
<< dendl
;
2411 send_mon_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddrs(),
2412 crush_loc
, need_set_crush_loc
),
2416 do_stretch_mode_election_work();
2419 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2421 if (new_features
.compare(features
) != 0) {
2422 CompatSet diff
= features
.unsupported(new_features
);
2423 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2424 features
= new_features
;
2426 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2428 store
->apply_transaction(t
);
2430 calc_quorum_requirements();
2434 void Monitor::apply_quorum_to_compatset_features()
2436 CompatSet
new_features(features
);
2437 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2438 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2439 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2441 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2442 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2443 dout(5) << __func__
<< dendl
;
2444 _apply_compatset_features(new_features
);
2447 void Monitor::apply_monmap_to_compatset_features()
2449 CompatSet
new_features(features
);
2450 mon_feature_t monmap_features
= monmap
->get_required_features();
2452 /* persistent monmap features may go into the compatset.
2453 * optional monmap features may not - why?
2454 * because optional monmap features may be set/unset by the admin,
2455 * and possibly by other means that haven't yet been thought out,
2456 * so we can't make the monitor enforce them on start - because they
2458 * this, of course, does not invalidate setting a compatset feature
2459 * for an optional feature - as long as you make sure to clean it up
2460 * once you unset it.
2462 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2463 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2464 ceph::features::mon::FEATURE_KRAKEN
));
2465 // this feature should only ever be set if the quorum supports it.
2466 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2467 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2469 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
2470 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2471 ceph::features::mon::FEATURE_LUMINOUS
));
2472 // this feature should only ever be set if the quorum supports it.
2473 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
));
2474 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
2476 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_MIMIC
)) {
2477 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2478 ceph::features::mon::FEATURE_MIMIC
));
2479 // this feature should only ever be set if the quorum supports it.
2480 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_MIMIC
));
2481 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC
);
2483 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_NAUTILUS
)) {
2484 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2485 ceph::features::mon::FEATURE_NAUTILUS
));
2486 // this feature should only ever be set if the quorum supports it.
2487 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_NAUTILUS
));
2488 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
);
2490 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_OCTOPUS
)) {
2491 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2492 ceph::features::mon::FEATURE_OCTOPUS
));
2493 // this feature should only ever be set if the quorum supports it.
2494 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_OCTOPUS
));
2495 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
);
2497 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_PACIFIC
)) {
2498 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2499 ceph::features::mon::FEATURE_PACIFIC
));
2500 // this feature should only ever be set if the quorum supports it.
2501 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_PACIFIC
));
2502 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC
);
2504 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_QUINCY
)) {
2505 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2506 ceph::features::mon::FEATURE_QUINCY
));
2507 // this feature should only ever be set if the quorum supports it.
2508 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_QUINCY
));
2509 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_QUINCY
);
2511 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_REEF
)) {
2512 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2513 ceph::features::mon::FEATURE_REEF
));
2514 // this feature should only ever be set if the quorum supports it.
2515 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_REEF
));
2516 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_REEF
);
2519 dout(5) << __func__
<< dendl
;
2520 _apply_compatset_features(new_features
);
2523 void Monitor::calc_quorum_requirements()
2525 required_features
= 0;
2528 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2529 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2531 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2532 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2534 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
)) {
2535 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2537 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC
)) {
2538 required_features
|= CEPH_FEATUREMASK_SERVER_MIMIC
;
2540 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
)) {
2541 required_features
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
|
2542 CEPH_FEATUREMASK_CEPHX_V2
;
2544 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
)) {
2545 required_features
|= CEPH_FEATUREMASK_SERVER_OCTOPUS
;
2547 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_PACIFIC
)) {
2548 required_features
|= CEPH_FEATUREMASK_SERVER_PACIFIC
;
2550 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_QUINCY
)) {
2551 required_features
|= CEPH_FEATUREMASK_SERVER_QUINCY
;
2553 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_REEF
)) {
2554 required_features
|= CEPH_FEATUREMASK_SERVER_REEF
;
2558 if (monmap
->get_required_features().contains_all(
2559 ceph::features::mon::FEATURE_KRAKEN
)) {
2560 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2562 if (monmap
->get_required_features().contains_all(
2563 ceph::features::mon::FEATURE_LUMINOUS
)) {
2564 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2566 if (monmap
->get_required_features().contains_all(
2567 ceph::features::mon::FEATURE_MIMIC
)) {
2568 required_features
|= CEPH_FEATUREMASK_SERVER_MIMIC
;
2570 if (monmap
->get_required_features().contains_all(
2571 ceph::features::mon::FEATURE_NAUTILUS
)) {
2572 required_features
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
|
2573 CEPH_FEATUREMASK_CEPHX_V2
;
2575 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2578 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2580 *fm
+= session_map
.feature_map
;
2581 for (auto id
: quorum
) {
2583 *fm
+= quorum_feature_map
[id
];
2588 void Monitor::sync_force(Formatter
*f
)
2590 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2591 sync_stash_critical_state(tx
);
2592 tx
->put("mon_sync", "force_sync", 1);
2593 store
->apply_transaction(tx
);
2595 f
->open_object_section("sync_force");
2596 f
->dump_int("ret", 0);
2597 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2598 f
->close_section(); // sync_force
2601 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2603 bool free_formatter
= false;
2606 // louzy/lazy hack: default to json if no formatter has been defined
2607 f
= new JSONFormatter();
2608 free_formatter
= true;
2610 f
->open_object_section("quorum_status");
2611 f
->dump_int("election_epoch", get_epoch());
2613 f
->open_array_section("quorum");
2614 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2615 f
->dump_int("mon", *p
);
2616 f
->close_section(); // quorum
2618 list
<string
> quorum_names
= get_quorum_names();
2619 f
->open_array_section("quorum_names");
2620 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2621 f
->dump_string("mon", *p
);
2622 f
->close_section(); // quorum_names
2624 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(leader
));
2626 if (!quorum
.empty()) {
2632 f
->open_object_section("features");
2633 f
->dump_stream("quorum_con") << quorum_con_features
;
2634 quorum_mon_features
.dump(f
, "quorum_mon");
2637 f
->open_object_section("monmap");
2639 f
->close_section(); // monmap
2641 f
->close_section(); // quorum_status
2647 void Monitor::get_mon_status(Formatter
*f
)
2649 f
->open_object_section("mon_status");
2650 f
->dump_string("name", name
);
2651 f
->dump_int("rank", rank
);
2652 f
->dump_string("state", get_state_name());
2653 f
->dump_int("election_epoch", get_epoch());
2655 f
->open_array_section("quorum");
2656 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2657 f
->dump_int("mon", *p
);
2659 f
->close_section(); // quorum
2661 if (!quorum
.empty()) {
2667 f
->open_object_section("features");
2668 f
->dump_stream("required_con") << required_features
;
2669 mon_feature_t req_mon_features
= get_required_mon_features();
2670 req_mon_features
.dump(f
, "required_mon");
2671 f
->dump_stream("quorum_con") << quorum_con_features
;
2672 quorum_mon_features
.dump(f
, "quorum_mon");
2673 f
->close_section(); // features
2675 f
->open_array_section("outside_quorum");
2676 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2677 f
->dump_string("mon", *p
);
2678 f
->close_section(); // outside_quorum
2680 f
->open_array_section("extra_probe_peers");
2681 for (set
<entity_addrvec_t
>::iterator p
= extra_probe_peers
.begin();
2682 p
!= extra_probe_peers
.end();
2684 f
->dump_object("peer", *p
);
2686 f
->close_section(); // extra_probe_peers
2688 f
->open_array_section("sync_provider");
2689 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2690 p
!= sync_providers
.end();
2692 f
->dump_unsigned("cookie", p
->second
.cookie
);
2693 f
->dump_object("addrs", p
->second
.addrs
);
2694 f
->dump_stream("timeout") << p
->second
.timeout
;
2695 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2696 f
->dump_stream("last_key") << p
->second
.last_key
;
2700 if (is_synchronizing()) {
2701 f
->open_object_section("sync");
2702 f
->dump_stream("sync_provider") << sync_provider
;
2703 f
->dump_unsigned("sync_cookie", sync_cookie
);
2704 f
->dump_unsigned("sync_start_version", sync_start_version
);
2708 if (g_conf()->mon_sync_provider_kill_at
> 0)
2709 f
->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at
);
2710 if (g_conf()->mon_sync_requester_kill_at
> 0)
2711 f
->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at
);
2713 f
->open_object_section("monmap");
2717 f
->dump_object("feature_map", session_map
.feature_map
);
2718 f
->dump_bool("stretch_mode", stretch_mode_engaged
);
2719 f
->close_section(); // mon_status
2723 // health status to clog
2725 void Monitor::health_tick_start()
2727 if (!cct
->_conf
->mon_health_to_clog
||
2728 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2731 dout(15) << __func__
<< dendl
;
2734 health_tick_event
= timer
.add_event_after(
2735 cct
->_conf
->mon_health_to_clog_tick_interval
,
2736 new C_MonContext
{this, [this](int r
) {
2739 health_tick_start();
2743 void Monitor::health_tick_stop()
2745 dout(15) << __func__
<< dendl
;
2747 if (health_tick_event
) {
2748 timer
.cancel_event(health_tick_event
);
2749 health_tick_event
= NULL
;
2753 ceph::real_clock::time_point
Monitor::health_interval_calc_next_update()
2755 auto now
= ceph::real_clock::now();
2757 auto secs
= std::chrono::duration_cast
<std::chrono::seconds
>(now
.time_since_epoch());
2758 int remainder
= secs
.count() % cct
->_conf
->mon_health_to_clog_interval
;
2759 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2760 auto next
= secs
+ std::chrono::seconds(adjustment
);
2762 dout(20) << __func__
2763 << " now: " << now
<< ","
2764 << " next: " << next
<< ","
2765 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2768 return ceph::real_clock::time_point
{next
};
2771 void Monitor::health_interval_start()
2773 dout(15) << __func__
<< dendl
;
2775 if (!cct
->_conf
->mon_health_to_clog
||
2776 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2780 health_interval_stop();
2781 auto next
= health_interval_calc_next_update();
2782 health_interval_event
= new C_MonContext
{this, [this](int r
) {
2785 do_health_to_clog_interval();
2787 if (!timer
.add_event_at(next
, health_interval_event
)) {
2788 health_interval_event
= nullptr;
2792 void Monitor::health_interval_stop()
2794 dout(15) << __func__
<< dendl
;
2795 if (health_interval_event
) {
2796 timer
.cancel_event(health_interval_event
);
2798 health_interval_event
= NULL
;
2801 void Monitor::health_events_cleanup()
2804 health_interval_stop();
2805 health_status_cache
.reset();
2808 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2810 dout(20) << __func__
<< dendl
;
2812 if (changed
.count("mon_health_to_clog")) {
2813 if (!cct
->_conf
->mon_health_to_clog
) {
2814 health_events_cleanup();
2817 if (!health_tick_event
) {
2818 health_tick_start();
2820 if (!health_interval_event
) {
2821 health_interval_start();
2826 if (changed
.count("mon_health_to_clog_interval")) {
2827 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2828 health_interval_stop();
2830 health_interval_start();
2834 if (changed
.count("mon_health_to_clog_tick_interval")) {
2835 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2838 health_tick_start();
2843 void Monitor::do_health_to_clog_interval()
2845 // outputting to clog may have been disabled in the conf
2846 // since we were scheduled.
2847 if (!cct
->_conf
->mon_health_to_clog
||
2848 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2851 dout(10) << __func__
<< dendl
;
2853 // do we have a cached value for next_clog_update? if not,
2854 // do we know when the last update was?
2856 do_health_to_clog(true);
2857 health_interval_start();
2860 void Monitor::do_health_to_clog(bool force
)
2862 // outputting to clog may have been disabled in the conf
2863 // since we were scheduled.
2864 if (!cct
->_conf
->mon_health_to_clog
||
2865 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2868 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2871 health_status_t level
= healthmon()->get_health_status(false, nullptr, &summary
);
2873 summary
== health_status_cache
.summary
&&
2874 level
== health_status_cache
.overall
)
2877 if (g_conf()->mon_health_detail_to_clog
&&
2878 summary
!= health_status_cache
.summary
&&
2879 level
!= HEALTH_OK
) {
2881 level
= healthmon()->get_health_status(true, nullptr, &details
);
2882 clog
->health(level
) << "Health detail: " << details
;
2884 clog
->health(level
) << "overall " << summary
;
2886 health_status_cache
.summary
= summary
;
2887 health_status_cache
.overall
= level
;
2890 void Monitor::log_health(
2891 const health_check_map_t
& updated
,
2892 const health_check_map_t
& previous
,
2893 MonitorDBStore::TransactionRef t
)
2895 if (!g_conf()->mon_health_to_clog
) {
2899 const utime_t now
= ceph_clock_now();
2901 // FIXME: log atomically as part of @t instead of using clog.
2902 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2903 << " previous " << previous
.checks
.size()
2905 const auto min_log_period
= g_conf().get_val
<int64_t>(
2906 "mon_health_log_update_period");
2907 for (auto& p
: updated
.checks
) {
2908 auto q
= previous
.checks
.find(p
.first
);
2909 bool logged
= false;
2910 if (q
== previous
.checks
.end()) {
2913 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2915 clog
->health(p
.second
.severity
) << ss
.str();
2919 if (p
.second
.summary
!= q
->second
.summary
||
2920 p
.second
.severity
!= q
->second
.severity
) {
2922 auto status_iter
= health_check_log_times
.find(p
.first
);
2923 if (status_iter
!= health_check_log_times
.end()) {
2924 if (p
.second
.severity
== q
->second
.severity
&&
2925 now
- status_iter
->second
.updated_at
< min_log_period
) {
2926 // We already logged this recently and the severity is unchanged,
2927 // so skip emitting an update of the summary string.
2928 // We'll get an update out of tick() later if the check
2929 // is still failing.
2934 // summary or severity changed (ignore detail changes at this level)
2936 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2937 clog
->health(p
.second
.severity
) << ss
.str();
2942 // Record the time at which we last logged, so that we can check this
2943 // when considering whether/when to print update messages.
2945 auto iter
= health_check_log_times
.find(p
.first
);
2946 if (iter
== health_check_log_times
.end()) {
2947 health_check_log_times
.emplace(p
.first
, HealthCheckLogStatus(
2948 p
.second
.severity
, p
.second
.summary
, now
));
2950 iter
->second
= HealthCheckLogStatus(
2951 p
.second
.severity
, p
.second
.summary
, now
);
2955 for (auto& p
: previous
.checks
) {
2956 if (!updated
.checks
.count(p
.first
)) {
2959 if (p
.first
== "DEGRADED_OBJECTS") {
2960 clog
->info() << "All degraded objects recovered";
2961 } else if (p
.first
== "OSD_FLAGS") {
2962 clog
->info() << "OSD flags cleared";
2964 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2965 << p
.second
.summary
<< ")";
2968 if (health_check_log_times
.count(p
.first
)) {
2969 health_check_log_times
.erase(p
.first
);
2974 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2975 // We might be going into a fully healthy state, check
2977 bool any_checks
= false;
2978 for (auto& svc
: paxos_service
) {
2979 if (&(svc
->get_health_checks()) == &(previous
)) {
2980 // Ignore the ones we're clearing right now
2984 if (svc
->get_health_checks().checks
.size() > 0) {
2990 clog
->info() << "Cluster is now healthy";
2995 void Monitor::update_pending_metadata()
2998 collect_metadata(&metadata
);
2999 size_t version_size
= mon_metadata
[rank
]["ceph_version_short"].size();
3000 const std::string current_version
= mon_metadata
[rank
]["ceph_version_short"];
3001 const std::string pending_version
= metadata
["ceph_version_short"];
3003 if (current_version
.compare(0, version_size
, pending_version
) != 0) {
3004 mgr_client
.update_daemon_metadata("mon", name
, metadata
);
3008 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
,
3009 MonSession
*session
)
3012 f
->open_object_section("status");
3014 const auto&& fs_names
= session
->get_allowed_fs_names();
3017 f
->dump_stream("fsid") << monmap
->get_fsid();
3018 healthmon()->get_health_status(false, f
, nullptr);
3019 f
->dump_unsigned("election_epoch", get_epoch());
3021 f
->open_array_section("quorum");
3022 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
3023 f
->dump_int("rank", *p
);
3025 f
->open_array_section("quorum_names");
3026 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
3027 f
->dump_string("id", monmap
->get_name(*p
));
3033 f
->open_object_section("monmap");
3034 monmap
->dump_summary(f
);
3036 f
->open_object_section("osdmap");
3037 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
3039 f
->open_object_section("pgmap");
3040 mgrstatmon()->print_summary(f
, NULL
);
3042 f
->open_object_section("fsmap");
3044 FSMap fsmap_copy
= mdsmon()->get_fsmap();
3045 if (!fs_names
.empty()) {
3046 fsmap_copy
.filter(fs_names
);
3048 const FSMap
*fsmapp
= &fsmap_copy
;
3050 fsmapp
->print_summary(f
, NULL
);
3052 f
->open_object_section("mgrmap");
3053 mgrmon()->get_map().print_summary(f
, nullptr);
3056 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
3058 f
->open_object_section("progress_events");
3059 for (auto& i
: mgrstatmon()->get_progress_events()) {
3060 f
->dump_object(i
.first
.c_str(), i
.second
);
3066 ss
<< " cluster:\n";
3067 ss
<< " id: " << monmap
->get_fsid() << "\n";
3070 healthmon()->get_health_status(false, nullptr, &health
,
3072 ss
<< " health: " << health
<< "\n";
3074 ss
<< "\n \n services:\n";
3077 auto& service_map
= mgrstatmon()->get_service_map();
3078 for (auto& p
: service_map
.services
) {
3079 maxlen
= std::max(maxlen
, p
.first
.size());
3081 string
spacing(maxlen
- 3, ' ');
3082 const auto quorum_names
= get_quorum_names();
3083 const auto mon_count
= monmap
->mon_info
.size();
3084 auto mnow
= ceph::mono_clock::now();
3085 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
3086 << quorum_names
<< " (age " << timespan_str(mnow
- quorum_since
) << ")";
3087 if (quorum_names
.size() != mon_count
) {
3088 std::list
<std::string
> out_of_q
;
3089 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
3090 if (quorum
.count(i
) == 0) {
3091 out_of_q
.push_back(monmap
->ranks
[i
]);
3094 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
3095 out_of_q
.end(), std::string(", "));
3098 if (mgrmon()->in_use()) {
3099 ss
<< " mgr: " << spacing
;
3100 mgrmon()->get_map().print_summary(nullptr, &ss
);
3104 FSMap fsmap_copy
= mdsmon()->get_fsmap();
3105 if (!fs_names
.empty()) {
3106 fsmap_copy
.filter(fs_names
);
3108 const FSMap
*fsmapp
= &fsmap_copy
;
3110 if (fsmapp
->filesystem_count() > 0 and mdsmon()->should_print_status()){
3111 ss
<< " mds: " << spacing
;
3112 fsmapp
->print_daemon_summary(ss
);
3116 ss
<< " osd: " << spacing
;
3117 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
3119 for (auto& p
: service_map
.services
) {
3120 const std::string
&service
= p
.first
;
3121 // filter out normal ceph entity types
3122 if (ServiceMap::is_normal_ceph_entity(service
)) {
3125 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
3126 << p
.second
.get_summary() << "\n";
3130 if (auto& service_map
= mgrstatmon()->get_service_map();
3131 std::any_of(service_map
.services
.begin(),
3132 service_map
.services
.end(),
3134 return service
.second
.has_running_tasks();
3136 ss
<< "\n \n task status:\n";
3137 for (auto& [name
, service
] : service_map
.services
) {
3138 ss
<< service
.get_task_summary(name
);
3142 ss
<< "\n \n data:\n";
3143 mdsmon()->print_fs_summary(ss
);
3144 mgrstatmon()->print_summary(NULL
, &ss
);
3146 auto& pem
= mgrstatmon()->get_progress_events();
3148 ss
<< "\n \n progress:\n";
3149 for (auto& i
: pem
) {
3150 if (i
.second
.add_to_ceph_s
){
3151 ss
<< " " << i
.second
.message
<< "\n";
3159 void Monitor::_generate_command_map(cmdmap_t
& cmdmap
,
3160 map
<string
,string
> ¶m_str_map
)
3162 for (auto p
= cmdmap
.begin(); p
!= cmdmap
.end(); ++p
) {
3163 if (p
->first
== "prefix")
3165 if (p
->first
== "caps") {
3167 if (cmd_getval(cmdmap
, "caps", cv
) &&
3168 cv
.size() % 2 == 0) {
3169 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
3170 string k
= string("caps_") + cv
[i
];
3171 param_str_map
[k
] = cv
[i
+ 1];
3176 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
3180 const MonCommand
*Monitor::_get_moncommand(
3181 const string
&cmd_prefix
,
3182 const vector
<MonCommand
>& cmds
)
3184 for (auto& c
: cmds
) {
3185 if (c
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
3192 bool Monitor::_allowed_command(MonSession
*s
, const string
&module
,
3193 const string
&prefix
, const cmdmap_t
& cmdmap
,
3194 const map
<string
,string
>& param_str_map
,
3195 const MonCommand
*this_cmd
) {
3197 bool cmd_r
= this_cmd
->requires_perm('r');
3198 bool cmd_w
= this_cmd
->requires_perm('w');
3199 bool cmd_x
= this_cmd
->requires_perm('x');
3201 bool capable
= s
->caps
.is_capable(
3204 module
, prefix
, param_str_map
,
3205 cmd_r
, cmd_w
, cmd_x
,
3206 s
->get_peer_socket_addr());
3208 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
3212 void Monitor::format_command_descriptions(const std::vector
<MonCommand
> &commands
,
3218 f
->open_object_section("command_descriptions");
3219 for (const auto &cmd
: commands
) {
3220 unsigned flags
= cmd
.flags
;
3221 ostringstream secname
;
3222 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
3223 dump_cmddesc_to_json(f
, features
, secname
.str(),
3224 cmd
.cmdstring
, cmd
.helpstring
, cmd
.module
,
3225 cmd
.req_perms
, flags
);
3228 f
->close_section(); // command_descriptions
3233 bool Monitor::is_keyring_required()
3235 return auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
) ||
3236 auth_service_required
.is_supported_auth(CEPH_AUTH_CEPHX
) ||
3237 auth_cluster_required
.is_supported_auth(CEPH_AUTH_GSS
) ||
3238 auth_service_required
.is_supported_auth(CEPH_AUTH_GSS
);
3241 struct C_MgrProxyCommand
: public Context
{
3247 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
3248 : mon(mon
), op(op
), size(s
) { }
3249 void finish(int r
) {
3250 std::lock_guard
l(mon
->lock
);
3251 mon
->mgr_proxy_bytes
-= size
;
3252 mon
->reply_command(op
, r
, outs
, outbl
, 0);
3256 void Monitor::handle_tell_command(MonOpRequestRef op
)
3258 ceph_assert(op
->is_type_command());
3259 MCommand
*m
= static_cast<MCommand
*>(op
->get_req());
3260 if (m
->fsid
!= monmap
->fsid
) {
3261 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
3262 return reply_tell_command(op
, -EACCES
, "wrong fsid");
3264 MonSession
*session
= op
->get_session();
3266 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
3270 if (stringstream ss
; !cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3271 return reply_tell_command(op
, -EINVAL
, ss
.str());
3273 map
<string
,string
> param_str_map
;
3274 _generate_command_map(cmdmap
, param_str_map
);
3276 if (!cmd_getval(cmdmap
, "prefix", prefix
)) {
3277 return reply_tell_command(op
, -EINVAL
, "no prefix");
3279 if (auto cmd
= _get_moncommand(prefix
,
3280 get_local_commands(quorum_mon_features
));
3282 if (cmd
->is_obsolete() ||
3283 (cct
->_conf
->mon_debug_deprecated_as_obsolete
&&
3284 cmd
->is_deprecated())) {
3285 return reply_tell_command(op
, -ENOTSUP
,
3286 "command is obsolete; "
3287 "please check usage and/or man page");
3290 // see if command is allowed
3291 if (!session
->caps
.is_capable(
3293 session
->entity_name
,
3294 "mon", prefix
, param_str_map
,
3296 session
->get_peer_socket_addr())) {
3297 return reply_tell_command(op
, -EACCES
, "insufficient caps");
3300 cct
->get_admin_socket()->queue_tell_command(m
);
3303 void Monitor::handle_command(MonOpRequestRef op
)
3305 ceph_assert(op
->is_type_command());
3306 auto m
= op
->get_req
<MMonCommand
>();
3307 if (m
->fsid
!= monmap
->fsid
) {
3308 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
3310 reply_command(op
, -EPERM
, "wrong fsid", 0);
3314 MonSession
*session
= op
->get_session();
3316 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
3320 if (m
->cmd
.empty()) {
3321 reply_command(op
, -EINVAL
, "no command specified", 0);
3326 vector
<string
> fullcmd
;
3328 stringstream ss
, ds
;
3332 rs
= "unrecognized command";
3334 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3335 // ss has reason for failure
3338 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3339 reply_command(op
, r
, rs
, 0);
3343 // check return value. If no prefix parameter provided,
3344 // return value will be false, then return error info.
3345 if (!cmd_getval(cmdmap
, "prefix", prefix
)) {
3346 reply_command(op
, -EINVAL
, "command prefix not found", 0);
3350 // check prefix is empty
3351 if (prefix
.empty()) {
3352 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
3356 if (prefix
== "get_command_descriptions") {
3358 Formatter
*f
= Formatter::create("json");
3360 std::vector
<MonCommand
> commands
= static_cast<MgrMonitor
*>(
3361 paxos_service
[PAXOS_MGR
].get())->get_command_descs();
3363 for (auto& c
: leader_mon_commands
) {
3364 commands
.push_back(c
);
3367 auto features
= m
->get_connection()->get_features();
3368 format_command_descriptions(commands
, f
, features
, &rdata
);
3370 reply_command(op
, 0, "", rdata
, 0);
3374 dout(0) << "handle_command " << *m
<< dendl
;
3376 string format
= cmd_getval_or
<string
>(cmdmap
, "format", "plain");
3377 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
3379 get_str_vec(prefix
, fullcmd
);
3381 // make sure fullcmd is not empty.
3382 // invalid prefix will cause empty vector fullcmd.
3383 // such as, prefix=";,,;"
3384 if (fullcmd
.empty()) {
3385 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3389 std::string_view module
= fullcmd
[0];
3391 // validate command is in leader map
3393 const MonCommand
*leader_cmd
;
3394 const auto& mgr_cmds
= mgrmon()->get_command_descs();
3395 const MonCommand
*mgr_cmd
= nullptr;
3396 if (!mgr_cmds
.empty()) {
3397 mgr_cmd
= _get_moncommand(prefix
, mgr_cmds
);
3399 leader_cmd
= _get_moncommand(prefix
, leader_mon_commands
);
3401 leader_cmd
= mgr_cmd
;
3403 reply_command(op
, -EINVAL
, "command not known", 0);
3407 // validate command is in our map & matches, or forward if it is allowed
3408 const MonCommand
*mon_cmd
= _get_moncommand(
3410 get_local_commands(quorum_mon_features
));
3416 if (leader_cmd
->is_noforward()) {
3417 reply_command(op
, -EINVAL
,
3418 "command not locally supported and not allowed to forward",
3422 dout(10) << "Command not locally supported, forwarding request "
3424 forward_request_leader(op
);
3426 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3427 if (mon_cmd
->is_noforward()) {
3428 reply_command(op
, -EINVAL
,
3429 "command not compatible with leader and not allowed to forward",
3433 dout(10) << "Command not compatible with leader, forwarding request "
3435 forward_request_leader(op
);
3440 if (mon_cmd
->is_obsolete() ||
3441 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3442 && mon_cmd
->is_deprecated())) {
3443 reply_command(op
, -ENOTSUP
,
3444 "command is obsolete; please check usage and/or man page",
3449 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3450 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3451 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3455 /* what we perceive as being the service the command falls under */
3456 string
service(mon_cmd
->module
);
3458 dout(25) << __func__
<< " prefix='" << prefix
3459 << "' module='" << module
3460 << "' service='" << service
<< "'" << dendl
;
3463 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3465 // validate user's permissions for requested command
3466 map
<string
,string
> param_str_map
;
3468 // Catch bad_cmd_get exception if _generate_command_map() throws it
3470 _generate_command_map(cmdmap
, param_str_map
);
3471 } catch (const bad_cmd_get
& e
) {
3472 reply_command(op
, -EINVAL
, e
.what(), 0);
3476 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3477 param_str_map
, mon_cmd
)) {
3478 dout(1) << __func__
<< " access denied" << dendl
;
3479 if (prefix
!= "config set" && prefix
!= "config-key set")
3480 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3481 << "from='" << session
->name
<< " " << session
->addrs
<< "' "
3482 << "entity='" << session
->entity_name
<< "' "
3483 << "cmd=" << m
->cmd
<< ": access denied";
3484 reply_command(op
, -EACCES
, "access denied", 0);
3488 if (prefix
!= "config set" && prefix
!= "config-key set")
3489 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3490 << "from='" << session
->name
<< " " << session
->addrs
<< "' "
3491 << "entity='" << session
->entity_name
<< "' "
3492 << "cmd=" << m
->cmd
<< ": dispatch";
3494 // compat kludge for legacy clients trying to tell commands that are
3495 // new. see bottom of MonCommands.h. we need to handle both (1)
3496 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3497 // and octopus mons.
3498 if ((!HAVE_FEATURE(m
->get_connection()->get_features(), SERVER_OCTOPUS
) ||
3499 monmap
->min_mon_release
< ceph_release_t::octopus
) &&
3500 (prefix
== "injectargs" ||
3501 prefix
== "smart" ||
3502 prefix
== "mon_status" ||
3503 prefix
== "heap")) {
3504 if (m
->get_connection()->get_messenger() == 0) {
3505 // Prior to octopus, monitors might forward these messages
3506 // around. that was broken at baseline, and if we try to process
3507 // this message now, it will assert out when we try to send a
3508 // message in reply from the asok/tell worker (see
3509 // AnonConnection). Just reply with an error.
3510 dout(5) << __func__
<< " failing forwarded command from a (presumably) "
3511 << "pre-octopus peer" << dendl
;
3514 "failing forwarded tell command in mixed-version mon cluster", 0);
3517 dout(5) << __func__
<< " passing command to tell/asok" << dendl
;
3518 cct
->get_admin_socket()->queue_tell_command(m
);
3522 if (mon_cmd
->is_mgr()) {
3523 const auto& hdr
= m
->get_header();
3524 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3525 uint64_t max
= g_conf().get_val
<Option::size_t>("mon_client_bytes")
3526 * g_conf().get_val
<double>("mon_mgr_proxy_client_bytes_ratio");
3527 if (mgr_proxy_bytes
+ size
> max
) {
3528 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3529 << " + " << size
<< " > max " << max
<< dendl
;
3530 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3533 mgr_proxy_bytes
+= size
;
3534 dout(10) << __func__
<< " proxying mgr command (+" << size
3535 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3536 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3537 mgr_client
.start_command(m
->cmd
,
3541 new C_OnFinisher(fin
, &finisher
));
3545 if ((module
== "mds" || module
== "fs") &&
3546 prefix
!= "fs authorize") {
3547 mdsmon()->dispatch(op
);
3550 if ((module
== "osd" ||
3551 prefix
== "pg map" ||
3552 prefix
== "pg repeer") &&
3553 prefix
!= "osd last-stat-seq") {
3554 osdmon()->dispatch(op
);
3557 if (module
== "config") {
3558 configmon()->dispatch(op
);
3562 if (module
== "mon" &&
3563 /* Let the Monitor class handle the following commands:
3566 prefix
!= "mon scrub" &&
3567 prefix
!= "mon metadata" &&
3568 prefix
!= "mon versions" &&
3569 prefix
!= "mon count-metadata" &&
3570 prefix
!= "mon ok-to-stop" &&
3571 prefix
!= "mon ok-to-add-offline" &&
3572 prefix
!= "mon ok-to-rm") {
3573 monmon()->dispatch(op
);
3576 if (module
== "health" && prefix
!= "health") {
3577 healthmon()->dispatch(op
);
3580 if (module
== "auth" || prefix
== "fs authorize") {
3581 authmon()->dispatch(op
);
3584 if (module
== "log") {
3585 logmon()->dispatch(op
);
3589 if (module
== "config-key") {
3590 kvmon()->dispatch(op
);
3594 if (module
== "mgr") {
3595 mgrmon()->dispatch(op
);
3599 if (prefix
== "fsid") {
3601 f
->open_object_section("fsid");
3602 f
->dump_stream("fsid") << monmap
->fsid
;
3609 reply_command(op
, 0, "", rdata
, 0);
3613 if (prefix
== "mon scrub") {
3614 wait_for_paxos_write();
3616 int r
= scrub_start();
3617 reply_command(op
, r
, "", rdata
, 0);
3618 } else if (is_peon()) {
3619 forward_request_leader(op
);
3621 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3626 if (prefix
== "time-sync-status") {
3628 f
.reset(Formatter::create("json-pretty"));
3629 f
->open_object_section("time_sync");
3630 if (!timecheck_skews
.empty()) {
3631 f
->open_object_section("time_skew_status");
3632 for (auto& i
: timecheck_skews
) {
3633 double skew
= i
.second
;
3634 double latency
= timecheck_latencies
[i
.first
];
3635 string name
= monmap
->get_name(i
.first
);
3637 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3638 f
->open_object_section(name
.c_str());
3639 f
->dump_float("skew", skew
);
3640 f
->dump_float("latency", latency
);
3641 f
->dump_stream("health") << tcstatus
;
3642 if (tcstatus
!= HEALTH_OK
) {
3643 f
->dump_stream("details") << tcss
.str();
3649 f
->open_object_section("timechecks");
3650 f
->dump_unsigned("epoch", get_epoch());
3651 f
->dump_int("round", timecheck_round
);
3652 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3653 "on-going" : "finished");
3659 } else if (prefix
== "status" ||
3660 prefix
== "health" ||
3663 cmd_getval(cmdmap
, "detail", detail
);
3665 if (prefix
== "status") {
3666 // get_cluster_status handles f == NULL
3667 get_cluster_status(ds
, f
.get(), session
);
3674 } else if (prefix
== "health") {
3676 healthmon()->get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3681 rdata
.append(plain
);
3683 } else if (prefix
== "df") {
3684 bool verbose
= (detail
== "detail");
3686 f
->open_object_section("stats");
3688 mgrstatmon()->dump_cluster_stats(&ds
, f
.get(), verbose
);
3692 mgrstatmon()->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3700 ceph_abort_msg("We should never get here!");
3706 } else if (prefix
== "report") {
3707 // some of the report data is only known by leader, e.g. osdmap_clean_epochs
3708 if (!is_leader() && !is_peon()) {
3709 dout(10) << " waiting for quorum" << dendl
;
3710 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3714 forward_request_leader(op
);
3717 // this must be formatted, in its current form
3719 f
.reset(Formatter::create("json-pretty"));
3720 f
->open_object_section("report");
3721 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3722 f
->dump_string("version", ceph_version_to_str());
3723 f
->dump_string("commit", git_version_to_str());
3724 f
->dump_stream("timestamp") << ceph_clock_now();
3726 vector
<string
> tagsvec
;
3727 cmd_getval(cmdmap
, "tags", tagsvec
);
3728 string tagstr
= str_join(tagsvec
, " ");
3729 if (!tagstr
.empty())
3730 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3731 f
->dump_string("tag", tagstr
);
3733 healthmon()->get_health_status(true, f
.get(), nullptr);
3735 monmon()->dump_info(f
.get());
3736 osdmon()->dump_info(f
.get());
3737 mdsmon()->dump_info(f
.get());
3738 authmon()->dump_info(f
.get());
3739 mgrstatmon()->dump_info(f
.get());
3740 logmon()->dump_info(f
.get());
3742 paxos
->dump_info(f
.get());
3748 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT_LEGACY
);
3751 } else if (prefix
== "osd last-stat-seq") {
3753 cmd_getval(cmdmap
, "id", osd
);
3754 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3756 f
->dump_unsigned("seq", seq
);
3764 } else if (prefix
== "node ls") {
3765 string
node_type("all");
3766 cmd_getval(cmdmap
, "type", node_type
);
3768 f
.reset(Formatter::create("json-pretty"));
3769 if (node_type
== "all") {
3770 f
->open_object_section("nodes");
3771 print_nodes(f
.get(), ds
);
3772 osdmon()->print_nodes(f
.get());
3773 mdsmon()->print_nodes(f
.get());
3774 mgrmon()->print_nodes(f
.get());
3776 } else if (node_type
== "mon") {
3777 print_nodes(f
.get(), ds
);
3778 } else if (node_type
== "osd") {
3779 osdmon()->print_nodes(f
.get());
3780 } else if (node_type
== "mds") {
3781 mdsmon()->print_nodes(f
.get());
3782 } else if (node_type
== "mgr") {
3783 mgrmon()->print_nodes(f
.get());
3789 } else if (prefix
== "features") {
3790 if (!is_leader() && !is_peon()) {
3791 dout(10) << " waiting for quorum" << dendl
;
3792 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3796 forward_request_leader(op
);
3800 f
.reset(Formatter::create("json-pretty"));
3802 get_combined_feature_map(&fm
);
3803 f
->dump_object("features", fm
);
3807 } else if (prefix
== "mon metadata") {
3809 f
.reset(Formatter::create("json-pretty"));
3812 bool all
= !cmd_getval(cmdmap
, "id", name
);
3814 // Dump a single mon's metadata
3815 int mon
= monmap
->get_rank(name
);
3817 rs
= "requested mon not found";
3821 f
->open_object_section("mon_metadata");
3822 r
= get_mon_metadata(mon
, f
.get(), ds
);
3825 // Dump all mons' metadata
3827 f
->open_array_section("mon_metadata");
3828 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3829 std::ostringstream get_err
;
3830 f
->open_object_section("mon");
3831 f
->dump_string("name", monmap
->get_name(rank
));
3832 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3834 if (r
== -ENOENT
|| r
== -EINVAL
) {
3835 dout(1) << get_err
.str() << dendl
;
3836 // Drop error, list what metadata we do have
3838 } else if (r
!= 0) {
3839 derr
<< "Unexpected error from get_mon_metadata: "
3840 << cpp_strerror(r
) << dendl
;
3841 ds
<< get_err
.str();
3851 } else if (prefix
== "mon versions") {
3853 f
.reset(Formatter::create("json-pretty"));
3854 count_metadata("ceph_version", f
.get());
3859 } else if (prefix
== "mon count-metadata") {
3861 f
.reset(Formatter::create("json-pretty"));
3863 cmd_getval(cmdmap
, "property", field
);
3864 count_metadata(field
, f
.get());
3869 } else if (prefix
== "quorum_status") {
3870 // make sure our map is readable and up to date
3871 if (!is_leader() && !is_peon()) {
3872 dout(10) << " waiting for quorum" << dendl
;
3873 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3876 _quorum_status(f
.get(), ds
);
3880 } else if (prefix
== "mon ok-to-stop") {
3881 vector
<string
> ids
, invalid_ids
;
3882 if (!cmd_getval(cmdmap
, "ids", ids
)) {
3886 set
<string
> wouldbe
;
3887 for (auto rank
: quorum
) {
3888 wouldbe
.insert(monmap
->get_name(rank
));
3890 for (auto& n
: ids
) {
3891 if (monmap
->contains(n
)) {
3894 invalid_ids
.push_back(n
);
3897 if (!invalid_ids
.empty()) {
3899 rs
= "invalid mon(s) specified: " + stringify(invalid_ids
);
3903 if (wouldbe
.size() < monmap
->min_quorum_size()) {
3905 rs
= "not enough monitors would be available (" + stringify(wouldbe
) +
3906 ") after stopping mons " + stringify(ids
);
3910 rs
= "quorum should be preserved (" + stringify(wouldbe
) +
3911 ") after stopping " + stringify(ids
);
3912 } else if (prefix
== "mon ok-to-add-offline") {
3913 if (quorum
.size() < monmap
->min_quorum_size(monmap
->size() + 1)) {
3914 rs
= "adding a monitor may break quorum (until that monitor starts)";
3918 rs
= "adding another mon that is not yet online will not break quorum";
3920 } else if (prefix
== "mon ok-to-rm") {
3922 if (!cmd_getval(cmdmap
, "id", id
)) {
3924 rs
= "must specify a monitor id";
3927 if (!monmap
->contains(id
)) {
3929 rs
= "mon." + id
+ " does not exist";
3932 int rank
= monmap
->get_rank(id
);
3933 if (quorum
.count(rank
) &&
3934 quorum
.size() - 1 < monmap
->min_quorum_size(monmap
->size() - 1)) {
3936 rs
= "removing mon." + id
+ " would break quorum";
3940 rs
= "safe to remove mon." + id
;
3941 } else if (prefix
== "version") {
3943 f
->open_object_section("version");
3944 f
->dump_string("version", pretty_version_to_str());
3948 ds
<< pretty_version_to_str();
3953 } else if (prefix
== "versions") {
3955 f
.reset(Formatter::create("json-pretty"));
3956 map
<string
,int> overall
;
3957 f
->open_object_section("version");
3958 map
<string
,int> mon
, mgr
, osd
, mds
;
3960 count_metadata("ceph_version", &mon
);
3961 f
->open_object_section("mon");
3962 for (auto& p
: mon
) {
3963 f
->dump_int(p
.first
.c_str(), p
.second
);
3964 overall
[p
.first
] += p
.second
;
3968 mgrmon()->count_metadata("ceph_version", &mgr
);
3970 f
->open_object_section("mgr");
3971 for (auto& p
: mgr
) {
3972 f
->dump_int(p
.first
.c_str(), p
.second
);
3973 overall
[p
.first
] += p
.second
;
3978 osdmon()->count_metadata("ceph_version", &osd
);
3980 f
->open_object_section("osd");
3981 for (auto& p
: osd
) {
3982 f
->dump_int(p
.first
.c_str(), p
.second
);
3983 overall
[p
.first
] += p
.second
;
3988 mdsmon()->count_metadata("ceph_version", &mds
);
3990 f
->open_object_section("mds");
3991 for (auto& p
: mds
) {
3992 f
->dump_int(p
.first
.c_str(), p
.second
);
3993 overall
[p
.first
] += p
.second
;
3998 for (auto& p
: mgrstatmon()->get_service_map().services
) {
3999 auto &service
= p
.first
;
4000 if (ServiceMap::is_normal_ceph_entity(service
)) {
4003 f
->open_object_section(service
.c_str());
4005 p
.second
.count_metadata("ceph_version", &m
);
4007 f
->dump_int(q
.first
.c_str(), q
.second
);
4008 overall
[q
.first
] += q
.second
;
4013 f
->open_object_section("overall");
4014 for (auto& p
: overall
) {
4015 f
->dump_int(p
.first
.c_str(), p
.second
);
4025 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
4026 reply_command(op
, r
, rs
, rdata
, 0);
4029 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
4032 reply_command(op
, rc
, rs
, rdata
, version
);
4035 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
4036 bufferlist
& rdata
, version_t version
)
4038 auto m
= op
->get_req
<MMonCommand
>();
4039 ceph_assert(m
->get_type() == MSG_MON_COMMAND
);
4040 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
4041 reply
->set_tid(m
->get_tid());
4042 reply
->set_data(rdata
);
4043 send_reply(op
, reply
);
4046 void Monitor::reply_tell_command(
4047 MonOpRequestRef op
, int rc
, const string
&rs
)
4049 MCommand
*m
= static_cast<MCommand
*>(op
->get_req());
4050 ceph_assert(m
->get_type() == MSG_COMMAND
);
4051 MCommandReply
*reply
= new MCommandReply(rc
, rs
);
4052 reply
->set_tid(m
->get_tid());
4053 m
->get_connection()->send_message(reply
);
4057 // ------------------------
4058 // request/reply routing
4060 // a client/mds/osd will connect to a random monitor. we need to forward any
4061 // messages requiring state updates to the leader, and then route any replies
4062 // back via the correct monitor and back to them. (the monitor will not
4063 // initiate any connections.)
4065 void Monitor::forward_request_leader(MonOpRequestRef op
)
4067 op
->mark_event(__func__
);
4069 int mon
= get_leader();
4070 MonSession
*session
= op
->get_session();
4071 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
4073 if (req
->get_source().is_mon() && req
->get_source_addrs() != messenger
->get_myaddrs()) {
4074 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
4075 } else if (session
->proxy_con
) {
4076 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
4077 } else if (!session
->closed
) {
4078 RoutedRequest
*rr
= new RoutedRequest
;
4079 rr
->tid
= ++routed_request_tid
;
4080 rr
->con
= req
->get_connection();
4081 rr
->con_features
= rr
->con
->get_features();
4082 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
4083 rr
->session
= static_cast<MonSession
*>(session
->get());
4085 routed_requests
[rr
->tid
] = rr
;
4086 session
->routed_request_tids
.insert(rr
->tid
);
4088 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
4089 << " features " << rr
->con_features
<< dendl
;
4091 MForward
*forward
= new MForward(rr
->tid
,
4095 forward
->set_priority(req
->get_priority());
4096 if (session
->auth_handler
) {
4097 forward
->entity_name
= session
->entity_name
;
4098 } else if (req
->get_source().is_mon()) {
4099 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
4101 send_mon_message(forward
, mon
);
4102 op
->mark_forwarded();
4103 ceph_assert(op
->get_req()->get_type() != 0);
4105 dout(10) << "forward_request no session for request " << *req
<< dendl
;
4109 // fake connection attached to forwarded messages
4110 struct AnonConnection
: public Connection
{
4111 entity_addr_t socket_addr
;
4113 int send_message(Message
*m
) override
{
4114 ceph_assert(!"send_message on anonymous connection");
4116 void send_keepalive() override
{
4117 ceph_assert(!"send_keepalive on anonymous connection");
4119 void mark_down() override
{
4122 void mark_disposable() override
{
4125 bool is_connected() override
{ return false; }
4126 entity_addr_t
get_peer_socket_addr() const override
{
4131 FRIEND_MAKE_REF(AnonConnection
);
4132 explicit AnonConnection(CephContext
*cct
, const entity_addr_t
& sa
)
4133 : Connection(cct
, nullptr),
4137 //extract the original message and put it into the regular dispatch function
4138 void Monitor::handle_forward(MonOpRequestRef op
)
4140 auto m
= op
->get_req
<MForward
>();
4141 dout(10) << "received forwarded message from "
4142 << ceph_entity_type_name(m
->client_type
)
4143 << " " << m
->client_addrs
4144 << " via " << m
->get_source_inst() << dendl
;
4145 MonSession
*session
= op
->get_session();
4146 ceph_assert(session
);
4148 if (!session
->is_capable("mon", MON_CAP_X
)) {
4149 dout(0) << "forward from entity with insufficient caps! "
4150 << session
->caps
<< dendl
;
4152 // see PaxosService::dispatch(); we rely on this being anon
4153 // (c->msgr == NULL)
4154 PaxosServiceMessage
*req
= m
->claim_message();
4155 ceph_assert(req
!= NULL
);
4157 auto c
= ceph::make_ref
<AnonConnection
>(cct
, m
->client_socket_addr
);
4158 MonSession
*s
= new MonSession(static_cast<Connection
*>(c
.get()));
4159 s
->_ident(req
->get_source(),
4160 req
->get_source_addrs());
4161 c
->set_priv(RefCountedPtr
{s
, false});
4162 c
->set_peer_addrs(m
->client_addrs
);
4163 c
->set_peer_type(m
->client_type
);
4164 c
->set_features(m
->con_features
);
4166 s
->authenticated
= true;
4167 s
->caps
= m
->client_caps
;
4168 dout(10) << " caps are " << s
->caps
<< dendl
;
4169 s
->entity_name
= m
->entity_name
;
4170 dout(10) << " entity name '" << s
->entity_name
<< "' type "
4171 << s
->entity_name
.get_type() << dendl
;
4172 s
->proxy_con
= m
->get_connection();
4173 s
->proxy_tid
= m
->tid
;
4175 req
->set_connection(c
);
4177 // not super accurate, but better than nothing.
4178 req
->set_recv_stamp(m
->get_recv_stamp());
4181 * note which election epoch this is; we will drop the message if
4182 * there is a future election since our peers will resend routed
4183 * requests in that case.
4185 req
->rx_election_epoch
= get_epoch();
4187 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
4190 // break the session <-> con ref loop by removing the con->session
4191 // reference, which is no longer needed once the MonOpRequest is
4197 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
4199 op
->mark_event(__func__
);
4201 MonSession
*session
= op
->get_session();
4202 ceph_assert(session
);
4203 Message
*req
= op
->get_req();
4204 ConnectionRef con
= op
->get_connection();
4206 reply
->set_cct(g_ceph_context
);
4207 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
4210 dout(2) << "send_reply no connection, dropping reply " << *reply
4211 << " to " << req
<< " " << *req
<< dendl
;
4213 op
->mark_event("reply: no connection");
4217 if (!session
->con
&& !session
->proxy_con
) {
4218 dout(2) << "send_reply no connection, dropping reply " << *reply
4219 << " to " << req
<< " " << *req
<< dendl
;
4221 op
->mark_event("reply: no connection");
4225 if (session
->proxy_con
) {
4226 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
4227 << " via " << session
->proxy_con
->get_peer_addr()
4228 << " for request " << *req
<< dendl
;
4229 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
4230 op
->mark_event("reply: send routed request");
4232 session
->con
->send_message(reply
);
4233 op
->mark_event("reply: send");
4237 void Monitor::no_reply(MonOpRequestRef op
)
4239 MonSession
*session
= op
->get_session();
4240 Message
*req
= op
->get_req();
4242 if (session
->proxy_con
) {
4243 dout(10) << "no_reply to " << req
->get_source_inst()
4244 << " via " << session
->proxy_con
->get_peer_addr()
4245 << " for request " << *req
<< dendl
;
4246 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
4247 op
->mark_event("no_reply: send routed request");
4249 dout(10) << "no_reply to " << req
->get_source_inst()
4250 << " " << *req
<< dendl
;
4251 op
->mark_event("no_reply");
4255 void Monitor::handle_route(MonOpRequestRef op
)
4257 auto m
= op
->get_req
<MRoute
>();
4258 MonSession
*session
= op
->get_session();
4260 if (!session
->is_capable("mon", MON_CAP_X
)) {
4261 dout(0) << "MRoute received from entity without appropriate perms! "
4266 dout(10) << "handle_route tid " << m
->session_mon_tid
<< " " << *m
->msg
4269 dout(10) << "handle_route tid " << m
->session_mon_tid
<< " null" << dendl
;
4272 if (!m
->session_mon_tid
) {
4273 dout(10) << " not a routed request, ignoring" << dendl
;
4276 auto found
= routed_requests
.find(m
->session_mon_tid
);
4277 if (found
== routed_requests
.end()) {
4278 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
4281 std::unique_ptr
<RoutedRequest
> rr
{found
->second
};
4282 // reset payload, in case encoding is dependent on target features
4284 m
->msg
->clear_payload();
4285 rr
->con
->send_message(m
->msg
);
4288 if (m
->send_osdmap_first
) {
4289 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
4290 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
4291 true, MonOpRequestRef());
4293 ceph_assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
4294 routed_requests
.erase(found
);
4295 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
4298 void Monitor::resend_routed_requests()
4300 dout(10) << "resend_routed_requests" << dendl
;
4301 int mon
= get_leader();
4302 list
<Context
*> retry
;
4303 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
4304 p
!= routed_requests
.end();
4306 RoutedRequest
*rr
= p
->second
;
4309 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
4310 rr
->op
->mark_event("retry routed request");
4311 retry
.push_back(new C_RetryMessage(this, rr
->op
));
4313 ceph_assert(rr
->session
->routed_request_tids
.count(p
->first
));
4314 rr
->session
->routed_request_tids
.erase(p
->first
);
4318 auto q
= rr
->request_bl
.cbegin();
4319 PaxosServiceMessage
*req
=
4320 (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
4321 rr
->op
->mark_event("resend forwarded message to leader");
4322 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
4324 MForward
*forward
= new MForward(rr
->tid
,
4328 req
->put(); // forward takes its own ref; drop ours.
4329 forward
->client_type
= rr
->con
->get_peer_type();
4330 forward
->client_addrs
= rr
->con
->get_peer_addrs();
4331 forward
->client_socket_addr
= rr
->con
->get_peer_socket_addr();
4332 forward
->set_priority(req
->get_priority());
4333 send_mon_message(forward
, mon
);
4337 routed_requests
.clear();
4338 finish_contexts(g_ceph_context
, retry
);
4342 void Monitor::remove_session(MonSession
*s
)
4344 dout(10) << "remove_session " << s
<< " " << s
->name
<< " " << s
->addrs
4345 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
4346 ceph_assert(s
->con
);
4347 ceph_assert(!s
->closed
);
4348 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
4349 p
!= s
->routed_request_tids
.end();
4351 ceph_assert(routed_requests
.count(*p
));
4352 RoutedRequest
*rr
= routed_requests
[*p
];
4353 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
4355 routed_requests
.erase(*p
);
4357 s
->routed_request_tids
.clear();
4358 s
->con
->set_priv(nullptr);
4359 session_map
.remove_session(s
);
4360 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4361 logger
->inc(l_mon_session_rm
);
4364 void Monitor::remove_all_sessions()
4366 std::lock_guard
l(session_map_lock
);
4367 while (!session_map
.sessions
.empty()) {
4368 MonSession
*s
= session_map
.sessions
.front();
4370 logger
->inc(l_mon_session_rm
);
4373 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4376 void Monitor::send_mon_message(Message
*m
, int rank
)
4378 messenger
->send_to_mon(m
, monmap
->get_addrs(rank
));
4381 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
4384 * Wait list the new session until we're in the quorum, assuming it's
4386 * tick() will periodically send them back through so we can send
4387 * the client elsewhere if we don't think we're getting back in.
4389 * But we allow a few sorts of messages:
4390 * 1) Monitors can talk to us at any time, of course.
4391 * 2) auth messages. It's unlikely to go through much faster, but
4392 * it's possible we've just lost our quorum status and we want to take...
4393 * 3) command messages. We want to accept these under all possible
4396 Message
*m
= op
->get_req();
4397 MonSession
*s
= op
->get_session();
4398 ConnectionRef con
= op
->get_connection();
4399 utime_t too_old
= ceph_clock_now();
4400 too_old
-= g_ceph_context
->_conf
->mon_lease
;
4401 if (m
->get_recv_stamp() > too_old
&&
4402 con
->is_connected()) {
4403 dout(5) << "waitlisting message " << *m
<< dendl
;
4404 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
4405 op
->mark_wait_for_quorum();
4407 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
4409 // proxied sessions aren't registered and don't have a con; don't remove
4411 if (!s
->proxy_con
) {
4412 std::lock_guard
l(session_map_lock
);
4419 void Monitor::_ms_dispatch(Message
*m
)
4421 if (is_shutdown()) {
4426 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
4427 bool src_is_mon
= op
->is_src_mon();
4428 op
->mark_event("mon:_ms_dispatch");
4429 MonSession
*s
= op
->get_session();
4430 if (s
&& s
->closed
) {
4434 if (src_is_mon
&& s
) {
4435 ConnectionRef con
= m
->get_connection();
4436 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
4437 // only update features if this is a non-anonymous connection
4438 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
4439 << " (was " << s
->con_features
4440 << ", now " << con
->get_features() << ")" << dendl
;
4441 // connection features changed - recreate session.
4442 if (s
->con
&& s
->con
!= con
) {
4443 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
4444 << " changed from session; mark down and replace" << dendl
;
4445 s
->con
->mark_down();
4447 if (s
->item
.is_on_list()) {
4448 // forwarded messages' sessions are not in the sessions map and
4449 // exist only while the op is being handled.
4450 std::lock_guard
l(session_map_lock
);
4458 // if the sender is not a monitor, make sure their first message for a
4459 // session is an MAuth. If it is not, assume it's a stray message,
4460 // and considering that we are creating a new session it is safe to
4461 // assume that the sender hasn't authenticated yet, so we have no way
4462 // of assessing whether we should handle it or not.
4463 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
4464 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4465 m
->get_type() != CEPH_MSG_PING
)) {
4466 dout(1) << __func__
<< " dropping stray message " << *m
4467 << " from " << m
->get_source_inst() << dendl
;
4471 ConnectionRef con
= m
->get_connection();
4473 std::lock_guard
l(session_map_lock
);
4474 s
= session_map
.new_session(m
->get_source(),
4475 m
->get_source_addrs(),
4479 con
->set_priv(RefCountedPtr
{s
, false});
4480 dout(10) << __func__
<< " new session " << s
<< " " << *s
4481 << " features 0x" << std::hex
4482 << s
->con_features
<< std::dec
<< dendl
;
4485 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4486 logger
->inc(l_mon_session_add
);
4489 // give it monitor caps; the peer type has been authenticated
4490 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4491 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4493 s
->authenticated
= true;
4496 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->name
4502 s
->session_timeout
= ceph_clock_now();
4503 s
->session_timeout
+= g_conf()->mon_session_timeout
;
4505 if (s
->auth_handler
) {
4506 s
->entity_name
= s
->auth_handler
->get_entity_name();
4507 s
->global_id
= s
->auth_handler
->get_global_id();
4508 s
->global_id_status
= s
->auth_handler
->get_global_id_status();
4510 dout(20) << " entity_name " << s
->entity_name
4511 << " global_id " << s
->global_id
4512 << " (" << s
->global_id_status
4513 << ") caps " << s
->caps
.get_str() << dendl
;
4515 if (!session_stretch_allowed(s
, op
)) {
4518 if ((is_synchronizing() ||
4519 (!s
->authenticated
&& !exited_quorum
.is_zero())) &&
4521 m
->get_type() != CEPH_MSG_PING
) {
4522 waitlist_or_zap_client(op
);
4529 void Monitor::dispatch_op(MonOpRequestRef op
)
4531 op
->mark_event("mon:dispatch_op");
4532 MonSession
*s
= op
->get_session();
4535 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4539 /* we will consider the default type as being 'monitor' until proven wrong */
4540 op
->set_type_monitor();
4541 /* deal with all messages that do not necessarily need caps */
4542 switch (op
->get_req()->get_type()) {
4544 case MSG_MON_GLOBAL_ID
:
4545 case MSG_MON_USED_PENDING_KEYS
:
4547 op
->set_type_service();
4548 /* no need to check caps here */
4549 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4556 op
->set_type_command();
4557 handle_tell_command(op
);
4561 if (!op
->get_session()->authenticated
) {
4562 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4563 << " is not authenticated, dropping " << *(op
->get_req())
4568 // global_id_status == NONE: all sessions for auth_none and krb,
4569 // mon <-> mon sessions (including proxied sessions) for cephx
4570 ceph_assert(s
->global_id_status
== global_id_status_t::NONE
||
4571 s
->global_id_status
== global_id_status_t::NEW_OK
||
4572 s
->global_id_status
== global_id_status_t::NEW_NOT_EXPOSED
||
4573 s
->global_id_status
== global_id_status_t::RECLAIM_OK
||
4574 s
->global_id_status
== global_id_status_t::RECLAIM_INSECURE
);
4576 // let mon_getmap through for "ping" (which doesn't reconnect)
4577 // and "tell" (which reconnects but doesn't attempt to preserve
4578 // its global_id and stays in NEW_NOT_EXPOSED, retrying until
4579 // ->send_attempts reaches 0)
4580 if (cct
->_conf
->auth_expose_insecure_global_id_reclaim
&&
4581 s
->global_id_status
== global_id_status_t::NEW_NOT_EXPOSED
&&
4582 op
->get_req()->get_type() != CEPH_MSG_MON_GET_MAP
) {
4583 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4584 << " may omit old_ticket on reconnects, discarding "
4585 << *op
->get_req() << " and forcing reconnect" << dendl
;
4586 ceph_assert(s
->con
&& !s
->proxy_con
);
4587 s
->con
->mark_down();
4589 std::lock_guard
l(session_map_lock
);
4596 switch (op
->get_req()->get_type()) {
4597 case CEPH_MSG_MON_GET_MAP
:
4598 handle_mon_get_map(op
);
4601 case MSG_GET_CONFIG
:
4602 configmon()->handle_get_config(op
);
4605 case CEPH_MSG_MON_SUBSCRIBE
:
4606 /* FIXME: check what's being subscribed, filter accordingly */
4607 handle_subscribe(op
);
4611 /* well, maybe the op belongs to a service... */
4612 op
->set_type_service();
4613 /* deal with all messages which caps should be checked somewhere else */
4614 switch (op
->get_req()->get_type()) {
4617 case CEPH_MSG_MON_GET_OSDMAP
:
4618 case CEPH_MSG_POOLOP
:
4619 case MSG_OSD_BEACON
:
4620 case MSG_OSD_MARK_ME_DOWN
:
4621 case MSG_OSD_MARK_ME_DEAD
:
4623 case MSG_OSD_FAILURE
:
4626 case MSG_OSD_PGTEMP
:
4627 case MSG_OSD_PG_CREATED
:
4628 case MSG_REMOVE_SNAPS
:
4629 case MSG_MON_GET_PURGED_SNAPS
:
4630 case MSG_OSD_PG_READY_TO_MERGE
:
4631 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4635 case MSG_MDS_BEACON
:
4636 case MSG_MDS_OFFLOAD_TARGETS
:
4637 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4641 case MSG_MGR_BEACON
:
4642 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4646 case MSG_MON_MGR_REPORT
:
4647 case CEPH_MSG_STATFS
:
4648 case MSG_GETPOOLSTATS
:
4649 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4654 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4657 // handle_command() does its own caps checking
4658 case MSG_MON_COMMAND
:
4659 op
->set_type_command();
4664 /* nop, looks like it's not a service message; revert back to monitor */
4665 op
->set_type_monitor();
4667 /* messages we, the Monitor class, need to deal with
4668 * but may be sent by clients. */
4670 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4671 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4672 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4677 switch (op
->get_req()->get_type()) {
4679 case CEPH_MSG_MON_GET_VERSION
:
4680 handle_get_version(op
);
4684 if (!op
->is_src_mon()) {
4685 dout(1) << __func__
<< " unexpected monitor message from"
4686 << " non-monitor entity " << op
->get_req()->get_source_inst()
4687 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4691 /* messages that should only be sent by another monitor */
4692 switch (op
->get_req()->get_type()) {
4702 // Sync (i.e., the new slurp, but on steroids)
4710 /* log acks are sent from a monitor we sent the MLog to, and are
4711 never sent by clients to us. */
4713 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4718 op
->set_type_service();
4719 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4725 op
->set_type_paxos();
4726 auto pm
= op
->get_req
<MMonPaxos
>();
4727 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4732 if (state
== STATE_SYNCHRONIZING
) {
4733 // we are synchronizing. These messages would do us no
4734 // good, thus just drop them and ignore them.
4735 dout(10) << __func__
<< " ignore paxos msg from "
4736 << pm
->get_source_inst() << dendl
;
4741 if (pm
->epoch
> get_epoch()) {
4745 if (pm
->epoch
!= get_epoch()) {
4749 paxos
->dispatch(op
);
4754 case MSG_MON_ELECTION
:
4755 op
->set_type_election_or_ping();
4756 //check privileges here for simplicity
4757 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4758 dout(0) << "MMonElection received from entity without enough caps!"
4759 << op
->get_session()->caps
<< dendl
;
4762 if (!is_probing() && !is_synchronizing()) {
4763 elector
.dispatch(op
);
4768 op
->set_type_election_or_ping();
4769 elector
.dispatch(op
);
4777 dout(5) << __func__
<< " ignoring " << op
<< dendl
;
4779 case MSG_TIMECHECK2
:
4780 handle_timecheck(op
);
4783 case MSG_MON_HEALTH
:
4784 dout(5) << __func__
<< " dropping deprecated message: "
4785 << *op
->get_req() << dendl
;
4787 case MSG_MON_HEALTH_CHECKS
:
4788 op
->set_type_service();
4789 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4792 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4796 void Monitor::handle_ping(MonOpRequestRef op
)
4798 auto m
= op
->get_req
<MPing
>();
4799 dout(10) << __func__
<< " " << *m
<< dendl
;
4800 MPing
*reply
= new MPing
;
4802 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4803 f
->open_object_section("pong");
4805 healthmon()->get_health_status(false, f
.get(), nullptr);
4806 get_mon_status(f
.get());
4811 encode(ss
.str(), payload
);
4812 reply
->set_payload(payload
);
4813 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4814 m
->get_connection()->send_message(reply
);
4817 void Monitor::timecheck_start()
4819 dout(10) << __func__
<< dendl
;
4820 timecheck_cleanup();
4821 if (get_quorum_mon_features().contains_all(
4822 ceph::features::mon::FEATURE_NAUTILUS
)) {
4823 timecheck_start_round();
4827 void Monitor::timecheck_finish()
4829 dout(10) << __func__
<< dendl
;
4830 timecheck_cleanup();
4833 void Monitor::timecheck_start_round()
4835 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4836 ceph_assert(is_leader());
4838 if (monmap
->size() == 1) {
4839 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
4843 if (timecheck_round
% 2) {
4844 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4845 utime_t curr_time
= ceph_clock_now();
4846 double max
= g_conf()->mon_timecheck_interval
*3;
4847 if (curr_time
- timecheck_round_start
< max
) {
4848 dout(10) << __func__
<< " keep current round going" << dendl
;
4851 dout(10) << __func__
4852 << " finish current timecheck and start new" << dendl
;
4853 timecheck_cancel_round();
4857 ceph_assert(timecheck_round
% 2 == 0);
4860 timecheck_round_start
= ceph_clock_now();
4861 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4865 dout(10) << __func__
<< " setting up next event" << dendl
;
4866 timecheck_reset_event();
4869 void Monitor::timecheck_finish_round(bool success
)
4871 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4872 ceph_assert(timecheck_round
% 2);
4874 timecheck_round_start
= utime_t();
4877 ceph_assert(timecheck_waiting
.empty());
4878 ceph_assert(timecheck_acks
== quorum
.size());
4880 timecheck_check_skews();
4884 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4885 << " peers still waiting:";
4886 for (auto& p
: timecheck_waiting
) {
4887 *_dout
<< " mon." << p
.first
;
4890 timecheck_waiting
.clear();
4892 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4895 void Monitor::timecheck_cancel_round()
4897 timecheck_finish_round(false);
4900 void Monitor::timecheck_cleanup()
4902 timecheck_round
= 0;
4904 timecheck_round_start
= utime_t();
4906 if (timecheck_event
) {
4907 timer
.cancel_event(timecheck_event
);
4908 timecheck_event
= NULL
;
4910 timecheck_waiting
.clear();
4911 timecheck_skews
.clear();
4912 timecheck_latencies
.clear();
4914 timecheck_rounds_since_clean
= 0;
4917 void Monitor::timecheck_reset_event()
4919 if (timecheck_event
) {
4920 timer
.cancel_event(timecheck_event
);
4921 timecheck_event
= NULL
;
4925 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4927 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4928 delay
= cct
->_conf
->mon_timecheck_interval
;
4931 dout(10) << __func__
<< " delay " << delay
4932 << " rounds_since_clean " << timecheck_rounds_since_clean
4935 timecheck_event
= timer
.add_event_after(
4937 new C_MonContext
{this, [this](int) {
4938 timecheck_start_round();
4942 void Monitor::timecheck_check_skews()
4944 dout(10) << __func__
<< dendl
;
4945 ceph_assert(is_leader());
4946 ceph_assert((timecheck_round
% 2) == 0);
4947 if (monmap
->size() == 1) {
4948 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4951 ceph_assert(timecheck_latencies
.size() == timecheck_skews
.size());
4953 bool found_skew
= false;
4954 for (auto& p
: timecheck_skews
) {
4956 if (timecheck_has_skew(p
.second
, &abs_skew
)) {
4957 dout(10) << __func__
4958 << " " << p
.first
<< " skew " << abs_skew
<< dendl
;
4964 ++timecheck_rounds_since_clean
;
4965 timecheck_reset_event();
4966 } else if (timecheck_rounds_since_clean
> 0) {
4968 << " no clock skews found after " << timecheck_rounds_since_clean
4969 << " rounds" << dendl
;
4970 // make sure the skews are really gone and not just a transient success
4971 // this will run just once if not in the presence of skews again.
4972 timecheck_rounds_since_clean
= 1;
4973 timecheck_reset_event();
4974 timecheck_rounds_since_clean
= 0;
4979 void Monitor::timecheck_report()
4981 dout(10) << __func__
<< dendl
;
4982 ceph_assert(is_leader());
4983 ceph_assert((timecheck_round
% 2) == 0);
4984 if (monmap
->size() == 1) {
4985 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4989 ceph_assert(timecheck_latencies
.size() == timecheck_skews
.size());
4990 bool do_output
= true; // only output report once
4991 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4992 if (monmap
->get_name(*q
) == name
)
4995 MTimeCheck2
*m
= new MTimeCheck2(MTimeCheck2::OP_REPORT
);
4996 m
->epoch
= get_epoch();
4997 m
->round
= timecheck_round
;
4999 for (auto& it
: timecheck_skews
) {
5000 double skew
= it
.second
;
5001 double latency
= timecheck_latencies
[it
.first
];
5003 m
->skews
[it
.first
] = skew
;
5004 m
->latencies
[it
.first
] = latency
;
5007 dout(25) << __func__
<< " mon." << it
.first
5008 << " latency " << latency
5009 << " skew " << skew
<< dendl
;
5013 dout(10) << __func__
<< " send report to mon." << *q
<< dendl
;
5014 send_mon_message(m
, *q
);
5018 void Monitor::timecheck()
5020 dout(10) << __func__
<< dendl
;
5021 ceph_assert(is_leader());
5022 if (monmap
->size() == 1) {
5023 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
5026 ceph_assert(timecheck_round
% 2 != 0);
5028 timecheck_acks
= 1; // we ack ourselves
5030 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
5031 << " round " << timecheck_round
<< dendl
;
5033 // we are at the eye of the storm; the point of reference
5034 timecheck_skews
[rank
] = 0.0;
5035 timecheck_latencies
[rank
] = 0.0;
5037 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
5038 if (monmap
->get_name(*it
) == name
)
5041 utime_t curr_time
= ceph_clock_now();
5042 timecheck_waiting
[*it
] = curr_time
;
5043 MTimeCheck2
*m
= new MTimeCheck2(MTimeCheck2::OP_PING
);
5044 m
->epoch
= get_epoch();
5045 m
->round
= timecheck_round
;
5046 dout(10) << __func__
<< " send " << *m
<< " to mon." << *it
<< dendl
;
5047 send_mon_message(m
, *it
);
5051 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
5052 const double skew_bound
,
5053 const double latency
)
5055 health_status_t status
= HEALTH_OK
;
5056 ceph_assert(latency
>= 0);
5059 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
5060 status
= HEALTH_WARN
;
5061 ss
<< "clock skew " << abs_skew
<< "s"
5062 << " > max " << g_conf()->mon_clock_drift_allowed
<< "s";
5068 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
5070 auto m
= op
->get_req
<MTimeCheck2
>();
5071 dout(10) << __func__
<< " " << *m
<< dendl
;
5072 /* handles PONG's */
5073 ceph_assert(m
->op
== MTimeCheck2::OP_PONG
);
5075 int other
= m
->get_source().num();
5076 if (m
->epoch
< get_epoch()) {
5077 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
5078 << " from " << other
5079 << " curr " << get_epoch()
5080 << " -- severely lagged? discard" << dendl
;
5083 ceph_assert(m
->epoch
== get_epoch());
5085 if (m
->round
< timecheck_round
) {
5086 dout(1) << __func__
<< " got old round " << m
->round
5087 << " from " << other
5088 << " curr " << timecheck_round
<< " -- discard" << dendl
;
5092 utime_t curr_time
= ceph_clock_now();
5094 ceph_assert(timecheck_waiting
.count(other
) > 0);
5095 utime_t timecheck_sent
= timecheck_waiting
[other
];
5096 timecheck_waiting
.erase(other
);
5097 if (curr_time
< timecheck_sent
) {
5098 // our clock was readjusted -- drop everything until it all makes sense.
5099 dout(1) << __func__
<< " our clock was readjusted --"
5100 << " bump round and drop current check"
5102 timecheck_cancel_round();
5106 /* update peer latencies */
5107 double latency
= (double)(curr_time
- timecheck_sent
);
5109 if (timecheck_latencies
.count(other
) == 0)
5110 timecheck_latencies
[other
] = latency
;
5112 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
5113 timecheck_latencies
[other
] = avg_latency
;
5119 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
5120 * and 'a' happens to be lower than 'b'; so we use double instead.
5122 * latency is always expected to be >= 0.
5124 * delta, the difference between theirs timestamp and ours, may either be
5125 * lower or higher than 0; will hardly ever be 0.
5127 * The absolute skew is the absolute delta minus the latency, which is
5128 * taken as a whole instead of an rtt given that there is some queueing
5129 * and dispatch times involved and it's hard to assess how long exactly
5130 * it took for the message to travel to the other side and be handled. So
5131 * we call it a bounded skew, the worst case scenario.
5135 * Given that the latency is always positive, we can establish that the
5136 * bounded skew will be:
5138 * 1. positive if the absolute delta is higher than the latency and
5140 * 2. negative if the absolute delta is higher than the latency and
5141 * delta is negative.
5142 * 3. zero if the absolute delta is lower than the latency.
5144 * On 3. we make a judgement call and treat the skew as non-existent.
5145 * This is because that, if the absolute delta is lower than the
5146 * latency, then the apparently existing skew is nothing more than a
5147 * side-effect of the high latency at work.
5149 * This may not be entirely true though, as a severely skewed clock
5150 * may be masked by an even higher latency, but with high latencies
5151 * we probably have worse issues to deal with than just skewed clocks.
5153 ceph_assert(latency
>= 0);
5155 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
5156 double abs_delta
= (delta
> 0 ? delta
: -delta
);
5157 double skew_bound
= abs_delta
- latency
;
5161 skew_bound
= -skew_bound
;
5164 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
5165 if (status
!= HEALTH_OK
) {
5166 clog
->health(status
) << other
<< " " << ss
.str();
5169 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
5170 << " delta " << delta
<< " skew_bound " << skew_bound
5171 << " latency " << latency
<< dendl
;
5173 timecheck_skews
[other
] = skew_bound
;
5176 if (timecheck_acks
== quorum
.size()) {
5177 dout(10) << __func__
<< " got pongs from everybody ("
5178 << timecheck_acks
<< " total)" << dendl
;
5179 ceph_assert(timecheck_skews
.size() == timecheck_acks
);
5180 ceph_assert(timecheck_waiting
.empty());
5181 // everyone has acked, so bump the round to finish it.
5182 timecheck_finish_round();
5186 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
5188 auto m
= op
->get_req
<MTimeCheck2
>();
5189 dout(10) << __func__
<< " " << *m
<< dendl
;
5191 ceph_assert(is_peon());
5192 ceph_assert(m
->op
== MTimeCheck2::OP_PING
|| m
->op
== MTimeCheck2::OP_REPORT
);
5194 if (m
->epoch
!= get_epoch()) {
5195 dout(1) << __func__
<< " got wrong epoch "
5196 << "(ours " << get_epoch()
5197 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
5201 if (m
->round
< timecheck_round
) {
5202 dout(1) << __func__
<< " got old round " << m
->round
5203 << " current " << timecheck_round
5204 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
5208 timecheck_round
= m
->round
;
5210 if (m
->op
== MTimeCheck2::OP_REPORT
) {
5211 ceph_assert((timecheck_round
% 2) == 0);
5212 timecheck_latencies
.swap(m
->latencies
);
5213 timecheck_skews
.swap(m
->skews
);
5217 ceph_assert((timecheck_round
% 2) != 0);
5218 MTimeCheck2
*reply
= new MTimeCheck2(MTimeCheck2::OP_PONG
);
5219 utime_t curr_time
= ceph_clock_now();
5220 reply
->timestamp
= curr_time
;
5221 reply
->epoch
= m
->epoch
;
5222 reply
->round
= m
->round
;
5223 dout(10) << __func__
<< " send " << *m
5224 << " to " << m
->get_source_inst() << dendl
;
5225 m
->get_connection()->send_message(reply
);
5228 void Monitor::handle_timecheck(MonOpRequestRef op
)
5230 auto m
= op
->get_req
<MTimeCheck2
>();
5231 dout(10) << __func__
<< " " << *m
<< dendl
;
5234 if (m
->op
!= MTimeCheck2::OP_PONG
) {
5235 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
5237 handle_timecheck_leader(op
);
5239 } else if (is_peon()) {
5240 if (m
->op
!= MTimeCheck2::OP_PING
&& m
->op
!= MTimeCheck2::OP_REPORT
) {
5241 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
5243 handle_timecheck_peon(op
);
5246 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
5250 void Monitor::handle_subscribe(MonOpRequestRef op
)
5252 auto m
= op
->get_req
<MMonSubscribe
>();
5253 dout(10) << "handle_subscribe " << *m
<< dendl
;
5257 MonSession
*s
= op
->get_session();
5260 if (m
->hostname
.size()) {
5261 s
->remote_host
= m
->hostname
;
5264 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
5267 if (p
->first
== "monmap" || p
->first
== "config") {
5268 // these require no caps
5269 } else if (!s
->is_capable("mon", MON_CAP_R
)) {
5270 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
5271 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
5276 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5277 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
5280 // remove conflicting subscribes
5281 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
5282 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
5283 it
!= s
->sub_map
.end(); ) {
5284 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
5285 std::lock_guard
l(session_map_lock
);
5286 session_map
.remove_sub((it
++)->second
);
5294 std::lock_guard
l(session_map_lock
);
5295 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
5296 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
5297 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
5300 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
5301 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
5302 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
5303 Subscription
*sub
= s
->sub_map
[p
->first
];
5304 ceph_assert(sub
!= nullptr);
5305 mdsmon()->check_sub(sub
);
5307 } else if (p
->first
== "osdmap") {
5308 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
5309 if (s
->osd_epoch
> p
->second
.start
) {
5310 // client needs earlier osdmaps on purpose, so reset the sent epoch
5313 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
5315 } else if (p
->first
== "osd_pg_creates") {
5316 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
5317 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
5319 } else if (p
->first
== "monmap") {
5320 monmon()->check_sub(s
->sub_map
[p
->first
]);
5321 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
5322 logmon()->check_sub(s
->sub_map
[p
->first
]);
5323 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
5324 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
5325 } else if (p
->first
== "servicemap") {
5326 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
5327 } else if (p
->first
== "config") {
5328 configmon()->check_sub(s
);
5329 } else if (p
->first
.find("kv:") == 0) {
5330 kvmon()->check_sub(s
->sub_map
[p
->first
]);
5335 // we only need to reply if the client is old enough to think it
5336 // has to send renewals.
5337 ConnectionRef con
= m
->get_connection();
5338 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
5339 m
->get_connection()->send_message(new MMonSubscribeAck(
5340 monmap
->get_fsid(), (int)g_conf()->mon_subscribe_interval
));
5345 void Monitor::handle_get_version(MonOpRequestRef op
)
5347 auto m
= op
->get_req
<MMonGetVersion
>();
5348 dout(10) << "handle_get_version " << *m
<< dendl
;
5349 PaxosService
*svc
= NULL
;
5351 MonSession
*s
= op
->get_session();
5354 if (!is_leader() && !is_peon()) {
5355 dout(10) << " waiting for quorum" << dendl
;
5356 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
5360 if (m
->what
== "mdsmap") {
5362 } else if (m
->what
== "fsmap") {
5364 } else if (m
->what
== "osdmap") {
5366 } else if (m
->what
== "monmap") {
5369 derr
<< "invalid map type " << m
->what
<< dendl
;
5373 if (!svc
->is_readable()) {
5374 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
5378 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
5379 reply
->handle
= m
->handle
;
5380 reply
->version
= svc
->get_last_committed();
5381 reply
->oldest_version
= svc
->get_first_committed();
5382 reply
->set_tid(m
->get_tid());
5384 m
->get_connection()->send_message(reply
);
5390 bool Monitor::ms_handle_reset(Connection
*con
)
5392 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
5394 // ignore lossless monitor sessions
5395 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
5398 auto priv
= con
->get_priv();
5399 auto s
= static_cast<MonSession
*>(priv
.get());
5403 // break any con <-> session ref cycle
5404 s
->con
->set_priv(nullptr);
5409 std::lock_guard
l(lock
);
5411 dout(10) << "reset/close on session " << s
->name
<< " " << s
->addrs
<< dendl
;
5412 if (!s
->closed
&& s
->item
.is_on_list()) {
5413 std::lock_guard
l(session_map_lock
);
5419 bool Monitor::ms_handle_refused(Connection
*con
)
5421 // just log for now...
5422 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
5428 void Monitor::send_latest_monmap(Connection
*con
)
5431 monmap
->encode(bl
, con
->get_features());
5432 con
->send_message(new MMonMap(bl
));
5435 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
5437 auto m
= op
->get_req
<MMonGetMap
>();
5438 dout(10) << "handle_mon_get_map" << dendl
;
5439 send_latest_monmap(m
->get_connection().get());
5442 int Monitor::load_metadata()
5445 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5448 auto it
= bl
.cbegin();
5449 decode(mon_metadata
, it
);
5451 pending_metadata
= mon_metadata
;
5455 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
5458 if (!mon_metadata
.count(mon
)) {
5459 err
<< "mon." << mon
<< " not found";
5462 const Metadata
& m
= mon_metadata
[mon
];
5463 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
5464 f
->dump_string(p
->first
.c_str(), p
->second
);
5469 void Monitor::count_metadata(const string
& field
, map
<string
,int> *out
)
5471 for (auto& p
: mon_metadata
) {
5472 auto q
= p
.second
.find(field
);
5473 if (q
== p
.second
.end()) {
5474 (*out
)["unknown"]++;
5476 (*out
)[q
->second
]++;
5481 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5483 map
<string
,int> by_val
;
5484 count_metadata(field
, &by_val
);
5485 f
->open_object_section(field
.c_str());
5486 for (auto& p
: by_val
) {
5487 f
->dump_int(p
.first
.c_str(), p
.second
);
5492 void Monitor::get_all_versions(std::map
<string
, list
<string
> > &versions
)
5495 get_versions(versions
);
5497 osdmon()->get_versions(versions
);
5499 mgrmon()->get_versions(versions
);
5501 mdsmon()->get_versions(versions
);
5502 dout(20) << __func__
<< " all versions=" << versions
<< dendl
;
5505 void Monitor::get_versions(std::map
<string
, list
<string
> > &versions
)
5507 for (auto& [rank
, metadata
] : mon_metadata
) {
5508 auto q
= metadata
.find("ceph_version_short");
5509 if (q
== metadata
.end()) {
5513 versions
[q
->second
].push_back(string("mon.") + monmap
->get_name(rank
));
5517 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5519 map
<string
, list
<string
> > mons
; // hostname => mon
5520 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5521 it
!= mon_metadata
.end(); ++it
) {
5522 const Metadata
& m
= it
->second
;
5523 Metadata::const_iterator hostname
= m
.find("hostname");
5524 if (hostname
== m
.end()) {
5525 // not likely though
5528 mons
[hostname
->second
].push_back(monmap
->get_name(it
->first
));
5531 dump_services(f
, mons
, "mon");
5535 // ----------------------------------------------
5538 int Monitor::scrub_start()
5540 dout(10) << __func__
<< dendl
;
5541 ceph_assert(is_leader());
5543 if (!scrub_result
.empty()) {
5544 clog
->info() << "scrub already in progress";
5548 scrub_event_cancel();
5549 scrub_result
.clear();
5550 scrub_state
.reset(new ScrubState
);
5556 int Monitor::scrub()
5558 ceph_assert(is_leader());
5559 ceph_assert(scrub_state
);
5561 scrub_cancel_timeout();
5562 wait_for_paxos_write();
5563 scrub_version
= paxos
->get_version();
5566 // scrub all keys if we're the only monitor in the quorum
5568 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5570 for (set
<int>::iterator p
= quorum
.begin();
5575 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5577 r
->key
= scrub_state
->last_key
;
5578 send_mon_message(r
, *p
);
5582 bool r
= _scrub(&scrub_result
[rank
],
5583 &scrub_state
->last_key
,
5586 scrub_state
->finished
= !r
;
5588 // only after we got our scrub results do we really care whether the
5589 // other monitors are late on their results. Also, this way we avoid
5590 // triggering the timeout if we end up getting stuck in _scrub() for
5591 // longer than the duration of the timeout.
5592 scrub_reset_timeout();
5594 if (quorum
.size() == 1) {
5595 ceph_assert(scrub_state
->finished
== true);
5601 void Monitor::handle_scrub(MonOpRequestRef op
)
5603 auto m
= op
->get_req
<MMonScrub
>();
5604 dout(10) << __func__
<< " " << *m
<< dendl
;
5606 case MMonScrub::OP_SCRUB
:
5611 wait_for_paxos_write();
5613 if (m
->version
!= paxos
->get_version())
5616 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5620 reply
->key
= m
->key
;
5621 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5622 m
->get_connection()->send_message(reply
);
5626 case MMonScrub::OP_RESULT
:
5630 if (m
->version
!= scrub_version
)
5632 // reset the timeout each time we get a result
5633 scrub_reset_timeout();
5635 int from
= m
->get_source().num();
5636 ceph_assert(scrub_result
.count(from
) == 0);
5637 scrub_result
[from
] = m
->result
;
5639 if (scrub_result
.size() == quorum
.size()) {
5640 scrub_check_results();
5641 scrub_result
.clear();
5642 if (scrub_state
->finished
)
5652 bool Monitor::_scrub(ScrubResult
*r
,
5653 pair
<string
,string
> *start
,
5656 ceph_assert(r
!= NULL
);
5657 ceph_assert(start
!= NULL
);
5658 ceph_assert(num_keys
!= NULL
);
5660 set
<string
> prefixes
= get_sync_targets_names();
5661 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5663 dout(10) << __func__
<< " start (" << *start
<< ")"
5664 << " num_keys " << *num_keys
<< dendl
;
5666 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5668 int scrubbed_keys
= 0;
5669 pair
<string
,string
> last_key
;
5671 while (it
->has_next_chunk()) {
5673 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5676 pair
<string
,string
> k
= it
->get_next_key();
5677 if (prefixes
.count(k
.first
) == 0)
5680 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5681 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5682 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5688 int err
= store
->get(k
.first
, k
.second
, bl
);
5689 ceph_assert(err
== 0);
5691 uint32_t key_crc
= bl
.crc32c(0);
5692 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5693 << " crc " << key_crc
<< dendl
;
5694 r
->prefix_keys
[k
.first
]++;
5695 if (r
->prefix_crc
.count(k
.first
) == 0) {
5696 r
->prefix_crc
[k
.first
] = 0;
5698 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5700 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5701 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5702 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5703 r
->prefix_crc
[k
.first
] += 1;
5710 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5711 << " scrubbed_keys " << scrubbed_keys
5712 << " has_next " << it
->has_next_chunk() << dendl
;
5715 *num_keys
= scrubbed_keys
;
5717 return it
->has_next_chunk();
5720 void Monitor::scrub_check_results()
5722 dout(10) << __func__
<< dendl
;
5726 ScrubResult
& mine
= scrub_result
[rank
];
5727 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5728 p
!= scrub_result
.end();
5730 if (p
->first
== rank
)
5732 if (p
->second
!= mine
) {
5734 clog
->error() << "scrub mismatch";
5735 clog
->error() << " mon." << rank
<< " " << mine
;
5736 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5740 clog
->debug() << "scrub ok on " << quorum
<< ": " << mine
;
5743 inline void Monitor::scrub_timeout()
5745 dout(1) << __func__
<< " restarting scrub" << dendl
;
5750 void Monitor::scrub_finish()
5752 dout(10) << __func__
<< dendl
;
5754 scrub_event_start();
5757 void Monitor::scrub_reset()
5759 dout(10) << __func__
<< dendl
;
5760 scrub_cancel_timeout();
5762 scrub_result
.clear();
5763 scrub_state
.reset();
5766 inline void Monitor::scrub_update_interval(ceph::timespan interval
)
5768 // we don't care about changes if we are not the leader.
5769 // changes will be visible if we become the leader.
5773 dout(1) << __func__
<< " new interval = " << interval
<< dendl
;
5775 // if scrub already in progress, all changes will already be visible during
5776 // the next round. Nothing to do.
5777 if (scrub_state
!= NULL
)
5780 scrub_event_cancel();
5781 scrub_event_start();
5784 void Monitor::scrub_event_start()
5786 dout(10) << __func__
<< dendl
;
5789 scrub_event_cancel();
5791 auto scrub_interval
=
5792 cct
->_conf
.get_val
<std::chrono::seconds
>("mon_scrub_interval");
5793 if (scrub_interval
== std::chrono::seconds::zero()) {
5794 dout(1) << __func__
<< " scrub event is disabled"
5795 << " (mon_scrub_interval = " << scrub_interval
5800 scrub_event
= timer
.add_event_after(
5802 new C_MonContext
{this, [this](int) {
5807 void Monitor::scrub_event_cancel()
5809 dout(10) << __func__
<< dendl
;
5811 timer
.cancel_event(scrub_event
);
5816 inline void Monitor::scrub_cancel_timeout()
5818 if (scrub_timeout_event
) {
5819 timer
.cancel_event(scrub_timeout_event
);
5820 scrub_timeout_event
= NULL
;
5824 void Monitor::scrub_reset_timeout()
5826 dout(15) << __func__
<< " reset timeout event" << dendl
;
5827 scrub_cancel_timeout();
5828 scrub_timeout_event
= timer
.add_event_after(
5829 g_conf()->mon_scrub_timeout
,
5830 new C_MonContext
{this, [this](int) {
5835 /************ TICK ***************/
5836 void Monitor::new_tick()
5838 timer
.add_event_after(g_conf()->mon_tick_interval
, new C_MonContext
{this, [this](int) {
5843 void Monitor::tick()
5846 dout(11) << "tick" << dendl
;
5847 const utime_t now
= ceph_clock_now();
5849 // Check if we need to emit any delayed health check updated messages
5851 const auto min_period
= g_conf().get_val
<int64_t>(
5852 "mon_health_log_update_period");
5853 for (auto& svc
: paxos_service
) {
5854 auto health
= svc
->get_health_checks();
5856 for (const auto &i
: health
.checks
) {
5857 const std::string
&code
= i
.first
;
5858 const std::string
&summary
= i
.second
.summary
;
5859 const health_status_t severity
= i
.second
.severity
;
5861 auto status_iter
= health_check_log_times
.find(code
);
5862 if (status_iter
== health_check_log_times
.end()) {
5866 auto &log_status
= status_iter
->second
;
5867 bool const changed
= log_status
.last_message
!= summary
5868 || log_status
.severity
!= severity
;
5870 if (changed
&& now
- log_status
.updated_at
> min_period
) {
5871 log_status
.last_message
= summary
;
5872 log_status
.updated_at
= now
;
5873 log_status
.severity
= severity
;
5876 ss
<< "Health check update: " << summary
<< " (" << code
<< ")";
5877 clog
->health(severity
) << ss
.str();
5884 for (auto& svc
: paxos_service
) {
5891 std::lock_guard
l(session_map_lock
);
5892 auto p
= session_map
.sessions
.begin();
5894 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5895 now
> (exited_quorum
+ 2*g_conf()->mon_lease
));
5901 // don't trim monitors
5902 if (s
->name
.is_mon())
5905 if (s
->session_timeout
< now
&& s
->con
) {
5906 // check keepalive, too
5907 s
->session_timeout
= s
->con
->get_last_keepalive();
5908 s
->session_timeout
+= g_conf()->mon_session_timeout
;
5910 if (s
->session_timeout
< now
) {
5911 dout(10) << " trimming session " << s
->con
<< " " << s
->name
5913 << " (timeout " << s
->session_timeout
5914 << " < now " << now
<< ")" << dendl
;
5915 } else if (out_for_too_long
) {
5916 // boot the client Session because we've taken too long getting back in
5917 dout(10) << " trimming session " << s
->con
<< " " << s
->name
5918 << " because we've been out of quorum too long" << dendl
;
5923 s
->con
->mark_down();
5925 logger
->inc(l_mon_session_trim
);
5928 sync_trim_providers();
5930 if (!maybe_wait_for_quorum
.empty()) {
5931 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5934 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5935 // this is only necessary on upgraded clusters.
5936 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5937 prepare_new_fingerprint(t
);
5938 paxos
->trigger_propose();
5941 mgr_client
.update_daemon_health(get_health_metrics());
5945 vector
<DaemonHealthMetric
> Monitor::get_health_metrics()
5947 vector
<DaemonHealthMetric
> metrics
;
5949 utime_t oldest_secs
;
5950 const utime_t now
= ceph_clock_now();
5952 too_old
-= g_conf().get_val
<std::chrono::seconds
>("mon_op_complaint_time").count();
5954 TrackedOpRef oldest_op
;
5955 auto count_slow_ops
= [&](TrackedOp
& op
) {
5956 if (op
.get_initiated() < too_old
) {
5958 if (!oldest_op
|| op
.get_initiated() < oldest_op
->get_initiated()) {
5966 if (op_tracker
.visit_ops_in_flight(&oldest_secs
, count_slow_ops
)) {
5968 derr
<< __func__
<< " reporting " << slow
<< " slow ops, oldest is "
5969 << oldest_op
->get_desc() << dendl
;
5971 metrics
.emplace_back(daemon_metric::SLOW_OPS
, slow
, oldest_secs
);
5973 metrics
.emplace_back(daemon_metric::SLOW_OPS
, 0, 0);
5978 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5981 nf
.generate_random();
5982 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5986 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5989 int Monitor::check_fsid()
5992 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5995 ceph_assert(r
== 0);
5997 string
es(ebl
.c_str(), ebl
.length());
5999 // only keep the first line
6000 size_t pos
= es
.find_first_of('\n');
6001 if (pos
!= string::npos
)
6004 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
6006 if (!ondisk
.parse(es
.c_str())) {
6007 derr
<< "error: unable to parse uuid" << dendl
;
6011 if (monmap
->get_fsid() != ondisk
) {
6012 derr
<< "error: cluster_uuid file exists with value " << ondisk
6013 << ", != our uuid " << monmap
->get_fsid() << dendl
;
6020 int Monitor::write_fsid()
6022 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
6024 int r
= store
->apply_transaction(t
);
6028 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
6031 ss
<< monmap
->get_fsid() << "\n";
6032 string us
= ss
.str();
6037 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
6042 * this is the closest thing to a traditional 'mkfs' for ceph.
6043 * initialize the monitor state machines to their initial values.
6045 int Monitor::mkfs(bufferlist
& osdmapbl
)
6047 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
6049 // verify cluster fsid
6050 int r
= check_fsid();
6051 if (r
< 0 && r
!= -ENOENT
)
6055 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
6056 magicbl
.append("\n");
6057 t
->put(MONITOR_NAME
, "magic", magicbl
);
6060 features
= get_initial_supported_features();
6063 // save monmap, osdmap, keyring.
6064 bufferlist monmapbl
;
6065 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
6066 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
6067 t
->put("mkfs", "monmap", monmapbl
);
6069 if (osdmapbl
.length()) {
6070 // make sure it's a valid osdmap
6073 om
.decode(osdmapbl
);
6075 catch (ceph::buffer::error
& e
) {
6076 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
6079 t
->put("mkfs", "osdmap", osdmapbl
);
6082 if (is_keyring_required()) {
6084 string keyring_filename
;
6086 r
= ceph_resolve_file_search(g_conf()->keyring
, keyring_filename
);
6088 if (g_conf()->key
!= "") {
6089 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf()->key
+
6090 "\n\tcaps mon = \"allow *\"\n";
6092 bl
.append(keyring_plaintext
);
6094 auto i
= bl
.cbegin();
6097 catch (const ceph::buffer::error
& e
) {
6098 derr
<< "error decoding keyring " << keyring_plaintext
6099 << ": " << e
.what() << dendl
;
6103 derr
<< "unable to find a keyring on " << g_conf()->keyring
6104 << ": " << cpp_strerror(r
) << dendl
;
6108 r
= keyring
.load(g_ceph_context
, keyring_filename
);
6110 derr
<< "unable to load initial keyring " << g_conf()->keyring
<< dendl
;
6115 // put mon. key in external keyring; seed with everything else.
6116 extract_save_mon_key(keyring
);
6118 bufferlist keyringbl
;
6119 keyring
.encode_plaintext(keyringbl
);
6120 t
->put("mkfs", "keyring", keyringbl
);
6123 store
->apply_transaction(t
);
6128 int Monitor::write_default_keyring(bufferlist
& bl
)
6131 os
<< g_conf()->mon_data
<< "/keyring";
6134 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
|O_CLOEXEC
, 0600);
6137 dout(0) << __func__
<< " failed to open " << os
.str()
6138 << ": " << cpp_strerror(err
) << dendl
;
6142 err
= bl
.write_fd(fd
);
6145 VOID_TEMP_FAILURE_RETRY(::close(fd
));
6150 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
6152 EntityName mon_name
;
6153 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
6155 if (keyring
.get_auth(mon_name
, mon_key
)) {
6156 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
6158 pkey
.add(mon_name
, mon_key
);
6160 pkey
.encode_plaintext(bl
);
6161 write_default_keyring(bl
);
6162 keyring
.remove(mon_name
);
6166 // AuthClient methods -- for mon <-> mon communication
6167 int Monitor::get_auth_request(
6169 AuthConnectionMeta
*auth_meta
,
6171 vector
<uint32_t> *preferred_modes
,
6174 std::scoped_lock
l(auth_lock
);
6175 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
&&
6176 con
->get_peer_type() != CEPH_ENTITY_TYPE_MGR
) {
6179 AuthAuthorizer
*auth
;
6180 if (!get_authorizer(con
->get_peer_type(), &auth
)) {
6183 auth_meta
->authorizer
.reset(auth
);
6184 auth_registry
.get_supported_modes(con
->get_peer_type(),
6187 *method
= auth
->protocol
;
6192 int Monitor::handle_auth_reply_more(
6194 AuthConnectionMeta
*auth_meta
,
6195 const bufferlist
& bl
,
6198 std::scoped_lock
l(auth_lock
);
6199 if (!auth_meta
->authorizer
) {
6200 derr
<< __func__
<< " no authorizer?" << dendl
;
6203 auth_meta
->authorizer
->add_challenge(cct
, bl
);
6204 *reply
= auth_meta
->authorizer
->bl
;
6208 int Monitor::handle_auth_done(
6210 AuthConnectionMeta
*auth_meta
,
6213 const bufferlist
& bl
,
6214 CryptoKey
*session_key
,
6215 std::string
*connection_secret
)
6217 std::scoped_lock
l(auth_lock
);
6218 // verify authorizer reply
6219 auto p
= bl
.begin();
6220 if (!auth_meta
->authorizer
->verify_reply(p
, connection_secret
)) {
6221 dout(0) << __func__
<< " failed verifying authorizer reply" << dendl
;
6224 auth_meta
->session_key
= auth_meta
->authorizer
->session_key
;
6228 int Monitor::handle_auth_bad_method(
6230 AuthConnectionMeta
*auth_meta
,
6231 uint32_t old_auth_method
,
6233 const std::vector
<uint32_t>& allowed_methods
,
6234 const std::vector
<uint32_t>& allowed_modes
)
6236 derr
<< __func__
<< " hmm, they didn't like " << old_auth_method
6237 << " result " << cpp_strerror(result
) << dendl
;
6241 bool Monitor::get_authorizer(int service_id
, AuthAuthorizer
**authorizer
)
6243 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id
)
6249 // we only connect to other monitors and mgr; every else connects to us.
6250 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
6251 service_id
!= CEPH_ENTITY_TYPE_MGR
)
6254 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
6256 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
6257 AuthNoneClientHandler handler
{g_ceph_context
};
6258 handler
.set_global_id(0);
6259 *authorizer
= handler
.build_authorizer(service_id
);
6263 CephXServiceTicketInfo auth_ticket_info
;
6264 CephXSessionAuthInfo info
;
6268 name
.set_type(CEPH_ENTITY_TYPE_MON
);
6269 auth_ticket_info
.ticket
.name
= name
;
6270 auth_ticket_info
.ticket
.global_id
= 0;
6272 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
6273 // mon to mon authentication uses the private monitor shared key and not the
6276 if (!keyring
.get_secret(name
, secret
) &&
6277 !key_server
.get_secret(name
, secret
)) {
6278 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6280 stringstream ss
, ds
;
6281 int err
= key_server
.list_secrets(ds
);
6283 ss
<< "no installed auth entries!";
6285 ss
<< "installed auth entries:";
6286 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
6290 ret
= key_server
.build_session_auth_info(
6291 service_id
, auth_ticket_info
.ticket
, secret
, (uint64_t)-1, info
);
6293 dout(0) << __func__
<< " failed to build mon session_auth_info "
6294 << cpp_strerror(ret
) << dendl
;
6297 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
6299 ret
= key_server
.build_session_auth_info(
6300 service_id
, auth_ticket_info
.ticket
, info
);
6302 derr
<< __func__
<< " failed to build mgr service session_auth_info "
6303 << cpp_strerror(ret
) << dendl
;
6307 ceph_abort(); // see check at top of fn
6310 CephXTicketBlob blob
;
6311 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
6312 dout(0) << "get_authorizer failed to build service ticket" << dendl
;
6315 bufferlist ticket_data
;
6316 encode(blob
, ticket_data
);
6318 auto iter
= ticket_data
.cbegin();
6319 CephXTicketHandler
handler(g_ceph_context
, service_id
);
6320 decode(handler
.ticket
, iter
);
6322 handler
.session_key
= info
.session_key
;
6324 *authorizer
= handler
.build_authorizer(0);
6329 int Monitor::handle_auth_request(
6331 AuthConnectionMeta
*auth_meta
,
6333 uint32_t auth_method
,
6334 const bufferlist
&payload
,
6337 std::scoped_lock
l(auth_lock
);
6339 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6340 // e.g., peer_features, peer_addrs, and others are still unknown.
6342 dout(10) << __func__
<< " con " << con
<< (more
? " (more)":" (start)")
6343 << " method " << auth_method
6344 << " payload " << payload
.length()
6346 if (!payload
.length()) {
6347 if (!con
->is_msgr2() &&
6348 con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
) {
6349 // for v1 connections, we tolerate no authorizer (from
6350 // non-monitors), because authentication happens via MAuth
6357 auth_meta
->auth_mode
= payload
[0];
6360 if (auth_meta
->auth_mode
>= AUTH_MODE_AUTHORIZER
&&
6361 auth_meta
->auth_mode
<= AUTH_MODE_AUTHORIZER_MAX
) {
6362 AuthAuthorizeHandler
*ah
= get_auth_authorize_handler(con
->get_peer_type(),
6365 lderr(cct
) << __func__
<< " no AuthAuthorizeHandler found for auth method "
6366 << auth_method
<< dendl
;
6369 bool was_challenge
= (bool)auth_meta
->authorizer_challenge
;
6370 bool isvalid
= ah
->verify_authorizer(
6374 auth_meta
->get_connection_secret_length(),
6377 &con
->peer_global_id
,
6378 &con
->peer_caps_info
,
6379 &auth_meta
->session_key
,
6380 &auth_meta
->connection_secret
,
6381 &auth_meta
->authorizer_challenge
);
6383 ms_handle_authentication(con
);
6386 if (!more
&& !was_challenge
&& auth_meta
->authorizer_challenge
) {
6389 dout(10) << __func__
<< " bad authorizer on " << con
<< dendl
;
6391 } else if (auth_meta
->auth_mode
< AUTH_MODE_MON
||
6392 auth_meta
->auth_mode
> AUTH_MODE_MON_MAX
) {
6393 derr
<< __func__
<< " unrecognized auth mode " << auth_meta
->auth_mode
6398 // wait until we've formed an initial quorum on mkfs so that we have
6399 // the initial keys (e.g., client.admin).
6400 if (authmon()->get_last_committed() == 0) {
6401 dout(10) << __func__
<< " haven't formed initial quorum, EBUSY" << dendl
;
6408 auto p
= payload
.begin();
6410 if (con
->get_priv()) {
6411 return -EACCES
; // wtf
6415 unique_ptr
<AuthServiceHandler
> auth_handler
{get_auth_service_handler(
6416 auth_method
, g_ceph_context
, &key_server
)};
6417 if (!auth_handler
) {
6418 dout(1) << __func__
<< " auth_method " << auth_method
<< " not supported"
6424 EntityName entity_name
;
6428 if (mode
< AUTH_MODE_MON
||
6429 mode
> AUTH_MODE_MON_MAX
) {
6430 dout(1) << __func__
<< " invalid mode " << (int)mode
<< dendl
;
6433 assert(mode
>= AUTH_MODE_MON
&& mode
<= AUTH_MODE_MON_MAX
);
6434 decode(entity_name
, p
);
6435 decode(con
->peer_global_id
, p
);
6436 } catch (ceph::buffer::error
& e
) {
6437 dout(1) << __func__
<< " failed to decode, " << e
.what() << dendl
;
6441 // supported method?
6442 if (entity_name
.get_type() == CEPH_ENTITY_TYPE_MON
||
6443 entity_name
.get_type() == CEPH_ENTITY_TYPE_OSD
||
6444 entity_name
.get_type() == CEPH_ENTITY_TYPE_MDS
||
6445 entity_name
.get_type() == CEPH_ENTITY_TYPE_MGR
) {
6446 if (!auth_cluster_required
.is_supported_auth(auth_method
)) {
6447 dout(10) << __func__
<< " entity " << entity_name
<< " method "
6448 << auth_method
<< " not among supported "
6449 << auth_cluster_required
.get_supported_set() << dendl
;
6453 if (!auth_service_required
.is_supported_auth(auth_method
)) {
6454 dout(10) << __func__
<< " entity " << entity_name
<< " method "
6455 << auth_method
<< " not among supported "
6456 << auth_cluster_required
.get_supported_set() << dendl
;
6461 // for msgr1 we would do some weirdness here to ensure signatures
6462 // are supported by the client if we require it. for msgr2 that
6463 // is not necessary.
6465 bool is_new_global_id
= false;
6466 if (!con
->peer_global_id
) {
6467 con
->peer_global_id
= authmon()->_assign_global_id();
6468 if (!con
->peer_global_id
) {
6469 dout(1) << __func__
<< " failed to assign global_id" << dendl
;
6472 is_new_global_id
= true;
6475 // set up partial session
6476 s
= new MonSession(con
);
6477 s
->auth_handler
= auth_handler
.release();
6478 con
->set_priv(RefCountedPtr
{s
, false});
6480 r
= s
->auth_handler
->start_session(
6482 con
->peer_global_id
,
6485 &con
->peer_caps_info
);
6487 priv
= con
->get_priv();
6489 // this can happen if the async ms_handle_reset event races with
6490 // the unlocked call into handle_auth_request
6493 s
= static_cast<MonSession
*>(priv
.get());
6494 r
= s
->auth_handler
->handle_request(
6496 auth_meta
->get_connection_secret_length(),
6498 &con
->peer_caps_info
,
6499 &auth_meta
->session_key
,
6500 &auth_meta
->connection_secret
);
6503 !s
->authenticated
) {
6504 ms_handle_authentication(con
);
6507 dout(30) << " r " << r
<< " reply:\n";
6508 reply
->hexdump(*_dout
);
6513 void Monitor::ms_handle_accept(Connection
*con
)
6515 auto priv
= con
->get_priv();
6516 MonSession
*s
= static_cast<MonSession
*>(priv
.get());
6518 // legacy protocol v1?
6519 dout(10) << __func__
<< " con " << con
<< " no session" << dendl
;
6523 if (s
->item
.is_on_list()) {
6524 dout(10) << __func__
<< " con " << con
<< " session " << s
6525 << " already on list" << dendl
;
6527 std::lock_guard
l(session_map_lock
);
6528 if (state
== STATE_SHUTDOWN
) {
6529 dout(10) << __func__
<< " ignoring new con " << con
<< " (shutdown)" << dendl
;
6533 dout(10) << __func__
<< " con " << con
<< " session " << s
6534 << " registering session for "
6535 << con
->get_peer_addrs() << dendl
;
6536 s
->_ident(entity_name_t(con
->get_peer_type(), con
->get_peer_id()),
6537 con
->get_peer_addrs());
6538 session_map
.add_session(s
);
6542 int Monitor::ms_handle_authentication(Connection
*con
)
6544 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
6545 // mon <-> mon connections need no Session, and setting one up
6546 // creates an awkward ref cycle between Session and Connection.
6550 auto priv
= con
->get_priv();
6551 MonSession
*s
= static_cast<MonSession
*>(priv
.get());
6553 // must be msgr2, otherwise dispatch would have set up the session.
6554 s
= session_map
.new_session(
6555 entity_name_t(con
->get_peer_type(), -1), // we don't know yet
6556 con
->get_peer_addrs(),
6559 dout(10) << __func__
<< " adding session " << s
<< " to con " << con
6562 logger
->set(l_mon_num_sessions
, session_map
.get_size());
6563 logger
->inc(l_mon_session_add
);
6565 dout(10) << __func__
<< " session " << s
<< " con " << con
6566 << " addr " << s
->con
->get_peer_addr()
6567 << " " << *s
<< dendl
;
6569 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
6571 if (caps_info
.allow_all
) {
6572 s
->caps
.set_allow_all();
6573 s
->authenticated
= true;
6575 } else if (caps_info
.caps
.length()) {
6576 bufferlist::const_iterator p
= caps_info
.caps
.cbegin();
6580 } catch (const ceph::buffer::error
&err
) {
6581 derr
<< __func__
<< " corrupt cap data for " << con
->get_peer_entity_name()
6582 << " in auth db" << dendl
;
6587 if (s
->caps
.parse(str
, NULL
)) {
6588 s
->authenticated
= true;
6591 derr
<< __func__
<< " unparseable caps '" << str
<< "' for "
6592 << con
->get_peer_entity_name() << dendl
;
6601 void Monitor::set_mon_crush_location(const string
& loc
)
6606 vector
<string
> loc_vec
;
6607 loc_vec
.push_back(loc
);
6608 CrushWrapper::parse_loc_map(loc_vec
, &crush_loc
);
6609 need_set_crush_loc
= true;
6612 void Monitor::notify_new_monmap(bool can_change_external_state
, bool remove_rank_elector
)
6614 if (need_set_crush_loc
) {
6615 auto my_info_i
= monmap
->mon_info
.find(name
);
6616 if (my_info_i
!= monmap
->mon_info
.end() &&
6617 my_info_i
->second
.crush_loc
== crush_loc
) {
6618 need_set_crush_loc
= false;
6621 elector
.notify_strategy_maybe_changed(monmap
->strategy
);
6622 if (remove_rank_elector
){
6623 dout(10) << __func__
<< " we have " << monmap
->ranks
.size()<< " ranks" << dendl
;
6624 dout(10) << __func__
<< " we have " << monmap
->removed_ranks
.size() << " removed ranks" << dendl
;
6625 for (auto i
= monmap
->removed_ranks
.rbegin();
6626 i
!= monmap
->removed_ranks
.rend(); ++i
) {
6627 int remove_rank
= *i
;
6628 dout(10) << __func__
<< " removing rank " << remove_rank
<< dendl
;
6629 if (rank
== remove_rank
) {
6630 dout(5) << "We are removing our own rank, probably we"
6631 << " are removed from monmap before we shutdown ... dropping." << dendl
;
6634 int new_rank
= monmap
->get_rank(messenger
->get_myaddrs());
6635 if (new_rank
== -1) {
6636 dout(5) << "We no longer exists in the monmap! ... dropping." << dendl
;
6639 elector
.notify_rank_removed(remove_rank
, new_rank
);
6643 if (monmap
->stretch_mode_enabled
) {
6644 try_engage_stretch_mode();
6647 if (is_stretch_mode()) {
6648 if (!monmap
->stretch_marked_down_mons
.empty()) {
6649 dout(20) << __func__
<< " stretch_marked_down_mons: " << monmap
->stretch_marked_down_mons
<< dendl
;
6650 set_degraded_stretch_mode();
6653 set_elector_disallowed_leaders(can_change_external_state
);
6656 void Monitor::set_elector_disallowed_leaders(bool allow_election
)
6659 for (auto name
: monmap
->disallowed_leaders
) {
6660 dl
.insert(monmap
->get_rank(name
));
6662 if (is_stretch_mode()) {
6663 for (auto name
: monmap
->stretch_marked_down_mons
) {
6664 dl
.insert(monmap
->get_rank(name
));
6666 dl
.insert(monmap
->get_rank(monmap
->tiebreaker_mon
));
6669 bool disallowed_changed
= elector
.set_disallowed_leaders(dl
);
6670 if (disallowed_changed
&& allow_election
) {
6671 elector
.call_election();
6675 struct CMonEnableStretchMode
: public Context
{
6677 CMonEnableStretchMode(Monitor
*mon
) : m(mon
) {}
6678 void finish(int r
) {
6679 m
->try_engage_stretch_mode();
6682 void Monitor::try_engage_stretch_mode()
6684 dout(20) << __func__
<< dendl
;
6685 if (stretch_mode_engaged
) return;
6686 if (!osdmon()->is_readable()) {
6687 dout(20) << "osdmon is not readable" << dendl
;
6688 osdmon()->wait_for_readable_ctx(new CMonEnableStretchMode(this));
6691 if (osdmon()->osdmap
.stretch_mode_enabled
&&
6692 monmap
->stretch_mode_enabled
) {
6693 dout(10) << "Engaging stretch mode!" << dendl
;
6694 stretch_mode_engaged
= true;
6695 int32_t stretch_divider_id
= osdmon()->osdmap
.stretch_mode_bucket
;
6696 stretch_bucket_divider
= osdmon()->osdmap
.
6697 crush
->get_type_name(stretch_divider_id
);
6698 disconnect_disallowed_stretch_sessions();
6702 void Monitor::do_stretch_mode_election_work()
6704 dout(20) << __func__
<< dendl
;
6705 if (!is_stretch_mode() ||
6706 !is_leader()) return;
6707 dout(20) << "checking for degraded stretch mode" << dendl
;
6708 map
<string
, set
<string
>> old_dead_buckets
;
6709 old_dead_buckets
.swap(dead_mon_buckets
);
6710 up_mon_buckets
.clear();
6711 // identify if we've lost a CRUSH bucket, request OSDMonitor check for death
6712 map
<string
,set
<string
>> down_mon_buckets
;
6713 for (unsigned i
= 0; i
< monmap
->size(); ++i
) {
6714 const auto &mi
= monmap
->mon_info
[monmap
->get_name(i
)];
6715 auto ci
= mi
.crush_loc
.find(stretch_bucket_divider
);
6716 ceph_assert(ci
!= mi
.crush_loc
.end());
6717 if (quorum
.count(i
)) {
6718 up_mon_buckets
.insert(ci
->second
);
6720 down_mon_buckets
[ci
->second
].insert(mi
.name
);
6723 dout(20) << "prior dead_mon_buckets: " << old_dead_buckets
6724 << "; down_mon_buckets: " << down_mon_buckets
6725 << "; up_mon_buckets: " << up_mon_buckets
<< dendl
;
6726 for (const auto& di
: down_mon_buckets
) {
6727 if (!up_mon_buckets
.count(di
.first
)) {
6728 dead_mon_buckets
[di
.first
] = di
.second
;
6731 dout(20) << "new dead_mon_buckets " << dead_mon_buckets
<< dendl
;
6733 if (dead_mon_buckets
!= old_dead_buckets
&&
6734 dead_mon_buckets
.size() >= old_dead_buckets
.size()) {
6735 maybe_go_degraded_stretch_mode();
6739 struct CMonGoDegraded
: public Context
{
6741 CMonGoDegraded(Monitor
*mon
) : m(mon
) {}
6742 void finish(int r
) {
6743 m
->maybe_go_degraded_stretch_mode();
6747 struct CMonGoRecovery
: public Context
{
6749 CMonGoRecovery(Monitor
*mon
) : m(mon
) {}
6750 void finish(int r
) {
6751 m
->go_recovery_stretch_mode();
6754 void Monitor::go_recovery_stretch_mode()
6756 dout(20) << __func__
<< dendl
;
6757 dout(20) << "is_leader(): " << is_leader() << dendl
;
6758 if (!is_leader()) return;
6759 dout(20) << "is_degraded_stretch_mode(): " << is_degraded_stretch_mode() << dendl
;
6760 if (!is_degraded_stretch_mode()) return;
6761 dout(20) << "is_recovering_stretch_mode(): " << is_recovering_stretch_mode() << dendl
;
6762 if (is_recovering_stretch_mode()) return;
6763 dout(20) << "dead_mon_buckets.size(): " << dead_mon_buckets
.size() << dendl
;
6764 dout(20) << "dead_mon_buckets: " << dead_mon_buckets
<< dendl
;
6765 if (dead_mon_buckets
.size()) {
6766 ceph_assert( 0 == "how did we try and do stretch recovery while we have dead monitor buckets?");
6767 // we can't recover if we are missing monitors in a zone!
6771 if (!osdmon()->is_readable()) {
6772 dout(20) << "osdmon is not readable" << dendl
;
6773 osdmon()->wait_for_readable_ctx(new CMonGoRecovery(this));
6777 if (!osdmon()->is_writeable()) {
6778 dout(20) << "osdmon is not writeable" << dendl
;
6779 osdmon()->wait_for_writeable_ctx(new CMonGoRecovery(this));
6782 osdmon()->trigger_recovery_stretch_mode();
6785 void Monitor::set_recovery_stretch_mode()
6787 degraded_stretch_mode
= true;
6788 recovering_stretch_mode
= true;
6789 osdmon()->set_recovery_stretch_mode();
6792 void Monitor::maybe_go_degraded_stretch_mode()
6794 dout(20) << __func__
<< dendl
;
6795 if (is_degraded_stretch_mode()) return;
6796 if (!is_leader()) return;
6797 if (dead_mon_buckets
.empty()) return;
6798 if (!osdmon()->is_readable()) {
6799 osdmon()->wait_for_readable_ctx(new CMonGoDegraded(this));
6802 ceph_assert(monmap
->contains(monmap
->tiebreaker_mon
));
6803 // filter out the tiebreaker zone and check if remaining sites are down by OSDs too
6804 const auto &mi
= monmap
->mon_info
[monmap
->tiebreaker_mon
];
6805 auto ci
= mi
.crush_loc
.find(stretch_bucket_divider
);
6806 map
<string
, set
<string
>> filtered_dead_buckets
= dead_mon_buckets
;
6807 filtered_dead_buckets
.erase(ci
->second
);
6809 set
<int> matched_down_buckets
;
6810 set
<string
> matched_down_mons
;
6811 bool dead
= osdmon()->check_for_dead_crush_zones(filtered_dead_buckets
,
6812 &matched_down_buckets
,
6813 &matched_down_mons
);
6815 if (!osdmon()->is_writeable()) {
6816 dout(20) << "osdmon is not writeable" << dendl
;
6817 osdmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6820 if (!monmon()->is_writeable()) {
6821 dout(20) << "monmon is not writeable" << dendl
;
6822 monmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6825 trigger_degraded_stretch_mode(matched_down_mons
, matched_down_buckets
);
6829 void Monitor::trigger_degraded_stretch_mode(const set
<string
>& dead_mons
,
6830 const set
<int>& dead_buckets
)
6832 dout(20) << __func__
<< dendl
;
6833 ceph_assert(osdmon()->is_writeable());
6834 ceph_assert(monmon()->is_writeable());
6836 // figure out which OSD zone(s) remains alive by removing
6837 // tiebreaker mon from up_mon_buckets
6838 set
<string
> live_zones
= up_mon_buckets
;
6839 ceph_assert(monmap
->contains(monmap
->tiebreaker_mon
));
6840 const auto &mi
= monmap
->mon_info
[monmap
->tiebreaker_mon
];
6841 auto ci
= mi
.crush_loc
.find(stretch_bucket_divider
);
6842 live_zones
.erase(ci
->second
);
6843 ceph_assert(live_zones
.size() == 1); // only support 2 zones right now
6845 osdmon()->trigger_degraded_stretch_mode(dead_buckets
, live_zones
);
6846 monmon()->trigger_degraded_stretch_mode(dead_mons
);
6847 set_degraded_stretch_mode();
6850 void Monitor::set_degraded_stretch_mode()
6852 dout(20) << __func__
<< dendl
;
6853 degraded_stretch_mode
= true;
6854 recovering_stretch_mode
= false;
6855 osdmon()->set_degraded_stretch_mode();
6858 struct CMonGoHealthy
: public Context
{
6860 CMonGoHealthy(Monitor
*mon
) : m(mon
) {}
6861 void finish(int r
) {
6862 m
->trigger_healthy_stretch_mode();
6867 void Monitor::trigger_healthy_stretch_mode()
6869 dout(20) << __func__
<< dendl
;
6870 if (!is_degraded_stretch_mode()) return;
6871 if (!is_leader()) return;
6872 if (!osdmon()->is_writeable()) {
6873 dout(20) << "osdmon is not writeable" << dendl
;
6874 osdmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6877 if (!monmon()->is_writeable()) {
6878 dout(20) << "monmon is not writeable" << dendl
;
6879 monmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6883 ceph_assert(osdmon()->osdmap
.recovering_stretch_mode
);
6884 osdmon()->trigger_healthy_stretch_mode();
6885 monmon()->trigger_healthy_stretch_mode();
6888 void Monitor::set_healthy_stretch_mode()
6890 degraded_stretch_mode
= false;
6891 recovering_stretch_mode
= false;
6892 osdmon()->set_healthy_stretch_mode();
6895 bool Monitor::session_stretch_allowed(MonSession
*s
, MonOpRequestRef
& op
)
6897 if (!is_stretch_mode()) return true;
6898 if (s
->proxy_con
) return true;
6899 if (s
->validated_stretch_connection
) return true;
6900 if (!s
->con
) return true;
6901 if (s
->con
->peer_is_osd()) {
6902 dout(20) << __func__
<< "checking OSD session" << s
<< dendl
;
6903 // okay, check the crush location
6904 int barrier_id
= [&] {
6905 auto type_id
= osdmon()->osdmap
.crush
->get_validated_type_id(
6906 stretch_bucket_divider
);
6907 ceph_assert(type_id
.has_value());
6910 int osd_bucket_id
= osdmon()->osdmap
.crush
->get_parent_of_type(s
->con
->peer_id
,
6912 const auto &mi
= monmap
->mon_info
.find(name
);
6913 ceph_assert(mi
!= monmap
->mon_info
.end());
6914 auto ci
= mi
->second
.crush_loc
.find(stretch_bucket_divider
);
6915 ceph_assert(ci
!= mi
->second
.crush_loc
.end());
6916 int mon_bucket_id
= osdmon()->osdmap
.crush
->get_item_id(ci
->second
);
6918 if (osd_bucket_id
!= mon_bucket_id
) {
6919 dout(5) << "discarding session " << *s
6920 << " and sending OSD to matched zone" << dendl
;
6921 s
->con
->mark_down();
6922 std::lock_guard
l(session_map_lock
);
6931 s
->validated_stretch_connection
= true;
6935 void Monitor::disconnect_disallowed_stretch_sessions()
6937 dout(20) << __func__
<< dendl
;
6938 MonOpRequestRef blank
;
6939 auto i
= session_map
.sessions
.begin();
6940 while (i
!= session_map
.sessions
.end()) {
6943 session_stretch_allowed(*j
, blank
);