1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include <boost/scope_exit.hpp>
24 #include <boost/algorithm/string/predicate.hpp>
26 #include "json_spirit/json_spirit_reader.h"
27 #include "json_spirit/json_spirit_writer.h"
30 #include "common/version.h"
31 #include "common/blkdev.h"
32 #include "common/cmdparse.h"
33 #include "common/signal.h"
35 #include "osd/OSDMap.h"
37 #include "MonitorDBStore.h"
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MMonMap.h"
41 #include "messages/MMonGetMap.h"
42 #include "messages/MMonGetVersion.h"
43 #include "messages/MMonGetVersionReply.h"
44 #include "messages/MGenericMessage.h"
45 #include "messages/MMonCommand.h"
46 #include "messages/MMonCommandAck.h"
47 #include "messages/MMonMetadata.h"
48 #include "messages/MMonSync.h"
49 #include "messages/MMonScrub.h"
50 #include "messages/MMonProbe.h"
51 #include "messages/MMonJoin.h"
52 #include "messages/MMonPaxos.h"
53 #include "messages/MRoute.h"
54 #include "messages/MForward.h"
56 #include "messages/MMonSubscribe.h"
57 #include "messages/MMonSubscribeAck.h"
59 #include "messages/MCommand.h"
60 #include "messages/MCommandReply.h"
62 #include "messages/MAuthReply.h"
64 #include "messages/MTimeCheck2.h"
65 #include "messages/MPing.h"
67 #include "common/strtol.h"
68 #include "common/ceph_argparse.h"
69 #include "common/Timer.h"
70 #include "common/Clock.h"
71 #include "common/errno.h"
72 #include "common/perf_counters.h"
73 #include "common/admin_socket.h"
74 #include "global/signal_handler.h"
75 #include "common/Formatter.h"
76 #include "include/stringify.h"
77 #include "include/color.h"
78 #include "include/ceph_fs.h"
79 #include "include/str_list.h"
81 #include "OSDMonitor.h"
82 #include "MDSMonitor.h"
83 #include "MonmapMonitor.h"
84 #include "LogMonitor.h"
85 #include "AuthMonitor.h"
86 #include "MgrMonitor.h"
87 #include "MgrStatMonitor.h"
88 #include "ConfigMonitor.h"
89 #include "mon/QuorumService.h"
90 #include "mon/HealthMonitor.h"
91 #include "mon/ConfigKeyService.h"
92 #include "common/config.h"
93 #include "common/cmdparse.h"
94 #include "include/ceph_assert.h"
95 #include "include/compat.h"
96 #include "perfglue/heap_profiler.h"
98 #include "auth/none/AuthNoneClientHandler.h"
100 #define dout_subsys ceph_subsys_mon
102 #define dout_prefix _prefix(_dout, this)
103 using namespace TOPNSPC::common
;
104 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
105 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
106 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
109 const string
Monitor::MONITOR_NAME
= "monitor";
110 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
115 #undef COMMAND_WITH_FLAG
116 #define FLAG(f) (MonCommand::FLAG_##f)
117 #define COMMAND(parsesig, helptext, modulename, req_perms) \
118 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
119 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
120 {parsesig, helptext, modulename, req_perms, flags},
121 MonCommand mon_commands
[] = {
122 #include <mon/MonCommands.h>
125 #undef COMMAND_WITH_FLAG
127 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
128 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
134 con_self(m
? m
->get_loopback_connection() : NULL
),
136 finisher(cct_
, "mon_finisher", "fin"),
137 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads
),
138 has_ever_joined(false),
139 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
141 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
142 key_server(cct
, &keyring
),
143 auth_cluster_required(cct
,
144 cct
->_conf
->auth_supported
.empty() ?
145 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
146 auth_service_required(cct
,
147 cct
->_conf
->auth_supported
.empty() ?
148 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
149 mgr_messenger(mgr_m
),
150 mgr_client(cct_
, mgr_m
, monmap
),
151 gss_ktfile_client(cct
->_conf
.get_val
<std::string
>("gss_ktab_client_file")),
155 required_features(0),
157 quorum_con_features(0),
161 scrub_timeout_event(NULL
),
164 sync_provider_count(0),
167 sync_start_version(0),
168 sync_timeout_event(NULL
),
169 sync_last_committed_floor(0),
173 timecheck_rounds_since_clean(0),
174 timecheck_event(NULL
),
176 paxos_service(PAXOS_NUM
),
178 routed_request_tid(0),
179 op_tracker(cct
, g_conf().get_val
<bool>("mon_enable_op_tracker"), 1)
181 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
182 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
184 update_log_clients();
186 if (!gss_ktfile_client
.empty()) {
187 // Assert we can export environment variable
189 The default client keytab is used, if it is present and readable,
190 to automatically obtain initial credentials for GSSAPI client
191 applications. The principal name of the first entry in the client
192 keytab is used by default when obtaining initial credentials.
193 1. The KRB5_CLIENT_KTNAME environment variable.
194 2. The default_client_keytab_name profile variable in [libdefaults].
195 3. The hardcoded default, DEFCKTNAME.
197 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
198 gss_ktfile_client
.c_str(), 1));
199 ceph_assert(set_result
== 0);
202 op_tracker
.set_complaint_and_threshold(
203 g_conf().get_val
<std::chrono::seconds
>("mon_op_complaint_time").count(),
204 g_conf().get_val
<int64_t>("mon_op_log_threshold"));
205 op_tracker
.set_history_size_and_duration(
206 g_conf().get_val
<uint64_t>("mon_op_history_size"),
207 g_conf().get_val
<std::chrono::seconds
>("mon_op_history_duration").count());
208 op_tracker
.set_history_slow_op_size_and_threshold(
209 g_conf().get_val
<uint64_t>("mon_op_history_slow_op_size"),
210 g_conf().get_val
<std::chrono::seconds
>("mon_op_history_slow_op_threshold").count());
212 paxos
= new Paxos(this, "paxos");
214 paxos_service
[PAXOS_MDSMAP
].reset(new MDSMonitor(this, paxos
, "mdsmap"));
215 paxos_service
[PAXOS_MONMAP
].reset(new MonmapMonitor(this, paxos
, "monmap"));
216 paxos_service
[PAXOS_OSDMAP
].reset(new OSDMonitor(cct
, this, paxos
, "osdmap"));
217 paxos_service
[PAXOS_LOG
].reset(new LogMonitor(this, paxos
, "logm"));
218 paxos_service
[PAXOS_AUTH
].reset(new AuthMonitor(this, paxos
, "auth"));
219 paxos_service
[PAXOS_MGR
].reset(new MgrMonitor(this, paxos
, "mgr"));
220 paxos_service
[PAXOS_MGRSTAT
].reset(new MgrStatMonitor(this, paxos
, "mgrstat"));
221 paxos_service
[PAXOS_HEALTH
].reset(new HealthMonitor(this, paxos
, "health"));
222 paxos_service
[PAXOS_CONFIG
].reset(new ConfigMonitor(this, paxos
, "config"));
224 config_key_service
= new ConfigKeyService(this, paxos
);
226 bool r
= mon_caps
.parse("allow *", NULL
);
229 exited_quorum
= ceph_clock_now();
231 // prepare local commands
232 local_mon_commands
.resize(std::size(mon_commands
));
233 for (unsigned i
= 0; i
< std::size(mon_commands
); ++i
) {
234 local_mon_commands
[i
] = mon_commands
[i
];
236 MonCommand::encode_vector(local_mon_commands
, local_mon_commands_bl
);
238 prenautilus_local_mon_commands
= local_mon_commands
;
239 for (auto& i
: prenautilus_local_mon_commands
) {
240 std::string n
= cmddesc_get_prenautilus_compat(i
.cmdstring
);
241 if (n
!= i
.cmdstring
) {
242 dout(20) << " pre-nautilus cmd " << i
.cmdstring
<< " -> " << n
<< dendl
;
246 MonCommand::encode_vector(prenautilus_local_mon_commands
, prenautilus_local_mon_commands_bl
);
248 // assume our commands until we have an election. this only means
249 // we won't reply with EINVAL before the election; any command that
250 // actually matters will wait until we have quorum etc and then
251 // retry (and revalidate).
252 leader_mon_commands
= local_mon_commands
;
257 op_tracker
.on_shutdown();
259 paxos_service
.clear();
260 delete config_key_service
;
262 ceph_assert(session_map
.sessions
.empty());
266 class AdminHook
: public AdminSocketHook
{
269 explicit AdminHook(Monitor
*m
) : mon(m
) {}
270 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
273 bufferlist
& out
) override
{
275 int r
= mon
->do_admin_command(command
, cmdmap
, f
, errss
, outss
);
281 int Monitor::do_admin_command(
282 std::string_view command
,
283 const cmdmap_t
& cmdmap
,
288 std::lock_guard
l(lock
);
292 for (auto p
= cmdmap
.begin();
293 p
!= cmdmap
.end(); ++p
) {
294 if (p
->first
== "prefix")
298 args
+= cmd_vartype_stringify(p
->second
);
300 args
= "[" + args
+ "]";
302 bool read_only
= (command
== "mon_status" ||
303 command
== "mon metadata" ||
304 command
== "quorum_status" ||
306 command
== "sessions");
308 (read_only
? audit_clog
->debug() : audit_clog
->info())
309 << "from='admin socket' entity='admin socket' "
310 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
312 if (command
== "mon_status") {
314 } else if (command
== "quorum_status") {
315 _quorum_status(f
, out
);
316 } else if (command
== "sync_force") {
318 if ((!cmd_getval(cmdmap
, "validate", validate
)) ||
319 (validate
!= "--yes-i-really-mean-it")) {
320 err
<< "are you SURE? this will mean the monitor store will be erased "
321 "the next time the monitor is restarted. pass "
322 "'--yes-i-really-mean-it' if you really do.";
327 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
328 command
.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
329 if (!_add_bootstrap_peer_hint(command
, cmdmap
, out
))
331 } else if (command
== "quorum enter") {
332 elector
.start_participating();
334 out
<< "started responding to quorum, initiated new election";
335 } else if (command
== "quorum exit") {
337 elector
.stop_participating();
338 out
<< "stopped responding to quorum, initiated new election";
339 } else if (command
== "ops") {
340 (void)op_tracker
.dump_ops_in_flight(f
);
341 } else if (command
== "sessions") {
342 f
->open_array_section("sessions");
343 for (auto p
: session_map
.sessions
) {
344 f
->dump_object("session", *p
);
347 } else if (command
== "dump_historic_ops") {
348 if (!op_tracker
.dump_historic_ops(f
)) {
349 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
350 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
352 } else if (command
== "dump_historic_ops_by_duration" ) {
353 if (op_tracker
.dump_historic_ops(f
, true)) {
354 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
355 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
357 } else if (command
== "dump_historic_slow_ops") {
358 if (op_tracker
.dump_historic_slow_ops(f
, {})) {
359 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
360 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
362 } else if (command
== "quorum") {
364 cmd_getval(cmdmap
, "quorumcmd", quorumcmd
);
365 if (quorumcmd
== "exit") {
367 elector
.stop_participating();
368 out
<< "stopped responding to quorum, initiated new election" << std::endl
;
369 } else if (quorumcmd
== "enter") {
370 elector
.start_participating();
372 out
<< "started responding to quorum, initiated new election" << std::endl
;
374 err
<< "needs a valid 'quorum' command" << std::endl
;
376 } else if (command
== "smart") {
378 cmd_getval(cmdmap
, "devid", want_devid
);
380 string devname
= store
->get_devname();
381 set
<string
> devnames
;
382 get_raw_devices(devname
, &devnames
);
383 json_spirit::mObject json_map
;
384 uint64_t smart_timeout
= cct
->_conf
.get_val
<uint64_t>(
385 "mon_smart_report_timeout");
386 for (auto& devname
: devnames
) {
388 string devid
= get_device_id(devname
, &err
);
389 if (want_devid
.size() && want_devid
!= devid
) {
390 derr
<< "get_device_id failed on " << devname
<< ": " << err
<< dendl
;
393 json_spirit::mValue smart_json
;
394 if (block_device_get_metrics(devname
, smart_timeout
,
396 dout(10) << "block_device_get_metrics failed for /dev/" << devname
400 json_map
[devid
] = smart_json
;
402 json_spirit::write(json_map
, out
, json_spirit::pretty_print
);
403 } else if (command
== "heap") {
404 if (!ceph_using_tcmalloc()) {
405 err
<< "could not issue heap profiler command -- not using tcmalloc!";
410 if (!cmd_getval(cmdmap
, "heapcmd", cmd
)) {
411 err
<< "unable to get value for command \"" << cmd
<< "\"";
415 std::vector
<std::string
> cmd_vec
;
416 get_str_vec(cmd
, cmd_vec
);
418 if (cmd_getval(cmdmap
, "value", val
)) {
419 cmd_vec
.push_back(val
);
421 ceph_heap_profiler_handle_command(cmd_vec
, out
);
422 } else if (command
== "compact") {
423 dout(1) << "triggering manual compaction" << dendl
;
424 auto start
= ceph::coarse_mono_clock::now();
425 store
->compact_async();
426 auto end
= ceph::coarse_mono_clock::now();
427 auto duration
= ceph::to_seconds
<double>(end
- start
);
428 dout(1) << "finished manual compaction in "
429 << duration
<< " seconds" << dendl
;
430 out
<< "compacted " << g_conf().get_val
<std::string
>("mon_keyvaluedb")
431 << " in " << duration
<< " seconds";
433 ceph_abort_msg("bad AdminSocket command binding");
435 (read_only
? audit_clog
->debug() : audit_clog
->info())
436 << "from='admin socket' "
437 << "entity='admin socket' "
438 << "cmd=" << command
<< " "
439 << "args=" << args
<< ": finished";
443 (read_only
? audit_clog
->debug() : audit_clog
->info())
444 << "from='admin socket' "
445 << "entity='admin socket' "
446 << "cmd=" << command
<< " "
447 << "args=" << args
<< ": aborted";
451 void Monitor::handle_signal(int signum
)
453 ceph_assert(signum
== SIGINT
|| signum
== SIGTERM
);
454 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
458 CompatSet
Monitor::get_initial_supported_features()
460 CompatSet::FeatureSet ceph_mon_feature_compat
;
461 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
462 CompatSet::FeatureSet ceph_mon_feature_incompat
;
463 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
464 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
465 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
466 ceph_mon_feature_incompat
);
469 CompatSet
Monitor::get_supported_features()
471 CompatSet compat
= get_initial_supported_features();
472 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
473 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
474 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
475 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
476 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
477 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
478 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC
);
479 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
);
480 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
);
484 CompatSet
Monitor::get_legacy_features()
486 CompatSet::FeatureSet ceph_mon_feature_compat
;
487 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
488 CompatSet::FeatureSet ceph_mon_feature_incompat
;
489 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
490 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
491 ceph_mon_feature_incompat
);
494 int Monitor::check_features(MonitorDBStore
*store
)
496 CompatSet required
= get_supported_features();
499 read_features_off_disk(store
, &ondisk
);
501 if (!required
.writeable(ondisk
)) {
502 CompatSet diff
= required
.unsupported(ondisk
);
503 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
510 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
512 bufferlist featuresbl
;
513 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
514 if (featuresbl
.length() == 0) {
515 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
516 << "Assuming it is old-style and introducing one." << dendl
;
517 //we only want the baseline ~v.18 features assumed to be on disk.
518 //If new features are introduced this code needs to disappear or
520 *features
= get_legacy_features();
522 features
->encode(featuresbl
);
523 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
524 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
525 store
->apply_transaction(t
);
527 auto it
= featuresbl
.cbegin();
528 features
->decode(it
);
532 void Monitor::read_features()
534 read_features_off_disk(store
, &features
);
535 dout(10) << "features " << features
<< dendl
;
537 calc_quorum_requirements();
538 dout(10) << "required_features " << required_features
<< dendl
;
541 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
545 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
548 const char** Monitor::get_tracked_conf_keys() const
550 static const char* KEYS
[] = {
551 "crushtool", // helpful for testing
552 "mon_election_timeout",
554 "mon_lease_renew_interval_factor",
555 "mon_lease_ack_timeout_factor",
556 "mon_accept_timeout_factor",
560 "clog_to_syslog_facility",
561 "clog_to_syslog_level",
563 "clog_to_graylog_host",
564 "clog_to_graylog_port",
567 // periodic health to clog
568 "mon_health_to_clog",
569 "mon_health_to_clog_interval",
570 "mon_health_to_clog_tick_interval",
572 "mon_scrub_interval",
573 "mon_allow_pool_delete",
574 // osdmap pruning - observed, not handled.
575 "mon_osdmap_full_prune_enabled",
576 "mon_osdmap_full_prune_min",
577 "mon_osdmap_full_prune_interval",
578 "mon_osdmap_full_prune_txsize",
579 // debug options - observed, not handled
580 "mon_debug_extra_checks",
581 "mon_debug_block_osdmap_trim",
587 void Monitor::handle_conf_change(const ConfigProxy
& conf
,
588 const std::set
<std::string
> &changed
)
592 dout(10) << __func__
<< " " << changed
<< dendl
;
594 if (changed
.count("clog_to_monitors") ||
595 changed
.count("clog_to_syslog") ||
596 changed
.count("clog_to_syslog_level") ||
597 changed
.count("clog_to_syslog_facility") ||
598 changed
.count("clog_to_graylog") ||
599 changed
.count("clog_to_graylog_host") ||
600 changed
.count("clog_to_graylog_port") ||
601 changed
.count("host") ||
602 changed
.count("fsid")) {
603 update_log_clients();
606 if (changed
.count("mon_health_to_clog") ||
607 changed
.count("mon_health_to_clog_interval") ||
608 changed
.count("mon_health_to_clog_tick_interval")) {
609 finisher
.queue(new C_MonContext
{this, [this, changed
](int) {
610 std::lock_guard l
{lock
};
611 health_to_clog_update_conf(changed
);
615 if (changed
.count("mon_scrub_interval")) {
616 int scrub_interval
= conf
->mon_scrub_interval
;
617 finisher
.queue(new C_MonContext
{this, [this, scrub_interval
](int) {
618 std::lock_guard l
{lock
};
619 scrub_update_interval(scrub_interval
);
624 void Monitor::update_log_clients()
626 map
<string
,string
> log_to_monitors
;
627 map
<string
,string
> log_to_syslog
;
628 map
<string
,string
> log_channel
;
629 map
<string
,string
> log_prio
;
630 map
<string
,string
> log_to_graylog
;
631 map
<string
,string
> log_to_graylog_host
;
632 map
<string
,string
> log_to_graylog_port
;
636 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
637 log_channel
, log_prio
, log_to_graylog
,
638 log_to_graylog_host
, log_to_graylog_port
,
642 clog
->update_config(log_to_monitors
, log_to_syslog
,
643 log_channel
, log_prio
, log_to_graylog
,
644 log_to_graylog_host
, log_to_graylog_port
,
647 audit_clog
->update_config(log_to_monitors
, log_to_syslog
,
648 log_channel
, log_prio
, log_to_graylog
,
649 log_to_graylog_host
, log_to_graylog_port
,
653 int Monitor::sanitize_options()
657 // mon_lease must be greater than mon_lease_renewal; otherwise we
658 // may incur in leases expiring before they are renewed.
659 if (g_conf()->mon_lease_renew_interval_factor
>= 1.0) {
660 clog
->error() << "mon_lease_renew_interval_factor ("
661 << g_conf()->mon_lease_renew_interval_factor
662 << ") must be less than 1.0";
666 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
667 // got time to renew the lease and get an ack for it. Having both options
668 // with the same value, for a given small vale, could mean timing out if
669 // the monitors happened to be overloaded -- or even under normal load for
670 // a small enough value.
671 if (g_conf()->mon_lease_ack_timeout_factor
<= 1.0) {
672 clog
->error() << "mon_lease_ack_timeout_factor ("
673 << g_conf()->mon_lease_ack_timeout_factor
674 << ") must be greater than 1.0";
681 int Monitor::preinit()
683 std::unique_lock
l(lock
);
685 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
687 int r
= sanitize_options();
689 derr
<< "option sanitization failed!" << dendl
;
693 ceph_assert(!logger
);
695 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
696 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess",
697 PerfCountersBuilder::PRIO_USEFUL
);
698 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions",
699 "sadd", PerfCountersBuilder::PRIO_INTERESTING
);
700 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions",
701 "srm", PerfCountersBuilder::PRIO_INTERESTING
);
702 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions",
703 "strm", PerfCountersBuilder::PRIO_USEFUL
);
704 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in",
705 "ecnt", PerfCountersBuilder::PRIO_USEFUL
);
706 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started",
707 "estt", PerfCountersBuilder::PRIO_INTERESTING
);
708 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won",
709 "ewon", PerfCountersBuilder::PRIO_INTERESTING
);
710 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost",
711 "elst", PerfCountersBuilder::PRIO_INTERESTING
);
712 logger
= pcb
.create_perf_counters();
713 cct
->get_perfcounters_collection()->add(logger
);
716 ceph_assert(!cluster_logger
);
718 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
719 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
720 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
721 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
722 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
723 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
724 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
725 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster", NULL
, 0, unit_t(UNIT_BYTES
));
726 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space", NULL
, 0, unit_t(UNIT_BYTES
));
727 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space", NULL
, 0, unit_t(UNIT_BYTES
));
728 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
729 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
730 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
731 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
732 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
733 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
734 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
735 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
736 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
737 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects", NULL
, 0, unit_t(UNIT_BYTES
));
738 cluster_logger
= pcb
.create_perf_counters();
741 paxos
->init_logger();
743 // verify cluster_uuid
745 int r
= check_fsid();
756 // have we ever joined a quorum?
757 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
758 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
760 if (!has_ever_joined
) {
761 // impose initial quorum restrictions?
762 list
<string
> initial_members
;
763 get_str_list(g_conf()->mon_initial_members
, initial_members
);
765 if (!initial_members
.empty()) {
766 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
768 monmap
->set_initial_members(
769 g_ceph_context
, initial_members
, name
, messenger
->get_myaddrs(),
772 dout(10) << " monmap is " << *monmap
<< dendl
;
773 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
775 } else if (!monmap
->contains(name
)) {
776 derr
<< "not in monmap and have been in a quorum before; "
777 << "must have been removed" << dendl
;
778 if (g_conf()->mon_force_quorum_join
) {
779 dout(0) << "we should have died but "
780 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
782 derr
<< "commit suicide!" << dendl
;
788 // We have a potentially inconsistent store state in hands. Get rid of it
790 bool clear_store
= false;
791 if (store
->exists("mon_sync", "in_sync")) {
792 dout(1) << __func__
<< " clean up potentially inconsistent store state"
797 if (store
->get("mon_sync", "force_sync") > 0) {
798 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
803 set
<string
> sync_prefixes
= get_sync_targets_names();
804 store
->clear(sync_prefixes
);
808 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
809 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
813 if (is_keyring_required()) {
814 // we need to bootstrap authentication keys so we can form an
816 if (authmon()->get_last_committed() == 0) {
817 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
819 int err
= store
->get("mkfs", "keyring", bl
);
820 if (err
== 0 && bl
.length() > 0) {
821 // Attempt to decode and extract keyring only if it is found.
823 auto p
= bl
.cbegin();
825 extract_save_mon_key(keyring
);
829 string keyring_loc
= g_conf()->mon_data
+ "/keyring";
831 r
= keyring
.load(cct
, keyring_loc
);
834 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
836 if (key_server
.get_auth(mon_name
, mon_key
)) {
837 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
838 keyring
.add(mon_name
, mon_key
);
840 keyring
.encode_plaintext(bl
);
841 write_default_keyring(bl
);
843 derr
<< "unable to load initial keyring " << g_conf()->keyring
<< dendl
;
849 admin_hook
= new AdminHook(this);
850 AdminSocket
* admin_socket
= cct
->get_admin_socket();
852 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
854 // register tell/asock commands
855 for (const auto& command
: local_mon_commands
) {
856 if (!command
.is_tell()) {
859 const auto prefix
= cmddesc_get_prefix(command
.cmdstring
);
860 if (prefix
== "injectargs" ||
861 prefix
== "version" ||
863 // not registerd by me
866 r
= admin_socket
->register_command(command
.cmdstring
, admin_hook
,
872 // add ourselves as a conf observer
873 g_conf().add_observer(this);
875 messenger
->set_auth_client(this);
876 messenger
->set_auth_server(this);
877 mgr_messenger
->set_auth_client(this);
879 auth_registry
.refresh_config();
886 dout(2) << "init" << dendl
;
887 std::lock_guard
l(lock
);
898 messenger
->add_dispatcher_tail(this);
900 // kickstart pet mgrclient
902 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
903 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
904 mgrmon()->prime_mgr_client();
906 state
= STATE_PROBING
;
908 // add features of myself into feature_map
909 session_map
.feature_map
.add_mon(con_self
->get_features());
913 void Monitor::init_paxos()
915 dout(10) << __func__
<< dendl
;
919 for (auto& svc
: paxos_service
) {
923 refresh_from_paxos(NULL
);
926 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
928 dout(10) << __func__
<< dendl
;
931 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
934 auto p
= bl
.cbegin();
935 decode(fingerprint
, p
);
937 catch (buffer::error
& e
) {
938 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
941 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
944 for (auto& svc
: paxos_service
) {
945 svc
->refresh(need_bootstrap
);
947 for (auto& svc
: paxos_service
) {
953 void Monitor::register_cluster_logger()
955 if (!cluster_logger_registered
) {
956 dout(10) << "register_cluster_logger" << dendl
;
957 cluster_logger_registered
= true;
958 cct
->get_perfcounters_collection()->add(cluster_logger
);
960 dout(10) << "register_cluster_logger - already registered" << dendl
;
964 void Monitor::unregister_cluster_logger()
966 if (cluster_logger_registered
) {
967 dout(10) << "unregister_cluster_logger" << dendl
;
968 cluster_logger_registered
= false;
969 cct
->get_perfcounters_collection()->remove(cluster_logger
);
971 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
975 void Monitor::update_logger()
977 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
978 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
981 void Monitor::shutdown()
983 dout(1) << "shutdown" << dendl
;
987 wait_for_paxos_write();
990 std::lock_guard
l(auth_lock
);
991 authmon()->_set_mon_num_rank(0, 0);
994 state
= STATE_SHUTDOWN
;
997 g_conf().remove_observer(this);
1001 cct
->get_admin_socket()->unregister_commands(admin_hook
);
1008 mgr_client
.shutdown();
1011 finisher
.wait_for_empty();
1017 for (auto& svc
: paxos_service
) {
1021 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
1022 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
1028 remove_all_sessions();
1030 log_client
.shutdown();
1032 // unlock before msgr shutdown...
1035 // shutdown messenger before removing logger from perfcounter collection,
1036 // otherwise _ms_dispatch() will try to update deleted logger
1037 messenger
->shutdown();
1038 mgr_messenger
->shutdown();
1041 cct
->get_perfcounters_collection()->remove(logger
);
1045 if (cluster_logger
) {
1046 if (cluster_logger_registered
)
1047 cct
->get_perfcounters_collection()->remove(cluster_logger
);
1048 delete cluster_logger
;
1049 cluster_logger
= NULL
;
1053 void Monitor::wait_for_paxos_write()
1055 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
1056 dout(10) << __func__
<< " flushing pending write" << dendl
;
1060 dout(10) << __func__
<< " flushed pending write" << dendl
;
1064 void Monitor::respawn()
1066 // --- WARNING TO FUTURE COPY/PASTERS ---
1067 // You must also add a call like
1069 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1071 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1072 // instead of "(exe)", so that killall (and log rotation) will work.
1074 dout(0) << __func__
<< dendl
;
1076 char *new_argv
[orig_argc
+1];
1077 dout(1) << " e: '" << orig_argv
[0] << "'" << dendl
;
1078 for (int i
=0; i
<orig_argc
; i
++) {
1079 new_argv
[i
] = (char *)orig_argv
[i
];
1080 dout(1) << " " << i
<< ": '" << orig_argv
[i
] << "'" << dendl
;
1082 new_argv
[orig_argc
] = NULL
;
1084 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1085 * This allows us to exec the same executable even if it has since been
1088 char exe_path
[PATH_MAX
] = "";
1090 if (readlink(PROCPREFIX
"/proc/self/exe", exe_path
, PATH_MAX
-1) != -1) {
1091 dout(1) << "respawning with exe " << exe_path
<< dendl
;
1092 strcpy(exe_path
, PROCPREFIX
"/proc/self/exe");
1097 /* Print CWD for the user's interest */
1099 char *cwd
= getcwd(buf
, sizeof(buf
));
1101 dout(1) << " cwd " << cwd
<< dendl
;
1103 /* Fall back to a best-effort: just running in our CWD */
1104 strncpy(exe_path
, orig_argv
[0], PATH_MAX
-1);
1107 dout(1) << " exe_path " << exe_path
<< dendl
;
1109 unblock_all_signals(NULL
);
1110 execv(exe_path
, new_argv
);
1112 dout(0) << "respawn execv " << orig_argv
[0]
1113 << " failed with " << cpp_strerror(errno
) << dendl
;
1115 // We have to assert out here, because suicide() returns, and callers
1116 // to respawn expect it never to return.
1120 void Monitor::bootstrap()
1122 dout(10) << "bootstrap" << dendl
;
1123 wait_for_paxos_write();
1125 sync_reset_requester();
1126 unregister_cluster_logger();
1127 cancel_probe_timeout();
1129 if (monmap
->get_epoch() == 0) {
1130 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl
;
1131 monmap
->calc_legacy_ranks();
1133 dout(10) << "monmap " << *monmap
<< dendl
;
1135 auto from_release
= monmap
->min_mon_release
;
1137 if (!can_upgrade_from(from_release
, "min_mon_release", err
)) {
1138 derr
<< "current monmap has " << err
.str() << " stopping." << dendl
;
1143 int newrank
= monmap
->get_rank(messenger
->get_myaddrs());
1144 if (newrank
< 0 && rank
>= 0) {
1145 // was i ever part of the quorum?
1146 if (has_ever_joined
) {
1147 dout(0) << " removed from monmap, suicide." << dendl
;
1152 monmap
->get_addrs(newrank
) != messenger
->get_myaddrs()) {
1153 dout(0) << " monmap addrs for rank " << newrank
<< " changed, i am "
1154 << messenger
->get_myaddrs()
1155 << ", monmap is " << monmap
->get_addrs(newrank
) << ", respawning"
1158 if (monmap
->get_epoch()) {
1159 // store this map in temp mon_sync location so that we use it on
1161 derr
<< " stashing newest monmap " << monmap
->get_epoch()
1162 << " for next startup" << dendl
;
1164 monmap
->encode(bl
, -1);
1165 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1166 t
->put("mon_sync", "temp_newer_monmap", bl
);
1167 store
->apply_transaction(t
);
1172 if (newrank
!= rank
) {
1173 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
1174 messenger
->set_myname(entity_name_t::MON(newrank
));
1177 // reset all connections, or else our peers will think we are someone else.
1178 messenger
->mark_down_all();
1182 state
= STATE_PROBING
;
1187 if (g_conf()->mon_compact_on_bootstrap
) {
1188 dout(10) << "bootstrap -- triggering compaction" << dendl
;
1190 dout(10) << "bootstrap -- finished compaction" << dendl
;
1193 // singleton monitor?
1194 if (monmap
->size() == 1 && rank
== 0) {
1195 win_standalone_election();
1199 reset_probe_timeout();
1201 // i'm outside the quorum
1202 if (monmap
->contains(name
))
1203 outside_quorum
.insert(name
);
1206 dout(10) << "probing other monitors" << dendl
;
1207 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1210 new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
,
1214 for (auto& av
: extra_probe_peers
) {
1215 if (av
!= messenger
->get_myaddrs()) {
1216 messenger
->send_to_mon(
1217 new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
,
1224 bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd
,
1225 const cmdmap_t
& cmdmap
,
1228 if (is_leader() || is_peon()) {
1229 ss
<< "mon already active; ignoring bootstrap hint";
1233 entity_addrvec_t addrs
;
1235 if (cmd_getval(cmdmap
, "addr", addrstr
)) {
1236 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' addr '"
1237 << addrstr
<< "'" << dendl
;
1240 const char *end
= 0;
1241 if (!addr
.parse(addrstr
.c_str(), &end
, entity_addr_t::TYPE_ANY
)) {
1242 ss
<< "failed to parse addrs '" << addrstr
1243 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1247 addrs
.v
.push_back(addr
);
1248 if (addr
.get_port() == 0) {
1249 addrs
.v
[0].set_type(entity_addr_t::TYPE_MSGR2
);
1250 addrs
.v
[0].set_port(CEPH_MON_PORT_IANA
);
1251 addrs
.v
.push_back(addr
);
1252 addrs
.v
[1].set_type(entity_addr_t::TYPE_LEGACY
);
1253 addrs
.v
[1].set_port(CEPH_MON_PORT_LEGACY
);
1254 } else if (addr
.get_type() == entity_addr_t::TYPE_ANY
) {
1255 if (addr
.get_port() == CEPH_MON_PORT_LEGACY
) {
1256 addrs
.v
[0].set_type(entity_addr_t::TYPE_LEGACY
);
1258 addrs
.v
[0].set_type(entity_addr_t::TYPE_MSGR2
);
1261 } else if (cmd_getval(cmdmap
, "addrv", addrstr
)) {
1262 dout(10) << "_add_bootstrap_peer_hintv '" << cmd
<< "' addrv '"
1263 << addrstr
<< "'" << dendl
;
1264 const char *end
= 0;
1265 if (!addrs
.parse(addrstr
.c_str(), &end
)) {
1266 ss
<< "failed to parse addrs '" << addrstr
1267 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1271 ss
<< "no addr or addrv provided";
1275 extra_probe_peers
.insert(addrs
);
1276 ss
<< "adding peer " << addrs
<< " to list: " << extra_probe_peers
;
1280 // called by bootstrap(), or on leader|peon -> electing
1281 void Monitor::_reset()
1283 dout(10) << __func__
<< dendl
;
1285 // disable authentication
1287 std::lock_guard
l(auth_lock
);
1288 authmon()->_set_mon_num_rank(0, 0);
1291 cancel_probe_timeout();
1293 health_events_cleanup();
1294 health_check_log_times
.clear();
1295 scrub_event_cancel();
1297 leader_since
= utime_t();
1299 if (!quorum
.empty()) {
1300 exited_quorum
= ceph_clock_now();
1303 outside_quorum
.clear();
1304 quorum_feature_map
.clear();
1310 for (auto& svc
: paxos_service
) {
1316 // -----------------------------------------------------------
1319 set
<string
> Monitor::get_sync_targets_names()
1321 set
<string
> targets
;
1322 targets
.insert(paxos
->get_name());
1323 for (auto& svc
: paxos_service
) {
1324 svc
->get_store_prefixes(targets
);
1326 ConfigKeyService
*config_key_service_ptr
= dynamic_cast<ConfigKeyService
*>(config_key_service
);
1327 ceph_assert(config_key_service_ptr
);
1328 config_key_service_ptr
->get_store_prefixes(targets
);
1333 void Monitor::sync_timeout()
1335 dout(10) << __func__
<< dendl
;
1336 ceph_assert(state
== STATE_SYNCHRONIZING
);
1340 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1342 dout(1) << __func__
<< dendl
;
1344 MonMap latest_monmap
;
1346 // Grab latest monmap from MonmapMonitor
1347 bufferlist monmon_bl
;
1348 int err
= monmon()->get_monmap(monmon_bl
);
1350 if (err
!= -ENOENT
) {
1352 << " something wrong happened while reading the store: "
1353 << cpp_strerror(err
) << dendl
;
1354 ceph_abort_msg("error reading the store");
1357 latest_monmap
.decode(monmon_bl
);
1360 // Grab last backed up monmap (if any) and compare epochs
1361 if (store
->exists("mon_sync", "latest_monmap")) {
1362 bufferlist backup_bl
;
1363 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1366 << " something wrong happened while reading the store: "
1367 << cpp_strerror(err
) << dendl
;
1368 ceph_abort_msg("error reading the store");
1370 ceph_assert(backup_bl
.length() > 0);
1372 MonMap backup_monmap
;
1373 backup_monmap
.decode(backup_bl
);
1375 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1376 latest_monmap
= backup_monmap
;
1379 // Check if our current monmap's epoch is greater than the one we've
1381 if (monmap
->epoch
> latest_monmap
.epoch
)
1382 latest_monmap
= *monmap
;
1384 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1386 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1389 void Monitor::sync_reset_requester()
1391 dout(10) << __func__
<< dendl
;
1393 if (sync_timeout_event
) {
1394 timer
.cancel_event(sync_timeout_event
);
1395 sync_timeout_event
= NULL
;
1398 sync_provider
= entity_addrvec_t();
1401 sync_start_version
= 0;
1404 void Monitor::sync_reset_provider()
1406 dout(10) << __func__
<< dendl
;
1407 sync_providers
.clear();
1410 void Monitor::sync_start(entity_addrvec_t
&addrs
, bool full
)
1412 dout(10) << __func__
<< " " << addrs
<< (full
? " full" : " recent") << dendl
;
1414 ceph_assert(state
== STATE_PROBING
||
1415 state
== STATE_SYNCHRONIZING
);
1416 state
= STATE_SYNCHRONIZING
;
1418 // make sure are not a provider for anyone!
1419 sync_reset_provider();
1424 // stash key state, and mark that we are syncing
1425 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1426 sync_stash_critical_state(t
);
1427 t
->put("mon_sync", "in_sync", 1);
1429 sync_last_committed_floor
= std::max(sync_last_committed_floor
, paxos
->get_version());
1430 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1431 << sync_last_committed_floor
<< dendl
;
1432 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1434 store
->apply_transaction(t
);
1436 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 1);
1438 // clear the underlying store
1439 set
<string
> targets
= get_sync_targets_names();
1440 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1441 store
->clear(targets
);
1443 // make sure paxos knows it has been reset. this prevents a
1444 // bootstrap and then different probe reply order from possibly
1445 // deciding a partial or no sync is needed.
1448 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 2);
1451 // assume 'other' as the leader. We will update the leader once we receive
1452 // a reply to the sync start.
1453 sync_provider
= addrs
;
1455 sync_reset_timeout();
1457 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1459 m
->last_committed
= paxos
->get_version();
1460 messenger
->send_to_mon(m
, sync_provider
);
1463 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1465 dout(10) << __func__
<< dendl
;
1466 bufferlist backup_monmap
;
1467 sync_obtain_latest_monmap(backup_monmap
);
1468 ceph_assert(backup_monmap
.length() > 0);
1469 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1472 void Monitor::sync_reset_timeout()
1474 dout(10) << __func__
<< dendl
;
1475 if (sync_timeout_event
)
1476 timer
.cancel_event(sync_timeout_event
);
1477 sync_timeout_event
= timer
.add_event_after(
1478 g_conf()->mon_sync_timeout
,
1479 new C_MonContext
{this, [this](int) {
1484 void Monitor::sync_finish(version_t last_committed
)
1486 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1488 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 7);
1491 // finalize the paxos commits
1492 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1493 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1495 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1497 dout(30) << __func__
<< " final tx dump:\n";
1498 JSONFormatter
f(true);
1503 store
->apply_transaction(tx
);
1506 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 8);
1508 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1509 t
->erase("mon_sync", "in_sync");
1510 t
->erase("mon_sync", "force_sync");
1511 t
->erase("mon_sync", "last_committed_floor");
1512 store
->apply_transaction(t
);
1514 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 9);
1518 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 10);
1523 void Monitor::handle_sync(MonOpRequestRef op
)
1525 auto m
= op
->get_req
<MMonSync
>();
1526 dout(10) << __func__
<< " " << *m
<< dendl
;
1529 // provider ---------
1531 case MMonSync::OP_GET_COOKIE_FULL
:
1532 case MMonSync::OP_GET_COOKIE_RECENT
:
1533 handle_sync_get_cookie(op
);
1535 case MMonSync::OP_GET_CHUNK
:
1536 handle_sync_get_chunk(op
);
1539 // client -----------
1541 case MMonSync::OP_COOKIE
:
1542 handle_sync_cookie(op
);
1545 case MMonSync::OP_CHUNK
:
1546 case MMonSync::OP_LAST_CHUNK
:
1547 handle_sync_chunk(op
);
1549 case MMonSync::OP_NO_COOKIE
:
1550 handle_sync_no_cookie(op
);
1554 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1555 ceph_abort_msg("unknown op");
1561 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1563 auto m
= op
->get_req
<MMonSync
>();
1564 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1565 m
->get_connection()->send_message(reply
);
1568 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1570 auto m
= op
->get_req
<MMonSync
>();
1571 if (is_synchronizing()) {
1572 _sync_reply_no_cookie(op
);
1576 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 1);
1578 // make sure they can understand us.
1579 if ((required_features
^ m
->get_connection()->get_features()) &
1580 required_features
) {
1581 dout(5) << " ignoring peer mon." << m
->get_source().num()
1582 << " has features " << std::hex
1583 << m
->get_connection()->get_features()
1584 << " but we require " << required_features
<< std::dec
<< dendl
;
1588 // make up a unique cookie. include election epoch (which persists
1589 // across restarts for the whole cluster) and a counter for this
1590 // process instance. there is no need to be unique *across*
1591 // monitors, though.
1592 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1593 ceph_assert(sync_providers
.count(cookie
) == 0);
1595 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1597 SyncProvider
& sp
= sync_providers
[cookie
];
1599 sp
.addrs
= m
->get_source_addrs();
1600 sp
.reset_timeout(g_ceph_context
, g_conf()->mon_sync_timeout
* 2);
1602 set
<string
> sync_targets
;
1603 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1605 sync_targets
= get_sync_targets_names();
1606 sp
.last_committed
= paxos
->get_version();
1607 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1609 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1611 // just catch up paxos
1612 sp
.last_committed
= m
->last_committed
;
1614 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1616 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1617 reply
->last_committed
= sp
.last_committed
;
1618 m
->get_connection()->send_message(reply
);
1621 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1623 auto m
= op
->get_req
<MMonSync
>();
1624 dout(10) << __func__
<< " " << *m
<< dendl
;
1626 if (sync_providers
.count(m
->cookie
) == 0) {
1627 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1628 _sync_reply_no_cookie(op
);
1632 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 2);
1634 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1635 sp
.reset_timeout(g_ceph_context
, g_conf()->mon_sync_timeout
* 2);
1637 if (sp
.last_committed
< paxos
->get_first_committed() &&
1638 paxos
->get_first_committed() > 1) {
1639 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1640 << " < our fc " << paxos
->get_first_committed() << dendl
;
1641 sync_providers
.erase(m
->cookie
);
1642 _sync_reply_no_cookie(op
);
1646 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1647 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1649 int bytes_left
= g_conf()->mon_sync_max_payload_size
;
1650 int keys_left
= g_conf()->mon_sync_max_payload_keys
;
1651 while (sp
.last_committed
< paxos
->get_version() &&
1655 sp
.last_committed
++;
1657 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1658 ceph_assert(err
== 0);
1660 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1661 bytes_left
-= bl
.length();
1663 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1666 reply
->last_committed
= sp
.last_committed
;
1668 if (sp
.full
&& bytes_left
> 0 && keys_left
> 0) {
1669 sp
.synchronizer
->get_chunk_tx(tx
, bytes_left
, keys_left
);
1670 sp
.last_key
= sp
.synchronizer
->get_last_key();
1671 reply
->last_key
= sp
.last_key
;
1674 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1675 sp
.last_committed
< paxos
->get_version()) {
1676 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1677 << " key " << sp
.last_key
<< dendl
;
1679 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1680 << " key " << sp
.last_key
<< dendl
;
1681 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1683 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 3);
1685 // clean up our local state
1686 sync_providers
.erase(sp
.cookie
);
1689 encode(*tx
, reply
->chunk_bl
);
1691 m
->get_connection()->send_message(reply
);
1696 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1698 auto m
= op
->get_req
<MMonSync
>();
1699 dout(10) << __func__
<< " " << *m
<< dendl
;
1701 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1704 if (m
->get_source_addrs() != sync_provider
) {
1705 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1708 sync_cookie
= m
->cookie
;
1709 sync_start_version
= m
->last_committed
;
1711 sync_reset_timeout();
1712 sync_get_next_chunk();
1714 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 3);
1717 void Monitor::sync_get_next_chunk()
1719 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1720 if (g_conf()->mon_inject_sync_get_chunk_delay
> 0) {
1721 dout(20) << __func__
<< " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay
<< dendl
;
1722 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay
* 1000000.0));
1724 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1725 messenger
->send_to_mon(r
, sync_provider
);
1727 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 4);
1730 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1732 auto m
= op
->get_req
<MMonSync
>();
1733 dout(10) << __func__
<< " " << *m
<< dendl
;
1735 if (m
->cookie
!= sync_cookie
) {
1736 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1739 if (m
->get_source_addrs() != sync_provider
) {
1740 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1744 ceph_assert(state
== STATE_SYNCHRONIZING
);
1745 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 5);
1747 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1748 tx
->append_from_encoded(m
->chunk_bl
);
1750 dout(30) << __func__
<< " tx dump:\n";
1751 JSONFormatter
f(true);
1756 store
->apply_transaction(tx
);
1758 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 6);
1761 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1762 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1763 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1765 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1767 dout(30) << __func__
<< " tx dump:\n";
1768 JSONFormatter
f(true);
1773 store
->apply_transaction(tx
);
1774 paxos
->init(); // to refresh what we just wrote
1777 if (m
->op
== MMonSync::OP_CHUNK
) {
1778 sync_reset_timeout();
1779 sync_get_next_chunk();
1780 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1781 sync_finish(m
->last_committed
);
1785 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1787 dout(10) << __func__
<< dendl
;
1791 void Monitor::sync_trim_providers()
1793 dout(20) << __func__
<< dendl
;
1795 utime_t now
= ceph_clock_now();
1796 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1797 while (p
!= sync_providers
.end()) {
1798 if (now
> p
->second
.timeout
) {
1799 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
1800 << " for " << p
->second
.addrs
<< dendl
;
1801 sync_providers
.erase(p
++);
1808 // ---------------------------------------------------
1811 void Monitor::cancel_probe_timeout()
1813 if (probe_timeout_event
) {
1814 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1815 timer
.cancel_event(probe_timeout_event
);
1816 probe_timeout_event
= NULL
;
1818 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1822 void Monitor::reset_probe_timeout()
1824 cancel_probe_timeout();
1825 probe_timeout_event
= new C_MonContext
{this, [this](int r
) {
1828 double t
= g_conf()->mon_probe_timeout
;
1829 if (timer
.add_event_after(t
, probe_timeout_event
)) {
1830 dout(10) << "reset_probe_timeout " << probe_timeout_event
1831 << " after " << t
<< " seconds" << dendl
;
1833 probe_timeout_event
= nullptr;
1837 void Monitor::probe_timeout(int r
)
1839 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1840 ceph_assert(is_probing() || is_synchronizing());
1841 ceph_assert(probe_timeout_event
);
1842 probe_timeout_event
= NULL
;
1846 void Monitor::handle_probe(MonOpRequestRef op
)
1848 auto m
= op
->get_req
<MMonProbe
>();
1849 dout(10) << "handle_probe " << *m
<< dendl
;
1851 if (m
->fsid
!= monmap
->fsid
) {
1852 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1857 case MMonProbe::OP_PROBE
:
1858 handle_probe_probe(op
);
1861 case MMonProbe::OP_REPLY
:
1862 handle_probe_reply(op
);
1865 case MMonProbe::OP_MISSING_FEATURES
:
1866 derr
<< __func__
<< " require release " << (int)m
->mon_release
<< " > "
1867 << (int)ceph_release()
1868 << ", or missing features (have " << CEPH_FEATURES_ALL
1869 << ", required " << m
->required_features
1870 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
) << ")"
1876 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1878 auto m
= op
->get_req
<MMonProbe
>();
1880 dout(10) << "handle_probe_probe " << m
->get_source_inst() << *m
1881 << " features " << m
->get_connection()->get_features() << dendl
;
1882 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1883 if ((m
->mon_release
!= ceph_release_t::unknown
&&
1884 m
->mon_release
< monmap
->min_mon_release
) ||
1886 dout(1) << " peer " << m
->get_source_addr()
1887 << " release " << m
->mon_release
1888 << " < min_mon_release " << monmap
->min_mon_release
1889 << ", or missing features " << missing
<< dendl
;
1890 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1891 name
, has_ever_joined
, monmap
->min_mon_release
);
1892 m
->required_features
= required_features
;
1893 m
->get_connection()->send_message(r
);
1897 if (!is_probing() && !is_synchronizing()) {
1898 // If the probing mon is way ahead of us, we need to re-bootstrap.
1899 // Normally we capture this case when we initially bootstrap, but
1900 // it is possible we pass those checks (we overlap with
1901 // quorum-to-be) but fail to join a quorum before it moves past
1902 // us. We need to be kicked back to bootstrap so we can
1903 // synchonize, not keep calling elections.
1904 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1905 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1906 << "ahead of us, re-bootstrapping" << dendl
;
1914 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
,
1918 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1919 r
->paxos_first_version
= paxos
->get_first_committed();
1920 r
->paxos_last_version
= paxos
->get_version();
1921 m
->get_connection()->send_message(r
);
1923 // did we discover a peer here?
1924 if (!monmap
->contains(m
->get_source_addr())) {
1925 dout(1) << " adding peer " << m
->get_source_addrs()
1926 << " to list of hints" << dendl
;
1927 extra_probe_peers
.insert(m
->get_source_addrs());
1934 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1936 auto m
= op
->get_req
<MMonProbe
>();
1937 dout(10) << "handle_probe_reply " << m
->get_source_inst()
1938 << " " << *m
<< dendl
;
1939 dout(10) << " monmap is " << *monmap
<< dendl
;
1941 // discover name and addrs during probing or electing states.
1942 if (!is_probing() && !is_electing()) {
1946 // newer map, or they've joined a quorum and we haven't?
1948 monmap
->encode(mybl
, m
->get_connection()->get_features());
1949 // make sure it's actually different; the checks below err toward
1950 // taking the other guy's map, which could cause us to loop.
1951 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1952 MonMap
*newmap
= new MonMap
;
1953 newmap
->decode(m
->monmap_bl
);
1954 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1955 !has_ever_joined
)) {
1956 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1957 << ", mine was " << monmap
->get_epoch() << dendl
;
1959 monmap
->decode(m
->monmap_bl
);
1968 string peer_name
= monmap
->get_name(m
->get_source_addr());
1969 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
1970 dout(10) << " renaming peer " << m
->get_source_addr() << " "
1971 << peer_name
<< " -> " << m
->name
<< " in my monmap"
1973 monmap
->rename(peer_name
, m
->name
);
1975 if (is_electing()) {
1979 } else if (peer_name
.size()) {
1980 dout(10) << " peer name is " << peer_name
<< dendl
;
1982 dout(10) << " peer " << m
->get_source_addr() << " not in map" << dendl
;
1985 // new initial peer?
1986 if (monmap
->get_epoch() == 0 &&
1987 monmap
->contains(m
->name
) &&
1988 monmap
->get_addrs(m
->name
).front().is_blank_ip()) {
1989 dout(1) << " learned initial mon " << m
->name
1990 << " addrs " << m
->get_source_addrs() << dendl
;
1991 monmap
->set_addrvec(m
->name
, m
->get_source_addrs());
1997 // end discover phase
1998 if (!is_probing()) {
2002 ceph_assert(paxos
!= NULL
);
2004 if (is_synchronizing()) {
2005 dout(10) << " currently syncing" << dendl
;
2009 entity_addrvec_t other
= m
->get_source_addrs();
2011 if (m
->paxos_last_version
< sync_last_committed_floor
) {
2012 dout(10) << " peer paxos versions [" << m
->paxos_first_version
2013 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
2014 << sync_last_committed_floor
<< ", ignoring"
2017 if (paxos
->get_version() < m
->paxos_first_version
&&
2018 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
2019 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
2020 << "," << m
->paxos_last_version
<< "]"
2021 << " vs my version " << paxos
->get_version()
2022 << " (too far ahead)"
2024 cancel_probe_timeout();
2025 sync_start(other
, true);
2028 if (paxos
->get_version() + g_conf()->paxos_max_join_drift
< m
->paxos_last_version
) {
2029 dout(10) << " peer paxos last version " << m
->paxos_last_version
2030 << " vs my version " << paxos
->get_version()
2031 << " (too far ahead)"
2033 cancel_probe_timeout();
2034 sync_start(other
, false);
2039 // did the existing cluster complete upgrade to luminous?
2040 if (osdmon()->osdmap
.get_epoch()) {
2041 if (osdmon()->osdmap
.require_osd_release
< ceph_release_t::luminous
) {
2042 derr
<< __func__
<< " existing cluster has not completed upgrade to"
2043 << " luminous; 'ceph osd require_osd_release luminous' before"
2044 << " upgrading" << dendl
;
2047 if (!osdmon()->osdmap
.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS
) ||
2048 !osdmon()->osdmap
.test_flag(CEPH_OSDMAP_RECOVERY_DELETES
)) {
2049 derr
<< __func__
<< " existing cluster has not completed a full luminous"
2050 << " scrub to purge legacy snapdir objects; please scrub before"
2051 << " upgrading beyond luminous." << dendl
;
2056 // is there an existing quorum?
2057 if (m
->quorum
.size()) {
2058 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
2060 dout(10) << " peer paxos version " << m
->paxos_last_version
2061 << " vs my version " << paxos
->get_version()
2065 if (monmap
->contains(name
) &&
2066 !monmap
->get_addrs(name
).front().is_blank_ip()) {
2067 // i'm part of the cluster; just initiate a new election
2070 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl
;
2072 new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddrs()),
2073 *m
->quorum
.begin());
2076 if (monmap
->contains(m
->name
)) {
2077 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
2078 outside_quorum
.insert(m
->name
);
2080 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
2084 unsigned need
= monmap
->min_quorum_size();
2085 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
2086 if (outside_quorum
.size() >= need
) {
2087 if (outside_quorum
.count(name
)) {
2088 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
2091 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
2094 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
2099 void Monitor::join_election()
2101 dout(10) << __func__
<< dendl
;
2102 wait_for_paxos_write();
2104 state
= STATE_ELECTING
;
2106 logger
->inc(l_mon_num_elections
);
2109 void Monitor::start_election()
2111 dout(10) << "start_election" << dendl
;
2112 wait_for_paxos_write();
2114 state
= STATE_ELECTING
;
2116 logger
->inc(l_mon_num_elections
);
2117 logger
->inc(l_mon_election_call
);
2119 clog
->info() << "mon." << name
<< " calling monitor election";
2120 elector
.call_election();
2123 void Monitor::win_standalone_election()
2125 dout(1) << "win_standalone_election" << dendl
;
2127 // bump election epoch, in case the previous epoch included other
2128 // monitors; we need to be able to make the distinction.
2129 elector
.declare_standalone_victory();
2131 rank
= monmap
->get_rank(name
);
2132 ceph_assert(rank
== 0);
2136 map
<int,Metadata
> metadata
;
2137 collect_metadata(&metadata
[0]);
2139 win_election(elector
.get_epoch(), q
,
2141 ceph::features::mon::get_supported(),
2146 const utime_t
& Monitor::get_leader_since() const
2148 ceph_assert(state
== STATE_LEADER
);
2149 return leader_since
;
2152 epoch_t
Monitor::get_epoch()
2154 return elector
.get_epoch();
2157 void Monitor::_finish_svc_election()
2159 ceph_assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
2161 for (auto& svc
: paxos_service
) {
2162 // we already called election_finished() on monmon(); avoid callig twice
2163 if (state
== STATE_LEADER
&& svc
.get() == monmon())
2165 svc
->election_finished();
2169 void Monitor::win_election(epoch_t epoch
, const set
<int>& active
, uint64_t features
,
2170 const mon_feature_t
& mon_features
,
2171 ceph_release_t min_mon_release
,
2172 const map
<int,Metadata
>& metadata
)
2174 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
2175 << " features " << features
2176 << " mon_features " << mon_features
2177 << " min_mon_release " << min_mon_release
2179 ceph_assert(is_electing());
2180 state
= STATE_LEADER
;
2181 leader_since
= ceph_clock_now();
2182 quorum_since
= mono_clock::now();
2185 quorum_con_features
= features
;
2186 quorum_mon_features
= mon_features
;
2187 quorum_min_mon_release
= min_mon_release
;
2188 pending_metadata
= metadata
;
2189 outside_quorum
.clear();
2191 clog
->info() << "mon." << name
<< " is new leader, mons " << get_quorum_names()
2192 << " in quorum (ranks " << quorum
<< ")";
2194 set_leader_commands(get_local_commands(mon_features
));
2196 paxos
->leader_init();
2197 // NOTE: tell monmap monitor first. This is important for the
2198 // bootstrap case to ensure that the very first paxos proposal
2199 // codifies the monmap. Otherwise any manner of chaos can ensue
2200 // when monitors are call elections or participating in a paxos
2201 // round without agreeing on who the participants are.
2202 monmon()->election_finished();
2203 _finish_svc_election();
2205 logger
->inc(l_mon_election_win
);
2207 // inject new metadata in first transaction.
2209 // include previous metadata for missing mons (that aren't part of
2210 // the current quorum).
2211 map
<int,Metadata
> m
= metadata
;
2212 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
2213 if (m
.count(rank
) == 0 &&
2214 mon_metadata
.count(rank
)) {
2215 m
[rank
] = mon_metadata
[rank
];
2219 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2220 // a new transaction immediately after the election finishes. We should
2221 // do that anyway for other reasons, though.
2222 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
2225 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
2229 if (monmap
->size() > 1 &&
2230 monmap
->get_epoch() > 0) {
2232 health_tick_start();
2234 // Freshen the health status before doing health_to_clog in case
2235 // our just-completed election changed the health
2236 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r
){
2237 dout(20) << "healthmon now active" << dendl
;
2238 healthmon()->tick();
2239 if (healthmon()->is_proposing()) {
2240 dout(20) << __func__
<< " healthmon proposing, waiting" << dendl
;
2241 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext
{this,
2243 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
2244 do_health_to_clog_interval();
2248 do_health_to_clog_interval();
2252 scrub_event_start();
2256 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
2258 const mon_feature_t
& mon_features
,
2259 ceph_release_t min_mon_release
)
2262 leader_since
= utime_t();
2263 quorum_since
= mono_clock::now();
2266 outside_quorum
.clear();
2267 quorum_con_features
= features
;
2268 quorum_mon_features
= mon_features
;
2269 quorum_min_mon_release
= min_mon_release
;
2270 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
2271 << " quorum is " << quorum
<< " features are " << quorum_con_features
2272 << " mon_features are " << quorum_mon_features
2273 << " min_mon_release " << min_mon_release
2277 _finish_svc_election();
2279 logger
->inc(l_mon_election_lose
);
2285 std::string
collect_compression_algorithms()
2288 bool printed
= false;
2289 for (auto [name
, key
] : Compressor::compression_algorithms
) {
2302 void Monitor::collect_metadata(Metadata
*m
)
2304 collect_sys_info(m
, g_ceph_context
);
2305 (*m
)["addrs"] = stringify(messenger
->get_myaddrs());
2306 (*m
)["compression_algorithms"] = collect_compression_algorithms();
2308 // infer storage device
2309 string devname
= store
->get_devname();
2310 set
<string
> devnames
;
2311 get_raw_devices(devname
, &devnames
);
2312 map
<string
,string
> errs
;
2313 get_device_metadata(devnames
, m
, &errs
);
2314 for (auto& i
: errs
) {
2315 dout(1) << __func__
<< " " << i
.first
<< ": " << i
.second
<< dendl
;
2319 void Monitor::finish_election()
2321 apply_quorum_to_compatset_features();
2322 apply_monmap_to_compatset_features();
2324 exited_quorum
= utime_t();
2325 finish_contexts(g_ceph_context
, waitfor_quorum
);
2326 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2327 resend_routed_requests();
2329 register_cluster_logger();
2331 // enable authentication
2333 std::lock_guard
l(auth_lock
);
2334 authmon()->_set_mon_num_rank(monmap
->size(), rank
);
2337 // am i named properly?
2338 string cur_name
= monmap
->get_name(messenger
->get_myaddrs());
2339 if (cur_name
!= name
) {
2340 dout(10) << " renaming myself from " << cur_name
<< " -> " << name
<< dendl
;
2342 new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddrs()),
2347 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2349 if (new_features
.compare(features
) != 0) {
2350 CompatSet diff
= features
.unsupported(new_features
);
2351 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2352 features
= new_features
;
2354 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2356 store
->apply_transaction(t
);
2358 calc_quorum_requirements();
2362 void Monitor::apply_quorum_to_compatset_features()
2364 CompatSet
new_features(features
);
2365 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2366 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2367 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2369 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2370 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2371 dout(5) << __func__
<< dendl
;
2372 _apply_compatset_features(new_features
);
2375 void Monitor::apply_monmap_to_compatset_features()
2377 CompatSet
new_features(features
);
2378 mon_feature_t monmap_features
= monmap
->get_required_features();
2380 /* persistent monmap features may go into the compatset.
2381 * optional monmap features may not - why?
2382 * because optional monmap features may be set/unset by the admin,
2383 * and possibly by other means that haven't yet been thought out,
2384 * so we can't make the monitor enforce them on start - because they
2386 * this, of course, does not invalidate setting a compatset feature
2387 * for an optional feature - as long as you make sure to clean it up
2388 * once you unset it.
2390 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2391 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2392 ceph::features::mon::FEATURE_KRAKEN
));
2393 // this feature should only ever be set if the quorum supports it.
2394 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2395 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2397 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
2398 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2399 ceph::features::mon::FEATURE_LUMINOUS
));
2400 // this feature should only ever be set if the quorum supports it.
2401 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
));
2402 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
2404 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_MIMIC
)) {
2405 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2406 ceph::features::mon::FEATURE_MIMIC
));
2407 // this feature should only ever be set if the quorum supports it.
2408 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_MIMIC
));
2409 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC
);
2411 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_NAUTILUS
)) {
2412 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2413 ceph::features::mon::FEATURE_NAUTILUS
));
2414 // this feature should only ever be set if the quorum supports it.
2415 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_NAUTILUS
));
2416 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
);
2418 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_OCTOPUS
)) {
2419 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2420 ceph::features::mon::FEATURE_OCTOPUS
));
2421 // this feature should only ever be set if the quorum supports it.
2422 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_OCTOPUS
));
2423 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
);
2426 dout(5) << __func__
<< dendl
;
2427 _apply_compatset_features(new_features
);
2430 void Monitor::calc_quorum_requirements()
2432 required_features
= 0;
2435 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2436 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2438 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2439 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2441 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
)) {
2442 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2444 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC
)) {
2445 required_features
|= CEPH_FEATUREMASK_SERVER_MIMIC
;
2447 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
)) {
2448 required_features
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
|
2449 CEPH_FEATUREMASK_CEPHX_V2
;
2451 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
)) {
2452 required_features
|= CEPH_FEATUREMASK_SERVER_OCTOPUS
;
2456 if (monmap
->get_required_features().contains_all(
2457 ceph::features::mon::FEATURE_KRAKEN
)) {
2458 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2460 if (monmap
->get_required_features().contains_all(
2461 ceph::features::mon::FEATURE_LUMINOUS
)) {
2462 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2464 if (monmap
->get_required_features().contains_all(
2465 ceph::features::mon::FEATURE_MIMIC
)) {
2466 required_features
|= CEPH_FEATUREMASK_SERVER_MIMIC
;
2468 if (monmap
->get_required_features().contains_all(
2469 ceph::features::mon::FEATURE_NAUTILUS
)) {
2470 required_features
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
|
2471 CEPH_FEATUREMASK_CEPHX_V2
;
2473 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2476 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2478 *fm
+= session_map
.feature_map
;
2479 for (auto id
: quorum
) {
2481 *fm
+= quorum_feature_map
[id
];
2486 void Monitor::sync_force(Formatter
*f
)
2488 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2489 sync_stash_critical_state(tx
);
2490 tx
->put("mon_sync", "force_sync", 1);
2491 store
->apply_transaction(tx
);
2493 f
->open_object_section("sync_force");
2494 f
->dump_int("ret", 0);
2495 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2496 f
->close_section(); // sync_force
2499 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2501 bool free_formatter
= false;
2504 // louzy/lazy hack: default to json if no formatter has been defined
2505 f
= new JSONFormatter();
2506 free_formatter
= true;
2508 f
->open_object_section("quorum_status");
2509 f
->dump_int("election_epoch", get_epoch());
2511 f
->open_array_section("quorum");
2512 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2513 f
->dump_int("mon", *p
);
2514 f
->close_section(); // quorum
2516 list
<string
> quorum_names
= get_quorum_names();
2517 f
->open_array_section("quorum_names");
2518 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2519 f
->dump_string("mon", *p
);
2520 f
->close_section(); // quorum_names
2522 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(*quorum
.begin()));
2524 if (!quorum
.empty()) {
2527 std::chrono::duration_cast
<std::chrono::seconds
>(
2528 mono_clock::now() - quorum_since
).count());
2531 f
->open_object_section("features");
2532 f
->dump_stream("quorum_con") << quorum_con_features
;
2533 quorum_mon_features
.dump(f
, "quorum_mon");
2536 f
->open_object_section("monmap");
2538 f
->close_section(); // monmap
2540 f
->close_section(); // quorum_status
2546 void Monitor::get_mon_status(Formatter
*f
)
2548 f
->open_object_section("mon_status");
2549 f
->dump_string("name", name
);
2550 f
->dump_int("rank", rank
);
2551 f
->dump_string("state", get_state_name());
2552 f
->dump_int("election_epoch", get_epoch());
2554 f
->open_array_section("quorum");
2555 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2556 f
->dump_int("mon", *p
);
2558 f
->close_section(); // quorum
2560 if (!quorum
.empty()) {
2563 std::chrono::duration_cast
<std::chrono::seconds
>(
2564 mono_clock::now() - quorum_since
).count());
2567 f
->open_object_section("features");
2568 f
->dump_stream("required_con") << required_features
;
2569 mon_feature_t req_mon_features
= get_required_mon_features();
2570 req_mon_features
.dump(f
, "required_mon");
2571 f
->dump_stream("quorum_con") << quorum_con_features
;
2572 quorum_mon_features
.dump(f
, "quorum_mon");
2573 f
->close_section(); // features
2575 f
->open_array_section("outside_quorum");
2576 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2577 f
->dump_string("mon", *p
);
2578 f
->close_section(); // outside_quorum
2580 f
->open_array_section("extra_probe_peers");
2581 for (set
<entity_addrvec_t
>::iterator p
= extra_probe_peers
.begin();
2582 p
!= extra_probe_peers
.end();
2584 f
->dump_object("peer", *p
);
2586 f
->close_section(); // extra_probe_peers
2588 f
->open_array_section("sync_provider");
2589 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2590 p
!= sync_providers
.end();
2592 f
->dump_unsigned("cookie", p
->second
.cookie
);
2593 f
->dump_object("addrs", p
->second
.addrs
);
2594 f
->dump_stream("timeout") << p
->second
.timeout
;
2595 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2596 f
->dump_stream("last_key") << p
->second
.last_key
;
2600 if (is_synchronizing()) {
2601 f
->open_object_section("sync");
2602 f
->dump_stream("sync_provider") << sync_provider
;
2603 f
->dump_unsigned("sync_cookie", sync_cookie
);
2604 f
->dump_unsigned("sync_start_version", sync_start_version
);
2608 if (g_conf()->mon_sync_provider_kill_at
> 0)
2609 f
->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at
);
2610 if (g_conf()->mon_sync_requester_kill_at
> 0)
2611 f
->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at
);
2613 f
->open_object_section("monmap");
2617 f
->dump_object("feature_map", session_map
.feature_map
);
2618 f
->close_section(); // mon_status
2622 // health status to clog
2624 void Monitor::health_tick_start()
2626 if (!cct
->_conf
->mon_health_to_clog
||
2627 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2630 dout(15) << __func__
<< dendl
;
2633 health_tick_event
= timer
.add_event_after(
2634 cct
->_conf
->mon_health_to_clog_tick_interval
,
2635 new C_MonContext
{this, [this](int r
) {
2638 health_tick_start();
2642 void Monitor::health_tick_stop()
2644 dout(15) << __func__
<< dendl
;
2646 if (health_tick_event
) {
2647 timer
.cancel_event(health_tick_event
);
2648 health_tick_event
= NULL
;
2652 ceph::real_clock::time_point
Monitor::health_interval_calc_next_update()
2654 auto now
= ceph::real_clock::now();
2656 auto secs
= std::chrono::duration_cast
<std::chrono::seconds
>(now
.time_since_epoch());
2657 int remainder
= secs
.count() % cct
->_conf
->mon_health_to_clog_interval
;
2658 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2659 auto next
= secs
+ std::chrono::seconds(adjustment
);
2661 dout(20) << __func__
2662 << " now: " << now
<< ","
2663 << " next: " << next
<< ","
2664 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2667 return ceph::real_clock::time_point
{next
};
2670 void Monitor::health_interval_start()
2672 dout(15) << __func__
<< dendl
;
2674 if (!cct
->_conf
->mon_health_to_clog
||
2675 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2679 health_interval_stop();
2680 auto next
= health_interval_calc_next_update();
2681 health_interval_event
= new C_MonContext
{this, [this](int r
) {
2684 do_health_to_clog_interval();
2686 if (!timer
.add_event_at(next
, health_interval_event
)) {
2687 health_interval_event
= nullptr;
2691 void Monitor::health_interval_stop()
2693 dout(15) << __func__
<< dendl
;
2694 if (health_interval_event
) {
2695 timer
.cancel_event(health_interval_event
);
2697 health_interval_event
= NULL
;
2700 void Monitor::health_events_cleanup()
2703 health_interval_stop();
2704 health_status_cache
.reset();
2707 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2709 dout(20) << __func__
<< dendl
;
2711 if (changed
.count("mon_health_to_clog")) {
2712 if (!cct
->_conf
->mon_health_to_clog
) {
2713 health_events_cleanup();
2716 if (!health_tick_event
) {
2717 health_tick_start();
2719 if (!health_interval_event
) {
2720 health_interval_start();
2725 if (changed
.count("mon_health_to_clog_interval")) {
2726 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2727 health_interval_stop();
2729 health_interval_start();
2733 if (changed
.count("mon_health_to_clog_tick_interval")) {
2734 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2737 health_tick_start();
2742 void Monitor::do_health_to_clog_interval()
2744 // outputting to clog may have been disabled in the conf
2745 // since we were scheduled.
2746 if (!cct
->_conf
->mon_health_to_clog
||
2747 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2750 dout(10) << __func__
<< dendl
;
2752 // do we have a cached value for next_clog_update? if not,
2753 // do we know when the last update was?
2755 do_health_to_clog(true);
2756 health_interval_start();
2759 void Monitor::do_health_to_clog(bool force
)
2761 // outputting to clog may have been disabled in the conf
2762 // since we were scheduled.
2763 if (!cct
->_conf
->mon_health_to_clog
||
2764 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2767 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2770 health_status_t level
= healthmon()->get_health_status(false, nullptr, &summary
);
2772 summary
== health_status_cache
.summary
&&
2773 level
== health_status_cache
.overall
)
2776 if (g_conf()->mon_health_detail_to_clog
&&
2777 summary
!= health_status_cache
.summary
&&
2778 level
!= HEALTH_OK
) {
2780 level
= healthmon()->get_health_status(true, nullptr, &details
);
2781 clog
->health(level
) << "Health detail: " << details
;
2783 clog
->health(level
) << "overall " << summary
;
2785 health_status_cache
.summary
= summary
;
2786 health_status_cache
.overall
= level
;
2789 void Monitor::log_health(
2790 const health_check_map_t
& updated
,
2791 const health_check_map_t
& previous
,
2792 MonitorDBStore::TransactionRef t
)
2794 if (!g_conf()->mon_health_to_clog
) {
2798 const utime_t now
= ceph_clock_now();
2800 // FIXME: log atomically as part of @t instead of using clog.
2801 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2802 << " previous " << previous
.checks
.size()
2804 const auto min_log_period
= g_conf().get_val
<int64_t>(
2805 "mon_health_log_update_period");
2806 for (auto& p
: updated
.checks
) {
2807 auto q
= previous
.checks
.find(p
.first
);
2808 bool logged
= false;
2809 if (q
== previous
.checks
.end()) {
2812 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2814 clog
->health(p
.second
.severity
) << ss
.str();
2818 if (p
.second
.summary
!= q
->second
.summary
||
2819 p
.second
.severity
!= q
->second
.severity
) {
2821 auto status_iter
= health_check_log_times
.find(p
.first
);
2822 if (status_iter
!= health_check_log_times
.end()) {
2823 if (p
.second
.severity
== q
->second
.severity
&&
2824 now
- status_iter
->second
.updated_at
< min_log_period
) {
2825 // We already logged this recently and the severity is unchanged,
2826 // so skip emitting an update of the summary string.
2827 // We'll get an update out of tick() later if the check
2828 // is still failing.
2833 // summary or severity changed (ignore detail changes at this level)
2835 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2836 clog
->health(p
.second
.severity
) << ss
.str();
2841 // Record the time at which we last logged, so that we can check this
2842 // when considering whether/when to print update messages.
2844 auto iter
= health_check_log_times
.find(p
.first
);
2845 if (iter
== health_check_log_times
.end()) {
2846 health_check_log_times
.emplace(p
.first
, HealthCheckLogStatus(
2847 p
.second
.severity
, p
.second
.summary
, now
));
2849 iter
->second
= HealthCheckLogStatus(
2850 p
.second
.severity
, p
.second
.summary
, now
);
2854 for (auto& p
: previous
.checks
) {
2855 if (!updated
.checks
.count(p
.first
)) {
2858 if (p
.first
== "DEGRADED_OBJECTS") {
2859 clog
->info() << "All degraded objects recovered";
2860 } else if (p
.first
== "OSD_FLAGS") {
2861 clog
->info() << "OSD flags cleared";
2863 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2864 << p
.second
.summary
<< ")";
2867 if (health_check_log_times
.count(p
.first
)) {
2868 health_check_log_times
.erase(p
.first
);
2873 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2874 // We might be going into a fully healthy state, check
2876 bool any_checks
= false;
2877 for (auto& svc
: paxos_service
) {
2878 if (&(svc
->get_health_checks()) == &(previous
)) {
2879 // Ignore the ones we're clearing right now
2883 if (svc
->get_health_checks().checks
.size() > 0) {
2889 clog
->info() << "Cluster is now healthy";
2894 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
)
2897 f
->open_object_section("status");
2899 mono_clock::time_point now
= mono_clock::now();
2901 f
->dump_stream("fsid") << monmap
->get_fsid();
2902 healthmon()->get_health_status(false, f
, nullptr);
2903 f
->dump_unsigned("election_epoch", get_epoch());
2905 f
->open_array_section("quorum");
2906 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2907 f
->dump_int("rank", *p
);
2909 f
->open_array_section("quorum_names");
2910 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2911 f
->dump_string("id", monmap
->get_name(*p
));
2915 std::chrono::duration_cast
<std::chrono::seconds
>(
2916 mono_clock::now() - quorum_since
).count());
2918 f
->open_object_section("monmap");
2919 monmap
->dump_summary(f
);
2921 f
->open_object_section("osdmap");
2922 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
2924 f
->open_object_section("pgmap");
2925 mgrstatmon()->print_summary(f
, NULL
);
2927 f
->open_object_section("fsmap");
2928 mdsmon()->get_fsmap().print_summary(f
, NULL
);
2930 f
->open_object_section("mgrmap");
2931 mgrmon()->get_map().print_summary(f
, nullptr);
2934 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
2936 f
->open_object_section("progress_events");
2937 for (auto& i
: mgrstatmon()->get_progress_events()) {
2938 f
->dump_object(i
.first
.c_str(), i
.second
);
2944 ss
<< " cluster:\n";
2945 ss
<< " id: " << monmap
->get_fsid() << "\n";
2948 healthmon()->get_health_status(false, nullptr, &health
,
2950 ss
<< " health: " << health
<< "\n";
2952 ss
<< "\n \n services:\n";
2955 auto& service_map
= mgrstatmon()->get_service_map();
2956 for (auto& p
: service_map
.services
) {
2957 maxlen
= std::max(maxlen
, p
.first
.size());
2959 string
spacing(maxlen
- 3, ' ');
2960 const auto quorum_names
= get_quorum_names();
2961 const auto mon_count
= monmap
->mon_info
.size();
2962 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
2963 << quorum_names
<< " (age " << timespan_str(now
- quorum_since
) << ")";
2964 if (quorum_names
.size() != mon_count
) {
2965 std::list
<std::string
> out_of_q
;
2966 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
2967 if (quorum
.count(i
) == 0) {
2968 out_of_q
.push_back(monmap
->ranks
[i
]);
2971 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
2972 out_of_q
.end(), std::string(", "));
2975 if (mgrmon()->in_use()) {
2976 ss
<< " mgr: " << spacing
;
2977 mgrmon()->get_map().print_summary(nullptr, &ss
);
2980 if (mdsmon()->should_print_status()) {
2981 ss
<< " mds: " << spacing
<< mdsmon()->get_fsmap() << "\n";
2983 ss
<< " osd: " << spacing
;
2984 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
2986 for (auto& p
: service_map
.services
) {
2987 const std::string
&service
= p
.first
;
2988 // filter out normal ceph entity types
2989 if (ServiceMap::is_normal_ceph_entity(service
)) {
2992 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
2993 << p
.second
.get_summary() << "\n";
2998 auto& service_map
= mgrstatmon()->get_service_map();
2999 if (!service_map
.services
.empty()) {
3000 ss
<< "\n \n task status:\n";
3002 for (auto &p
: service_map
.services
) {
3003 ss
<< p
.second
.get_task_summary(p
.first
);
3009 ss
<< "\n \n data:\n";
3010 mgrstatmon()->print_summary(NULL
, &ss
);
3012 auto& pem
= mgrstatmon()->get_progress_events();
3014 ss
<< "\n \n progress:\n";
3015 for (auto& i
: pem
) {
3016 ss
<< " " << i
.second
.message
<< "\n";
3023 void Monitor::_generate_command_map(cmdmap_t
& cmdmap
,
3024 map
<string
,string
> ¶m_str_map
)
3026 for (auto p
= cmdmap
.begin(); p
!= cmdmap
.end(); ++p
) {
3027 if (p
->first
== "prefix")
3029 if (p
->first
== "caps") {
3031 if (cmd_getval(cmdmap
, "caps", cv
) &&
3032 cv
.size() % 2 == 0) {
3033 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
3034 string k
= string("caps_") + cv
[i
];
3035 param_str_map
[k
] = cv
[i
+ 1];
3040 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
3044 const MonCommand
*Monitor::_get_moncommand(
3045 const string
&cmd_prefix
,
3046 const vector
<MonCommand
>& cmds
)
3048 for (auto& c
: cmds
) {
3049 if (c
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
3056 bool Monitor::_allowed_command(MonSession
*s
, const string
&module
,
3057 const string
&prefix
, const cmdmap_t
& cmdmap
,
3058 const map
<string
,string
>& param_str_map
,
3059 const MonCommand
*this_cmd
) {
3061 bool cmd_r
= this_cmd
->requires_perm('r');
3062 bool cmd_w
= this_cmd
->requires_perm('w');
3063 bool cmd_x
= this_cmd
->requires_perm('x');
3065 bool capable
= s
->caps
.is_capable(
3068 module
, prefix
, param_str_map
,
3069 cmd_r
, cmd_w
, cmd_x
,
3070 s
->get_peer_socket_addr());
3072 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
3076 void Monitor::format_command_descriptions(const std::vector
<MonCommand
> &commands
,
3082 f
->open_object_section("command_descriptions");
3083 for (const auto &cmd
: commands
) {
3084 unsigned flags
= cmd
.flags
;
3085 ostringstream secname
;
3086 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
3087 dump_cmddesc_to_json(f
, features
, secname
.str(),
3088 cmd
.cmdstring
, cmd
.helpstring
, cmd
.module
,
3089 cmd
.req_perms
, flags
);
3092 f
->close_section(); // command_descriptions
3097 bool Monitor::is_keyring_required()
3099 return auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
) ||
3100 auth_service_required
.is_supported_auth(CEPH_AUTH_CEPHX
) ||
3101 auth_cluster_required
.is_supported_auth(CEPH_AUTH_GSS
) ||
3102 auth_service_required
.is_supported_auth(CEPH_AUTH_GSS
);
3105 struct C_MgrProxyCommand
: public Context
{
3111 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
3112 : mon(mon
), op(op
), size(s
) { }
3113 void finish(int r
) {
3114 std::lock_guard
l(mon
->lock
);
3115 mon
->mgr_proxy_bytes
-= size
;
3116 mon
->reply_command(op
, r
, outs
, outbl
, 0);
3120 void Monitor::handle_tell_command(MonOpRequestRef op
)
3122 ceph_assert(op
->is_type_command());
3123 MCommand
*m
= static_cast<MCommand
*>(op
->get_req());
3124 if (m
->fsid
!= monmap
->fsid
) {
3125 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
3126 return reply_tell_command(op
, -EACCES
, "wrong fsid");
3128 MonSession
*session
= op
->get_session();
3130 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
3134 if (stringstream ss
; !cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3135 return reply_tell_command(op
, -EINVAL
, ss
.str());
3137 map
<string
,string
> param_str_map
;
3138 _generate_command_map(cmdmap
, param_str_map
);
3140 if (!cmd_getval(cmdmap
, "prefix", prefix
)) {
3141 return reply_tell_command(op
, -EINVAL
, "no prefix");
3143 if (auto cmd
= _get_moncommand(prefix
,
3144 get_local_commands(quorum_mon_features
));
3146 if (cmd
->is_obsolete() ||
3147 (cct
->_conf
->mon_debug_deprecated_as_obsolete
&&
3148 cmd
->is_deprecated())) {
3149 return reply_tell_command(op
, -ENOTSUP
,
3150 "command is obsolete; "
3151 "please check usage and/or man page");
3154 // see if command is whitelisted
3155 if (!session
->caps
.is_capable(
3157 session
->entity_name
,
3158 "mon", prefix
, param_str_map
,
3160 session
->get_peer_socket_addr())) {
3161 return reply_tell_command(op
, -EACCES
, "insufficient caps");
3164 cct
->get_admin_socket()->queue_tell_command(m
);
3167 void Monitor::handle_command(MonOpRequestRef op
)
3169 ceph_assert(op
->is_type_command());
3170 auto m
= op
->get_req
<MMonCommand
>();
3171 if (m
->fsid
!= monmap
->fsid
) {
3172 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
3174 reply_command(op
, -EPERM
, "wrong fsid", 0);
3178 MonSession
*session
= op
->get_session();
3180 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
3184 if (m
->cmd
.empty()) {
3185 reply_command(op
, -EINVAL
, "no command specified", 0);
3190 vector
<string
> fullcmd
;
3192 stringstream ss
, ds
;
3196 rs
= "unrecognized command";
3198 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3199 // ss has reason for failure
3202 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3203 reply_command(op
, r
, rs
, 0);
3207 // check return value. If no prefix parameter provided,
3208 // return value will be false, then return error info.
3209 if (!cmd_getval(cmdmap
, "prefix", prefix
)) {
3210 reply_command(op
, -EINVAL
, "command prefix not found", 0);
3214 // check prefix is empty
3215 if (prefix
.empty()) {
3216 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
3220 if (prefix
== "get_command_descriptions") {
3222 Formatter
*f
= Formatter::create("json");
3224 std::vector
<MonCommand
> commands
= static_cast<MgrMonitor
*>(
3225 paxos_service
[PAXOS_MGR
].get())->get_command_descs();
3227 for (auto& c
: leader_mon_commands
) {
3228 commands
.push_back(c
);
3231 auto features
= m
->get_connection()->get_features();
3232 format_command_descriptions(commands
, f
, features
, &rdata
);
3234 reply_command(op
, 0, "", rdata
, 0);
3241 dout(0) << "handle_command " << *m
<< dendl
;
3244 cmd_getval(cmdmap
, "format", format
, string("plain"));
3245 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
3247 get_str_vec(prefix
, fullcmd
);
3249 // make sure fullcmd is not empty.
3250 // invalid prefix will cause empty vector fullcmd.
3251 // such as, prefix=";,,;"
3252 if (fullcmd
.empty()) {
3253 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3257 module
= fullcmd
[0];
3259 // validate command is in leader map
3261 const MonCommand
*leader_cmd
;
3262 const auto& mgr_cmds
= mgrmon()->get_command_descs();
3263 const MonCommand
*mgr_cmd
= nullptr;
3264 if (!mgr_cmds
.empty()) {
3265 mgr_cmd
= _get_moncommand(prefix
, mgr_cmds
);
3267 leader_cmd
= _get_moncommand(prefix
, leader_mon_commands
);
3269 leader_cmd
= mgr_cmd
;
3271 reply_command(op
, -EINVAL
, "command not known", 0);
3275 // validate command is in our map & matches, or forward if it is allowed
3276 const MonCommand
*mon_cmd
= _get_moncommand(
3278 get_local_commands(quorum_mon_features
));
3284 if (leader_cmd
->is_noforward()) {
3285 reply_command(op
, -EINVAL
,
3286 "command not locally supported and not allowed to forward",
3290 dout(10) << "Command not locally supported, forwarding request "
3292 forward_request_leader(op
);
3294 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3295 if (mon_cmd
->is_noforward()) {
3296 reply_command(op
, -EINVAL
,
3297 "command not compatible with leader and not allowed to forward",
3301 dout(10) << "Command not compatible with leader, forwarding request "
3303 forward_request_leader(op
);
3308 if (mon_cmd
->is_obsolete() ||
3309 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3310 && mon_cmd
->is_deprecated())) {
3311 reply_command(op
, -ENOTSUP
,
3312 "command is obsolete; please check usage and/or man page",
3317 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3318 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3319 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3323 /* what we perceive as being the service the command falls under */
3324 string
service(mon_cmd
->module
);
3326 dout(25) << __func__
<< " prefix='" << prefix
3327 << "' module='" << module
3328 << "' service='" << service
<< "'" << dendl
;
3331 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3333 // validate user's permissions for requested command
3334 map
<string
,string
> param_str_map
;
3335 _generate_command_map(cmdmap
, param_str_map
);
3336 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3337 param_str_map
, mon_cmd
)) {
3338 dout(1) << __func__
<< " access denied" << dendl
;
3339 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3340 << "from='" << session
->name
<< " " << session
->addrs
<< "' "
3341 << "entity='" << session
->entity_name
<< "' "
3342 << "cmd=" << m
->cmd
<< ": access denied";
3343 reply_command(op
, -EACCES
, "access denied", 0);
3347 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3348 << "from='" << session
->name
<< " " << session
->addrs
<< "' "
3349 << "entity='" << session
->entity_name
<< "' "
3350 << "cmd=" << m
->cmd
<< ": dispatch";
3352 // compat kludge for legacy clients trying to tell commands that are
3353 // new. see bottom of MonCommands.h. we need to handle both (1)
3354 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3355 // and octopus mons.
3356 if ((!HAVE_FEATURE(m
->get_connection()->get_features(), SERVER_OCTOPUS
) ||
3357 monmap
->min_mon_release
< ceph_release_t::octopus
) &&
3358 (prefix
== "injectargs" ||
3359 prefix
== "smart" ||
3360 prefix
== "mon_status" ||
3361 prefix
== "heap")) {
3362 if (m
->get_connection()->get_messenger() == 0) {
3363 // Prior to octopus, monitors might forward these messages
3364 // around. that was broken at baseline, and if we try to process
3365 // this message now, it will assert out when we try to send a
3366 // message in reply from the asok/tell worker (see
3367 // AnonConnection). Just reply with an error.
3368 dout(5) << __func__
<< " failing forwarded command from a (presumably) "
3369 << "pre-octopus peer" << dendl
;
3372 "failing forwarded tell command in mixed-version mon cluster", 0);
3375 dout(5) << __func__
<< " passing command to tell/asok" << dendl
;
3376 cct
->get_admin_socket()->queue_tell_command(m
);
3380 if (mon_cmd
->is_mgr()) {
3381 const auto& hdr
= m
->get_header();
3382 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3383 uint64_t max
= g_conf().get_val
<Option::size_t>("mon_client_bytes")
3384 * g_conf().get_val
<double>("mon_mgr_proxy_client_bytes_ratio");
3385 if (mgr_proxy_bytes
+ size
> max
) {
3386 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3387 << " + " << size
<< " > max " << max
<< dendl
;
3388 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3391 mgr_proxy_bytes
+= size
;
3392 dout(10) << __func__
<< " proxying mgr command (+" << size
3393 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3394 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3395 mgr_client
.start_command(m
->cmd
,
3399 new C_OnFinisher(fin
, &finisher
));
3403 if ((module
== "mds" || module
== "fs") &&
3404 prefix
!= "fs authorize") {
3405 mdsmon()->dispatch(op
);
3408 if ((module
== "osd" ||
3409 prefix
== "pg map" ||
3410 prefix
== "pg repeer") &&
3411 prefix
!= "osd last-stat-seq") {
3412 osdmon()->dispatch(op
);
3415 if (module
== "config") {
3416 configmon()->dispatch(op
);
3420 if (module
== "mon" &&
3421 /* Let the Monitor class handle the following commands:
3424 prefix
!= "mon scrub" &&
3425 prefix
!= "mon metadata" &&
3426 prefix
!= "mon versions" &&
3427 prefix
!= "mon count-metadata" &&
3428 prefix
!= "mon ok-to-stop" &&
3429 prefix
!= "mon ok-to-add-offline" &&
3430 prefix
!= "mon ok-to-rm") {
3431 monmon()->dispatch(op
);
3434 if (module
== "health" && prefix
!= "health") {
3435 healthmon()->dispatch(op
);
3438 if (module
== "auth" || prefix
== "fs authorize") {
3439 authmon()->dispatch(op
);
3442 if (module
== "log") {
3443 logmon()->dispatch(op
);
3447 if (module
== "config-key") {
3448 config_key_service
->dispatch(op
);
3452 if (module
== "mgr") {
3453 mgrmon()->dispatch(op
);
3457 if (prefix
== "fsid") {
3459 f
->open_object_section("fsid");
3460 f
->dump_stream("fsid") << monmap
->fsid
;
3467 reply_command(op
, 0, "", rdata
, 0);
3471 if (prefix
== "mon scrub") {
3472 wait_for_paxos_write();
3474 int r
= scrub_start();
3475 reply_command(op
, r
, "", rdata
, 0);
3476 } else if (is_peon()) {
3477 forward_request_leader(op
);
3479 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3484 if (prefix
== "time-sync-status") {
3486 f
.reset(Formatter::create("json-pretty"));
3487 f
->open_object_section("time_sync");
3488 if (!timecheck_skews
.empty()) {
3489 f
->open_object_section("time_skew_status");
3490 for (auto& i
: timecheck_skews
) {
3491 double skew
= i
.second
;
3492 double latency
= timecheck_latencies
[i
.first
];
3493 string name
= monmap
->get_name(i
.first
);
3495 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3496 f
->open_object_section(name
.c_str());
3497 f
->dump_float("skew", skew
);
3498 f
->dump_float("latency", latency
);
3499 f
->dump_stream("health") << tcstatus
;
3500 if (tcstatus
!= HEALTH_OK
) {
3501 f
->dump_stream("details") << tcss
.str();
3507 f
->open_object_section("timechecks");
3508 f
->dump_unsigned("epoch", get_epoch());
3509 f
->dump_int("round", timecheck_round
);
3510 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3511 "on-going" : "finished");
3517 } else if (prefix
== "status" ||
3518 prefix
== "health" ||
3521 cmd_getval(cmdmap
, "detail", detail
);
3523 if (prefix
== "status") {
3524 // get_cluster_status handles f == NULL
3525 get_cluster_status(ds
, f
.get());
3532 } else if (prefix
== "health") {
3534 healthmon()->get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3538 rdata
.append(plain
);
3540 } else if (prefix
== "df") {
3541 bool verbose
= (detail
== "detail");
3543 f
->open_object_section("stats");
3545 mgrstatmon()->dump_cluster_stats(&ds
, f
.get(), verbose
);
3549 mgrstatmon()->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3557 ceph_abort_msg("We should never get here!");
3563 } else if (prefix
== "report") {
3565 // this must be formatted, in its current form
3567 f
.reset(Formatter::create("json-pretty"));
3568 f
->open_object_section("report");
3569 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3570 f
->dump_string("version", ceph_version_to_str());
3571 f
->dump_string("commit", git_version_to_str());
3572 f
->dump_stream("timestamp") << ceph_clock_now();
3574 vector
<string
> tagsvec
;
3575 cmd_getval(cmdmap
, "tags", tagsvec
);
3576 string tagstr
= str_join(tagsvec
, " ");
3577 if (!tagstr
.empty())
3578 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3579 f
->dump_string("tag", tagstr
);
3581 healthmon()->get_health_status(true, f
.get(), nullptr);
3583 monmon()->dump_info(f
.get());
3584 osdmon()->dump_info(f
.get());
3585 mdsmon()->dump_info(f
.get());
3586 authmon()->dump_info(f
.get());
3587 mgrstatmon()->dump_info(f
.get());
3589 paxos
->dump_info(f
.get());
3595 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT_LEGACY
);
3598 } else if (prefix
== "osd last-stat-seq") {
3600 cmd_getval(cmdmap
, "id", osd
);
3601 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3603 f
->dump_unsigned("seq", seq
);
3611 } else if (prefix
== "node ls") {
3612 string
node_type("all");
3613 cmd_getval(cmdmap
, "type", node_type
);
3615 f
.reset(Formatter::create("json-pretty"));
3616 if (node_type
== "all") {
3617 f
->open_object_section("nodes");
3618 print_nodes(f
.get(), ds
);
3619 osdmon()->print_nodes(f
.get());
3620 mdsmon()->print_nodes(f
.get());
3621 mgrmon()->print_nodes(f
.get());
3623 } else if (node_type
== "mon") {
3624 print_nodes(f
.get(), ds
);
3625 } else if (node_type
== "osd") {
3626 osdmon()->print_nodes(f
.get());
3627 } else if (node_type
== "mds") {
3628 mdsmon()->print_nodes(f
.get());
3629 } else if (node_type
== "mgr") {
3630 mgrmon()->print_nodes(f
.get());
3636 } else if (prefix
== "features") {
3637 if (!is_leader() && !is_peon()) {
3638 dout(10) << " waiting for quorum" << dendl
;
3639 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3643 forward_request_leader(op
);
3647 f
.reset(Formatter::create("json-pretty"));
3649 get_combined_feature_map(&fm
);
3650 f
->dump_object("features", fm
);
3654 } else if (prefix
== "mon metadata") {
3656 f
.reset(Formatter::create("json-pretty"));
3659 bool all
= !cmd_getval(cmdmap
, "id", name
);
3661 // Dump a single mon's metadata
3662 int mon
= monmap
->get_rank(name
);
3664 rs
= "requested mon not found";
3668 f
->open_object_section("mon_metadata");
3669 r
= get_mon_metadata(mon
, f
.get(), ds
);
3672 // Dump all mons' metadata
3674 f
->open_array_section("mon_metadata");
3675 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3676 std::ostringstream get_err
;
3677 f
->open_object_section("mon");
3678 f
->dump_string("name", monmap
->get_name(rank
));
3679 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3681 if (r
== -ENOENT
|| r
== -EINVAL
) {
3682 dout(1) << get_err
.str() << dendl
;
3683 // Drop error, list what metadata we do have
3685 } else if (r
!= 0) {
3686 derr
<< "Unexpected error from get_mon_metadata: "
3687 << cpp_strerror(r
) << dendl
;
3688 ds
<< get_err
.str();
3698 } else if (prefix
== "mon versions") {
3700 f
.reset(Formatter::create("json-pretty"));
3701 count_metadata("ceph_version", f
.get());
3706 } else if (prefix
== "mon count-metadata") {
3708 f
.reset(Formatter::create("json-pretty"));
3710 cmd_getval(cmdmap
, "property", field
);
3711 count_metadata(field
, f
.get());
3716 } else if (prefix
== "quorum_status") {
3717 // make sure our map is readable and up to date
3718 if (!is_leader() && !is_peon()) {
3719 dout(10) << " waiting for quorum" << dendl
;
3720 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3723 _quorum_status(f
.get(), ds
);
3727 } else if (prefix
== "mon ok-to-stop") {
3729 if (!cmd_getval(cmdmap
, "ids", ids
)) {
3733 set
<string
> wouldbe
;
3734 for (auto rank
: quorum
) {
3735 wouldbe
.insert(monmap
->get_name(rank
));
3737 for (auto& n
: ids
) {
3738 if (monmap
->contains(n
)) {
3742 if (wouldbe
.size() < monmap
->min_quorum_size()) {
3744 rs
= "not enough monitors would be available (" + stringify(wouldbe
) +
3745 ") after stopping mons " + stringify(ids
);
3749 rs
= "quorum should be preserved (" + stringify(wouldbe
) +
3750 ") after stopping " + stringify(ids
);
3751 } else if (prefix
== "mon ok-to-add-offline") {
3752 if (quorum
.size() < monmap
->min_quorum_size(monmap
->size() + 1)) {
3753 rs
= "adding a monitor may break quorum (until that monitor starts)";
3757 rs
= "adding another mon that is not yet online will not break quorum";
3759 } else if (prefix
== "mon ok-to-rm") {
3761 if (!cmd_getval(cmdmap
, "id", id
)) {
3763 rs
= "must specify a monitor id";
3766 if (!monmap
->contains(id
)) {
3768 rs
= "mon." + id
+ " does not exist";
3771 int rank
= monmap
->get_rank(id
);
3772 if (quorum
.count(rank
) &&
3773 quorum
.size() - 1 < monmap
->min_quorum_size(monmap
->size() - 1)) {
3775 rs
= "removing mon." + id
+ " would break quorum";
3779 rs
= "safe to remove mon." + id
;
3780 } else if (prefix
== "version") {
3782 f
->open_object_section("version");
3783 f
->dump_string("version", pretty_version_to_str());
3787 ds
<< pretty_version_to_str();
3792 } else if (prefix
== "versions") {
3794 f
.reset(Formatter::create("json-pretty"));
3795 map
<string
,int> overall
;
3796 f
->open_object_section("version");
3797 map
<string
,int> mon
, mgr
, osd
, mds
;
3799 count_metadata("ceph_version", &mon
);
3800 f
->open_object_section("mon");
3801 for (auto& p
: mon
) {
3802 f
->dump_int(p
.first
.c_str(), p
.second
);
3803 overall
[p
.first
] += p
.second
;
3807 mgrmon()->count_metadata("ceph_version", &mgr
);
3808 f
->open_object_section("mgr");
3809 for (auto& p
: mgr
) {
3810 f
->dump_int(p
.first
.c_str(), p
.second
);
3811 overall
[p
.first
] += p
.second
;
3815 osdmon()->count_metadata("ceph_version", &osd
);
3816 f
->open_object_section("osd");
3817 for (auto& p
: osd
) {
3818 f
->dump_int(p
.first
.c_str(), p
.second
);
3819 overall
[p
.first
] += p
.second
;
3823 mdsmon()->count_metadata("ceph_version", &mds
);
3824 f
->open_object_section("mds");
3825 for (auto& p
: mds
) {
3826 f
->dump_int(p
.first
.c_str(), p
.second
);
3827 overall
[p
.first
] += p
.second
;
3831 for (auto& p
: mgrstatmon()->get_service_map().services
) {
3832 auto &service
= p
.first
;
3833 if (ServiceMap::is_normal_ceph_entity(service
)) {
3836 f
->open_object_section(service
.c_str());
3838 p
.second
.count_metadata("ceph_version", &m
);
3840 f
->dump_int(q
.first
.c_str(), q
.second
);
3841 overall
[q
.first
] += q
.second
;
3846 f
->open_object_section("overall");
3847 for (auto& p
: overall
) {
3848 f
->dump_int(p
.first
.c_str(), p
.second
);
3858 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3859 reply_command(op
, r
, rs
, rdata
, 0);
3862 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3865 reply_command(op
, rc
, rs
, rdata
, version
);
3868 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3869 bufferlist
& rdata
, version_t version
)
3871 auto m
= op
->get_req
<MMonCommand
>();
3872 ceph_assert(m
->get_type() == MSG_MON_COMMAND
);
3873 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3874 reply
->set_tid(m
->get_tid());
3875 reply
->set_data(rdata
);
3876 send_reply(op
, reply
);
3879 void Monitor::reply_tell_command(
3880 MonOpRequestRef op
, int rc
, const string
&rs
)
3882 MCommand
*m
= static_cast<MCommand
*>(op
->get_req());
3883 ceph_assert(m
->get_type() == MSG_COMMAND
);
3884 MCommandReply
*reply
= new MCommandReply(rc
, rs
);
3885 reply
->set_tid(m
->get_tid());
3886 m
->get_connection()->send_message(reply
);
3890 // ------------------------
3891 // request/reply routing
3893 // a client/mds/osd will connect to a random monitor. we need to forward any
3894 // messages requiring state updates to the leader, and then route any replies
3895 // back via the correct monitor and back to them. (the monitor will not
3896 // initiate any connections.)
3898 void Monitor::forward_request_leader(MonOpRequestRef op
)
3900 op
->mark_event(__func__
);
3902 int mon
= get_leader();
3903 MonSession
*session
= op
->get_session();
3904 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
3906 if (req
->get_source().is_mon() && req
->get_source_addrs() != messenger
->get_myaddrs()) {
3907 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
3908 } else if (session
->proxy_con
) {
3909 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
3910 } else if (!session
->closed
) {
3911 RoutedRequest
*rr
= new RoutedRequest
;
3912 rr
->tid
= ++routed_request_tid
;
3913 rr
->con
= req
->get_connection();
3914 rr
->con_features
= rr
->con
->get_features();
3915 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
3916 rr
->session
= static_cast<MonSession
*>(session
->get());
3918 routed_requests
[rr
->tid
] = rr
;
3919 session
->routed_request_tids
.insert(rr
->tid
);
3921 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
3922 << " features " << rr
->con_features
<< dendl
;
3924 MForward
*forward
= new MForward(rr
->tid
,
3928 forward
->set_priority(req
->get_priority());
3929 if (session
->auth_handler
) {
3930 forward
->entity_name
= session
->entity_name
;
3931 } else if (req
->get_source().is_mon()) {
3932 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
3934 send_mon_message(forward
, mon
);
3935 op
->mark_forwarded();
3936 ceph_assert(op
->get_req()->get_type() != 0);
3938 dout(10) << "forward_request no session for request " << *req
<< dendl
;
3942 // fake connection attached to forwarded messages
3943 struct AnonConnection
: public Connection
{
3944 entity_addr_t socket_addr
;
3946 int send_message(Message
*m
) override
{
3947 ceph_assert(!"send_message on anonymous connection");
3949 void send_keepalive() override
{
3950 ceph_assert(!"send_keepalive on anonymous connection");
3952 void mark_down() override
{
3955 void mark_disposable() override
{
3958 bool is_connected() override
{ return false; }
3959 entity_addr_t
get_peer_socket_addr() const override
{
3964 FRIEND_MAKE_REF(AnonConnection
);
3965 explicit AnonConnection(CephContext
*cct
, const entity_addr_t
& sa
)
3966 : Connection(cct
, nullptr),
3970 //extract the original message and put it into the regular dispatch function
3971 void Monitor::handle_forward(MonOpRequestRef op
)
3973 auto m
= op
->get_req
<MForward
>();
3974 dout(10) << "received forwarded message from "
3975 << ceph_entity_type_name(m
->client_type
)
3976 << " " << m
->client_addrs
3977 << " via " << m
->get_source_inst() << dendl
;
3978 MonSession
*session
= op
->get_session();
3979 ceph_assert(session
);
3981 if (!session
->is_capable("mon", MON_CAP_X
)) {
3982 dout(0) << "forward from entity with insufficient caps! "
3983 << session
->caps
<< dendl
;
3985 // see PaxosService::dispatch(); we rely on this being anon
3986 // (c->msgr == NULL)
3987 PaxosServiceMessage
*req
= m
->claim_message();
3988 ceph_assert(req
!= NULL
);
3990 auto c
= ceph::make_ref
<AnonConnection
>(cct
, m
->client_socket_addr
);
3991 MonSession
*s
= new MonSession(static_cast<Connection
*>(c
.get()));
3992 s
->_ident(req
->get_source(),
3993 req
->get_source_addrs());
3994 c
->set_priv(RefCountedPtr
{s
, false});
3995 c
->set_peer_addrs(m
->client_addrs
);
3996 c
->set_peer_type(m
->client_type
);
3997 c
->set_features(m
->con_features
);
3999 s
->authenticated
= true;
4000 s
->caps
= m
->client_caps
;
4001 dout(10) << " caps are " << s
->caps
<< dendl
;
4002 s
->entity_name
= m
->entity_name
;
4003 dout(10) << " entity name '" << s
->entity_name
<< "' type "
4004 << s
->entity_name
.get_type() << dendl
;
4005 s
->proxy_con
= m
->get_connection();
4006 s
->proxy_tid
= m
->tid
;
4008 req
->set_connection(c
);
4010 // not super accurate, but better than nothing.
4011 req
->set_recv_stamp(m
->get_recv_stamp());
4014 * note which election epoch this is; we will drop the message if
4015 * there is a future election since our peers will resend routed
4016 * requests in that case.
4018 req
->rx_election_epoch
= get_epoch();
4020 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
4023 // break the session <-> con ref loop by removing the con->session
4024 // reference, which is no longer needed once the MonOpRequest is
4030 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
4032 op
->mark_event(__func__
);
4034 MonSession
*session
= op
->get_session();
4035 ceph_assert(session
);
4036 Message
*req
= op
->get_req();
4037 ConnectionRef con
= op
->get_connection();
4039 reply
->set_cct(g_ceph_context
);
4040 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
4043 dout(2) << "send_reply no connection, dropping reply " << *reply
4044 << " to " << req
<< " " << *req
<< dendl
;
4046 op
->mark_event("reply: no connection");
4050 if (!session
->con
&& !session
->proxy_con
) {
4051 dout(2) << "send_reply no connection, dropping reply " << *reply
4052 << " to " << req
<< " " << *req
<< dendl
;
4054 op
->mark_event("reply: no connection");
4058 if (session
->proxy_con
) {
4059 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
4060 << " via " << session
->proxy_con
->get_peer_addr()
4061 << " for request " << *req
<< dendl
;
4062 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
4063 op
->mark_event("reply: send routed request");
4065 session
->con
->send_message(reply
);
4066 op
->mark_event("reply: send");
4070 void Monitor::no_reply(MonOpRequestRef op
)
4072 MonSession
*session
= op
->get_session();
4073 Message
*req
= op
->get_req();
4075 if (session
->proxy_con
) {
4076 dout(10) << "no_reply to " << req
->get_source_inst()
4077 << " via " << session
->proxy_con
->get_peer_addr()
4078 << " for request " << *req
<< dendl
;
4079 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
4080 op
->mark_event("no_reply: send routed request");
4082 dout(10) << "no_reply to " << req
->get_source_inst()
4083 << " " << *req
<< dendl
;
4084 op
->mark_event("no_reply");
4088 void Monitor::handle_route(MonOpRequestRef op
)
4090 auto m
= op
->get_req
<MRoute
>();
4091 MonSession
*session
= op
->get_session();
4093 if (!session
->is_capable("mon", MON_CAP_X
)) {
4094 dout(0) << "MRoute received from entity without appropriate perms! "
4099 dout(10) << "handle_route tid " << m
->session_mon_tid
<< " " << *m
->msg
4102 dout(10) << "handle_route tid " << m
->session_mon_tid
<< " null" << dendl
;
4105 if (m
->session_mon_tid
) {
4106 if (routed_requests
.count(m
->session_mon_tid
)) {
4107 RoutedRequest
*rr
= routed_requests
[m
->session_mon_tid
];
4109 // reset payload, in case encoding is dependent on target features
4111 m
->msg
->clear_payload();
4112 rr
->con
->send_message(m
->msg
);
4115 if (m
->send_osdmap_first
) {
4116 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
4117 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
4118 true, MonOpRequestRef());
4120 ceph_assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
4121 routed_requests
.erase(m
->session_mon_tid
);
4122 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
4125 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
4128 dout(10) << " not a routed request, ignoring" << dendl
;
4132 void Monitor::resend_routed_requests()
4134 dout(10) << "resend_routed_requests" << dendl
;
4135 int mon
= get_leader();
4136 list
<Context
*> retry
;
4137 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
4138 p
!= routed_requests
.end();
4140 RoutedRequest
*rr
= p
->second
;
4143 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
4144 rr
->op
->mark_event("retry routed request");
4145 retry
.push_back(new C_RetryMessage(this, rr
->op
));
4147 ceph_assert(rr
->session
->routed_request_tids
.count(p
->first
));
4148 rr
->session
->routed_request_tids
.erase(p
->first
);
4152 auto q
= rr
->request_bl
.cbegin();
4153 PaxosServiceMessage
*req
=
4154 (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
4155 rr
->op
->mark_event("resend forwarded message to leader");
4156 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
4158 MForward
*forward
= new MForward(rr
->tid
,
4162 req
->put(); // forward takes its own ref; drop ours.
4163 forward
->client_type
= rr
->con
->get_peer_type();
4164 forward
->client_addrs
= rr
->con
->get_peer_addrs();
4165 forward
->client_socket_addr
= rr
->con
->get_peer_socket_addr();
4166 forward
->set_priority(req
->get_priority());
4167 send_mon_message(forward
, mon
);
4171 routed_requests
.clear();
4172 finish_contexts(g_ceph_context
, retry
);
4176 void Monitor::remove_session(MonSession
*s
)
4178 dout(10) << "remove_session " << s
<< " " << s
->name
<< " " << s
->addrs
4179 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
4180 ceph_assert(s
->con
);
4181 ceph_assert(!s
->closed
);
4182 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
4183 p
!= s
->routed_request_tids
.end();
4185 ceph_assert(routed_requests
.count(*p
));
4186 RoutedRequest
*rr
= routed_requests
[*p
];
4187 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
4189 routed_requests
.erase(*p
);
4191 s
->routed_request_tids
.clear();
4192 s
->con
->set_priv(nullptr);
4193 session_map
.remove_session(s
);
4194 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4195 logger
->inc(l_mon_session_rm
);
4198 void Monitor::remove_all_sessions()
4200 std::lock_guard
l(session_map_lock
);
4201 while (!session_map
.sessions
.empty()) {
4202 MonSession
*s
= session_map
.sessions
.front();
4204 logger
->inc(l_mon_session_rm
);
4207 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4210 void Monitor::send_mon_message(Message
*m
, int rank
)
4212 messenger
->send_to_mon(m
, monmap
->get_addrs(rank
));
4215 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
4218 * Wait list the new session until we're in the quorum, assuming it's
4220 * tick() will periodically send them back through so we can send
4221 * the client elsewhere if we don't think we're getting back in.
4223 * But we whitelist a few sorts of messages:
4224 * 1) Monitors can talk to us at any time, of course.
4225 * 2) auth messages. It's unlikely to go through much faster, but
4226 * it's possible we've just lost our quorum status and we want to take...
4227 * 3) command messages. We want to accept these under all possible
4230 Message
*m
= op
->get_req();
4231 MonSession
*s
= op
->get_session();
4232 ConnectionRef con
= op
->get_connection();
4233 utime_t too_old
= ceph_clock_now();
4234 too_old
-= g_ceph_context
->_conf
->mon_lease
;
4235 if (m
->get_recv_stamp() > too_old
&&
4236 con
->is_connected()) {
4237 dout(5) << "waitlisting message " << *m
<< dendl
;
4238 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
4239 op
->mark_wait_for_quorum();
4241 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
4243 // proxied sessions aren't registered and don't have a con; don't remove
4245 if (!s
->proxy_con
) {
4246 std::lock_guard
l(session_map_lock
);
4253 void Monitor::_ms_dispatch(Message
*m
)
4255 if (is_shutdown()) {
4260 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
4261 bool src_is_mon
= op
->is_src_mon();
4262 op
->mark_event("mon:_ms_dispatch");
4263 MonSession
*s
= op
->get_session();
4264 if (s
&& s
->closed
) {
4268 if (src_is_mon
&& s
) {
4269 ConnectionRef con
= m
->get_connection();
4270 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
4271 // only update features if this is a non-anonymous connection
4272 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
4273 << " (was " << s
->con_features
4274 << ", now " << con
->get_features() << ")" << dendl
;
4275 // connection features changed - recreate session.
4276 if (s
->con
&& s
->con
!= con
) {
4277 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
4278 << " changed from session; mark down and replace" << dendl
;
4279 s
->con
->mark_down();
4281 if (s
->item
.is_on_list()) {
4282 // forwarded messages' sessions are not in the sessions map and
4283 // exist only while the op is being handled.
4284 std::lock_guard
l(session_map_lock
);
4292 // if the sender is not a monitor, make sure their first message for a
4293 // session is an MAuth. If it is not, assume it's a stray message,
4294 // and considering that we are creating a new session it is safe to
4295 // assume that the sender hasn't authenticated yet, so we have no way
4296 // of assessing whether we should handle it or not.
4297 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
4298 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4299 m
->get_type() != CEPH_MSG_PING
)) {
4300 dout(1) << __func__
<< " dropping stray message " << *m
4301 << " from " << m
->get_source_inst() << dendl
;
4305 ConnectionRef con
= m
->get_connection();
4307 std::lock_guard
l(session_map_lock
);
4308 s
= session_map
.new_session(m
->get_source(),
4309 m
->get_source_addrs(),
4313 con
->set_priv(RefCountedPtr
{s
, false});
4314 dout(10) << __func__
<< " new session " << s
<< " " << *s
4315 << " features 0x" << std::hex
4316 << s
->con_features
<< std::dec
<< dendl
;
4319 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4320 logger
->inc(l_mon_session_add
);
4323 // give it monitor caps; the peer type has been authenticated
4324 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4325 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4327 s
->authenticated
= true;
4330 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->name
4336 s
->session_timeout
= ceph_clock_now();
4337 s
->session_timeout
+= g_conf()->mon_session_timeout
;
4339 if (s
->auth_handler
) {
4340 s
->entity_name
= s
->auth_handler
->get_entity_name();
4342 dout(20) << " entity " << s
->entity_name
4343 << " caps " << s
->caps
.get_str() << dendl
;
4345 if ((is_synchronizing() ||
4346 (!s
->authenticated
&& !exited_quorum
.is_zero())) &&
4348 m
->get_type() != CEPH_MSG_PING
) {
4349 waitlist_or_zap_client(op
);
4356 void Monitor::dispatch_op(MonOpRequestRef op
)
4358 op
->mark_event("mon:dispatch_op");
4359 MonSession
*s
= op
->get_session();
4362 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4366 /* we will consider the default type as being 'monitor' until proven wrong */
4367 op
->set_type_monitor();
4368 /* deal with all messages that do not necessarily need caps */
4369 switch (op
->get_req()->get_type()) {
4371 case MSG_MON_GLOBAL_ID
:
4373 op
->set_type_service();
4374 /* no need to check caps here */
4375 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4382 op
->set_type_command();
4383 handle_tell_command(op
);
4387 if (!op
->get_session()->authenticated
) {
4388 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4389 << " is not authenticated, dropping " << *(op
->get_req())
4394 switch (op
->get_req()->get_type()) {
4395 case CEPH_MSG_MON_GET_MAP
:
4396 handle_mon_get_map(op
);
4399 case MSG_GET_CONFIG
:
4400 configmon()->handle_get_config(op
);
4403 case CEPH_MSG_MON_METADATA
:
4404 return handle_mon_metadata(op
);
4406 case CEPH_MSG_MON_SUBSCRIBE
:
4407 /* FIXME: check what's being subscribed, filter accordingly */
4408 handle_subscribe(op
);
4412 /* well, maybe the op belongs to a service... */
4413 op
->set_type_service();
4414 /* deal with all messages which caps should be checked somewhere else */
4415 switch (op
->get_req()->get_type()) {
4418 case CEPH_MSG_MON_GET_OSDMAP
:
4419 case CEPH_MSG_POOLOP
:
4420 case MSG_OSD_BEACON
:
4421 case MSG_OSD_MARK_ME_DOWN
:
4422 case MSG_OSD_MARK_ME_DEAD
:
4424 case MSG_OSD_FAILURE
:
4427 case MSG_OSD_PGTEMP
:
4428 case MSG_OSD_PG_CREATED
:
4429 case MSG_REMOVE_SNAPS
:
4430 case MSG_MON_GET_PURGED_SNAPS
:
4431 case MSG_OSD_PG_READY_TO_MERGE
:
4432 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4436 case MSG_MDS_BEACON
:
4437 case MSG_MDS_OFFLOAD_TARGETS
:
4438 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4442 case MSG_MGR_BEACON
:
4443 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4447 case MSG_MON_MGR_REPORT
:
4448 case CEPH_MSG_STATFS
:
4449 case MSG_GETPOOLSTATS
:
4450 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4455 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4458 // handle_command() does its own caps checking
4459 case MSG_MON_COMMAND
:
4460 op
->set_type_command();
4465 /* nop, looks like it's not a service message; revert back to monitor */
4466 op
->set_type_monitor();
4468 /* messages we, the Monitor class, need to deal with
4469 * but may be sent by clients. */
4471 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4472 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4473 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4478 switch (op
->get_req()->get_type()) {
4480 case CEPH_MSG_MON_GET_VERSION
:
4481 handle_get_version(op
);
4485 if (!op
->is_src_mon()) {
4486 dout(1) << __func__
<< " unexpected monitor message from"
4487 << " non-monitor entity " << op
->get_req()->get_source_inst()
4488 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4492 /* messages that should only be sent by another monitor */
4493 switch (op
->get_req()->get_type()) {
4503 // Sync (i.e., the new slurp, but on steroids)
4511 /* log acks are sent from a monitor we sent the MLog to, and are
4512 never sent by clients to us. */
4514 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4519 op
->set_type_service();
4520 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4526 op
->set_type_paxos();
4527 auto pm
= op
->get_req
<MMonPaxos
>();
4528 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4533 if (state
== STATE_SYNCHRONIZING
) {
4534 // we are synchronizing. These messages would do us no
4535 // good, thus just drop them and ignore them.
4536 dout(10) << __func__
<< " ignore paxos msg from "
4537 << pm
->get_source_inst() << dendl
;
4542 if (pm
->epoch
> get_epoch()) {
4546 if (pm
->epoch
!= get_epoch()) {
4550 paxos
->dispatch(op
);
4555 case MSG_MON_ELECTION
:
4556 op
->set_type_election();
4557 //check privileges here for simplicity
4558 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4559 dout(0) << "MMonElection received from entity without enough caps!"
4560 << op
->get_session()->caps
<< dendl
;
4563 if (!is_probing() && !is_synchronizing()) {
4564 elector
.dispatch(op
);
4573 dout(5) << __func__
<< " ignoring " << op
<< dendl
;
4575 case MSG_TIMECHECK2
:
4576 handle_timecheck(op
);
4579 case MSG_MON_HEALTH
:
4580 dout(5) << __func__
<< " dropping deprecated message: "
4581 << *op
->get_req() << dendl
;
4583 case MSG_MON_HEALTH_CHECKS
:
4584 op
->set_type_service();
4585 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4588 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4592 void Monitor::handle_ping(MonOpRequestRef op
)
4594 auto m
= op
->get_req
<MPing
>();
4595 dout(10) << __func__
<< " " << *m
<< dendl
;
4596 MPing
*reply
= new MPing
;
4598 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4599 f
->open_object_section("pong");
4601 healthmon()->get_health_status(false, f
.get(), nullptr);
4602 get_mon_status(f
.get());
4607 encode(ss
.str(), payload
);
4608 reply
->set_payload(payload
);
4609 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4610 m
->get_connection()->send_message(reply
);
4613 void Monitor::timecheck_start()
4615 dout(10) << __func__
<< dendl
;
4616 timecheck_cleanup();
4617 if (get_quorum_mon_features().contains_all(
4618 ceph::features::mon::FEATURE_NAUTILUS
)) {
4619 timecheck_start_round();
4623 void Monitor::timecheck_finish()
4625 dout(10) << __func__
<< dendl
;
4626 timecheck_cleanup();
4629 void Monitor::timecheck_start_round()
4631 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4632 ceph_assert(is_leader());
4634 if (monmap
->size() == 1) {
4635 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
4639 if (timecheck_round
% 2) {
4640 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4641 utime_t curr_time
= ceph_clock_now();
4642 double max
= g_conf()->mon_timecheck_interval
*3;
4643 if (curr_time
- timecheck_round_start
< max
) {
4644 dout(10) << __func__
<< " keep current round going" << dendl
;
4647 dout(10) << __func__
4648 << " finish current timecheck and start new" << dendl
;
4649 timecheck_cancel_round();
4653 ceph_assert(timecheck_round
% 2 == 0);
4656 timecheck_round_start
= ceph_clock_now();
4657 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4661 dout(10) << __func__
<< " setting up next event" << dendl
;
4662 timecheck_reset_event();
4665 void Monitor::timecheck_finish_round(bool success
)
4667 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4668 ceph_assert(timecheck_round
% 2);
4670 timecheck_round_start
= utime_t();
4673 ceph_assert(timecheck_waiting
.empty());
4674 ceph_assert(timecheck_acks
== quorum
.size());
4676 timecheck_check_skews();
4680 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4681 << " peers still waiting:";
4682 for (auto& p
: timecheck_waiting
) {
4683 *_dout
<< " mon." << p
.first
;
4686 timecheck_waiting
.clear();
4688 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4691 void Monitor::timecheck_cancel_round()
4693 timecheck_finish_round(false);
4696 void Monitor::timecheck_cleanup()
4698 timecheck_round
= 0;
4700 timecheck_round_start
= utime_t();
4702 if (timecheck_event
) {
4703 timer
.cancel_event(timecheck_event
);
4704 timecheck_event
= NULL
;
4706 timecheck_waiting
.clear();
4707 timecheck_skews
.clear();
4708 timecheck_latencies
.clear();
4710 timecheck_rounds_since_clean
= 0;
4713 void Monitor::timecheck_reset_event()
4715 if (timecheck_event
) {
4716 timer
.cancel_event(timecheck_event
);
4717 timecheck_event
= NULL
;
4721 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4723 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4724 delay
= cct
->_conf
->mon_timecheck_interval
;
4727 dout(10) << __func__
<< " delay " << delay
4728 << " rounds_since_clean " << timecheck_rounds_since_clean
4731 timecheck_event
= timer
.add_event_after(
4733 new C_MonContext
{this, [this](int) {
4734 timecheck_start_round();
4738 void Monitor::timecheck_check_skews()
4740 dout(10) << __func__
<< dendl
;
4741 ceph_assert(is_leader());
4742 ceph_assert((timecheck_round
% 2) == 0);
4743 if (monmap
->size() == 1) {
4744 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4747 ceph_assert(timecheck_latencies
.size() == timecheck_skews
.size());
4749 bool found_skew
= false;
4750 for (auto& p
: timecheck_skews
) {
4752 if (timecheck_has_skew(p
.second
, &abs_skew
)) {
4753 dout(10) << __func__
4754 << " " << p
.first
<< " skew " << abs_skew
<< dendl
;
4760 ++timecheck_rounds_since_clean
;
4761 timecheck_reset_event();
4762 } else if (timecheck_rounds_since_clean
> 0) {
4764 << " no clock skews found after " << timecheck_rounds_since_clean
4765 << " rounds" << dendl
;
4766 // make sure the skews are really gone and not just a transient success
4767 // this will run just once if not in the presence of skews again.
4768 timecheck_rounds_since_clean
= 1;
4769 timecheck_reset_event();
4770 timecheck_rounds_since_clean
= 0;
4775 void Monitor::timecheck_report()
4777 dout(10) << __func__
<< dendl
;
4778 ceph_assert(is_leader());
4779 ceph_assert((timecheck_round
% 2) == 0);
4780 if (monmap
->size() == 1) {
4781 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4785 ceph_assert(timecheck_latencies
.size() == timecheck_skews
.size());
4786 bool do_output
= true; // only output report once
4787 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4788 if (monmap
->get_name(*q
) == name
)
4791 MTimeCheck2
*m
= new MTimeCheck2(MTimeCheck2::OP_REPORT
);
4792 m
->epoch
= get_epoch();
4793 m
->round
= timecheck_round
;
4795 for (auto& it
: timecheck_skews
) {
4796 double skew
= it
.second
;
4797 double latency
= timecheck_latencies
[it
.first
];
4799 m
->skews
[it
.first
] = skew
;
4800 m
->latencies
[it
.first
] = latency
;
4803 dout(25) << __func__
<< " mon." << it
.first
4804 << " latency " << latency
4805 << " skew " << skew
<< dendl
;
4809 dout(10) << __func__
<< " send report to mon." << *q
<< dendl
;
4810 send_mon_message(m
, *q
);
4814 void Monitor::timecheck()
4816 dout(10) << __func__
<< dendl
;
4817 ceph_assert(is_leader());
4818 if (monmap
->size() == 1) {
4819 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4822 ceph_assert(timecheck_round
% 2 != 0);
4824 timecheck_acks
= 1; // we ack ourselves
4826 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4827 << " round " << timecheck_round
<< dendl
;
4829 // we are at the eye of the storm; the point of reference
4830 timecheck_skews
[rank
] = 0.0;
4831 timecheck_latencies
[rank
] = 0.0;
4833 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4834 if (monmap
->get_name(*it
) == name
)
4837 utime_t curr_time
= ceph_clock_now();
4838 timecheck_waiting
[*it
] = curr_time
;
4839 MTimeCheck2
*m
= new MTimeCheck2(MTimeCheck2::OP_PING
);
4840 m
->epoch
= get_epoch();
4841 m
->round
= timecheck_round
;
4842 dout(10) << __func__
<< " send " << *m
<< " to mon." << *it
<< dendl
;
4843 send_mon_message(m
, *it
);
4847 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
4848 const double skew_bound
,
4849 const double latency
)
4851 health_status_t status
= HEALTH_OK
;
4852 ceph_assert(latency
>= 0);
4855 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
4856 status
= HEALTH_WARN
;
4857 ss
<< "clock skew " << abs_skew
<< "s"
4858 << " > max " << g_conf()->mon_clock_drift_allowed
<< "s";
4864 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
4866 auto m
= op
->get_req
<MTimeCheck2
>();
4867 dout(10) << __func__
<< " " << *m
<< dendl
;
4868 /* handles PONG's */
4869 ceph_assert(m
->op
== MTimeCheck2::OP_PONG
);
4871 int other
= m
->get_source().num();
4872 if (m
->epoch
< get_epoch()) {
4873 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
4874 << " from " << other
4875 << " curr " << get_epoch()
4876 << " -- severely lagged? discard" << dendl
;
4879 ceph_assert(m
->epoch
== get_epoch());
4881 if (m
->round
< timecheck_round
) {
4882 dout(1) << __func__
<< " got old round " << m
->round
4883 << " from " << other
4884 << " curr " << timecheck_round
<< " -- discard" << dendl
;
4888 utime_t curr_time
= ceph_clock_now();
4890 ceph_assert(timecheck_waiting
.count(other
) > 0);
4891 utime_t timecheck_sent
= timecheck_waiting
[other
];
4892 timecheck_waiting
.erase(other
);
4893 if (curr_time
< timecheck_sent
) {
4894 // our clock was readjusted -- drop everything until it all makes sense.
4895 dout(1) << __func__
<< " our clock was readjusted --"
4896 << " bump round and drop current check"
4898 timecheck_cancel_round();
4902 /* update peer latencies */
4903 double latency
= (double)(curr_time
- timecheck_sent
);
4905 if (timecheck_latencies
.count(other
) == 0)
4906 timecheck_latencies
[other
] = latency
;
4908 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
4909 timecheck_latencies
[other
] = avg_latency
;
4915 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4916 * and 'a' happens to be lower than 'b'; so we use double instead.
4918 * latency is always expected to be >= 0.
4920 * delta, the difference between theirs timestamp and ours, may either be
4921 * lower or higher than 0; will hardly ever be 0.
4923 * The absolute skew is the absolute delta minus the latency, which is
4924 * taken as a whole instead of an rtt given that there is some queueing
4925 * and dispatch times involved and it's hard to assess how long exactly
4926 * it took for the message to travel to the other side and be handled. So
4927 * we call it a bounded skew, the worst case scenario.
4931 * Given that the latency is always positive, we can establish that the
4932 * bounded skew will be:
4934 * 1. positive if the absolute delta is higher than the latency and
4936 * 2. negative if the absolute delta is higher than the latency and
4937 * delta is negative.
4938 * 3. zero if the absolute delta is lower than the latency.
4940 * On 3. we make a judgement call and treat the skew as non-existent.
4941 * This is because that, if the absolute delta is lower than the
4942 * latency, then the apparently existing skew is nothing more than a
4943 * side-effect of the high latency at work.
4945 * This may not be entirely true though, as a severely skewed clock
4946 * may be masked by an even higher latency, but with high latencies
4947 * we probably have worse issues to deal with than just skewed clocks.
4949 ceph_assert(latency
>= 0);
4951 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
4952 double abs_delta
= (delta
> 0 ? delta
: -delta
);
4953 double skew_bound
= abs_delta
- latency
;
4957 skew_bound
= -skew_bound
;
4960 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
4961 if (status
!= HEALTH_OK
) {
4962 clog
->health(status
) << other
<< " " << ss
.str();
4965 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
4966 << " delta " << delta
<< " skew_bound " << skew_bound
4967 << " latency " << latency
<< dendl
;
4969 timecheck_skews
[other
] = skew_bound
;
4972 if (timecheck_acks
== quorum
.size()) {
4973 dout(10) << __func__
<< " got pongs from everybody ("
4974 << timecheck_acks
<< " total)" << dendl
;
4975 ceph_assert(timecheck_skews
.size() == timecheck_acks
);
4976 ceph_assert(timecheck_waiting
.empty());
4977 // everyone has acked, so bump the round to finish it.
4978 timecheck_finish_round();
4982 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
4984 auto m
= op
->get_req
<MTimeCheck2
>();
4985 dout(10) << __func__
<< " " << *m
<< dendl
;
4987 ceph_assert(is_peon());
4988 ceph_assert(m
->op
== MTimeCheck2::OP_PING
|| m
->op
== MTimeCheck2::OP_REPORT
);
4990 if (m
->epoch
!= get_epoch()) {
4991 dout(1) << __func__
<< " got wrong epoch "
4992 << "(ours " << get_epoch()
4993 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
4997 if (m
->round
< timecheck_round
) {
4998 dout(1) << __func__
<< " got old round " << m
->round
4999 << " current " << timecheck_round
5000 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
5004 timecheck_round
= m
->round
;
5006 if (m
->op
== MTimeCheck2::OP_REPORT
) {
5007 ceph_assert((timecheck_round
% 2) == 0);
5008 timecheck_latencies
.swap(m
->latencies
);
5009 timecheck_skews
.swap(m
->skews
);
5013 ceph_assert((timecheck_round
% 2) != 0);
5014 MTimeCheck2
*reply
= new MTimeCheck2(MTimeCheck2::OP_PONG
);
5015 utime_t curr_time
= ceph_clock_now();
5016 reply
->timestamp
= curr_time
;
5017 reply
->epoch
= m
->epoch
;
5018 reply
->round
= m
->round
;
5019 dout(10) << __func__
<< " send " << *m
5020 << " to " << m
->get_source_inst() << dendl
;
5021 m
->get_connection()->send_message(reply
);
5024 void Monitor::handle_timecheck(MonOpRequestRef op
)
5026 auto m
= op
->get_req
<MTimeCheck2
>();
5027 dout(10) << __func__
<< " " << *m
<< dendl
;
5030 if (m
->op
!= MTimeCheck2::OP_PONG
) {
5031 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
5033 handle_timecheck_leader(op
);
5035 } else if (is_peon()) {
5036 if (m
->op
!= MTimeCheck2::OP_PING
&& m
->op
!= MTimeCheck2::OP_REPORT
) {
5037 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
5039 handle_timecheck_peon(op
);
5042 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
5046 void Monitor::handle_subscribe(MonOpRequestRef op
)
5048 auto m
= op
->get_req
<MMonSubscribe
>();
5049 dout(10) << "handle_subscribe " << *m
<< dendl
;
5053 MonSession
*s
= op
->get_session();
5056 if (m
->hostname
.size()) {
5057 s
->remote_host
= m
->hostname
;
5060 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
5063 if (p
->first
== "monmap" || p
->first
== "config") {
5064 // these require no caps
5065 } else if (!s
->is_capable("mon", MON_CAP_R
)) {
5066 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
5067 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
5072 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5073 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
5076 // remove conflicting subscribes
5077 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
5078 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
5079 it
!= s
->sub_map
.end(); ) {
5080 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
5081 std::lock_guard
l(session_map_lock
);
5082 session_map
.remove_sub((it
++)->second
);
5090 std::lock_guard
l(session_map_lock
);
5091 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
5092 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
5093 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
5096 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
5097 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
5098 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
5099 Subscription
*sub
= s
->sub_map
[p
->first
];
5100 ceph_assert(sub
!= nullptr);
5101 mdsmon()->check_sub(sub
);
5103 } else if (p
->first
== "osdmap") {
5104 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
5105 if (s
->osd_epoch
> p
->second
.start
) {
5106 // client needs earlier osdmaps on purpose, so reset the sent epoch
5109 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
5111 } else if (p
->first
== "osd_pg_creates") {
5112 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
5113 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
5115 } else if (p
->first
== "monmap") {
5116 monmon()->check_sub(s
->sub_map
[p
->first
]);
5117 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
5118 logmon()->check_sub(s
->sub_map
[p
->first
]);
5119 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
5120 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
5121 } else if (p
->first
== "servicemap") {
5122 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
5123 } else if (p
->first
== "config") {
5124 configmon()->check_sub(s
);
5129 // we only need to reply if the client is old enough to think it
5130 // has to send renewals.
5131 ConnectionRef con
= m
->get_connection();
5132 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
5133 m
->get_connection()->send_message(new MMonSubscribeAck(
5134 monmap
->get_fsid(), (int)g_conf()->mon_subscribe_interval
));
5139 void Monitor::handle_get_version(MonOpRequestRef op
)
5141 auto m
= op
->get_req
<MMonGetVersion
>();
5142 dout(10) << "handle_get_version " << *m
<< dendl
;
5143 PaxosService
*svc
= NULL
;
5145 MonSession
*s
= op
->get_session();
5148 if (!is_leader() && !is_peon()) {
5149 dout(10) << " waiting for quorum" << dendl
;
5150 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
5154 if (m
->what
== "mdsmap") {
5156 } else if (m
->what
== "fsmap") {
5158 } else if (m
->what
== "osdmap") {
5160 } else if (m
->what
== "monmap") {
5163 derr
<< "invalid map type " << m
->what
<< dendl
;
5167 if (!svc
->is_readable()) {
5168 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
5172 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
5173 reply
->handle
= m
->handle
;
5174 reply
->version
= svc
->get_last_committed();
5175 reply
->oldest_version
= svc
->get_first_committed();
5176 reply
->set_tid(m
->get_tid());
5178 m
->get_connection()->send_message(reply
);
5184 bool Monitor::ms_handle_reset(Connection
*con
)
5186 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
5188 // ignore lossless monitor sessions
5189 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
5192 auto priv
= con
->get_priv();
5193 auto s
= static_cast<MonSession
*>(priv
.get());
5197 // break any con <-> session ref cycle
5198 s
->con
->set_priv(nullptr);
5203 std::lock_guard
l(lock
);
5205 dout(10) << "reset/close on session " << s
->name
<< " " << s
->addrs
<< dendl
;
5206 if (!s
->closed
&& s
->item
.is_on_list()) {
5207 std::lock_guard
l(session_map_lock
);
5213 bool Monitor::ms_handle_refused(Connection
*con
)
5215 // just log for now...
5216 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
5222 void Monitor::send_latest_monmap(Connection
*con
)
5225 monmap
->encode(bl
, con
->get_features());
5226 con
->send_message(new MMonMap(bl
));
5229 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
5231 auto m
= op
->get_req
<MMonGetMap
>();
5232 dout(10) << "handle_mon_get_map" << dendl
;
5233 send_latest_monmap(m
->get_connection().get());
5236 void Monitor::handle_mon_metadata(MonOpRequestRef op
)
5238 auto m
= op
->get_req
<MMonMetadata
>();
5240 dout(10) << __func__
<< dendl
;
5241 update_mon_metadata(m
->get_source().num(), std::move(m
->data
));
5245 void Monitor::update_mon_metadata(int from
, Metadata
&& m
)
5247 // NOTE: this is now for legacy (kraken or jewel) mons only.
5248 pending_metadata
[from
] = std::move(m
);
5250 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5252 encode(pending_metadata
, bl
);
5253 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5254 paxos
->trigger_propose();
5257 int Monitor::load_metadata()
5260 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5263 auto it
= bl
.cbegin();
5264 decode(mon_metadata
, it
);
5266 pending_metadata
= mon_metadata
;
5270 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
5273 if (!mon_metadata
.count(mon
)) {
5274 err
<< "mon." << mon
<< " not found";
5277 const Metadata
& m
= mon_metadata
[mon
];
5278 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
5279 f
->dump_string(p
->first
.c_str(), p
->second
);
5284 void Monitor::count_metadata(const string
& field
, map
<string
,int> *out
)
5286 for (auto& p
: mon_metadata
) {
5287 auto q
= p
.second
.find(field
);
5288 if (q
== p
.second
.end()) {
5289 (*out
)["unknown"]++;
5291 (*out
)[q
->second
]++;
5296 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5298 map
<string
,int> by_val
;
5299 count_metadata(field
, &by_val
);
5300 f
->open_object_section(field
.c_str());
5301 for (auto& p
: by_val
) {
5302 f
->dump_int(p
.first
.c_str(), p
.second
);
5307 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5309 map
<string
, list
<string
> > mons
; // hostname => mon
5310 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5311 it
!= mon_metadata
.end(); ++it
) {
5312 const Metadata
& m
= it
->second
;
5313 Metadata::const_iterator hostname
= m
.find("hostname");
5314 if (hostname
== m
.end()) {
5315 // not likely though
5318 mons
[hostname
->second
].push_back(monmap
->get_name(it
->first
));
5321 dump_services(f
, mons
, "mon");
5325 // ----------------------------------------------
5328 int Monitor::scrub_start()
5330 dout(10) << __func__
<< dendl
;
5331 ceph_assert(is_leader());
5333 if (!scrub_result
.empty()) {
5334 clog
->info() << "scrub already in progress";
5338 scrub_event_cancel();
5339 scrub_result
.clear();
5340 scrub_state
.reset(new ScrubState
);
5346 int Monitor::scrub()
5348 ceph_assert(is_leader());
5349 ceph_assert(scrub_state
);
5351 scrub_cancel_timeout();
5352 wait_for_paxos_write();
5353 scrub_version
= paxos
->get_version();
5356 // scrub all keys if we're the only monitor in the quorum
5358 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5360 for (set
<int>::iterator p
= quorum
.begin();
5365 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5367 r
->key
= scrub_state
->last_key
;
5368 send_mon_message(r
, *p
);
5372 bool r
= _scrub(&scrub_result
[rank
],
5373 &scrub_state
->last_key
,
5376 scrub_state
->finished
= !r
;
5378 // only after we got our scrub results do we really care whether the
5379 // other monitors are late on their results. Also, this way we avoid
5380 // triggering the timeout if we end up getting stuck in _scrub() for
5381 // longer than the duration of the timeout.
5382 scrub_reset_timeout();
5384 if (quorum
.size() == 1) {
5385 ceph_assert(scrub_state
->finished
== true);
5391 void Monitor::handle_scrub(MonOpRequestRef op
)
5393 auto m
= op
->get_req
<MMonScrub
>();
5394 dout(10) << __func__
<< " " << *m
<< dendl
;
5396 case MMonScrub::OP_SCRUB
:
5401 wait_for_paxos_write();
5403 if (m
->version
!= paxos
->get_version())
5406 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5410 reply
->key
= m
->key
;
5411 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5412 m
->get_connection()->send_message(reply
);
5416 case MMonScrub::OP_RESULT
:
5420 if (m
->version
!= scrub_version
)
5422 // reset the timeout each time we get a result
5423 scrub_reset_timeout();
5425 int from
= m
->get_source().num();
5426 ceph_assert(scrub_result
.count(from
) == 0);
5427 scrub_result
[from
] = m
->result
;
5429 if (scrub_result
.size() == quorum
.size()) {
5430 scrub_check_results();
5431 scrub_result
.clear();
5432 if (scrub_state
->finished
)
5442 bool Monitor::_scrub(ScrubResult
*r
,
5443 pair
<string
,string
> *start
,
5446 ceph_assert(r
!= NULL
);
5447 ceph_assert(start
!= NULL
);
5448 ceph_assert(num_keys
!= NULL
);
5450 set
<string
> prefixes
= get_sync_targets_names();
5451 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5453 dout(10) << __func__
<< " start (" << *start
<< ")"
5454 << " num_keys " << *num_keys
<< dendl
;
5456 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5458 int scrubbed_keys
= 0;
5459 pair
<string
,string
> last_key
;
5461 while (it
->has_next_chunk()) {
5463 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5466 pair
<string
,string
> k
= it
->get_next_key();
5467 if (prefixes
.count(k
.first
) == 0)
5470 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5471 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5472 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5478 int err
= store
->get(k
.first
, k
.second
, bl
);
5479 ceph_assert(err
== 0);
5481 uint32_t key_crc
= bl
.crc32c(0);
5482 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5483 << " crc " << key_crc
<< dendl
;
5484 r
->prefix_keys
[k
.first
]++;
5485 if (r
->prefix_crc
.count(k
.first
) == 0) {
5486 r
->prefix_crc
[k
.first
] = 0;
5488 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5490 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5491 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5492 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5493 r
->prefix_crc
[k
.first
] += 1;
5500 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5501 << " scrubbed_keys " << scrubbed_keys
5502 << " has_next " << it
->has_next_chunk() << dendl
;
5505 *num_keys
= scrubbed_keys
;
5507 return it
->has_next_chunk();
5510 void Monitor::scrub_check_results()
5512 dout(10) << __func__
<< dendl
;
5516 ScrubResult
& mine
= scrub_result
[rank
];
5517 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5518 p
!= scrub_result
.end();
5520 if (p
->first
== rank
)
5522 if (p
->second
!= mine
) {
5524 clog
->error() << "scrub mismatch";
5525 clog
->error() << " mon." << rank
<< " " << mine
;
5526 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5530 clog
->debug() << "scrub ok on " << quorum
<< ": " << mine
;
5533 inline void Monitor::scrub_timeout()
5535 dout(1) << __func__
<< " restarting scrub" << dendl
;
5540 void Monitor::scrub_finish()
5542 dout(10) << __func__
<< dendl
;
5544 scrub_event_start();
5547 void Monitor::scrub_reset()
5549 dout(10) << __func__
<< dendl
;
5550 scrub_cancel_timeout();
5552 scrub_result
.clear();
5553 scrub_state
.reset();
5556 inline void Monitor::scrub_update_interval(int secs
)
5558 // we don't care about changes if we are not the leader.
5559 // changes will be visible if we become the leader.
5563 dout(1) << __func__
<< " new interval = " << secs
<< dendl
;
5565 // if scrub already in progress, all changes will already be visible during
5566 // the next round. Nothing to do.
5567 if (scrub_state
!= NULL
)
5570 scrub_event_cancel();
5571 scrub_event_start();
5574 void Monitor::scrub_event_start()
5576 dout(10) << __func__
<< dendl
;
5579 scrub_event_cancel();
5581 if (cct
->_conf
->mon_scrub_interval
<= 0) {
5582 dout(1) << __func__
<< " scrub event is disabled"
5583 << " (mon_scrub_interval = " << cct
->_conf
->mon_scrub_interval
5588 scrub_event
= timer
.add_event_after(
5589 cct
->_conf
->mon_scrub_interval
,
5590 new C_MonContext
{this, [this](int) {
5595 void Monitor::scrub_event_cancel()
5597 dout(10) << __func__
<< dendl
;
5599 timer
.cancel_event(scrub_event
);
5604 inline void Monitor::scrub_cancel_timeout()
5606 if (scrub_timeout_event
) {
5607 timer
.cancel_event(scrub_timeout_event
);
5608 scrub_timeout_event
= NULL
;
5612 void Monitor::scrub_reset_timeout()
5614 dout(15) << __func__
<< " reset timeout event" << dendl
;
5615 scrub_cancel_timeout();
5616 scrub_timeout_event
= timer
.add_event_after(
5617 g_conf()->mon_scrub_timeout
,
5618 new C_MonContext
{this, [this](int) {
5623 /************ TICK ***************/
5624 void Monitor::new_tick()
5626 timer
.add_event_after(g_conf()->mon_tick_interval
, new C_MonContext
{this, [this](int) {
5631 void Monitor::tick()
5634 dout(11) << "tick" << dendl
;
5635 const utime_t now
= ceph_clock_now();
5637 // Check if we need to emit any delayed health check updated messages
5639 const auto min_period
= g_conf().get_val
<int64_t>(
5640 "mon_health_log_update_period");
5641 for (auto& svc
: paxos_service
) {
5642 auto health
= svc
->get_health_checks();
5644 for (const auto &i
: health
.checks
) {
5645 const std::string
&code
= i
.first
;
5646 const std::string
&summary
= i
.second
.summary
;
5647 const health_status_t severity
= i
.second
.severity
;
5649 auto status_iter
= health_check_log_times
.find(code
);
5650 if (status_iter
== health_check_log_times
.end()) {
5654 auto &log_status
= status_iter
->second
;
5655 bool const changed
= log_status
.last_message
!= summary
5656 || log_status
.severity
!= severity
;
5658 if (changed
&& now
- log_status
.updated_at
> min_period
) {
5659 log_status
.last_message
= summary
;
5660 log_status
.updated_at
= now
;
5661 log_status
.severity
= severity
;
5664 ss
<< "Health check update: " << summary
<< " (" << code
<< ")";
5665 clog
->health(severity
) << ss
.str();
5672 for (auto& svc
: paxos_service
) {
5679 std::lock_guard
l(session_map_lock
);
5680 auto p
= session_map
.sessions
.begin();
5682 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5683 now
> (exited_quorum
+ 2*g_conf()->mon_lease
));
5689 // don't trim monitors
5690 if (s
->name
.is_mon())
5693 if (s
->session_timeout
< now
&& s
->con
) {
5694 // check keepalive, too
5695 s
->session_timeout
= s
->con
->get_last_keepalive();
5696 s
->session_timeout
+= g_conf()->mon_session_timeout
;
5698 if (s
->session_timeout
< now
) {
5699 dout(10) << " trimming session " << s
->con
<< " " << s
->name
5701 << " (timeout " << s
->session_timeout
5702 << " < now " << now
<< ")" << dendl
;
5703 } else if (out_for_too_long
) {
5704 // boot the client Session because we've taken too long getting back in
5705 dout(10) << " trimming session " << s
->con
<< " " << s
->name
5706 << " because we've been out of quorum too long" << dendl
;
5711 s
->con
->mark_down();
5713 logger
->inc(l_mon_session_trim
);
5716 sync_trim_providers();
5718 if (!maybe_wait_for_quorum
.empty()) {
5719 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5722 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5723 // this is only necessary on upgraded clusters.
5724 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5725 prepare_new_fingerprint(t
);
5726 paxos
->trigger_propose();
5729 mgr_client
.update_daemon_health(get_health_metrics());
5733 vector
<DaemonHealthMetric
> Monitor::get_health_metrics()
5735 vector
<DaemonHealthMetric
> metrics
;
5737 utime_t oldest_secs
;
5738 const utime_t now
= ceph_clock_now();
5740 too_old
-= g_conf().get_val
<std::chrono::seconds
>("mon_op_complaint_time").count();
5742 TrackedOpRef oldest_op
;
5743 auto count_slow_ops
= [&](TrackedOp
& op
) {
5744 if (op
.get_initiated() < too_old
) {
5746 if (!oldest_op
|| op
.get_initiated() < oldest_op
->get_initiated()) {
5754 if (op_tracker
.visit_ops_in_flight(&oldest_secs
, count_slow_ops
)) {
5756 derr
<< __func__
<< " reporting " << slow
<< " slow ops, oldest is "
5757 << oldest_op
->get_desc() << dendl
;
5759 metrics
.emplace_back(daemon_metric::SLOW_OPS
, slow
, oldest_secs
);
5761 metrics
.emplace_back(daemon_metric::SLOW_OPS
, 0, 0);
5766 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5769 nf
.generate_random();
5770 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5774 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5777 int Monitor::check_fsid()
5780 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5783 ceph_assert(r
== 0);
5785 string
es(ebl
.c_str(), ebl
.length());
5787 // only keep the first line
5788 size_t pos
= es
.find_first_of('\n');
5789 if (pos
!= string::npos
)
5792 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5794 if (!ondisk
.parse(es
.c_str())) {
5795 derr
<< "error: unable to parse uuid" << dendl
;
5799 if (monmap
->get_fsid() != ondisk
) {
5800 derr
<< "error: cluster_uuid file exists with value " << ondisk
5801 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5808 int Monitor::write_fsid()
5810 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5812 int r
= store
->apply_transaction(t
);
5816 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5819 ss
<< monmap
->get_fsid() << "\n";
5820 string us
= ss
.str();
5825 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5830 * this is the closest thing to a traditional 'mkfs' for ceph.
5831 * initialize the monitor state machines to their initial values.
5833 int Monitor::mkfs(bufferlist
& osdmapbl
)
5835 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5837 // verify cluster fsid
5838 int r
= check_fsid();
5839 if (r
< 0 && r
!= -ENOENT
)
5843 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
5844 magicbl
.append("\n");
5845 t
->put(MONITOR_NAME
, "magic", magicbl
);
5848 features
= get_initial_supported_features();
5851 // save monmap, osdmap, keyring.
5852 bufferlist monmapbl
;
5853 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
5854 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5855 t
->put("mkfs", "monmap", monmapbl
);
5857 if (osdmapbl
.length()) {
5858 // make sure it's a valid osdmap
5861 om
.decode(osdmapbl
);
5863 catch (buffer::error
& e
) {
5864 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
5867 t
->put("mkfs", "osdmap", osdmapbl
);
5870 if (is_keyring_required()) {
5872 string keyring_filename
;
5874 r
= ceph_resolve_file_search(g_conf()->keyring
, keyring_filename
);
5876 derr
<< "unable to find a keyring file on " << g_conf()->keyring
5877 << ": " << cpp_strerror(r
) << dendl
;
5878 if (g_conf()->key
!= "") {
5879 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf()->key
+
5880 "\n\tcaps mon = \"allow *\"\n";
5882 bl
.append(keyring_plaintext
);
5884 auto i
= bl
.cbegin();
5885 keyring
.decode_plaintext(i
);
5887 catch (const buffer::error
& e
) {
5888 derr
<< "error decoding keyring " << keyring_plaintext
5889 << ": " << e
.what() << dendl
;
5896 r
= keyring
.load(g_ceph_context
, keyring_filename
);
5898 derr
<< "unable to load initial keyring " << g_conf()->keyring
<< dendl
;
5903 // put mon. key in external keyring; seed with everything else.
5904 extract_save_mon_key(keyring
);
5906 bufferlist keyringbl
;
5907 keyring
.encode_plaintext(keyringbl
);
5908 t
->put("mkfs", "keyring", keyringbl
);
5911 store
->apply_transaction(t
);
5916 int Monitor::write_default_keyring(bufferlist
& bl
)
5919 os
<< g_conf()->mon_data
<< "/keyring";
5922 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
|O_CLOEXEC
, 0600);
5925 dout(0) << __func__
<< " failed to open " << os
.str()
5926 << ": " << cpp_strerror(err
) << dendl
;
5930 err
= bl
.write_fd(fd
);
5933 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5938 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
5940 EntityName mon_name
;
5941 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
5943 if (keyring
.get_auth(mon_name
, mon_key
)) {
5944 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
5946 pkey
.add(mon_name
, mon_key
);
5948 pkey
.encode_plaintext(bl
);
5949 write_default_keyring(bl
);
5950 keyring
.remove(mon_name
);
5954 // AuthClient methods -- for mon <-> mon communication
5955 int Monitor::get_auth_request(
5957 AuthConnectionMeta
*auth_meta
,
5959 vector
<uint32_t> *preferred_modes
,
5962 std::scoped_lock
l(auth_lock
);
5963 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
&&
5964 con
->get_peer_type() != CEPH_ENTITY_TYPE_MGR
) {
5967 AuthAuthorizer
*auth
;
5968 if (!get_authorizer(con
->get_peer_type(), &auth
)) {
5971 auth_meta
->authorizer
.reset(auth
);
5972 auth_registry
.get_supported_modes(con
->get_peer_type(),
5975 *method
= auth
->protocol
;
5980 int Monitor::handle_auth_reply_more(
5982 AuthConnectionMeta
*auth_meta
,
5983 const bufferlist
& bl
,
5986 std::scoped_lock
l(auth_lock
);
5987 if (!auth_meta
->authorizer
) {
5988 derr
<< __func__
<< " no authorizer?" << dendl
;
5991 auth_meta
->authorizer
->add_challenge(cct
, bl
);
5992 *reply
= auth_meta
->authorizer
->bl
;
5996 int Monitor::handle_auth_done(
5998 AuthConnectionMeta
*auth_meta
,
6001 const bufferlist
& bl
,
6002 CryptoKey
*session_key
,
6003 std::string
*connection_secret
)
6005 std::scoped_lock
l(auth_lock
);
6006 // verify authorizer reply
6007 auto p
= bl
.begin();
6008 if (!auth_meta
->authorizer
->verify_reply(p
, connection_secret
)) {
6009 dout(0) << __func__
<< " failed verifying authorizer reply" << dendl
;
6012 auth_meta
->session_key
= auth_meta
->authorizer
->session_key
;
6016 int Monitor::handle_auth_bad_method(
6018 AuthConnectionMeta
*auth_meta
,
6019 uint32_t old_auth_method
,
6021 const std::vector
<uint32_t>& allowed_methods
,
6022 const std::vector
<uint32_t>& allowed_modes
)
6024 derr
<< __func__
<< " hmm, they didn't like " << old_auth_method
6025 << " result " << cpp_strerror(result
) << dendl
;
6029 bool Monitor::get_authorizer(int service_id
, AuthAuthorizer
**authorizer
)
6031 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id
)
6037 // we only connect to other monitors and mgr; every else connects to us.
6038 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
6039 service_id
!= CEPH_ENTITY_TYPE_MGR
)
6042 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
6044 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
6045 AuthNoneClientHandler handler
{g_ceph_context
};
6046 handler
.set_global_id(0);
6047 *authorizer
= handler
.build_authorizer(service_id
);
6051 CephXServiceTicketInfo auth_ticket_info
;
6052 CephXSessionAuthInfo info
;
6056 name
.set_type(CEPH_ENTITY_TYPE_MON
);
6057 auth_ticket_info
.ticket
.name
= name
;
6058 auth_ticket_info
.ticket
.global_id
= 0;
6060 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
6061 // mon to mon authentication uses the private monitor shared key and not the
6064 if (!keyring
.get_secret(name
, secret
) &&
6065 !key_server
.get_secret(name
, secret
)) {
6066 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6068 stringstream ss
, ds
;
6069 int err
= key_server
.list_secrets(ds
);
6071 ss
<< "no installed auth entries!";
6073 ss
<< "installed auth entries:";
6074 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
6078 ret
= key_server
.build_session_auth_info(
6079 service_id
, auth_ticket_info
.ticket
, info
, secret
, (uint64_t)-1);
6081 dout(0) << __func__
<< " failed to build mon session_auth_info "
6082 << cpp_strerror(ret
) << dendl
;
6085 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
6087 ret
= key_server
.build_session_auth_info(
6088 service_id
, auth_ticket_info
.ticket
, info
);
6090 derr
<< __func__
<< " failed to build mgr service session_auth_info "
6091 << cpp_strerror(ret
) << dendl
;
6095 ceph_abort(); // see check at top of fn
6098 CephXTicketBlob blob
;
6099 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
6100 dout(0) << "get_authorizer failed to build service ticket" << dendl
;
6103 bufferlist ticket_data
;
6104 encode(blob
, ticket_data
);
6106 auto iter
= ticket_data
.cbegin();
6107 CephXTicketHandler
handler(g_ceph_context
, service_id
);
6108 decode(handler
.ticket
, iter
);
6110 handler
.session_key
= info
.session_key
;
6112 *authorizer
= handler
.build_authorizer(0);
6117 int Monitor::handle_auth_request(
6119 AuthConnectionMeta
*auth_meta
,
6121 uint32_t auth_method
,
6122 const bufferlist
&payload
,
6125 std::scoped_lock
l(auth_lock
);
6127 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6128 // e.g., peer_features, peer_addrs, and others are still unknown.
6130 dout(10) << __func__
<< " con " << con
<< (more
? " (more)":" (start)")
6131 << " method " << auth_method
6132 << " payload " << payload
.length()
6134 if (!payload
.length()) {
6135 if (!con
->is_msgr2() &&
6136 con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
) {
6137 // for v1 connections, we tolerate no authorizer (from
6138 // non-monitors), because authentication happens via MAuth
6145 auth_meta
->auth_mode
= payload
[0];
6148 if (auth_meta
->auth_mode
>= AUTH_MODE_AUTHORIZER
&&
6149 auth_meta
->auth_mode
<= AUTH_MODE_AUTHORIZER_MAX
) {
6150 AuthAuthorizeHandler
*ah
= get_auth_authorize_handler(con
->get_peer_type(),
6153 lderr(cct
) << __func__
<< " no AuthAuthorizeHandler found for auth method "
6154 << auth_method
<< dendl
;
6157 bool was_challenge
= (bool)auth_meta
->authorizer_challenge
;
6158 bool isvalid
= ah
->verify_authorizer(
6162 auth_meta
->get_connection_secret_length(),
6165 &con
->peer_global_id
,
6166 &con
->peer_caps_info
,
6167 &auth_meta
->session_key
,
6168 &auth_meta
->connection_secret
,
6169 &auth_meta
->authorizer_challenge
);
6171 ms_handle_authentication(con
);
6174 if (!more
&& !was_challenge
&& auth_meta
->authorizer_challenge
) {
6177 dout(10) << __func__
<< " bad authorizer on " << con
<< dendl
;
6179 } else if (auth_meta
->auth_mode
< AUTH_MODE_MON
||
6180 auth_meta
->auth_mode
> AUTH_MODE_MON_MAX
) {
6181 derr
<< __func__
<< " unrecognized auth mode " << auth_meta
->auth_mode
6186 // wait until we've formed an initial quorum on mkfs so that we have
6187 // the initial keys (e.g., client.admin).
6188 if (authmon()->get_last_committed() == 0) {
6189 dout(10) << __func__
<< " haven't formed initial quorum, EBUSY" << dendl
;
6196 auto p
= payload
.begin();
6198 if (con
->get_priv()) {
6199 return -EACCES
; // wtf
6203 unique_ptr
<AuthServiceHandler
> auth_handler
{get_auth_service_handler(
6204 auth_method
, g_ceph_context
, &key_server
)};
6205 if (!auth_handler
) {
6206 dout(1) << __func__
<< " auth_method " << auth_method
<< " not supported"
6212 EntityName entity_name
;
6216 if (mode
< AUTH_MODE_MON
||
6217 mode
> AUTH_MODE_MON_MAX
) {
6218 dout(1) << __func__
<< " invalid mode " << (int)mode
<< dendl
;
6221 assert(mode
>= AUTH_MODE_MON
&& mode
<= AUTH_MODE_MON_MAX
);
6222 decode(entity_name
, p
);
6223 decode(con
->peer_global_id
, p
);
6224 } catch (buffer::error
& e
) {
6225 dout(1) << __func__
<< " failed to decode, " << e
.what() << dendl
;
6229 // supported method?
6230 if (entity_name
.get_type() == CEPH_ENTITY_TYPE_MON
||
6231 entity_name
.get_type() == CEPH_ENTITY_TYPE_OSD
||
6232 entity_name
.get_type() == CEPH_ENTITY_TYPE_MDS
||
6233 entity_name
.get_type() == CEPH_ENTITY_TYPE_MGR
) {
6234 if (!auth_cluster_required
.is_supported_auth(auth_method
)) {
6235 dout(10) << __func__
<< " entity " << entity_name
<< " method "
6236 << auth_method
<< " not among supported "
6237 << auth_cluster_required
.get_supported_set() << dendl
;
6241 if (!auth_service_required
.is_supported_auth(auth_method
)) {
6242 dout(10) << __func__
<< " entity " << entity_name
<< " method "
6243 << auth_method
<< " not among supported "
6244 << auth_cluster_required
.get_supported_set() << dendl
;
6249 // for msgr1 we would do some weirdness here to ensure signatures
6250 // are supported by the client if we require it. for msgr2 that
6251 // is not necessary.
6253 if (!con
->peer_global_id
) {
6254 con
->peer_global_id
= authmon()->_assign_global_id();
6255 if (!con
->peer_global_id
) {
6256 dout(1) << __func__
<< " failed to assign global_id" << dendl
;
6259 dout(10) << __func__
<< " assigned global_id " << con
->peer_global_id
6263 // set up partial session
6264 s
= new MonSession(con
);
6265 s
->auth_handler
= auth_handler
.release();
6266 con
->set_priv(RefCountedPtr
{s
, false});
6268 r
= s
->auth_handler
->start_session(
6270 auth_meta
->get_connection_secret_length(),
6272 &con
->peer_caps_info
,
6273 &auth_meta
->session_key
,
6274 &auth_meta
->connection_secret
);
6276 priv
= con
->get_priv();
6278 // this can happen if the async ms_handle_reset event races with
6279 // the unlocked call into handle_auth_request
6282 s
= static_cast<MonSession
*>(priv
.get());
6283 r
= s
->auth_handler
->handle_request(
6285 auth_meta
->get_connection_secret_length(),
6287 &con
->peer_global_id
,
6288 &con
->peer_caps_info
,
6289 &auth_meta
->session_key
,
6290 &auth_meta
->connection_secret
);
6293 !s
->authenticated
) {
6294 ms_handle_authentication(con
);
6297 dout(30) << " r " << r
<< " reply:\n";
6298 reply
->hexdump(*_dout
);
6303 void Monitor::ms_handle_accept(Connection
*con
)
6305 auto priv
= con
->get_priv();
6306 MonSession
*s
= static_cast<MonSession
*>(priv
.get());
6308 // legacy protocol v1?
6309 dout(10) << __func__
<< " con " << con
<< " no session" << dendl
;
6313 if (s
->item
.is_on_list()) {
6314 dout(10) << __func__
<< " con " << con
<< " session " << s
6315 << " already on list" << dendl
;
6317 dout(10) << __func__
<< " con " << con
<< " session " << s
6318 << " registering session for "
6319 << con
->get_peer_addrs() << dendl
;
6320 s
->_ident(entity_name_t(con
->get_peer_type(), con
->get_peer_id()),
6321 con
->get_peer_addrs());
6322 std::lock_guard
l(session_map_lock
);
6323 session_map
.add_session(s
);
6327 int Monitor::ms_handle_authentication(Connection
*con
)
6329 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
6330 // mon <-> mon connections need no Session, and setting one up
6331 // creates an awkward ref cycle between Session and Connection.
6335 auto priv
= con
->get_priv();
6336 MonSession
*s
= static_cast<MonSession
*>(priv
.get());
6338 // must be msgr2, otherwise dispatch would have set up the session.
6339 s
= session_map
.new_session(
6340 entity_name_t(con
->get_peer_type(), -1), // we don't know yet
6341 con
->get_peer_addrs(),
6344 dout(10) << __func__
<< " adding session " << s
<< " to con " << con
6347 logger
->set(l_mon_num_sessions
, session_map
.get_size());
6348 logger
->inc(l_mon_session_add
);
6350 dout(10) << __func__
<< " session " << s
<< " con " << con
6351 << " addr " << s
->con
->get_peer_addr()
6352 << " " << *s
<< dendl
;
6354 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
6356 if (caps_info
.allow_all
) {
6357 s
->caps
.set_allow_all();
6358 s
->authenticated
= true;
6360 } else if (caps_info
.caps
.length()) {
6361 bufferlist::const_iterator p
= caps_info
.caps
.cbegin();
6365 } catch (const buffer::error
&err
) {
6366 derr
<< __func__
<< " corrupt cap data for " << con
->get_peer_entity_name()
6367 << " in auth db" << dendl
;
6372 if (s
->caps
.parse(str
, NULL
)) {
6373 s
->authenticated
= true;
6376 derr
<< __func__
<< " unparseable caps '" << str
<< "' for "
6377 << con
->get_peer_entity_name() << dendl
;