1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include <boost/scope_exit.hpp>
24 #include <boost/algorithm/string/predicate.hpp>
26 #include "json_spirit/json_spirit_reader.h"
27 #include "json_spirit/json_spirit_writer.h"
30 #include "common/version.h"
31 #include "common/blkdev.h"
32 #include "common/cmdparse.h"
33 #include "common/signal.h"
35 #include "osd/OSDMap.h"
37 #include "MonitorDBStore.h"
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MMonMap.h"
41 #include "messages/MMonGetMap.h"
42 #include "messages/MMonGetVersion.h"
43 #include "messages/MMonGetVersionReply.h"
44 #include "messages/MGenericMessage.h"
45 #include "messages/MMonCommand.h"
46 #include "messages/MMonCommandAck.h"
47 #include "messages/MMonMetadata.h"
48 #include "messages/MMonSync.h"
49 #include "messages/MMonScrub.h"
50 #include "messages/MMonProbe.h"
51 #include "messages/MMonJoin.h"
52 #include "messages/MMonPaxos.h"
53 #include "messages/MRoute.h"
54 #include "messages/MForward.h"
56 #include "messages/MMonSubscribe.h"
57 #include "messages/MMonSubscribeAck.h"
59 #include "messages/MCommand.h"
60 #include "messages/MCommandReply.h"
62 #include "messages/MAuthReply.h"
64 #include "messages/MTimeCheck2.h"
65 #include "messages/MPing.h"
67 #include "common/strtol.h"
68 #include "common/ceph_argparse.h"
69 #include "common/Timer.h"
70 #include "common/Clock.h"
71 #include "common/errno.h"
72 #include "common/perf_counters.h"
73 #include "common/admin_socket.h"
74 #include "global/signal_handler.h"
75 #include "common/Formatter.h"
76 #include "include/stringify.h"
77 #include "include/color.h"
78 #include "include/ceph_fs.h"
79 #include "include/str_list.h"
81 #include "OSDMonitor.h"
82 #include "MDSMonitor.h"
83 #include "MonmapMonitor.h"
84 #include "LogMonitor.h"
85 #include "AuthMonitor.h"
86 #include "MgrMonitor.h"
87 #include "MgrStatMonitor.h"
88 #include "ConfigMonitor.h"
89 #include "mon/QuorumService.h"
90 #include "mon/HealthMonitor.h"
91 #include "mon/ConfigKeyService.h"
92 #include "common/config.h"
93 #include "common/cmdparse.h"
94 #include "include/ceph_assert.h"
95 #include "include/compat.h"
96 #include "perfglue/heap_profiler.h"
98 #include "auth/none/AuthNoneClientHandler.h"
100 #define dout_subsys ceph_subsys_mon
102 #define dout_prefix _prefix(_dout, this)
103 using namespace TOPNSPC::common
;
104 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
105 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
106 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
109 const string
Monitor::MONITOR_NAME
= "monitor";
110 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
115 #undef COMMAND_WITH_FLAG
116 #define FLAG(f) (MonCommand::FLAG_##f)
117 #define COMMAND(parsesig, helptext, modulename, req_perms) \
118 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
119 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
120 {parsesig, helptext, modulename, req_perms, flags},
121 MonCommand mon_commands
[] = {
122 #include <mon/MonCommands.h>
125 #undef COMMAND_WITH_FLAG
127 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
128 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
134 con_self(m
? m
->get_loopback_connection() : NULL
),
136 finisher(cct_
, "mon_finisher", "fin"),
137 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads
),
138 has_ever_joined(false),
139 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
141 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
142 key_server(cct
, &keyring
),
143 auth_cluster_required(cct
,
144 cct
->_conf
->auth_supported
.empty() ?
145 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
146 auth_service_required(cct
,
147 cct
->_conf
->auth_supported
.empty() ?
148 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
149 mgr_messenger(mgr_m
),
150 mgr_client(cct_
, mgr_m
, monmap
),
151 gss_ktfile_client(cct
->_conf
.get_val
<std::string
>("gss_ktab_client_file")),
155 required_features(0),
157 quorum_con_features(0),
161 scrub_timeout_event(NULL
),
164 sync_provider_count(0),
167 sync_start_version(0),
168 sync_timeout_event(NULL
),
169 sync_last_committed_floor(0),
173 timecheck_rounds_since_clean(0),
174 timecheck_event(NULL
),
176 paxos_service(PAXOS_NUM
),
178 routed_request_tid(0),
179 op_tracker(cct
, g_conf().get_val
<bool>("mon_enable_op_tracker"), 1)
181 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
182 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
184 update_log_clients();
186 if (!gss_ktfile_client
.empty()) {
187 // Assert we can export environment variable
189 The default client keytab is used, if it is present and readable,
190 to automatically obtain initial credentials for GSSAPI client
191 applications. The principal name of the first entry in the client
192 keytab is used by default when obtaining initial credentials.
193 1. The KRB5_CLIENT_KTNAME environment variable.
194 2. The default_client_keytab_name profile variable in [libdefaults].
195 3. The hardcoded default, DEFCKTNAME.
197 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
198 gss_ktfile_client
.c_str(), 1));
199 ceph_assert(set_result
== 0);
202 op_tracker
.set_complaint_and_threshold(
203 g_conf().get_val
<std::chrono::seconds
>("mon_op_complaint_time").count(),
204 g_conf().get_val
<int64_t>("mon_op_log_threshold"));
205 op_tracker
.set_history_size_and_duration(
206 g_conf().get_val
<uint64_t>("mon_op_history_size"),
207 g_conf().get_val
<std::chrono::seconds
>("mon_op_history_duration").count());
208 op_tracker
.set_history_slow_op_size_and_threshold(
209 g_conf().get_val
<uint64_t>("mon_op_history_slow_op_size"),
210 g_conf().get_val
<std::chrono::seconds
>("mon_op_history_slow_op_threshold").count());
212 paxos
= new Paxos(this, "paxos");
214 paxos_service
[PAXOS_MDSMAP
].reset(new MDSMonitor(this, paxos
, "mdsmap"));
215 paxos_service
[PAXOS_MONMAP
].reset(new MonmapMonitor(this, paxos
, "monmap"));
216 paxos_service
[PAXOS_OSDMAP
].reset(new OSDMonitor(cct
, this, paxos
, "osdmap"));
217 paxos_service
[PAXOS_LOG
].reset(new LogMonitor(this, paxos
, "logm"));
218 paxos_service
[PAXOS_AUTH
].reset(new AuthMonitor(this, paxos
, "auth"));
219 paxos_service
[PAXOS_MGR
].reset(new MgrMonitor(this, paxos
, "mgr"));
220 paxos_service
[PAXOS_MGRSTAT
].reset(new MgrStatMonitor(this, paxos
, "mgrstat"));
221 paxos_service
[PAXOS_HEALTH
].reset(new HealthMonitor(this, paxos
, "health"));
222 paxos_service
[PAXOS_CONFIG
].reset(new ConfigMonitor(this, paxos
, "config"));
224 config_key_service
= new ConfigKeyService(this, paxos
);
226 bool r
= mon_caps
.parse("allow *", NULL
);
229 exited_quorum
= ceph_clock_now();
231 // prepare local commands
232 local_mon_commands
.resize(std::size(mon_commands
));
233 for (unsigned i
= 0; i
< std::size(mon_commands
); ++i
) {
234 local_mon_commands
[i
] = mon_commands
[i
];
236 MonCommand::encode_vector(local_mon_commands
, local_mon_commands_bl
);
238 prenautilus_local_mon_commands
= local_mon_commands
;
239 for (auto& i
: prenautilus_local_mon_commands
) {
240 std::string n
= cmddesc_get_prenautilus_compat(i
.cmdstring
);
241 if (n
!= i
.cmdstring
) {
242 dout(20) << " pre-nautilus cmd " << i
.cmdstring
<< " -> " << n
<< dendl
;
246 MonCommand::encode_vector(prenautilus_local_mon_commands
, prenautilus_local_mon_commands_bl
);
248 // assume our commands until we have an election. this only means
249 // we won't reply with EINVAL before the election; any command that
250 // actually matters will wait until we have quorum etc and then
251 // retry (and revalidate).
252 leader_mon_commands
= local_mon_commands
;
257 op_tracker
.on_shutdown();
259 paxos_service
.clear();
260 delete config_key_service
;
262 ceph_assert(session_map
.sessions
.empty());
266 class AdminHook
: public AdminSocketHook
{
269 explicit AdminHook(Monitor
*m
) : mon(m
) {}
270 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
273 bufferlist
& out
) override
{
275 int r
= mon
->do_admin_command(command
, cmdmap
, f
, errss
, outss
);
281 int Monitor::do_admin_command(
282 std::string_view command
,
283 const cmdmap_t
& cmdmap
,
288 std::lock_guard
l(lock
);
292 for (auto p
= cmdmap
.begin();
293 p
!= cmdmap
.end(); ++p
) {
294 if (p
->first
== "prefix")
298 args
+= cmd_vartype_stringify(p
->second
);
300 args
= "[" + args
+ "]";
302 bool read_only
= (command
== "mon_status" ||
303 command
== "mon metadata" ||
304 command
== "quorum_status" ||
306 command
== "sessions");
308 (read_only
? audit_clog
->debug() : audit_clog
->info())
309 << "from='admin socket' entity='admin socket' "
310 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
312 if (command
== "mon_status") {
314 } else if (command
== "quorum_status") {
315 _quorum_status(f
, out
);
316 } else if (command
== "sync_force") {
318 if ((!cmd_getval(cmdmap
, "validate", validate
)) ||
319 (validate
!= "--yes-i-really-mean-it")) {
320 err
<< "are you SURE? this will mean the monitor store will be erased "
321 "the next time the monitor is restarted. pass "
322 "'--yes-i-really-mean-it' if you really do.";
327 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
328 command
.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
329 if (!_add_bootstrap_peer_hint(command
, cmdmap
, out
))
331 } else if (command
== "quorum enter") {
332 elector
.start_participating();
334 out
<< "started responding to quorum, initiated new election";
335 } else if (command
== "quorum exit") {
337 elector
.stop_participating();
338 out
<< "stopped responding to quorum, initiated new election";
339 } else if (command
== "ops") {
340 (void)op_tracker
.dump_ops_in_flight(f
);
341 } else if (command
== "sessions") {
342 f
->open_array_section("sessions");
343 for (auto p
: session_map
.sessions
) {
344 f
->dump_object("session", *p
);
347 } else if (command
== "dump_historic_ops") {
348 if (!op_tracker
.dump_historic_ops(f
)) {
349 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
350 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
352 } else if (command
== "dump_historic_ops_by_duration" ) {
353 if (op_tracker
.dump_historic_ops(f
, true)) {
354 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
355 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
357 } else if (command
== "dump_historic_slow_ops") {
358 if (op_tracker
.dump_historic_slow_ops(f
, {})) {
359 err
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
360 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
362 } else if (command
== "quorum") {
364 cmd_getval(cmdmap
, "quorumcmd", quorumcmd
);
365 if (quorumcmd
== "exit") {
367 elector
.stop_participating();
368 out
<< "stopped responding to quorum, initiated new election" << std::endl
;
369 } else if (quorumcmd
== "enter") {
370 elector
.start_participating();
372 out
<< "started responding to quorum, initiated new election" << std::endl
;
374 err
<< "needs a valid 'quorum' command" << std::endl
;
376 } else if (command
== "smart") {
378 cmd_getval(cmdmap
, "devid", want_devid
);
380 string devname
= store
->get_devname();
381 set
<string
> devnames
;
382 get_raw_devices(devname
, &devnames
);
383 json_spirit::mObject json_map
;
384 uint64_t smart_timeout
= cct
->_conf
.get_val
<uint64_t>(
385 "mon_smart_report_timeout");
386 for (auto& devname
: devnames
) {
388 string devid
= get_device_id(devname
, &err
);
389 if (want_devid
.size() && want_devid
!= devid
) {
390 derr
<< "get_device_id failed on " << devname
<< ": " << err
<< dendl
;
393 json_spirit::mValue smart_json
;
394 if (block_device_get_metrics(devname
, smart_timeout
,
396 dout(10) << "block_device_get_metrics failed for /dev/" << devname
400 json_map
[devid
] = smart_json
;
402 json_spirit::write(json_map
, out
, json_spirit::pretty_print
);
403 } else if (command
== "heap") {
404 if (!ceph_using_tcmalloc()) {
405 err
<< "could not issue heap profiler command -- not using tcmalloc!";
410 if (!cmd_getval(cmdmap
, "heapcmd", cmd
)) {
411 err
<< "unable to get value for command \"" << cmd
<< "\"";
415 std::vector
<std::string
> cmd_vec
;
416 get_str_vec(cmd
, cmd_vec
);
418 if (cmd_getval(cmdmap
, "value", val
)) {
419 cmd_vec
.push_back(val
);
421 ceph_heap_profiler_handle_command(cmd_vec
, out
);
422 } else if (command
== "compact") {
423 dout(1) << "triggering manual compaction" << dendl
;
424 auto start
= ceph::coarse_mono_clock::now();
425 store
->compact_async();
426 auto end
= ceph::coarse_mono_clock::now();
427 auto duration
= ceph::to_seconds
<double>(end
- start
);
428 dout(1) << "finished manual compaction in "
429 << duration
<< " seconds" << dendl
;
430 out
<< "compacted " << g_conf().get_val
<std::string
>("mon_keyvaluedb")
431 << " in " << duration
<< " seconds";
433 ceph_abort_msg("bad AdminSocket command binding");
435 (read_only
? audit_clog
->debug() : audit_clog
->info())
436 << "from='admin socket' "
437 << "entity='admin socket' "
438 << "cmd=" << command
<< " "
439 << "args=" << args
<< ": finished";
443 (read_only
? audit_clog
->debug() : audit_clog
->info())
444 << "from='admin socket' "
445 << "entity='admin socket' "
446 << "cmd=" << command
<< " "
447 << "args=" << args
<< ": aborted";
451 void Monitor::handle_signal(int signum
)
453 ceph_assert(signum
== SIGINT
|| signum
== SIGTERM
);
454 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
458 CompatSet
Monitor::get_initial_supported_features()
460 CompatSet::FeatureSet ceph_mon_feature_compat
;
461 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
462 CompatSet::FeatureSet ceph_mon_feature_incompat
;
463 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
464 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
465 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
466 ceph_mon_feature_incompat
);
469 CompatSet
Monitor::get_supported_features()
471 CompatSet compat
= get_initial_supported_features();
472 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
473 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
474 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
475 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
476 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
477 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
478 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC
);
479 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
);
480 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
);
484 CompatSet
Monitor::get_legacy_features()
486 CompatSet::FeatureSet ceph_mon_feature_compat
;
487 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
488 CompatSet::FeatureSet ceph_mon_feature_incompat
;
489 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
490 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
491 ceph_mon_feature_incompat
);
494 int Monitor::check_features(MonitorDBStore
*store
)
496 CompatSet required
= get_supported_features();
499 read_features_off_disk(store
, &ondisk
);
501 if (!required
.writeable(ondisk
)) {
502 CompatSet diff
= required
.unsupported(ondisk
);
503 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
510 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
512 bufferlist featuresbl
;
513 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
514 if (featuresbl
.length() == 0) {
515 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
516 << "Assuming it is old-style and introducing one." << dendl
;
517 //we only want the baseline ~v.18 features assumed to be on disk.
518 //If new features are introduced this code needs to disappear or
520 *features
= get_legacy_features();
522 features
->encode(featuresbl
);
523 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
524 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
525 store
->apply_transaction(t
);
527 auto it
= featuresbl
.cbegin();
528 features
->decode(it
);
532 void Monitor::read_features()
534 read_features_off_disk(store
, &features
);
535 dout(10) << "features " << features
<< dendl
;
537 calc_quorum_requirements();
538 dout(10) << "required_features " << required_features
<< dendl
;
541 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
545 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
548 const char** Monitor::get_tracked_conf_keys() const
550 static const char* KEYS
[] = {
551 "crushtool", // helpful for testing
552 "mon_election_timeout",
554 "mon_lease_renew_interval_factor",
555 "mon_lease_ack_timeout_factor",
556 "mon_accept_timeout_factor",
560 "clog_to_syslog_facility",
561 "clog_to_syslog_level",
563 "clog_to_graylog_host",
564 "clog_to_graylog_port",
567 // periodic health to clog
568 "mon_health_to_clog",
569 "mon_health_to_clog_interval",
570 "mon_health_to_clog_tick_interval",
572 "mon_scrub_interval",
573 "mon_allow_pool_delete",
574 // osdmap pruning - observed, not handled.
575 "mon_osdmap_full_prune_enabled",
576 "mon_osdmap_full_prune_min",
577 "mon_osdmap_full_prune_interval",
578 "mon_osdmap_full_prune_txsize",
579 // debug options - observed, not handled
580 "mon_debug_extra_checks",
581 "mon_debug_block_osdmap_trim",
587 void Monitor::handle_conf_change(const ConfigProxy
& conf
,
588 const std::set
<std::string
> &changed
)
592 dout(10) << __func__
<< " " << changed
<< dendl
;
594 if (changed
.count("clog_to_monitors") ||
595 changed
.count("clog_to_syslog") ||
596 changed
.count("clog_to_syslog_level") ||
597 changed
.count("clog_to_syslog_facility") ||
598 changed
.count("clog_to_graylog") ||
599 changed
.count("clog_to_graylog_host") ||
600 changed
.count("clog_to_graylog_port") ||
601 changed
.count("host") ||
602 changed
.count("fsid")) {
603 update_log_clients();
606 if (changed
.count("mon_health_to_clog") ||
607 changed
.count("mon_health_to_clog_interval") ||
608 changed
.count("mon_health_to_clog_tick_interval")) {
609 finisher
.queue(new C_MonContext
{this, [this, changed
](int) {
610 std::lock_guard l
{lock
};
611 health_to_clog_update_conf(changed
);
615 if (changed
.count("mon_scrub_interval")) {
616 int scrub_interval
= conf
->mon_scrub_interval
;
617 finisher
.queue(new C_MonContext
{this, [this, scrub_interval
](int) {
618 std::lock_guard l
{lock
};
619 scrub_update_interval(scrub_interval
);
624 void Monitor::update_log_clients()
626 map
<string
,string
> log_to_monitors
;
627 map
<string
,string
> log_to_syslog
;
628 map
<string
,string
> log_channel
;
629 map
<string
,string
> log_prio
;
630 map
<string
,string
> log_to_graylog
;
631 map
<string
,string
> log_to_graylog_host
;
632 map
<string
,string
> log_to_graylog_port
;
636 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
637 log_channel
, log_prio
, log_to_graylog
,
638 log_to_graylog_host
, log_to_graylog_port
,
642 clog
->update_config(log_to_monitors
, log_to_syslog
,
643 log_channel
, log_prio
, log_to_graylog
,
644 log_to_graylog_host
, log_to_graylog_port
,
647 audit_clog
->update_config(log_to_monitors
, log_to_syslog
,
648 log_channel
, log_prio
, log_to_graylog
,
649 log_to_graylog_host
, log_to_graylog_port
,
653 int Monitor::sanitize_options()
657 // mon_lease must be greater than mon_lease_renewal; otherwise we
658 // may incur in leases expiring before they are renewed.
659 if (g_conf()->mon_lease_renew_interval_factor
>= 1.0) {
660 clog
->error() << "mon_lease_renew_interval_factor ("
661 << g_conf()->mon_lease_renew_interval_factor
662 << ") must be less than 1.0";
666 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
667 // got time to renew the lease and get an ack for it. Having both options
668 // with the same value, for a given small vale, could mean timing out if
669 // the monitors happened to be overloaded -- or even under normal load for
670 // a small enough value.
671 if (g_conf()->mon_lease_ack_timeout_factor
<= 1.0) {
672 clog
->error() << "mon_lease_ack_timeout_factor ("
673 << g_conf()->mon_lease_ack_timeout_factor
674 << ") must be greater than 1.0";
681 int Monitor::preinit()
683 std::unique_lock
l(lock
);
685 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
687 int r
= sanitize_options();
689 derr
<< "option sanitization failed!" << dendl
;
693 ceph_assert(!logger
);
695 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
696 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess",
697 PerfCountersBuilder::PRIO_USEFUL
);
698 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions",
699 "sadd", PerfCountersBuilder::PRIO_INTERESTING
);
700 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions",
701 "srm", PerfCountersBuilder::PRIO_INTERESTING
);
702 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions",
703 "strm", PerfCountersBuilder::PRIO_USEFUL
);
704 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in",
705 "ecnt", PerfCountersBuilder::PRIO_USEFUL
);
706 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started",
707 "estt", PerfCountersBuilder::PRIO_INTERESTING
);
708 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won",
709 "ewon", PerfCountersBuilder::PRIO_INTERESTING
);
710 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost",
711 "elst", PerfCountersBuilder::PRIO_INTERESTING
);
712 logger
= pcb
.create_perf_counters();
713 cct
->get_perfcounters_collection()->add(logger
);
716 ceph_assert(!cluster_logger
);
718 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
719 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
720 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
721 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
722 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
723 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
724 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
725 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster", NULL
, 0, unit_t(UNIT_BYTES
));
726 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space", NULL
, 0, unit_t(UNIT_BYTES
));
727 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space", NULL
, 0, unit_t(UNIT_BYTES
));
728 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
729 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
730 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
731 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
732 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
733 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
734 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
735 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
736 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
737 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects", NULL
, 0, unit_t(UNIT_BYTES
));
738 cluster_logger
= pcb
.create_perf_counters();
741 paxos
->init_logger();
743 // verify cluster_uuid
745 int r
= check_fsid();
756 // have we ever joined a quorum?
757 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
758 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
760 if (!has_ever_joined
) {
761 // impose initial quorum restrictions?
762 list
<string
> initial_members
;
763 get_str_list(g_conf()->mon_initial_members
, initial_members
);
765 if (!initial_members
.empty()) {
766 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
768 monmap
->set_initial_members(
769 g_ceph_context
, initial_members
, name
, messenger
->get_myaddrs(),
772 dout(10) << " monmap is " << *monmap
<< dendl
;
773 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
775 } else if (!monmap
->contains(name
)) {
776 derr
<< "not in monmap and have been in a quorum before; "
777 << "must have been removed" << dendl
;
778 if (g_conf()->mon_force_quorum_join
) {
779 dout(0) << "we should have died but "
780 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
782 derr
<< "commit suicide!" << dendl
;
788 // We have a potentially inconsistent store state in hands. Get rid of it
790 bool clear_store
= false;
791 if (store
->exists("mon_sync", "in_sync")) {
792 dout(1) << __func__
<< " clean up potentially inconsistent store state"
797 if (store
->get("mon_sync", "force_sync") > 0) {
798 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
803 set
<string
> sync_prefixes
= get_sync_targets_names();
804 store
->clear(sync_prefixes
);
808 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
809 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
813 if (is_keyring_required()) {
814 // we need to bootstrap authentication keys so we can form an
816 if (authmon()->get_last_committed() == 0) {
817 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
819 int err
= store
->get("mkfs", "keyring", bl
);
820 if (err
== 0 && bl
.length() > 0) {
821 // Attempt to decode and extract keyring only if it is found.
823 auto p
= bl
.cbegin();
825 extract_save_mon_key(keyring
);
829 string keyring_loc
= g_conf()->mon_data
+ "/keyring";
831 r
= keyring
.load(cct
, keyring_loc
);
834 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
836 if (key_server
.get_auth(mon_name
, mon_key
)) {
837 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
838 keyring
.add(mon_name
, mon_key
);
840 keyring
.encode_plaintext(bl
);
841 write_default_keyring(bl
);
843 derr
<< "unable to load initial keyring " << g_conf()->keyring
<< dendl
;
849 admin_hook
= new AdminHook(this);
850 AdminSocket
* admin_socket
= cct
->get_admin_socket();
852 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
854 // register tell/asock commands
855 for (const auto& command
: local_mon_commands
) {
856 if (!command
.is_tell()) {
859 const auto prefix
= cmddesc_get_prefix(command
.cmdstring
);
860 if (prefix
== "injectargs" ||
861 prefix
== "version" ||
863 // not registerd by me
866 r
= admin_socket
->register_command(command
.cmdstring
, admin_hook
,
872 // add ourselves as a conf observer
873 g_conf().add_observer(this);
875 messenger
->set_auth_client(this);
876 messenger
->set_auth_server(this);
877 mgr_messenger
->set_auth_client(this);
879 auth_registry
.refresh_config();
886 dout(2) << "init" << dendl
;
887 std::lock_guard
l(lock
);
898 messenger
->add_dispatcher_tail(this);
900 // kickstart pet mgrclient
902 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
903 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
904 mgrmon()->prime_mgr_client();
906 state
= STATE_PROBING
;
908 // add features of myself into feature_map
909 session_map
.feature_map
.add_mon(con_self
->get_features());
913 void Monitor::init_paxos()
915 dout(10) << __func__
<< dendl
;
919 for (auto& svc
: paxos_service
) {
923 refresh_from_paxos(NULL
);
926 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
928 dout(10) << __func__
<< dendl
;
931 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
934 auto p
= bl
.cbegin();
935 decode(fingerprint
, p
);
937 catch (buffer::error
& e
) {
938 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
941 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
944 for (auto& svc
: paxos_service
) {
945 svc
->refresh(need_bootstrap
);
947 for (auto& svc
: paxos_service
) {
953 void Monitor::register_cluster_logger()
955 if (!cluster_logger_registered
) {
956 dout(10) << "register_cluster_logger" << dendl
;
957 cluster_logger_registered
= true;
958 cct
->get_perfcounters_collection()->add(cluster_logger
);
960 dout(10) << "register_cluster_logger - already registered" << dendl
;
964 void Monitor::unregister_cluster_logger()
966 if (cluster_logger_registered
) {
967 dout(10) << "unregister_cluster_logger" << dendl
;
968 cluster_logger_registered
= false;
969 cct
->get_perfcounters_collection()->remove(cluster_logger
);
971 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
975 void Monitor::update_logger()
977 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
978 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
981 void Monitor::shutdown()
983 dout(1) << "shutdown" << dendl
;
987 wait_for_paxos_write();
990 std::lock_guard
l(auth_lock
);
991 authmon()->_set_mon_num_rank(0, 0);
994 state
= STATE_SHUTDOWN
;
997 g_conf().remove_observer(this);
1001 cct
->get_admin_socket()->unregister_commands(admin_hook
);
1008 mgr_client
.shutdown();
1011 finisher
.wait_for_empty();
1017 for (auto& svc
: paxos_service
) {
1021 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
1022 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
1028 remove_all_sessions();
1030 log_client
.shutdown();
1032 // unlock before msgr shutdown...
1035 // shutdown messenger before removing logger from perfcounter collection,
1036 // otherwise _ms_dispatch() will try to update deleted logger
1037 messenger
->shutdown();
1038 mgr_messenger
->shutdown();
1041 cct
->get_perfcounters_collection()->remove(logger
);
1045 if (cluster_logger
) {
1046 if (cluster_logger_registered
)
1047 cct
->get_perfcounters_collection()->remove(cluster_logger
);
1048 delete cluster_logger
;
1049 cluster_logger
= NULL
;
1053 void Monitor::wait_for_paxos_write()
1055 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
1056 dout(10) << __func__
<< " flushing pending write" << dendl
;
1060 dout(10) << __func__
<< " flushed pending write" << dendl
;
1064 void Monitor::respawn()
1066 // --- WARNING TO FUTURE COPY/PASTERS ---
1067 // You must also add a call like
1069 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1071 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1072 // instead of "(exe)", so that killall (and log rotation) will work.
1074 dout(0) << __func__
<< dendl
;
1076 char *new_argv
[orig_argc
+1];
1077 dout(1) << " e: '" << orig_argv
[0] << "'" << dendl
;
1078 for (int i
=0; i
<orig_argc
; i
++) {
1079 new_argv
[i
] = (char *)orig_argv
[i
];
1080 dout(1) << " " << i
<< ": '" << orig_argv
[i
] << "'" << dendl
;
1082 new_argv
[orig_argc
] = NULL
;
1084 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1085 * This allows us to exec the same executable even if it has since been
1088 char exe_path
[PATH_MAX
] = "";
1090 if (readlink(PROCPREFIX
"/proc/self/exe", exe_path
, PATH_MAX
-1) != -1) {
1091 dout(1) << "respawning with exe " << exe_path
<< dendl
;
1092 strcpy(exe_path
, PROCPREFIX
"/proc/self/exe");
1097 /* Print CWD for the user's interest */
1099 char *cwd
= getcwd(buf
, sizeof(buf
));
1101 dout(1) << " cwd " << cwd
<< dendl
;
1103 /* Fall back to a best-effort: just running in our CWD */
1104 strncpy(exe_path
, orig_argv
[0], PATH_MAX
-1);
1107 dout(1) << " exe_path " << exe_path
<< dendl
;
1109 unblock_all_signals(NULL
);
1110 execv(exe_path
, new_argv
);
1112 dout(0) << "respawn execv " << orig_argv
[0]
1113 << " failed with " << cpp_strerror(errno
) << dendl
;
1115 // We have to assert out here, because suicide() returns, and callers
1116 // to respawn expect it never to return.
1120 void Monitor::bootstrap()
1122 dout(10) << "bootstrap" << dendl
;
1123 wait_for_paxos_write();
1125 sync_reset_requester();
1126 unregister_cluster_logger();
1127 cancel_probe_timeout();
1129 if (monmap
->get_epoch() == 0) {
1130 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl
;
1131 monmap
->calc_legacy_ranks();
1133 dout(10) << "monmap " << *monmap
<< dendl
;
1135 auto from_release
= monmap
->min_mon_release
;
1137 if (!can_upgrade_from(from_release
, "min_mon_release", err
)) {
1138 derr
<< "current monmap has " << err
.str() << " stopping." << dendl
;
1143 int newrank
= monmap
->get_rank(messenger
->get_myaddrs());
1144 if (newrank
< 0 && rank
>= 0) {
1145 // was i ever part of the quorum?
1146 if (has_ever_joined
) {
1147 dout(0) << " removed from monmap, suicide." << dendl
;
1152 monmap
->get_addrs(newrank
) != messenger
->get_myaddrs()) {
1153 dout(0) << " monmap addrs for rank " << newrank
<< " changed, i am "
1154 << messenger
->get_myaddrs()
1155 << ", monmap is " << monmap
->get_addrs(newrank
) << ", respawning"
1158 if (monmap
->get_epoch()) {
1159 // store this map in temp mon_sync location so that we use it on
1161 derr
<< " stashing newest monmap " << monmap
->get_epoch()
1162 << " for next startup" << dendl
;
1164 monmap
->encode(bl
, -1);
1165 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1166 t
->put("mon_sync", "temp_newer_monmap", bl
);
1167 store
->apply_transaction(t
);
1172 if (newrank
!= rank
) {
1173 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
1174 messenger
->set_myname(entity_name_t::MON(newrank
));
1177 // reset all connections, or else our peers will think we are someone else.
1178 messenger
->mark_down_all();
1182 state
= STATE_PROBING
;
1187 if (g_conf()->mon_compact_on_bootstrap
) {
1188 dout(10) << "bootstrap -- triggering compaction" << dendl
;
1190 dout(10) << "bootstrap -- finished compaction" << dendl
;
1193 // singleton monitor?
1194 if (monmap
->size() == 1 && rank
== 0) {
1195 win_standalone_election();
1199 reset_probe_timeout();
1201 // i'm outside the quorum
1202 if (monmap
->contains(name
))
1203 outside_quorum
.insert(name
);
1206 dout(10) << "probing other monitors" << dendl
;
1207 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1210 new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
,
1214 for (auto& av
: extra_probe_peers
) {
1215 if (av
!= messenger
->get_myaddrs()) {
1216 messenger
->send_to_mon(
1217 new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
,
1224 bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd
,
1225 const cmdmap_t
& cmdmap
,
1228 if (is_leader() || is_peon()) {
1229 ss
<< "mon already active; ignoring bootstrap hint";
1233 entity_addrvec_t addrs
;
1235 if (cmd_getval(cmdmap
, "addr", addrstr
)) {
1236 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' addr '"
1237 << addrstr
<< "'" << dendl
;
1240 const char *end
= 0;
1241 if (!addr
.parse(addrstr
.c_str(), &end
, entity_addr_t::TYPE_ANY
)) {
1242 ss
<< "failed to parse addrs '" << addrstr
1243 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1247 addrs
.v
.push_back(addr
);
1248 if (addr
.get_port() == 0) {
1249 addrs
.v
[0].set_type(entity_addr_t::TYPE_MSGR2
);
1250 addrs
.v
[0].set_port(CEPH_MON_PORT_IANA
);
1251 addrs
.v
.push_back(addr
);
1252 addrs
.v
[1].set_type(entity_addr_t::TYPE_LEGACY
);
1253 addrs
.v
[1].set_port(CEPH_MON_PORT_LEGACY
);
1254 } else if (addr
.get_type() == entity_addr_t::TYPE_ANY
) {
1255 if (addr
.get_port() == CEPH_MON_PORT_LEGACY
) {
1256 addrs
.v
[0].set_type(entity_addr_t::TYPE_LEGACY
);
1258 addrs
.v
[0].set_type(entity_addr_t::TYPE_MSGR2
);
1261 } else if (cmd_getval(cmdmap
, "addrv", addrstr
)) {
1262 dout(10) << "_add_bootstrap_peer_hintv '" << cmd
<< "' addrv '"
1263 << addrstr
<< "'" << dendl
;
1264 const char *end
= 0;
1265 if (!addrs
.parse(addrstr
.c_str(), &end
)) {
1266 ss
<< "failed to parse addrs '" << addrstr
1267 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1271 ss
<< "no addr or addrv provided";
1275 extra_probe_peers
.insert(addrs
);
1276 ss
<< "adding peer " << addrs
<< " to list: " << extra_probe_peers
;
1280 // called by bootstrap(), or on leader|peon -> electing
1281 void Monitor::_reset()
1283 dout(10) << __func__
<< dendl
;
1285 // disable authentication
1287 std::lock_guard
l(auth_lock
);
1288 authmon()->_set_mon_num_rank(0, 0);
1291 cancel_probe_timeout();
1293 health_events_cleanup();
1294 health_check_log_times
.clear();
1295 scrub_event_cancel();
1297 leader_since
= utime_t();
1299 if (!quorum
.empty()) {
1300 exited_quorum
= ceph_clock_now();
1303 outside_quorum
.clear();
1304 quorum_feature_map
.clear();
1310 for (auto& svc
: paxos_service
) {
1316 // -----------------------------------------------------------
1319 set
<string
> Monitor::get_sync_targets_names()
1321 set
<string
> targets
;
1322 targets
.insert(paxos
->get_name());
1323 for (auto& svc
: paxos_service
) {
1324 svc
->get_store_prefixes(targets
);
1326 ConfigKeyService
*config_key_service_ptr
= dynamic_cast<ConfigKeyService
*>(config_key_service
);
1327 ceph_assert(config_key_service_ptr
);
1328 config_key_service_ptr
->get_store_prefixes(targets
);
1333 void Monitor::sync_timeout()
1335 dout(10) << __func__
<< dendl
;
1336 ceph_assert(state
== STATE_SYNCHRONIZING
);
1340 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1342 dout(1) << __func__
<< dendl
;
1344 MonMap latest_monmap
;
1346 // Grab latest monmap from MonmapMonitor
1347 bufferlist monmon_bl
;
1348 int err
= monmon()->get_monmap(monmon_bl
);
1350 if (err
!= -ENOENT
) {
1352 << " something wrong happened while reading the store: "
1353 << cpp_strerror(err
) << dendl
;
1354 ceph_abort_msg("error reading the store");
1357 latest_monmap
.decode(monmon_bl
);
1360 // Grab last backed up monmap (if any) and compare epochs
1361 if (store
->exists("mon_sync", "latest_monmap")) {
1362 bufferlist backup_bl
;
1363 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1366 << " something wrong happened while reading the store: "
1367 << cpp_strerror(err
) << dendl
;
1368 ceph_abort_msg("error reading the store");
1370 ceph_assert(backup_bl
.length() > 0);
1372 MonMap backup_monmap
;
1373 backup_monmap
.decode(backup_bl
);
1375 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1376 latest_monmap
= backup_monmap
;
1379 // Check if our current monmap's epoch is greater than the one we've
1381 if (monmap
->epoch
> latest_monmap
.epoch
)
1382 latest_monmap
= *monmap
;
1384 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1386 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1389 void Monitor::sync_reset_requester()
1391 dout(10) << __func__
<< dendl
;
1393 if (sync_timeout_event
) {
1394 timer
.cancel_event(sync_timeout_event
);
1395 sync_timeout_event
= NULL
;
1398 sync_provider
= entity_addrvec_t();
1401 sync_start_version
= 0;
1404 void Monitor::sync_reset_provider()
1406 dout(10) << __func__
<< dendl
;
1407 sync_providers
.clear();
1410 void Monitor::sync_start(entity_addrvec_t
&addrs
, bool full
)
1412 dout(10) << __func__
<< " " << addrs
<< (full
? " full" : " recent") << dendl
;
1414 ceph_assert(state
== STATE_PROBING
||
1415 state
== STATE_SYNCHRONIZING
);
1416 state
= STATE_SYNCHRONIZING
;
1418 // make sure are not a provider for anyone!
1419 sync_reset_provider();
1424 // stash key state, and mark that we are syncing
1425 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1426 sync_stash_critical_state(t
);
1427 t
->put("mon_sync", "in_sync", 1);
1429 sync_last_committed_floor
= std::max(sync_last_committed_floor
, paxos
->get_version());
1430 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1431 << sync_last_committed_floor
<< dendl
;
1432 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1434 store
->apply_transaction(t
);
1436 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 1);
1438 // clear the underlying store
1439 set
<string
> targets
= get_sync_targets_names();
1440 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1441 store
->clear(targets
);
1443 // make sure paxos knows it has been reset. this prevents a
1444 // bootstrap and then different probe reply order from possibly
1445 // deciding a partial or no sync is needed.
1448 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 2);
1451 // assume 'other' as the leader. We will update the leader once we receive
1452 // a reply to the sync start.
1453 sync_provider
= addrs
;
1455 sync_reset_timeout();
1457 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1459 m
->last_committed
= paxos
->get_version();
1460 messenger
->send_to_mon(m
, sync_provider
);
1463 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1465 dout(10) << __func__
<< dendl
;
1466 bufferlist backup_monmap
;
1467 sync_obtain_latest_monmap(backup_monmap
);
1468 ceph_assert(backup_monmap
.length() > 0);
1469 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1472 void Monitor::sync_reset_timeout()
1474 dout(10) << __func__
<< dendl
;
1475 if (sync_timeout_event
)
1476 timer
.cancel_event(sync_timeout_event
);
1477 sync_timeout_event
= timer
.add_event_after(
1478 g_conf()->mon_sync_timeout
,
1479 new C_MonContext
{this, [this](int) {
1484 void Monitor::sync_finish(version_t last_committed
)
1486 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1488 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 7);
1491 // finalize the paxos commits
1492 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1493 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1495 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1497 dout(30) << __func__
<< " final tx dump:\n";
1498 JSONFormatter
f(true);
1503 store
->apply_transaction(tx
);
1506 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 8);
1508 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1509 t
->erase("mon_sync", "in_sync");
1510 t
->erase("mon_sync", "force_sync");
1511 t
->erase("mon_sync", "last_committed_floor");
1512 store
->apply_transaction(t
);
1514 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 9);
1518 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 10);
1523 void Monitor::handle_sync(MonOpRequestRef op
)
1525 auto m
= op
->get_req
<MMonSync
>();
1526 dout(10) << __func__
<< " " << *m
<< dendl
;
1529 // provider ---------
1531 case MMonSync::OP_GET_COOKIE_FULL
:
1532 case MMonSync::OP_GET_COOKIE_RECENT
:
1533 handle_sync_get_cookie(op
);
1535 case MMonSync::OP_GET_CHUNK
:
1536 handle_sync_get_chunk(op
);
1539 // client -----------
1541 case MMonSync::OP_COOKIE
:
1542 handle_sync_cookie(op
);
1545 case MMonSync::OP_CHUNK
:
1546 case MMonSync::OP_LAST_CHUNK
:
1547 handle_sync_chunk(op
);
1549 case MMonSync::OP_NO_COOKIE
:
1550 handle_sync_no_cookie(op
);
1554 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1555 ceph_abort_msg("unknown op");
1561 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1563 auto m
= op
->get_req
<MMonSync
>();
1564 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1565 m
->get_connection()->send_message(reply
);
1568 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1570 auto m
= op
->get_req
<MMonSync
>();
1571 if (is_synchronizing()) {
1572 _sync_reply_no_cookie(op
);
1576 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 1);
1578 // make sure they can understand us.
1579 if ((required_features
^ m
->get_connection()->get_features()) &
1580 required_features
) {
1581 dout(5) << " ignoring peer mon." << m
->get_source().num()
1582 << " has features " << std::hex
1583 << m
->get_connection()->get_features()
1584 << " but we require " << required_features
<< std::dec
<< dendl
;
1588 // make up a unique cookie. include election epoch (which persists
1589 // across restarts for the whole cluster) and a counter for this
1590 // process instance. there is no need to be unique *across*
1591 // monitors, though.
1592 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1593 ceph_assert(sync_providers
.count(cookie
) == 0);
1595 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1597 SyncProvider
& sp
= sync_providers
[cookie
];
1599 sp
.addrs
= m
->get_source_addrs();
1600 sp
.reset_timeout(g_ceph_context
, g_conf()->mon_sync_timeout
* 2);
1602 set
<string
> sync_targets
;
1603 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1605 sync_targets
= get_sync_targets_names();
1606 sp
.last_committed
= paxos
->get_version();
1607 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1609 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1611 // just catch up paxos
1612 sp
.last_committed
= m
->last_committed
;
1614 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1616 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1617 reply
->last_committed
= sp
.last_committed
;
1618 m
->get_connection()->send_message(reply
);
1621 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1623 auto m
= op
->get_req
<MMonSync
>();
1624 dout(10) << __func__
<< " " << *m
<< dendl
;
1626 if (sync_providers
.count(m
->cookie
) == 0) {
1627 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1628 _sync_reply_no_cookie(op
);
1632 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 2);
1634 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1635 sp
.reset_timeout(g_ceph_context
, g_conf()->mon_sync_timeout
* 2);
1637 if (sp
.last_committed
< paxos
->get_first_committed() &&
1638 paxos
->get_first_committed() > 1) {
1639 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1640 << " < our fc " << paxos
->get_first_committed() << dendl
;
1641 sync_providers
.erase(m
->cookie
);
1642 _sync_reply_no_cookie(op
);
1646 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1647 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1649 int bytes_left
= g_conf()->mon_sync_max_payload_size
;
1650 int keys_left
= g_conf()->mon_sync_max_payload_keys
;
1651 while (sp
.last_committed
< paxos
->get_version() &&
1655 sp
.last_committed
++;
1657 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1658 ceph_assert(err
== 0);
1660 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1661 bytes_left
-= bl
.length();
1663 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1666 reply
->last_committed
= sp
.last_committed
;
1668 if (sp
.full
&& bytes_left
> 0 && keys_left
> 0) {
1669 sp
.synchronizer
->get_chunk_tx(tx
, bytes_left
, keys_left
);
1670 sp
.last_key
= sp
.synchronizer
->get_last_key();
1671 reply
->last_key
= sp
.last_key
;
1674 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1675 sp
.last_committed
< paxos
->get_version()) {
1676 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1677 << " key " << sp
.last_key
<< dendl
;
1679 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1680 << " key " << sp
.last_key
<< dendl
;
1681 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1683 ceph_assert(g_conf()->mon_sync_provider_kill_at
!= 3);
1685 // clean up our local state
1686 sync_providers
.erase(sp
.cookie
);
1689 encode(*tx
, reply
->chunk_bl
);
1691 m
->get_connection()->send_message(reply
);
1696 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1698 auto m
= op
->get_req
<MMonSync
>();
1699 dout(10) << __func__
<< " " << *m
<< dendl
;
1701 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1704 if (m
->get_source_addrs() != sync_provider
) {
1705 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1708 sync_cookie
= m
->cookie
;
1709 sync_start_version
= m
->last_committed
;
1711 sync_reset_timeout();
1712 sync_get_next_chunk();
1714 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 3);
1717 void Monitor::sync_get_next_chunk()
1719 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1720 if (g_conf()->mon_inject_sync_get_chunk_delay
> 0) {
1721 dout(20) << __func__
<< " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay
<< dendl
;
1722 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay
* 1000000.0));
1724 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1725 messenger
->send_to_mon(r
, sync_provider
);
1727 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 4);
1730 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1732 auto m
= op
->get_req
<MMonSync
>();
1733 dout(10) << __func__
<< " " << *m
<< dendl
;
1735 if (m
->cookie
!= sync_cookie
) {
1736 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1739 if (m
->get_source_addrs() != sync_provider
) {
1740 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1744 ceph_assert(state
== STATE_SYNCHRONIZING
);
1745 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 5);
1747 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1748 tx
->append_from_encoded(m
->chunk_bl
);
1750 dout(30) << __func__
<< " tx dump:\n";
1751 JSONFormatter
f(true);
1756 store
->apply_transaction(tx
);
1758 ceph_assert(g_conf()->mon_sync_requester_kill_at
!= 6);
1761 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1762 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1763 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1765 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1767 dout(30) << __func__
<< " tx dump:\n";
1768 JSONFormatter
f(true);
1773 store
->apply_transaction(tx
);
1774 paxos
->init(); // to refresh what we just wrote
1777 if (m
->op
== MMonSync::OP_CHUNK
) {
1778 sync_reset_timeout();
1779 sync_get_next_chunk();
1780 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1781 sync_finish(m
->last_committed
);
1785 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1787 dout(10) << __func__
<< dendl
;
1791 void Monitor::sync_trim_providers()
1793 dout(20) << __func__
<< dendl
;
1795 utime_t now
= ceph_clock_now();
1796 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1797 while (p
!= sync_providers
.end()) {
1798 if (now
> p
->second
.timeout
) {
1799 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
1800 << " for " << p
->second
.addrs
<< dendl
;
1801 sync_providers
.erase(p
++);
1808 // ---------------------------------------------------
1811 void Monitor::cancel_probe_timeout()
1813 if (probe_timeout_event
) {
1814 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1815 timer
.cancel_event(probe_timeout_event
);
1816 probe_timeout_event
= NULL
;
1818 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1822 void Monitor::reset_probe_timeout()
1824 cancel_probe_timeout();
1825 probe_timeout_event
= new C_MonContext
{this, [this](int r
) {
1828 double t
= g_conf()->mon_probe_timeout
;
1829 if (timer
.add_event_after(t
, probe_timeout_event
)) {
1830 dout(10) << "reset_probe_timeout " << probe_timeout_event
1831 << " after " << t
<< " seconds" << dendl
;
1833 probe_timeout_event
= nullptr;
1837 void Monitor::probe_timeout(int r
)
1839 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1840 ceph_assert(is_probing() || is_synchronizing());
1841 ceph_assert(probe_timeout_event
);
1842 probe_timeout_event
= NULL
;
1846 void Monitor::handle_probe(MonOpRequestRef op
)
1848 auto m
= op
->get_req
<MMonProbe
>();
1849 dout(10) << "handle_probe " << *m
<< dendl
;
1851 if (m
->fsid
!= monmap
->fsid
) {
1852 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1857 case MMonProbe::OP_PROBE
:
1858 handle_probe_probe(op
);
1861 case MMonProbe::OP_REPLY
:
1862 handle_probe_reply(op
);
1865 case MMonProbe::OP_MISSING_FEATURES
:
1866 derr
<< __func__
<< " require release " << (int)m
->mon_release
<< " > "
1867 << (int)ceph_release()
1868 << ", or missing features (have " << CEPH_FEATURES_ALL
1869 << ", required " << m
->required_features
1870 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
) << ")"
1876 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1878 auto m
= op
->get_req
<MMonProbe
>();
1880 dout(10) << "handle_probe_probe " << m
->get_source_inst() << *m
1881 << " features " << m
->get_connection()->get_features() << dendl
;
1882 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1883 if ((m
->mon_release
!= ceph_release_t::unknown
&&
1884 m
->mon_release
< monmap
->min_mon_release
) ||
1886 dout(1) << " peer " << m
->get_source_addr()
1887 << " release " << m
->mon_release
1888 << " < min_mon_release " << monmap
->min_mon_release
1889 << ", or missing features " << missing
<< dendl
;
1890 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1891 name
, has_ever_joined
, monmap
->min_mon_release
);
1892 m
->required_features
= required_features
;
1893 m
->get_connection()->send_message(r
);
1897 if (!is_probing() && !is_synchronizing()) {
1898 // If the probing mon is way ahead of us, we need to re-bootstrap.
1899 // Normally we capture this case when we initially bootstrap, but
1900 // it is possible we pass those checks (we overlap with
1901 // quorum-to-be) but fail to join a quorum before it moves past
1902 // us. We need to be kicked back to bootstrap so we can
1903 // synchonize, not keep calling elections.
1904 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1905 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1906 << "ahead of us, re-bootstrapping" << dendl
;
1914 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
,
1918 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1919 r
->paxos_first_version
= paxos
->get_first_committed();
1920 r
->paxos_last_version
= paxos
->get_version();
1921 m
->get_connection()->send_message(r
);
1923 // did we discover a peer here?
1924 if (!monmap
->contains(m
->get_source_addr())) {
1925 dout(1) << " adding peer " << m
->get_source_addrs()
1926 << " to list of hints" << dendl
;
1927 extra_probe_peers
.insert(m
->get_source_addrs());
1934 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1936 auto m
= op
->get_req
<MMonProbe
>();
1937 dout(10) << "handle_probe_reply " << m
->get_source_inst()
1938 << " " << *m
<< dendl
;
1939 dout(10) << " monmap is " << *monmap
<< dendl
;
1941 // discover name and addrs during probing or electing states.
1942 if (!is_probing() && !is_electing()) {
1946 // newer map, or they've joined a quorum and we haven't?
1948 monmap
->encode(mybl
, m
->get_connection()->get_features());
1949 // make sure it's actually different; the checks below err toward
1950 // taking the other guy's map, which could cause us to loop.
1951 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1952 MonMap
*newmap
= new MonMap
;
1953 newmap
->decode(m
->monmap_bl
);
1954 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1955 !has_ever_joined
)) {
1956 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1957 << ", mine was " << monmap
->get_epoch() << dendl
;
1959 monmap
->decode(m
->monmap_bl
);
1968 string peer_name
= monmap
->get_name(m
->get_source_addr());
1969 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
1970 dout(10) << " renaming peer " << m
->get_source_addr() << " "
1971 << peer_name
<< " -> " << m
->name
<< " in my monmap"
1973 monmap
->rename(peer_name
, m
->name
);
1975 if (is_electing()) {
1979 } else if (peer_name
.size()) {
1980 dout(10) << " peer name is " << peer_name
<< dendl
;
1982 dout(10) << " peer " << m
->get_source_addr() << " not in map" << dendl
;
1985 // new initial peer?
1986 if (monmap
->get_epoch() == 0 &&
1987 monmap
->contains(m
->name
) &&
1988 monmap
->get_addrs(m
->name
).front().is_blank_ip()) {
1989 dout(1) << " learned initial mon " << m
->name
1990 << " addrs " << m
->get_source_addrs() << dendl
;
1991 monmap
->set_addrvec(m
->name
, m
->get_source_addrs());
1997 // end discover phase
1998 if (!is_probing()) {
2002 ceph_assert(paxos
!= NULL
);
2004 if (is_synchronizing()) {
2005 dout(10) << " currently syncing" << dendl
;
2009 entity_addrvec_t other
= m
->get_source_addrs();
2011 if (m
->paxos_last_version
< sync_last_committed_floor
) {
2012 dout(10) << " peer paxos versions [" << m
->paxos_first_version
2013 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
2014 << sync_last_committed_floor
<< ", ignoring"
2017 if (paxos
->get_version() < m
->paxos_first_version
&&
2018 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
2019 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
2020 << "," << m
->paxos_last_version
<< "]"
2021 << " vs my version " << paxos
->get_version()
2022 << " (too far ahead)"
2024 cancel_probe_timeout();
2025 sync_start(other
, true);
2028 if (paxos
->get_version() + g_conf()->paxos_max_join_drift
< m
->paxos_last_version
) {
2029 dout(10) << " peer paxos last version " << m
->paxos_last_version
2030 << " vs my version " << paxos
->get_version()
2031 << " (too far ahead)"
2033 cancel_probe_timeout();
2034 sync_start(other
, false);
2039 // did the existing cluster complete upgrade to luminous?
2040 if (osdmon()->osdmap
.get_epoch()) {
2041 if (osdmon()->osdmap
.require_osd_release
< ceph_release_t::luminous
) {
2042 derr
<< __func__
<< " existing cluster has not completed upgrade to"
2043 << " luminous; 'ceph osd require_osd_release luminous' before"
2044 << " upgrading" << dendl
;
2047 if (!osdmon()->osdmap
.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS
) ||
2048 !osdmon()->osdmap
.test_flag(CEPH_OSDMAP_RECOVERY_DELETES
)) {
2049 derr
<< __func__
<< " existing cluster has not completed a full luminous"
2050 << " scrub to purge legacy snapdir objects; please scrub before"
2051 << " upgrading beyond luminous." << dendl
;
2056 // is there an existing quorum?
2057 if (m
->quorum
.size()) {
2058 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
2060 dout(10) << " peer paxos version " << m
->paxos_last_version
2061 << " vs my version " << paxos
->get_version()
2065 if (monmap
->contains(name
) &&
2066 !monmap
->get_addrs(name
).front().is_blank_ip()) {
2067 // i'm part of the cluster; just initiate a new election
2070 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl
;
2072 new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddrs()),
2073 *m
->quorum
.begin());
2076 if (monmap
->contains(m
->name
)) {
2077 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
2078 outside_quorum
.insert(m
->name
);
2080 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
2084 unsigned need
= monmap
->min_quorum_size();
2085 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
2086 if (outside_quorum
.size() >= need
) {
2087 if (outside_quorum
.count(name
)) {
2088 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
2091 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
2094 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
2099 void Monitor::join_election()
2101 dout(10) << __func__
<< dendl
;
2102 wait_for_paxos_write();
2104 state
= STATE_ELECTING
;
2106 logger
->inc(l_mon_num_elections
);
2109 void Monitor::start_election()
2111 dout(10) << "start_election" << dendl
;
2112 wait_for_paxos_write();
2114 state
= STATE_ELECTING
;
2116 logger
->inc(l_mon_num_elections
);
2117 logger
->inc(l_mon_election_call
);
2119 clog
->info() << "mon." << name
<< " calling monitor election";
2120 elector
.call_election();
2123 void Monitor::win_standalone_election()
2125 dout(1) << "win_standalone_election" << dendl
;
2127 // bump election epoch, in case the previous epoch included other
2128 // monitors; we need to be able to make the distinction.
2129 elector
.declare_standalone_victory();
2131 rank
= monmap
->get_rank(name
);
2132 ceph_assert(rank
== 0);
2136 map
<int,Metadata
> metadata
;
2137 collect_metadata(&metadata
[0]);
2139 win_election(elector
.get_epoch(), q
,
2141 ceph::features::mon::get_supported(),
2146 const utime_t
& Monitor::get_leader_since() const
2148 ceph_assert(state
== STATE_LEADER
);
2149 return leader_since
;
2152 epoch_t
Monitor::get_epoch()
2154 return elector
.get_epoch();
2157 void Monitor::_finish_svc_election()
2159 ceph_assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
2161 for (auto& svc
: paxos_service
) {
2162 // we already called election_finished() on monmon(); avoid callig twice
2163 if (state
== STATE_LEADER
&& svc
.get() == monmon())
2165 svc
->election_finished();
2169 void Monitor::win_election(epoch_t epoch
, const set
<int>& active
, uint64_t features
,
2170 const mon_feature_t
& mon_features
,
2171 ceph_release_t min_mon_release
,
2172 const map
<int,Metadata
>& metadata
)
2174 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
2175 << " features " << features
2176 << " mon_features " << mon_features
2177 << " min_mon_release " << min_mon_release
2179 ceph_assert(is_electing());
2180 state
= STATE_LEADER
;
2181 leader_since
= ceph_clock_now();
2182 quorum_since
= mono_clock::now();
2185 quorum_con_features
= features
;
2186 quorum_mon_features
= mon_features
;
2187 quorum_min_mon_release
= min_mon_release
;
2188 pending_metadata
= metadata
;
2189 outside_quorum
.clear();
2191 clog
->info() << "mon." << name
<< " is new leader, mons " << get_quorum_names()
2192 << " in quorum (ranks " << quorum
<< ")";
2194 set_leader_commands(get_local_commands(mon_features
));
2196 paxos
->leader_init();
2197 // NOTE: tell monmap monitor first. This is important for the
2198 // bootstrap case to ensure that the very first paxos proposal
2199 // codifies the monmap. Otherwise any manner of chaos can ensue
2200 // when monitors are call elections or participating in a paxos
2201 // round without agreeing on who the participants are.
2202 monmon()->election_finished();
2203 _finish_svc_election();
2205 logger
->inc(l_mon_election_win
);
2207 // inject new metadata in first transaction.
2209 // include previous metadata for missing mons (that aren't part of
2210 // the current quorum).
2211 map
<int,Metadata
> m
= metadata
;
2212 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
2213 if (m
.count(rank
) == 0 &&
2214 mon_metadata
.count(rank
)) {
2215 m
[rank
] = mon_metadata
[rank
];
2219 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2220 // a new transaction immediately after the election finishes. We should
2221 // do that anyway for other reasons, though.
2222 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
2225 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
2229 if (monmap
->size() > 1 &&
2230 monmap
->get_epoch() > 0) {
2232 health_tick_start();
2234 // Freshen the health status before doing health_to_clog in case
2235 // our just-completed election changed the health
2236 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r
){
2237 dout(20) << "healthmon now active" << dendl
;
2238 healthmon()->tick();
2239 if (healthmon()->is_proposing()) {
2240 dout(20) << __func__
<< " healthmon proposing, waiting" << dendl
;
2241 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext
{this,
2243 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
2244 do_health_to_clog_interval();
2248 do_health_to_clog_interval();
2252 scrub_event_start();
2256 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
2258 const mon_feature_t
& mon_features
,
2259 ceph_release_t min_mon_release
)
2262 leader_since
= utime_t();
2263 quorum_since
= mono_clock::now();
2266 outside_quorum
.clear();
2267 quorum_con_features
= features
;
2268 quorum_mon_features
= mon_features
;
2269 quorum_min_mon_release
= min_mon_release
;
2270 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
2271 << " quorum is " << quorum
<< " features are " << quorum_con_features
2272 << " mon_features are " << quorum_mon_features
2273 << " min_mon_release " << min_mon_release
2277 _finish_svc_election();
2279 logger
->inc(l_mon_election_lose
);
2285 std::string
collect_compression_algorithms()
2288 bool printed
= false;
2289 for (auto [name
, key
] : Compressor::compression_algorithms
) {
2302 void Monitor::collect_metadata(Metadata
*m
)
2304 collect_sys_info(m
, g_ceph_context
);
2305 (*m
)["addrs"] = stringify(messenger
->get_myaddrs());
2306 (*m
)["compression_algorithms"] = collect_compression_algorithms();
2308 // infer storage device
2309 string devname
= store
->get_devname();
2310 set
<string
> devnames
;
2311 get_raw_devices(devname
, &devnames
);
2312 map
<string
,string
> errs
;
2313 get_device_metadata(devnames
, m
, &errs
);
2314 for (auto& i
: errs
) {
2315 dout(1) << __func__
<< " " << i
.first
<< ": " << i
.second
<< dendl
;
2319 void Monitor::finish_election()
2321 apply_quorum_to_compatset_features();
2322 apply_monmap_to_compatset_features();
2324 exited_quorum
= utime_t();
2325 finish_contexts(g_ceph_context
, waitfor_quorum
);
2326 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2327 resend_routed_requests();
2329 register_cluster_logger();
2331 // enable authentication
2333 std::lock_guard
l(auth_lock
);
2334 authmon()->_set_mon_num_rank(monmap
->size(), rank
);
2337 // am i named properly?
2338 string cur_name
= monmap
->get_name(messenger
->get_myaddrs());
2339 if (cur_name
!= name
) {
2340 dout(10) << " renaming myself from " << cur_name
<< " -> " << name
<< dendl
;
2342 new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddrs()),
2347 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2349 if (new_features
.compare(features
) != 0) {
2350 CompatSet diff
= features
.unsupported(new_features
);
2351 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2352 features
= new_features
;
2354 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2356 store
->apply_transaction(t
);
2358 calc_quorum_requirements();
2362 void Monitor::apply_quorum_to_compatset_features()
2364 CompatSet
new_features(features
);
2365 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2366 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2367 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2369 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2370 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2371 dout(5) << __func__
<< dendl
;
2372 _apply_compatset_features(new_features
);
2375 void Monitor::apply_monmap_to_compatset_features()
2377 CompatSet
new_features(features
);
2378 mon_feature_t monmap_features
= monmap
->get_required_features();
2380 /* persistent monmap features may go into the compatset.
2381 * optional monmap features may not - why?
2382 * because optional monmap features may be set/unset by the admin,
2383 * and possibly by other means that haven't yet been thought out,
2384 * so we can't make the monitor enforce them on start - because they
2386 * this, of course, does not invalidate setting a compatset feature
2387 * for an optional feature - as long as you make sure to clean it up
2388 * once you unset it.
2390 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2391 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2392 ceph::features::mon::FEATURE_KRAKEN
));
2393 // this feature should only ever be set if the quorum supports it.
2394 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2395 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2397 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
2398 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2399 ceph::features::mon::FEATURE_LUMINOUS
));
2400 // this feature should only ever be set if the quorum supports it.
2401 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
));
2402 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
2404 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_MIMIC
)) {
2405 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2406 ceph::features::mon::FEATURE_MIMIC
));
2407 // this feature should only ever be set if the quorum supports it.
2408 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_MIMIC
));
2409 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC
);
2411 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_NAUTILUS
)) {
2412 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2413 ceph::features::mon::FEATURE_NAUTILUS
));
2414 // this feature should only ever be set if the quorum supports it.
2415 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_NAUTILUS
));
2416 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
);
2418 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_OCTOPUS
)) {
2419 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2420 ceph::features::mon::FEATURE_OCTOPUS
));
2421 // this feature should only ever be set if the quorum supports it.
2422 ceph_assert(HAVE_FEATURE(quorum_con_features
, SERVER_OCTOPUS
));
2423 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
);
2426 dout(5) << __func__
<< dendl
;
2427 _apply_compatset_features(new_features
);
2430 void Monitor::calc_quorum_requirements()
2432 required_features
= 0;
2435 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2436 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2438 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2439 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2441 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
)) {
2442 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2444 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC
)) {
2445 required_features
|= CEPH_FEATUREMASK_SERVER_MIMIC
;
2447 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS
)) {
2448 required_features
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
|
2449 CEPH_FEATUREMASK_CEPHX_V2
;
2451 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS
)) {
2452 required_features
|= CEPH_FEATUREMASK_SERVER_OCTOPUS
;
2456 if (monmap
->get_required_features().contains_all(
2457 ceph::features::mon::FEATURE_KRAKEN
)) {
2458 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2460 if (monmap
->get_required_features().contains_all(
2461 ceph::features::mon::FEATURE_LUMINOUS
)) {
2462 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2464 if (monmap
->get_required_features().contains_all(
2465 ceph::features::mon::FEATURE_MIMIC
)) {
2466 required_features
|= CEPH_FEATUREMASK_SERVER_MIMIC
;
2468 if (monmap
->get_required_features().contains_all(
2469 ceph::features::mon::FEATURE_NAUTILUS
)) {
2470 required_features
|= CEPH_FEATUREMASK_SERVER_NAUTILUS
|
2471 CEPH_FEATUREMASK_CEPHX_V2
;
2473 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2476 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2478 *fm
+= session_map
.feature_map
;
2479 for (auto id
: quorum
) {
2481 *fm
+= quorum_feature_map
[id
];
2486 void Monitor::sync_force(Formatter
*f
)
2488 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2489 sync_stash_critical_state(tx
);
2490 tx
->put("mon_sync", "force_sync", 1);
2491 store
->apply_transaction(tx
);
2493 f
->open_object_section("sync_force");
2494 f
->dump_int("ret", 0);
2495 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2496 f
->close_section(); // sync_force
2499 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2501 bool free_formatter
= false;
2504 // louzy/lazy hack: default to json if no formatter has been defined
2505 f
= new JSONFormatter();
2506 free_formatter
= true;
2508 f
->open_object_section("quorum_status");
2509 f
->dump_int("election_epoch", get_epoch());
2511 f
->open_array_section("quorum");
2512 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2513 f
->dump_int("mon", *p
);
2514 f
->close_section(); // quorum
2516 list
<string
> quorum_names
= get_quorum_names();
2517 f
->open_array_section("quorum_names");
2518 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2519 f
->dump_string("mon", *p
);
2520 f
->close_section(); // quorum_names
2522 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(*quorum
.begin()));
2524 if (!quorum
.empty()) {
2527 std::chrono::duration_cast
<std::chrono::seconds
>(
2528 mono_clock::now() - quorum_since
).count());
2531 f
->open_object_section("features");
2532 f
->dump_stream("quorum_con") << quorum_con_features
;
2533 quorum_mon_features
.dump(f
, "quorum_mon");
2536 f
->open_object_section("monmap");
2538 f
->close_section(); // monmap
2540 f
->close_section(); // quorum_status
2546 void Monitor::get_mon_status(Formatter
*f
)
2548 f
->open_object_section("mon_status");
2549 f
->dump_string("name", name
);
2550 f
->dump_int("rank", rank
);
2551 f
->dump_string("state", get_state_name());
2552 f
->dump_int("election_epoch", get_epoch());
2554 f
->open_array_section("quorum");
2555 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2556 f
->dump_int("mon", *p
);
2558 f
->close_section(); // quorum
2560 if (!quorum
.empty()) {
2563 std::chrono::duration_cast
<std::chrono::seconds
>(
2564 mono_clock::now() - quorum_since
).count());
2567 f
->open_object_section("features");
2568 f
->dump_stream("required_con") << required_features
;
2569 mon_feature_t req_mon_features
= get_required_mon_features();
2570 req_mon_features
.dump(f
, "required_mon");
2571 f
->dump_stream("quorum_con") << quorum_con_features
;
2572 quorum_mon_features
.dump(f
, "quorum_mon");
2573 f
->close_section(); // features
2575 f
->open_array_section("outside_quorum");
2576 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2577 f
->dump_string("mon", *p
);
2578 f
->close_section(); // outside_quorum
2580 f
->open_array_section("extra_probe_peers");
2581 for (set
<entity_addrvec_t
>::iterator p
= extra_probe_peers
.begin();
2582 p
!= extra_probe_peers
.end();
2584 f
->dump_object("peer", *p
);
2586 f
->close_section(); // extra_probe_peers
2588 f
->open_array_section("sync_provider");
2589 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2590 p
!= sync_providers
.end();
2592 f
->dump_unsigned("cookie", p
->second
.cookie
);
2593 f
->dump_object("addrs", p
->second
.addrs
);
2594 f
->dump_stream("timeout") << p
->second
.timeout
;
2595 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2596 f
->dump_stream("last_key") << p
->second
.last_key
;
2600 if (is_synchronizing()) {
2601 f
->open_object_section("sync");
2602 f
->dump_stream("sync_provider") << sync_provider
;
2603 f
->dump_unsigned("sync_cookie", sync_cookie
);
2604 f
->dump_unsigned("sync_start_version", sync_start_version
);
2608 if (g_conf()->mon_sync_provider_kill_at
> 0)
2609 f
->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at
);
2610 if (g_conf()->mon_sync_requester_kill_at
> 0)
2611 f
->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at
);
2613 f
->open_object_section("monmap");
2617 f
->dump_object("feature_map", session_map
.feature_map
);
2618 f
->close_section(); // mon_status
2622 // health status to clog
2624 void Monitor::health_tick_start()
2626 if (!cct
->_conf
->mon_health_to_clog
||
2627 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2630 dout(15) << __func__
<< dendl
;
2633 health_tick_event
= timer
.add_event_after(
2634 cct
->_conf
->mon_health_to_clog_tick_interval
,
2635 new C_MonContext
{this, [this](int r
) {
2638 health_tick_start();
2642 void Monitor::health_tick_stop()
2644 dout(15) << __func__
<< dendl
;
2646 if (health_tick_event
) {
2647 timer
.cancel_event(health_tick_event
);
2648 health_tick_event
= NULL
;
2652 ceph::real_clock::time_point
Monitor::health_interval_calc_next_update()
2654 auto now
= ceph::real_clock::now();
2656 auto secs
= std::chrono::duration_cast
<std::chrono::seconds
>(now
.time_since_epoch());
2657 int remainder
= secs
.count() % cct
->_conf
->mon_health_to_clog_interval
;
2658 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2659 auto next
= secs
+ std::chrono::seconds(adjustment
);
2661 dout(20) << __func__
2662 << " now: " << now
<< ","
2663 << " next: " << next
<< ","
2664 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2667 return ceph::real_clock::time_point
{next
};
2670 void Monitor::health_interval_start()
2672 dout(15) << __func__
<< dendl
;
2674 if (!cct
->_conf
->mon_health_to_clog
||
2675 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2679 health_interval_stop();
2680 auto next
= health_interval_calc_next_update();
2681 health_interval_event
= new C_MonContext
{this, [this](int r
) {
2684 do_health_to_clog_interval();
2686 if (!timer
.add_event_at(next
, health_interval_event
)) {
2687 health_interval_event
= nullptr;
2691 void Monitor::health_interval_stop()
2693 dout(15) << __func__
<< dendl
;
2694 if (health_interval_event
) {
2695 timer
.cancel_event(health_interval_event
);
2697 health_interval_event
= NULL
;
2700 void Monitor::health_events_cleanup()
2703 health_interval_stop();
2704 health_status_cache
.reset();
2707 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2709 dout(20) << __func__
<< dendl
;
2711 if (changed
.count("mon_health_to_clog")) {
2712 if (!cct
->_conf
->mon_health_to_clog
) {
2713 health_events_cleanup();
2716 if (!health_tick_event
) {
2717 health_tick_start();
2719 if (!health_interval_event
) {
2720 health_interval_start();
2725 if (changed
.count("mon_health_to_clog_interval")) {
2726 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2727 health_interval_stop();
2729 health_interval_start();
2733 if (changed
.count("mon_health_to_clog_tick_interval")) {
2734 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2737 health_tick_start();
2742 void Monitor::do_health_to_clog_interval()
2744 // outputting to clog may have been disabled in the conf
2745 // since we were scheduled.
2746 if (!cct
->_conf
->mon_health_to_clog
||
2747 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2750 dout(10) << __func__
<< dendl
;
2752 // do we have a cached value for next_clog_update? if not,
2753 // do we know when the last update was?
2755 do_health_to_clog(true);
2756 health_interval_start();
2759 void Monitor::do_health_to_clog(bool force
)
2761 // outputting to clog may have been disabled in the conf
2762 // since we were scheduled.
2763 if (!cct
->_conf
->mon_health_to_clog
||
2764 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2767 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2770 health_status_t level
= healthmon()->get_health_status(false, nullptr, &summary
);
2772 summary
== health_status_cache
.summary
&&
2773 level
== health_status_cache
.overall
)
2775 clog
->health(level
) << "overall " << summary
;
2776 health_status_cache
.summary
= summary
;
2777 health_status_cache
.overall
= level
;
2780 void Monitor::log_health(
2781 const health_check_map_t
& updated
,
2782 const health_check_map_t
& previous
,
2783 MonitorDBStore::TransactionRef t
)
2785 if (!g_conf()->mon_health_to_clog
) {
2789 const utime_t now
= ceph_clock_now();
2791 // FIXME: log atomically as part of @t instead of using clog.
2792 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2793 << " previous " << previous
.checks
.size()
2795 const auto min_log_period
= g_conf().get_val
<int64_t>(
2796 "mon_health_log_update_period");
2797 for (auto& p
: updated
.checks
) {
2798 auto q
= previous
.checks
.find(p
.first
);
2799 bool logged
= false;
2800 if (q
== previous
.checks
.end()) {
2803 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2805 clog
->health(p
.second
.severity
) << ss
.str();
2809 if (p
.second
.summary
!= q
->second
.summary
||
2810 p
.second
.severity
!= q
->second
.severity
) {
2812 auto status_iter
= health_check_log_times
.find(p
.first
);
2813 if (status_iter
!= health_check_log_times
.end()) {
2814 if (p
.second
.severity
== q
->second
.severity
&&
2815 now
- status_iter
->second
.updated_at
< min_log_period
) {
2816 // We already logged this recently and the severity is unchanged,
2817 // so skip emitting an update of the summary string.
2818 // We'll get an update out of tick() later if the check
2819 // is still failing.
2824 // summary or severity changed (ignore detail changes at this level)
2826 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2827 clog
->health(p
.second
.severity
) << ss
.str();
2832 // Record the time at which we last logged, so that we can check this
2833 // when considering whether/when to print update messages.
2835 auto iter
= health_check_log_times
.find(p
.first
);
2836 if (iter
== health_check_log_times
.end()) {
2837 health_check_log_times
.emplace(p
.first
, HealthCheckLogStatus(
2838 p
.second
.severity
, p
.second
.summary
, now
));
2840 iter
->second
= HealthCheckLogStatus(
2841 p
.second
.severity
, p
.second
.summary
, now
);
2845 for (auto& p
: previous
.checks
) {
2846 if (!updated
.checks
.count(p
.first
)) {
2849 if (p
.first
== "DEGRADED_OBJECTS") {
2850 clog
->info() << "All degraded objects recovered";
2851 } else if (p
.first
== "OSD_FLAGS") {
2852 clog
->info() << "OSD flags cleared";
2854 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2855 << p
.second
.summary
<< ")";
2858 if (health_check_log_times
.count(p
.first
)) {
2859 health_check_log_times
.erase(p
.first
);
2864 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2865 // We might be going into a fully healthy state, check
2867 bool any_checks
= false;
2868 for (auto& svc
: paxos_service
) {
2869 if (&(svc
->get_health_checks()) == &(previous
)) {
2870 // Ignore the ones we're clearing right now
2874 if (svc
->get_health_checks().checks
.size() > 0) {
2880 clog
->info() << "Cluster is now healthy";
2885 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
)
2888 f
->open_object_section("status");
2890 mono_clock::time_point now
= mono_clock::now();
2892 f
->dump_stream("fsid") << monmap
->get_fsid();
2893 healthmon()->get_health_status(false, f
, nullptr);
2894 f
->dump_unsigned("election_epoch", get_epoch());
2896 f
->open_array_section("quorum");
2897 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2898 f
->dump_int("rank", *p
);
2900 f
->open_array_section("quorum_names");
2901 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2902 f
->dump_string("id", monmap
->get_name(*p
));
2906 std::chrono::duration_cast
<std::chrono::seconds
>(
2907 mono_clock::now() - quorum_since
).count());
2909 f
->open_object_section("monmap");
2910 monmap
->dump_summary(f
);
2912 f
->open_object_section("osdmap");
2913 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
2915 f
->open_object_section("pgmap");
2916 mgrstatmon()->print_summary(f
, NULL
);
2918 f
->open_object_section("fsmap");
2919 mdsmon()->get_fsmap().print_summary(f
, NULL
);
2921 f
->open_object_section("mgrmap");
2922 mgrmon()->get_map().print_summary(f
, nullptr);
2925 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
2927 f
->open_object_section("progress_events");
2928 for (auto& i
: mgrstatmon()->get_progress_events()) {
2929 f
->dump_object(i
.first
.c_str(), i
.second
);
2935 ss
<< " cluster:\n";
2936 ss
<< " id: " << monmap
->get_fsid() << "\n";
2939 healthmon()->get_health_status(false, nullptr, &health
,
2941 ss
<< " health: " << health
<< "\n";
2943 ss
<< "\n \n services:\n";
2946 auto& service_map
= mgrstatmon()->get_service_map();
2947 for (auto& p
: service_map
.services
) {
2948 maxlen
= std::max(maxlen
, p
.first
.size());
2950 string
spacing(maxlen
- 3, ' ');
2951 const auto quorum_names
= get_quorum_names();
2952 const auto mon_count
= monmap
->mon_info
.size();
2953 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
2954 << quorum_names
<< " (age " << timespan_str(now
- quorum_since
) << ")";
2955 if (quorum_names
.size() != mon_count
) {
2956 std::list
<std::string
> out_of_q
;
2957 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
2958 if (quorum
.count(i
) == 0) {
2959 out_of_q
.push_back(monmap
->ranks
[i
]);
2962 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
2963 out_of_q
.end(), std::string(", "));
2966 if (mgrmon()->in_use()) {
2967 ss
<< " mgr: " << spacing
;
2968 mgrmon()->get_map().print_summary(nullptr, &ss
);
2971 if (mdsmon()->should_print_status()) {
2972 ss
<< " mds: " << spacing
<< mdsmon()->get_fsmap() << "\n";
2974 ss
<< " osd: " << spacing
;
2975 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
2977 for (auto& p
: service_map
.services
) {
2978 const std::string
&service
= p
.first
;
2979 // filter out normal ceph entity types
2980 if (ServiceMap::is_normal_ceph_entity(service
)) {
2983 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
2984 << p
.second
.get_summary() << "\n";
2989 auto& service_map
= mgrstatmon()->get_service_map();
2990 if (!service_map
.services
.empty()) {
2991 ss
<< "\n \n task status:\n";
2993 for (auto &p
: service_map
.services
) {
2994 ss
<< p
.second
.get_task_summary(p
.first
);
3000 ss
<< "\n \n data:\n";
3001 mgrstatmon()->print_summary(NULL
, &ss
);
3003 auto& pem
= mgrstatmon()->get_progress_events();
3005 ss
<< "\n \n progress:\n";
3006 for (auto& i
: pem
) {
3007 ss
<< " " << i
.second
.message
<< "\n";
3014 void Monitor::_generate_command_map(cmdmap_t
& cmdmap
,
3015 map
<string
,string
> ¶m_str_map
)
3017 for (auto p
= cmdmap
.begin(); p
!= cmdmap
.end(); ++p
) {
3018 if (p
->first
== "prefix")
3020 if (p
->first
== "caps") {
3022 if (cmd_getval(cmdmap
, "caps", cv
) &&
3023 cv
.size() % 2 == 0) {
3024 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
3025 string k
= string("caps_") + cv
[i
];
3026 param_str_map
[k
] = cv
[i
+ 1];
3031 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
3035 const MonCommand
*Monitor::_get_moncommand(
3036 const string
&cmd_prefix
,
3037 const vector
<MonCommand
>& cmds
)
3039 for (auto& c
: cmds
) {
3040 if (c
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
3047 bool Monitor::_allowed_command(MonSession
*s
, const string
&module
,
3048 const string
&prefix
, const cmdmap_t
& cmdmap
,
3049 const map
<string
,string
>& param_str_map
,
3050 const MonCommand
*this_cmd
) {
3052 bool cmd_r
= this_cmd
->requires_perm('r');
3053 bool cmd_w
= this_cmd
->requires_perm('w');
3054 bool cmd_x
= this_cmd
->requires_perm('x');
3056 bool capable
= s
->caps
.is_capable(
3059 module
, prefix
, param_str_map
,
3060 cmd_r
, cmd_w
, cmd_x
,
3061 s
->get_peer_socket_addr());
3063 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
3067 void Monitor::format_command_descriptions(const std::vector
<MonCommand
> &commands
,
3073 f
->open_object_section("command_descriptions");
3074 for (const auto &cmd
: commands
) {
3075 unsigned flags
= cmd
.flags
;
3076 ostringstream secname
;
3077 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
3078 dump_cmddesc_to_json(f
, features
, secname
.str(),
3079 cmd
.cmdstring
, cmd
.helpstring
, cmd
.module
,
3080 cmd
.req_perms
, flags
);
3083 f
->close_section(); // command_descriptions
3088 bool Monitor::is_keyring_required()
3090 return auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
) ||
3091 auth_service_required
.is_supported_auth(CEPH_AUTH_CEPHX
) ||
3092 auth_cluster_required
.is_supported_auth(CEPH_AUTH_GSS
) ||
3093 auth_service_required
.is_supported_auth(CEPH_AUTH_GSS
);
3096 struct C_MgrProxyCommand
: public Context
{
3102 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
3103 : mon(mon
), op(op
), size(s
) { }
3104 void finish(int r
) {
3105 std::lock_guard
l(mon
->lock
);
3106 mon
->mgr_proxy_bytes
-= size
;
3107 mon
->reply_command(op
, r
, outs
, outbl
, 0);
3111 void Monitor::handle_tell_command(MonOpRequestRef op
)
3113 ceph_assert(op
->is_type_command());
3114 MCommand
*m
= static_cast<MCommand
*>(op
->get_req());
3115 if (m
->fsid
!= monmap
->fsid
) {
3116 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
3117 return reply_tell_command(op
, -EACCES
, "wrong fsid");
3119 MonSession
*session
= op
->get_session();
3121 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
3125 if (stringstream ss
; !cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3126 return reply_tell_command(op
, -EINVAL
, ss
.str());
3128 map
<string
,string
> param_str_map
;
3129 _generate_command_map(cmdmap
, param_str_map
);
3131 if (!cmd_getval(cmdmap
, "prefix", prefix
)) {
3132 return reply_tell_command(op
, -EINVAL
, "no prefix");
3134 if (auto cmd
= _get_moncommand(prefix
,
3135 get_local_commands(quorum_mon_features
));
3137 if (cmd
->is_obsolete() ||
3138 (cct
->_conf
->mon_debug_deprecated_as_obsolete
&&
3139 cmd
->is_deprecated())) {
3140 return reply_tell_command(op
, -ENOTSUP
,
3141 "command is obsolete; "
3142 "please check usage and/or man page");
3145 // see if command is whitelisted
3146 if (!session
->caps
.is_capable(
3148 session
->entity_name
,
3149 "mon", prefix
, param_str_map
,
3151 session
->get_peer_socket_addr())) {
3152 return reply_tell_command(op
, -EACCES
, "insufficient caps");
3155 cct
->get_admin_socket()->queue_tell_command(m
);
3158 void Monitor::handle_command(MonOpRequestRef op
)
3160 ceph_assert(op
->is_type_command());
3161 auto m
= op
->get_req
<MMonCommand
>();
3162 if (m
->fsid
!= monmap
->fsid
) {
3163 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
3165 reply_command(op
, -EPERM
, "wrong fsid", 0);
3169 MonSession
*session
= op
->get_session();
3171 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
3175 if (m
->cmd
.empty()) {
3176 reply_command(op
, -EINVAL
, "no command specified", 0);
3181 vector
<string
> fullcmd
;
3183 stringstream ss
, ds
;
3187 rs
= "unrecognized command";
3189 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3190 // ss has reason for failure
3193 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3194 reply_command(op
, r
, rs
, 0);
3198 // check return value. If no prefix parameter provided,
3199 // return value will be false, then return error info.
3200 if (!cmd_getval(cmdmap
, "prefix", prefix
)) {
3201 reply_command(op
, -EINVAL
, "command prefix not found", 0);
3205 // check prefix is empty
3206 if (prefix
.empty()) {
3207 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
3211 if (prefix
== "get_command_descriptions") {
3213 Formatter
*f
= Formatter::create("json");
3215 std::vector
<MonCommand
> commands
= static_cast<MgrMonitor
*>(
3216 paxos_service
[PAXOS_MGR
].get())->get_command_descs();
3218 for (auto& c
: leader_mon_commands
) {
3219 commands
.push_back(c
);
3222 auto features
= m
->get_connection()->get_features();
3223 format_command_descriptions(commands
, f
, features
, &rdata
);
3225 reply_command(op
, 0, "", rdata
, 0);
3232 dout(0) << "handle_command " << *m
<< dendl
;
3235 cmd_getval(cmdmap
, "format", format
, string("plain"));
3236 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
3238 get_str_vec(prefix
, fullcmd
);
3240 // make sure fullcmd is not empty.
3241 // invalid prefix will cause empty vector fullcmd.
3242 // such as, prefix=";,,;"
3243 if (fullcmd
.empty()) {
3244 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3248 module
= fullcmd
[0];
3250 // validate command is in leader map
3252 const MonCommand
*leader_cmd
;
3253 const auto& mgr_cmds
= mgrmon()->get_command_descs();
3254 const MonCommand
*mgr_cmd
= nullptr;
3255 if (!mgr_cmds
.empty()) {
3256 mgr_cmd
= _get_moncommand(prefix
, mgr_cmds
);
3258 leader_cmd
= _get_moncommand(prefix
, leader_mon_commands
);
3260 leader_cmd
= mgr_cmd
;
3262 reply_command(op
, -EINVAL
, "command not known", 0);
3266 // validate command is in our map & matches, or forward if it is allowed
3267 const MonCommand
*mon_cmd
= _get_moncommand(
3269 get_local_commands(quorum_mon_features
));
3275 if (leader_cmd
->is_noforward()) {
3276 reply_command(op
, -EINVAL
,
3277 "command not locally supported and not allowed to forward",
3281 dout(10) << "Command not locally supported, forwarding request "
3283 forward_request_leader(op
);
3285 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3286 if (mon_cmd
->is_noforward()) {
3287 reply_command(op
, -EINVAL
,
3288 "command not compatible with leader and not allowed to forward",
3292 dout(10) << "Command not compatible with leader, forwarding request "
3294 forward_request_leader(op
);
3299 if (mon_cmd
->is_obsolete() ||
3300 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3301 && mon_cmd
->is_deprecated())) {
3302 reply_command(op
, -ENOTSUP
,
3303 "command is obsolete; please check usage and/or man page",
3308 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3309 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3310 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3314 /* what we perceive as being the service the command falls under */
3315 string
service(mon_cmd
->module
);
3317 dout(25) << __func__
<< " prefix='" << prefix
3318 << "' module='" << module
3319 << "' service='" << service
<< "'" << dendl
;
3322 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3324 // validate user's permissions for requested command
3325 map
<string
,string
> param_str_map
;
3326 _generate_command_map(cmdmap
, param_str_map
);
3327 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3328 param_str_map
, mon_cmd
)) {
3329 dout(1) << __func__
<< " access denied" << dendl
;
3330 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3331 << "from='" << session
->name
<< " " << session
->addrs
<< "' "
3332 << "entity='" << session
->entity_name
<< "' "
3333 << "cmd=" << m
->cmd
<< ": access denied";
3334 reply_command(op
, -EACCES
, "access denied", 0);
3338 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3339 << "from='" << session
->name
<< " " << session
->addrs
<< "' "
3340 << "entity='" << session
->entity_name
<< "' "
3341 << "cmd=" << m
->cmd
<< ": dispatch";
3343 // compat kludge for legacy clients trying to tell commands that are
3344 // new. see bottom of MonCommands.h. we need to handle both (1)
3345 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3346 // and octopus mons.
3347 if ((!HAVE_FEATURE(m
->get_connection()->get_features(), SERVER_OCTOPUS
) ||
3348 monmap
->min_mon_release
< ceph_release_t::octopus
) &&
3349 (prefix
== "injectargs" ||
3350 prefix
== "smart" ||
3351 prefix
== "mon_status" ||
3352 prefix
== "heap")) {
3353 if (m
->get_connection()->get_messenger() == 0) {
3354 // Prior to octopus, monitors might forward these messages
3355 // around. that was broken at baseline, and if we try to process
3356 // this message now, it will assert out when we try to send a
3357 // message in reply from the asok/tell worker (see
3358 // AnonConnection). Just reply with an error.
3359 dout(5) << __func__
<< " failing forwarded command from a (presumably) "
3360 << "pre-octopus peer" << dendl
;
3363 "failing forwarded tell command in mixed-version mon cluster", 0);
3366 dout(5) << __func__
<< " passing command to tell/asok" << dendl
;
3367 cct
->get_admin_socket()->queue_tell_command(m
);
3371 if (mon_cmd
->is_mgr()) {
3372 const auto& hdr
= m
->get_header();
3373 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3374 uint64_t max
= g_conf().get_val
<Option::size_t>("mon_client_bytes")
3375 * g_conf().get_val
<double>("mon_mgr_proxy_client_bytes_ratio");
3376 if (mgr_proxy_bytes
+ size
> max
) {
3377 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3378 << " + " << size
<< " > max " << max
<< dendl
;
3379 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3382 mgr_proxy_bytes
+= size
;
3383 dout(10) << __func__
<< " proxying mgr command (+" << size
3384 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3385 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3386 mgr_client
.start_command(m
->cmd
,
3390 new C_OnFinisher(fin
, &finisher
));
3394 if ((module
== "mds" || module
== "fs") &&
3395 prefix
!= "fs authorize") {
3396 mdsmon()->dispatch(op
);
3399 if ((module
== "osd" ||
3400 prefix
== "pg map" ||
3401 prefix
== "pg repeer") &&
3402 prefix
!= "osd last-stat-seq") {
3403 osdmon()->dispatch(op
);
3406 if (module
== "config") {
3407 configmon()->dispatch(op
);
3411 if (module
== "mon" &&
3412 /* Let the Monitor class handle the following commands:
3415 prefix
!= "mon scrub" &&
3416 prefix
!= "mon metadata" &&
3417 prefix
!= "mon versions" &&
3418 prefix
!= "mon count-metadata" &&
3419 prefix
!= "mon ok-to-stop" &&
3420 prefix
!= "mon ok-to-add-offline" &&
3421 prefix
!= "mon ok-to-rm") {
3422 monmon()->dispatch(op
);
3425 if (module
== "health" && prefix
!= "health") {
3426 healthmon()->dispatch(op
);
3429 if (module
== "auth" || prefix
== "fs authorize") {
3430 authmon()->dispatch(op
);
3433 if (module
== "log") {
3434 logmon()->dispatch(op
);
3438 if (module
== "config-key") {
3439 config_key_service
->dispatch(op
);
3443 if (module
== "mgr") {
3444 mgrmon()->dispatch(op
);
3448 if (prefix
== "fsid") {
3450 f
->open_object_section("fsid");
3451 f
->dump_stream("fsid") << monmap
->fsid
;
3458 reply_command(op
, 0, "", rdata
, 0);
3462 if (prefix
== "mon scrub") {
3463 wait_for_paxos_write();
3465 int r
= scrub_start();
3466 reply_command(op
, r
, "", rdata
, 0);
3467 } else if (is_peon()) {
3468 forward_request_leader(op
);
3470 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3475 if (prefix
== "time-sync-status") {
3477 f
.reset(Formatter::create("json-pretty"));
3478 f
->open_object_section("time_sync");
3479 if (!timecheck_skews
.empty()) {
3480 f
->open_object_section("time_skew_status");
3481 for (auto& i
: timecheck_skews
) {
3482 double skew
= i
.second
;
3483 double latency
= timecheck_latencies
[i
.first
];
3484 string name
= monmap
->get_name(i
.first
);
3486 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3487 f
->open_object_section(name
.c_str());
3488 f
->dump_float("skew", skew
);
3489 f
->dump_float("latency", latency
);
3490 f
->dump_stream("health") << tcstatus
;
3491 if (tcstatus
!= HEALTH_OK
) {
3492 f
->dump_stream("details") << tcss
.str();
3498 f
->open_object_section("timechecks");
3499 f
->dump_unsigned("epoch", get_epoch());
3500 f
->dump_int("round", timecheck_round
);
3501 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3502 "on-going" : "finished");
3508 } else if (prefix
== "status" ||
3509 prefix
== "health" ||
3512 cmd_getval(cmdmap
, "detail", detail
);
3514 if (prefix
== "status") {
3515 // get_cluster_status handles f == NULL
3516 get_cluster_status(ds
, f
.get());
3523 } else if (prefix
== "health") {
3525 healthmon()->get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3529 rdata
.append(plain
);
3531 } else if (prefix
== "df") {
3532 bool verbose
= (detail
== "detail");
3534 f
->open_object_section("stats");
3536 mgrstatmon()->dump_cluster_stats(&ds
, f
.get(), verbose
);
3540 mgrstatmon()->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3548 ceph_abort_msg("We should never get here!");
3554 } else if (prefix
== "report") {
3556 // this must be formatted, in its current form
3558 f
.reset(Formatter::create("json-pretty"));
3559 f
->open_object_section("report");
3560 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3561 f
->dump_string("version", ceph_version_to_str());
3562 f
->dump_string("commit", git_version_to_str());
3563 f
->dump_stream("timestamp") << ceph_clock_now();
3565 vector
<string
> tagsvec
;
3566 cmd_getval(cmdmap
, "tags", tagsvec
);
3567 string tagstr
= str_join(tagsvec
, " ");
3568 if (!tagstr
.empty())
3569 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3570 f
->dump_string("tag", tagstr
);
3572 healthmon()->get_health_status(true, f
.get(), nullptr);
3574 monmon()->dump_info(f
.get());
3575 osdmon()->dump_info(f
.get());
3576 mdsmon()->dump_info(f
.get());
3577 authmon()->dump_info(f
.get());
3578 mgrstatmon()->dump_info(f
.get());
3580 paxos
->dump_info(f
.get());
3586 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT_LEGACY
);
3589 } else if (prefix
== "osd last-stat-seq") {
3591 cmd_getval(cmdmap
, "id", osd
);
3592 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3594 f
->dump_unsigned("seq", seq
);
3602 } else if (prefix
== "node ls") {
3603 string
node_type("all");
3604 cmd_getval(cmdmap
, "type", node_type
);
3606 f
.reset(Formatter::create("json-pretty"));
3607 if (node_type
== "all") {
3608 f
->open_object_section("nodes");
3609 print_nodes(f
.get(), ds
);
3610 osdmon()->print_nodes(f
.get());
3611 mdsmon()->print_nodes(f
.get());
3612 mgrmon()->print_nodes(f
.get());
3614 } else if (node_type
== "mon") {
3615 print_nodes(f
.get(), ds
);
3616 } else if (node_type
== "osd") {
3617 osdmon()->print_nodes(f
.get());
3618 } else if (node_type
== "mds") {
3619 mdsmon()->print_nodes(f
.get());
3620 } else if (node_type
== "mgr") {
3621 mgrmon()->print_nodes(f
.get());
3627 } else if (prefix
== "features") {
3628 if (!is_leader() && !is_peon()) {
3629 dout(10) << " waiting for quorum" << dendl
;
3630 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3634 forward_request_leader(op
);
3638 f
.reset(Formatter::create("json-pretty"));
3640 get_combined_feature_map(&fm
);
3641 f
->dump_object("features", fm
);
3645 } else if (prefix
== "mon metadata") {
3647 f
.reset(Formatter::create("json-pretty"));
3650 bool all
= !cmd_getval(cmdmap
, "id", name
);
3652 // Dump a single mon's metadata
3653 int mon
= monmap
->get_rank(name
);
3655 rs
= "requested mon not found";
3659 f
->open_object_section("mon_metadata");
3660 r
= get_mon_metadata(mon
, f
.get(), ds
);
3663 // Dump all mons' metadata
3665 f
->open_array_section("mon_metadata");
3666 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3667 std::ostringstream get_err
;
3668 f
->open_object_section("mon");
3669 f
->dump_string("name", monmap
->get_name(rank
));
3670 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3672 if (r
== -ENOENT
|| r
== -EINVAL
) {
3673 dout(1) << get_err
.str() << dendl
;
3674 // Drop error, list what metadata we do have
3676 } else if (r
!= 0) {
3677 derr
<< "Unexpected error from get_mon_metadata: "
3678 << cpp_strerror(r
) << dendl
;
3679 ds
<< get_err
.str();
3689 } else if (prefix
== "mon versions") {
3691 f
.reset(Formatter::create("json-pretty"));
3692 count_metadata("ceph_version", f
.get());
3697 } else if (prefix
== "mon count-metadata") {
3699 f
.reset(Formatter::create("json-pretty"));
3701 cmd_getval(cmdmap
, "property", field
);
3702 count_metadata(field
, f
.get());
3707 } else if (prefix
== "quorum_status") {
3708 // make sure our map is readable and up to date
3709 if (!is_leader() && !is_peon()) {
3710 dout(10) << " waiting for quorum" << dendl
;
3711 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3714 _quorum_status(f
.get(), ds
);
3718 } else if (prefix
== "mon ok-to-stop") {
3720 if (!cmd_getval(cmdmap
, "ids", ids
)) {
3724 set
<string
> wouldbe
;
3725 for (auto rank
: quorum
) {
3726 wouldbe
.insert(monmap
->get_name(rank
));
3728 for (auto& n
: ids
) {
3729 if (monmap
->contains(n
)) {
3733 if (wouldbe
.size() < monmap
->min_quorum_size()) {
3735 rs
= "not enough monitors would be available (" + stringify(wouldbe
) +
3736 ") after stopping mons " + stringify(ids
);
3740 rs
= "quorum should be preserved (" + stringify(wouldbe
) +
3741 ") after stopping " + stringify(ids
);
3742 } else if (prefix
== "mon ok-to-add-offline") {
3743 if (quorum
.size() < monmap
->min_quorum_size(monmap
->size() + 1)) {
3744 rs
= "adding a monitor may break quorum (until that monitor starts)";
3748 rs
= "adding another mon that is not yet online will not break quorum";
3750 } else if (prefix
== "mon ok-to-rm") {
3752 if (!cmd_getval(cmdmap
, "id", id
)) {
3754 rs
= "must specify a monitor id";
3757 if (!monmap
->contains(id
)) {
3759 rs
= "mon." + id
+ " does not exist";
3762 int rank
= monmap
->get_rank(id
);
3763 if (quorum
.count(rank
) &&
3764 quorum
.size() - 1 < monmap
->min_quorum_size(monmap
->size() - 1)) {
3766 rs
= "removing mon." + id
+ " would break quorum";
3770 rs
= "safe to remove mon." + id
;
3771 } else if (prefix
== "version") {
3773 f
->open_object_section("version");
3774 f
->dump_string("version", pretty_version_to_str());
3778 ds
<< pretty_version_to_str();
3783 } else if (prefix
== "versions") {
3785 f
.reset(Formatter::create("json-pretty"));
3786 map
<string
,int> overall
;
3787 f
->open_object_section("version");
3788 map
<string
,int> mon
, mgr
, osd
, mds
;
3790 count_metadata("ceph_version", &mon
);
3791 f
->open_object_section("mon");
3792 for (auto& p
: mon
) {
3793 f
->dump_int(p
.first
.c_str(), p
.second
);
3794 overall
[p
.first
] += p
.second
;
3798 mgrmon()->count_metadata("ceph_version", &mgr
);
3799 f
->open_object_section("mgr");
3800 for (auto& p
: mgr
) {
3801 f
->dump_int(p
.first
.c_str(), p
.second
);
3802 overall
[p
.first
] += p
.second
;
3806 osdmon()->count_metadata("ceph_version", &osd
);
3807 f
->open_object_section("osd");
3808 for (auto& p
: osd
) {
3809 f
->dump_int(p
.first
.c_str(), p
.second
);
3810 overall
[p
.first
] += p
.second
;
3814 mdsmon()->count_metadata("ceph_version", &mds
);
3815 f
->open_object_section("mds");
3816 for (auto& p
: mds
) {
3817 f
->dump_int(p
.first
.c_str(), p
.second
);
3818 overall
[p
.first
] += p
.second
;
3822 for (auto& p
: mgrstatmon()->get_service_map().services
) {
3823 auto &service
= p
.first
;
3824 if (ServiceMap::is_normal_ceph_entity(service
)) {
3827 f
->open_object_section(service
.c_str());
3829 p
.second
.count_metadata("ceph_version", &m
);
3831 f
->dump_int(q
.first
.c_str(), q
.second
);
3832 overall
[q
.first
] += q
.second
;
3837 f
->open_object_section("overall");
3838 for (auto& p
: overall
) {
3839 f
->dump_int(p
.first
.c_str(), p
.second
);
3849 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3850 reply_command(op
, r
, rs
, rdata
, 0);
3853 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3856 reply_command(op
, rc
, rs
, rdata
, version
);
3859 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3860 bufferlist
& rdata
, version_t version
)
3862 auto m
= op
->get_req
<MMonCommand
>();
3863 ceph_assert(m
->get_type() == MSG_MON_COMMAND
);
3864 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3865 reply
->set_tid(m
->get_tid());
3866 reply
->set_data(rdata
);
3867 send_reply(op
, reply
);
3870 void Monitor::reply_tell_command(
3871 MonOpRequestRef op
, int rc
, const string
&rs
)
3873 MCommand
*m
= static_cast<MCommand
*>(op
->get_req());
3874 ceph_assert(m
->get_type() == MSG_COMMAND
);
3875 MCommandReply
*reply
= new MCommandReply(rc
, rs
);
3876 reply
->set_tid(m
->get_tid());
3877 m
->get_connection()->send_message(reply
);
3881 // ------------------------
3882 // request/reply routing
3884 // a client/mds/osd will connect to a random monitor. we need to forward any
3885 // messages requiring state updates to the leader, and then route any replies
3886 // back via the correct monitor and back to them. (the monitor will not
3887 // initiate any connections.)
3889 void Monitor::forward_request_leader(MonOpRequestRef op
)
3891 op
->mark_event(__func__
);
3893 int mon
= get_leader();
3894 MonSession
*session
= op
->get_session();
3895 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
3897 if (req
->get_source().is_mon() && req
->get_source_addrs() != messenger
->get_myaddrs()) {
3898 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
3899 } else if (session
->proxy_con
) {
3900 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
3901 } else if (!session
->closed
) {
3902 RoutedRequest
*rr
= new RoutedRequest
;
3903 rr
->tid
= ++routed_request_tid
;
3904 rr
->con
= req
->get_connection();
3905 rr
->con_features
= rr
->con
->get_features();
3906 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
3907 rr
->session
= static_cast<MonSession
*>(session
->get());
3909 routed_requests
[rr
->tid
] = rr
;
3910 session
->routed_request_tids
.insert(rr
->tid
);
3912 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
3913 << " features " << rr
->con_features
<< dendl
;
3915 MForward
*forward
= new MForward(rr
->tid
,
3919 forward
->set_priority(req
->get_priority());
3920 if (session
->auth_handler
) {
3921 forward
->entity_name
= session
->entity_name
;
3922 } else if (req
->get_source().is_mon()) {
3923 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
3925 send_mon_message(forward
, mon
);
3926 op
->mark_forwarded();
3927 ceph_assert(op
->get_req()->get_type() != 0);
3929 dout(10) << "forward_request no session for request " << *req
<< dendl
;
3933 // fake connection attached to forwarded messages
3934 struct AnonConnection
: public Connection
{
3935 entity_addr_t socket_addr
;
3937 int send_message(Message
*m
) override
{
3938 ceph_assert(!"send_message on anonymous connection");
3940 void send_keepalive() override
{
3941 ceph_assert(!"send_keepalive on anonymous connection");
3943 void mark_down() override
{
3946 void mark_disposable() override
{
3949 bool is_connected() override
{ return false; }
3950 entity_addr_t
get_peer_socket_addr() const override
{
3955 FRIEND_MAKE_REF(AnonConnection
);
3956 explicit AnonConnection(CephContext
*cct
, const entity_addr_t
& sa
)
3957 : Connection(cct
, nullptr),
3961 //extract the original message and put it into the regular dispatch function
3962 void Monitor::handle_forward(MonOpRequestRef op
)
3964 auto m
= op
->get_req
<MForward
>();
3965 dout(10) << "received forwarded message from "
3966 << ceph_entity_type_name(m
->client_type
)
3967 << " " << m
->client_addrs
3968 << " via " << m
->get_source_inst() << dendl
;
3969 MonSession
*session
= op
->get_session();
3970 ceph_assert(session
);
3972 if (!session
->is_capable("mon", MON_CAP_X
)) {
3973 dout(0) << "forward from entity with insufficient caps! "
3974 << session
->caps
<< dendl
;
3976 // see PaxosService::dispatch(); we rely on this being anon
3977 // (c->msgr == NULL)
3978 PaxosServiceMessage
*req
= m
->claim_message();
3979 ceph_assert(req
!= NULL
);
3981 auto c
= ceph::make_ref
<AnonConnection
>(cct
, m
->client_socket_addr
);
3982 MonSession
*s
= new MonSession(static_cast<Connection
*>(c
.get()));
3983 s
->_ident(req
->get_source(),
3984 req
->get_source_addrs());
3985 c
->set_priv(RefCountedPtr
{s
, false});
3986 c
->set_peer_addrs(m
->client_addrs
);
3987 c
->set_peer_type(m
->client_type
);
3988 c
->set_features(m
->con_features
);
3990 s
->authenticated
= true;
3991 s
->caps
= m
->client_caps
;
3992 dout(10) << " caps are " << s
->caps
<< dendl
;
3993 s
->entity_name
= m
->entity_name
;
3994 dout(10) << " entity name '" << s
->entity_name
<< "' type "
3995 << s
->entity_name
.get_type() << dendl
;
3996 s
->proxy_con
= m
->get_connection();
3997 s
->proxy_tid
= m
->tid
;
3999 req
->set_connection(c
);
4001 // not super accurate, but better than nothing.
4002 req
->set_recv_stamp(m
->get_recv_stamp());
4005 * note which election epoch this is; we will drop the message if
4006 * there is a future election since our peers will resend routed
4007 * requests in that case.
4009 req
->rx_election_epoch
= get_epoch();
4011 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
4014 // break the session <-> con ref loop by removing the con->session
4015 // reference, which is no longer needed once the MonOpRequest is
4021 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
4023 op
->mark_event(__func__
);
4025 MonSession
*session
= op
->get_session();
4026 ceph_assert(session
);
4027 Message
*req
= op
->get_req();
4028 ConnectionRef con
= op
->get_connection();
4030 reply
->set_cct(g_ceph_context
);
4031 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
4034 dout(2) << "send_reply no connection, dropping reply " << *reply
4035 << " to " << req
<< " " << *req
<< dendl
;
4037 op
->mark_event("reply: no connection");
4041 if (!session
->con
&& !session
->proxy_con
) {
4042 dout(2) << "send_reply no connection, dropping reply " << *reply
4043 << " to " << req
<< " " << *req
<< dendl
;
4045 op
->mark_event("reply: no connection");
4049 if (session
->proxy_con
) {
4050 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
4051 << " via " << session
->proxy_con
->get_peer_addr()
4052 << " for request " << *req
<< dendl
;
4053 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
4054 op
->mark_event("reply: send routed request");
4056 session
->con
->send_message(reply
);
4057 op
->mark_event("reply: send");
4061 void Monitor::no_reply(MonOpRequestRef op
)
4063 MonSession
*session
= op
->get_session();
4064 Message
*req
= op
->get_req();
4066 if (session
->proxy_con
) {
4067 dout(10) << "no_reply to " << req
->get_source_inst()
4068 << " via " << session
->proxy_con
->get_peer_addr()
4069 << " for request " << *req
<< dendl
;
4070 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
4071 op
->mark_event("no_reply: send routed request");
4073 dout(10) << "no_reply to " << req
->get_source_inst()
4074 << " " << *req
<< dendl
;
4075 op
->mark_event("no_reply");
4079 void Monitor::handle_route(MonOpRequestRef op
)
4081 auto m
= op
->get_req
<MRoute
>();
4082 MonSession
*session
= op
->get_session();
4084 if (!session
->is_capable("mon", MON_CAP_X
)) {
4085 dout(0) << "MRoute received from entity without appropriate perms! "
4090 dout(10) << "handle_route tid " << m
->session_mon_tid
<< " " << *m
->msg
4093 dout(10) << "handle_route tid " << m
->session_mon_tid
<< " null" << dendl
;
4096 if (m
->session_mon_tid
) {
4097 if (routed_requests
.count(m
->session_mon_tid
)) {
4098 RoutedRequest
*rr
= routed_requests
[m
->session_mon_tid
];
4100 // reset payload, in case encoding is dependent on target features
4102 m
->msg
->clear_payload();
4103 rr
->con
->send_message(m
->msg
);
4106 if (m
->send_osdmap_first
) {
4107 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
4108 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
4109 true, MonOpRequestRef());
4111 ceph_assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
4112 routed_requests
.erase(m
->session_mon_tid
);
4113 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
4116 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
4119 dout(10) << " not a routed request, ignoring" << dendl
;
4123 void Monitor::resend_routed_requests()
4125 dout(10) << "resend_routed_requests" << dendl
;
4126 int mon
= get_leader();
4127 list
<Context
*> retry
;
4128 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
4129 p
!= routed_requests
.end();
4131 RoutedRequest
*rr
= p
->second
;
4134 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
4135 rr
->op
->mark_event("retry routed request");
4136 retry
.push_back(new C_RetryMessage(this, rr
->op
));
4138 ceph_assert(rr
->session
->routed_request_tids
.count(p
->first
));
4139 rr
->session
->routed_request_tids
.erase(p
->first
);
4143 auto q
= rr
->request_bl
.cbegin();
4144 PaxosServiceMessage
*req
=
4145 (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
4146 rr
->op
->mark_event("resend forwarded message to leader");
4147 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
4149 MForward
*forward
= new MForward(rr
->tid
,
4153 req
->put(); // forward takes its own ref; drop ours.
4154 forward
->client_type
= rr
->con
->get_peer_type();
4155 forward
->client_addrs
= rr
->con
->get_peer_addrs();
4156 forward
->client_socket_addr
= rr
->con
->get_peer_socket_addr();
4157 forward
->set_priority(req
->get_priority());
4158 send_mon_message(forward
, mon
);
4162 routed_requests
.clear();
4163 finish_contexts(g_ceph_context
, retry
);
4167 void Monitor::remove_session(MonSession
*s
)
4169 dout(10) << "remove_session " << s
<< " " << s
->name
<< " " << s
->addrs
4170 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
4171 ceph_assert(s
->con
);
4172 ceph_assert(!s
->closed
);
4173 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
4174 p
!= s
->routed_request_tids
.end();
4176 ceph_assert(routed_requests
.count(*p
));
4177 RoutedRequest
*rr
= routed_requests
[*p
];
4178 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
4180 routed_requests
.erase(*p
);
4182 s
->routed_request_tids
.clear();
4183 s
->con
->set_priv(nullptr);
4184 session_map
.remove_session(s
);
4185 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4186 logger
->inc(l_mon_session_rm
);
4189 void Monitor::remove_all_sessions()
4191 std::lock_guard
l(session_map_lock
);
4192 while (!session_map
.sessions
.empty()) {
4193 MonSession
*s
= session_map
.sessions
.front();
4195 logger
->inc(l_mon_session_rm
);
4198 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4201 void Monitor::send_mon_message(Message
*m
, int rank
)
4203 messenger
->send_to_mon(m
, monmap
->get_addrs(rank
));
4206 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
4209 * Wait list the new session until we're in the quorum, assuming it's
4211 * tick() will periodically send them back through so we can send
4212 * the client elsewhere if we don't think we're getting back in.
4214 * But we whitelist a few sorts of messages:
4215 * 1) Monitors can talk to us at any time, of course.
4216 * 2) auth messages. It's unlikely to go through much faster, but
4217 * it's possible we've just lost our quorum status and we want to take...
4218 * 3) command messages. We want to accept these under all possible
4221 Message
*m
= op
->get_req();
4222 MonSession
*s
= op
->get_session();
4223 ConnectionRef con
= op
->get_connection();
4224 utime_t too_old
= ceph_clock_now();
4225 too_old
-= g_ceph_context
->_conf
->mon_lease
;
4226 if (m
->get_recv_stamp() > too_old
&&
4227 con
->is_connected()) {
4228 dout(5) << "waitlisting message " << *m
<< dendl
;
4229 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
4230 op
->mark_wait_for_quorum();
4232 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
4234 // proxied sessions aren't registered and don't have a con; don't remove
4236 if (!s
->proxy_con
) {
4237 std::lock_guard
l(session_map_lock
);
4244 void Monitor::_ms_dispatch(Message
*m
)
4246 if (is_shutdown()) {
4251 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
4252 bool src_is_mon
= op
->is_src_mon();
4253 op
->mark_event("mon:_ms_dispatch");
4254 MonSession
*s
= op
->get_session();
4255 if (s
&& s
->closed
) {
4259 if (src_is_mon
&& s
) {
4260 ConnectionRef con
= m
->get_connection();
4261 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
4262 // only update features if this is a non-anonymous connection
4263 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
4264 << " (was " << s
->con_features
4265 << ", now " << con
->get_features() << ")" << dendl
;
4266 // connection features changed - recreate session.
4267 if (s
->con
&& s
->con
!= con
) {
4268 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
4269 << " changed from session; mark down and replace" << dendl
;
4270 s
->con
->mark_down();
4272 if (s
->item
.is_on_list()) {
4273 // forwarded messages' sessions are not in the sessions map and
4274 // exist only while the op is being handled.
4275 std::lock_guard
l(session_map_lock
);
4283 // if the sender is not a monitor, make sure their first message for a
4284 // session is an MAuth. If it is not, assume it's a stray message,
4285 // and considering that we are creating a new session it is safe to
4286 // assume that the sender hasn't authenticated yet, so we have no way
4287 // of assessing whether we should handle it or not.
4288 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
4289 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4290 m
->get_type() != CEPH_MSG_PING
)) {
4291 dout(1) << __func__
<< " dropping stray message " << *m
4292 << " from " << m
->get_source_inst() << dendl
;
4296 ConnectionRef con
= m
->get_connection();
4298 std::lock_guard
l(session_map_lock
);
4299 s
= session_map
.new_session(m
->get_source(),
4300 m
->get_source_addrs(),
4304 con
->set_priv(RefCountedPtr
{s
, false});
4305 dout(10) << __func__
<< " new session " << s
<< " " << *s
4306 << " features 0x" << std::hex
4307 << s
->con_features
<< std::dec
<< dendl
;
4310 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4311 logger
->inc(l_mon_session_add
);
4314 // give it monitor caps; the peer type has been authenticated
4315 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4316 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4318 s
->authenticated
= true;
4321 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->name
4327 s
->session_timeout
= ceph_clock_now();
4328 s
->session_timeout
+= g_conf()->mon_session_timeout
;
4330 if (s
->auth_handler
) {
4331 s
->entity_name
= s
->auth_handler
->get_entity_name();
4333 dout(20) << " entity " << s
->entity_name
4334 << " caps " << s
->caps
.get_str() << dendl
;
4336 if ((is_synchronizing() ||
4337 (!s
->authenticated
&& !exited_quorum
.is_zero())) &&
4339 m
->get_type() != CEPH_MSG_PING
) {
4340 waitlist_or_zap_client(op
);
4347 void Monitor::dispatch_op(MonOpRequestRef op
)
4349 op
->mark_event("mon:dispatch_op");
4350 MonSession
*s
= op
->get_session();
4353 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4357 /* we will consider the default type as being 'monitor' until proven wrong */
4358 op
->set_type_monitor();
4359 /* deal with all messages that do not necessarily need caps */
4360 switch (op
->get_req()->get_type()) {
4362 case MSG_MON_GLOBAL_ID
:
4364 op
->set_type_service();
4365 /* no need to check caps here */
4366 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4373 op
->set_type_command();
4374 handle_tell_command(op
);
4378 if (!op
->get_session()->authenticated
) {
4379 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4380 << " is not authenticated, dropping " << *(op
->get_req())
4385 switch (op
->get_req()->get_type()) {
4386 case CEPH_MSG_MON_GET_MAP
:
4387 handle_mon_get_map(op
);
4390 case MSG_GET_CONFIG
:
4391 configmon()->handle_get_config(op
);
4394 case CEPH_MSG_MON_METADATA
:
4395 return handle_mon_metadata(op
);
4397 case CEPH_MSG_MON_SUBSCRIBE
:
4398 /* FIXME: check what's being subscribed, filter accordingly */
4399 handle_subscribe(op
);
4403 /* well, maybe the op belongs to a service... */
4404 op
->set_type_service();
4405 /* deal with all messages which caps should be checked somewhere else */
4406 switch (op
->get_req()->get_type()) {
4409 case CEPH_MSG_MON_GET_OSDMAP
:
4410 case CEPH_MSG_POOLOP
:
4411 case MSG_OSD_BEACON
:
4412 case MSG_OSD_MARK_ME_DOWN
:
4413 case MSG_OSD_MARK_ME_DEAD
:
4415 case MSG_OSD_FAILURE
:
4418 case MSG_OSD_PGTEMP
:
4419 case MSG_OSD_PG_CREATED
:
4420 case MSG_REMOVE_SNAPS
:
4421 case MSG_MON_GET_PURGED_SNAPS
:
4422 case MSG_OSD_PG_READY_TO_MERGE
:
4423 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4427 case MSG_MDS_BEACON
:
4428 case MSG_MDS_OFFLOAD_TARGETS
:
4429 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4433 case MSG_MGR_BEACON
:
4434 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4438 case MSG_MON_MGR_REPORT
:
4439 case CEPH_MSG_STATFS
:
4440 case MSG_GETPOOLSTATS
:
4441 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4446 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4449 // handle_command() does its own caps checking
4450 case MSG_MON_COMMAND
:
4451 op
->set_type_command();
4456 /* nop, looks like it's not a service message; revert back to monitor */
4457 op
->set_type_monitor();
4459 /* messages we, the Monitor class, need to deal with
4460 * but may be sent by clients. */
4462 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4463 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4464 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4469 switch (op
->get_req()->get_type()) {
4471 case CEPH_MSG_MON_GET_VERSION
:
4472 handle_get_version(op
);
4476 if (!op
->is_src_mon()) {
4477 dout(1) << __func__
<< " unexpected monitor message from"
4478 << " non-monitor entity " << op
->get_req()->get_source_inst()
4479 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4483 /* messages that should only be sent by another monitor */
4484 switch (op
->get_req()->get_type()) {
4494 // Sync (i.e., the new slurp, but on steroids)
4502 /* log acks are sent from a monitor we sent the MLog to, and are
4503 never sent by clients to us. */
4505 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4510 op
->set_type_service();
4511 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4517 op
->set_type_paxos();
4518 auto pm
= op
->get_req
<MMonPaxos
>();
4519 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4524 if (state
== STATE_SYNCHRONIZING
) {
4525 // we are synchronizing. These messages would do us no
4526 // good, thus just drop them and ignore them.
4527 dout(10) << __func__
<< " ignore paxos msg from "
4528 << pm
->get_source_inst() << dendl
;
4533 if (pm
->epoch
> get_epoch()) {
4537 if (pm
->epoch
!= get_epoch()) {
4541 paxos
->dispatch(op
);
4546 case MSG_MON_ELECTION
:
4547 op
->set_type_election();
4548 //check privileges here for simplicity
4549 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4550 dout(0) << "MMonElection received from entity without enough caps!"
4551 << op
->get_session()->caps
<< dendl
;
4554 if (!is_probing() && !is_synchronizing()) {
4555 elector
.dispatch(op
);
4564 dout(5) << __func__
<< " ignoring " << op
<< dendl
;
4566 case MSG_TIMECHECK2
:
4567 handle_timecheck(op
);
4570 case MSG_MON_HEALTH
:
4571 dout(5) << __func__
<< " dropping deprecated message: "
4572 << *op
->get_req() << dendl
;
4574 case MSG_MON_HEALTH_CHECKS
:
4575 op
->set_type_service();
4576 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4579 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4583 void Monitor::handle_ping(MonOpRequestRef op
)
4585 auto m
= op
->get_req
<MPing
>();
4586 dout(10) << __func__
<< " " << *m
<< dendl
;
4587 MPing
*reply
= new MPing
;
4589 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4590 f
->open_object_section("pong");
4592 healthmon()->get_health_status(false, f
.get(), nullptr);
4593 get_mon_status(f
.get());
4598 encode(ss
.str(), payload
);
4599 reply
->set_payload(payload
);
4600 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4601 m
->get_connection()->send_message(reply
);
4604 void Monitor::timecheck_start()
4606 dout(10) << __func__
<< dendl
;
4607 timecheck_cleanup();
4608 if (get_quorum_mon_features().contains_all(
4609 ceph::features::mon::FEATURE_NAUTILUS
)) {
4610 timecheck_start_round();
4614 void Monitor::timecheck_finish()
4616 dout(10) << __func__
<< dendl
;
4617 timecheck_cleanup();
4620 void Monitor::timecheck_start_round()
4622 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4623 ceph_assert(is_leader());
4625 if (monmap
->size() == 1) {
4626 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
4630 if (timecheck_round
% 2) {
4631 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4632 utime_t curr_time
= ceph_clock_now();
4633 double max
= g_conf()->mon_timecheck_interval
*3;
4634 if (curr_time
- timecheck_round_start
< max
) {
4635 dout(10) << __func__
<< " keep current round going" << dendl
;
4638 dout(10) << __func__
4639 << " finish current timecheck and start new" << dendl
;
4640 timecheck_cancel_round();
4644 ceph_assert(timecheck_round
% 2 == 0);
4647 timecheck_round_start
= ceph_clock_now();
4648 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4652 dout(10) << __func__
<< " setting up next event" << dendl
;
4653 timecheck_reset_event();
4656 void Monitor::timecheck_finish_round(bool success
)
4658 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4659 ceph_assert(timecheck_round
% 2);
4661 timecheck_round_start
= utime_t();
4664 ceph_assert(timecheck_waiting
.empty());
4665 ceph_assert(timecheck_acks
== quorum
.size());
4667 timecheck_check_skews();
4671 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4672 << " peers still waiting:";
4673 for (auto& p
: timecheck_waiting
) {
4674 *_dout
<< " mon." << p
.first
;
4677 timecheck_waiting
.clear();
4679 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4682 void Monitor::timecheck_cancel_round()
4684 timecheck_finish_round(false);
4687 void Monitor::timecheck_cleanup()
4689 timecheck_round
= 0;
4691 timecheck_round_start
= utime_t();
4693 if (timecheck_event
) {
4694 timer
.cancel_event(timecheck_event
);
4695 timecheck_event
= NULL
;
4697 timecheck_waiting
.clear();
4698 timecheck_skews
.clear();
4699 timecheck_latencies
.clear();
4701 timecheck_rounds_since_clean
= 0;
4704 void Monitor::timecheck_reset_event()
4706 if (timecheck_event
) {
4707 timer
.cancel_event(timecheck_event
);
4708 timecheck_event
= NULL
;
4712 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4714 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4715 delay
= cct
->_conf
->mon_timecheck_interval
;
4718 dout(10) << __func__
<< " delay " << delay
4719 << " rounds_since_clean " << timecheck_rounds_since_clean
4722 timecheck_event
= timer
.add_event_after(
4724 new C_MonContext
{this, [this](int) {
4725 timecheck_start_round();
4729 void Monitor::timecheck_check_skews()
4731 dout(10) << __func__
<< dendl
;
4732 ceph_assert(is_leader());
4733 ceph_assert((timecheck_round
% 2) == 0);
4734 if (monmap
->size() == 1) {
4735 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4738 ceph_assert(timecheck_latencies
.size() == timecheck_skews
.size());
4740 bool found_skew
= false;
4741 for (auto& p
: timecheck_skews
) {
4743 if (timecheck_has_skew(p
.second
, &abs_skew
)) {
4744 dout(10) << __func__
4745 << " " << p
.first
<< " skew " << abs_skew
<< dendl
;
4751 ++timecheck_rounds_since_clean
;
4752 timecheck_reset_event();
4753 } else if (timecheck_rounds_since_clean
> 0) {
4755 << " no clock skews found after " << timecheck_rounds_since_clean
4756 << " rounds" << dendl
;
4757 // make sure the skews are really gone and not just a transient success
4758 // this will run just once if not in the presence of skews again.
4759 timecheck_rounds_since_clean
= 1;
4760 timecheck_reset_event();
4761 timecheck_rounds_since_clean
= 0;
4766 void Monitor::timecheck_report()
4768 dout(10) << __func__
<< dendl
;
4769 ceph_assert(is_leader());
4770 ceph_assert((timecheck_round
% 2) == 0);
4771 if (monmap
->size() == 1) {
4772 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4776 ceph_assert(timecheck_latencies
.size() == timecheck_skews
.size());
4777 bool do_output
= true; // only output report once
4778 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4779 if (monmap
->get_name(*q
) == name
)
4782 MTimeCheck2
*m
= new MTimeCheck2(MTimeCheck2::OP_REPORT
);
4783 m
->epoch
= get_epoch();
4784 m
->round
= timecheck_round
;
4786 for (auto& it
: timecheck_skews
) {
4787 double skew
= it
.second
;
4788 double latency
= timecheck_latencies
[it
.first
];
4790 m
->skews
[it
.first
] = skew
;
4791 m
->latencies
[it
.first
] = latency
;
4794 dout(25) << __func__
<< " mon." << it
.first
4795 << " latency " << latency
4796 << " skew " << skew
<< dendl
;
4800 dout(10) << __func__
<< " send report to mon." << *q
<< dendl
;
4801 send_mon_message(m
, *q
);
4805 void Monitor::timecheck()
4807 dout(10) << __func__
<< dendl
;
4808 ceph_assert(is_leader());
4809 if (monmap
->size() == 1) {
4810 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4813 ceph_assert(timecheck_round
% 2 != 0);
4815 timecheck_acks
= 1; // we ack ourselves
4817 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4818 << " round " << timecheck_round
<< dendl
;
4820 // we are at the eye of the storm; the point of reference
4821 timecheck_skews
[rank
] = 0.0;
4822 timecheck_latencies
[rank
] = 0.0;
4824 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4825 if (monmap
->get_name(*it
) == name
)
4828 utime_t curr_time
= ceph_clock_now();
4829 timecheck_waiting
[*it
] = curr_time
;
4830 MTimeCheck2
*m
= new MTimeCheck2(MTimeCheck2::OP_PING
);
4831 m
->epoch
= get_epoch();
4832 m
->round
= timecheck_round
;
4833 dout(10) << __func__
<< " send " << *m
<< " to mon." << *it
<< dendl
;
4834 send_mon_message(m
, *it
);
4838 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
4839 const double skew_bound
,
4840 const double latency
)
4842 health_status_t status
= HEALTH_OK
;
4843 ceph_assert(latency
>= 0);
4846 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
4847 status
= HEALTH_WARN
;
4848 ss
<< "clock skew " << abs_skew
<< "s"
4849 << " > max " << g_conf()->mon_clock_drift_allowed
<< "s";
4855 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
4857 auto m
= op
->get_req
<MTimeCheck2
>();
4858 dout(10) << __func__
<< " " << *m
<< dendl
;
4859 /* handles PONG's */
4860 ceph_assert(m
->op
== MTimeCheck2::OP_PONG
);
4862 int other
= m
->get_source().num();
4863 if (m
->epoch
< get_epoch()) {
4864 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
4865 << " from " << other
4866 << " curr " << get_epoch()
4867 << " -- severely lagged? discard" << dendl
;
4870 ceph_assert(m
->epoch
== get_epoch());
4872 if (m
->round
< timecheck_round
) {
4873 dout(1) << __func__
<< " got old round " << m
->round
4874 << " from " << other
4875 << " curr " << timecheck_round
<< " -- discard" << dendl
;
4879 utime_t curr_time
= ceph_clock_now();
4881 ceph_assert(timecheck_waiting
.count(other
) > 0);
4882 utime_t timecheck_sent
= timecheck_waiting
[other
];
4883 timecheck_waiting
.erase(other
);
4884 if (curr_time
< timecheck_sent
) {
4885 // our clock was readjusted -- drop everything until it all makes sense.
4886 dout(1) << __func__
<< " our clock was readjusted --"
4887 << " bump round and drop current check"
4889 timecheck_cancel_round();
4893 /* update peer latencies */
4894 double latency
= (double)(curr_time
- timecheck_sent
);
4896 if (timecheck_latencies
.count(other
) == 0)
4897 timecheck_latencies
[other
] = latency
;
4899 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
4900 timecheck_latencies
[other
] = avg_latency
;
4906 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4907 * and 'a' happens to be lower than 'b'; so we use double instead.
4909 * latency is always expected to be >= 0.
4911 * delta, the difference between theirs timestamp and ours, may either be
4912 * lower or higher than 0; will hardly ever be 0.
4914 * The absolute skew is the absolute delta minus the latency, which is
4915 * taken as a whole instead of an rtt given that there is some queueing
4916 * and dispatch times involved and it's hard to assess how long exactly
4917 * it took for the message to travel to the other side and be handled. So
4918 * we call it a bounded skew, the worst case scenario.
4922 * Given that the latency is always positive, we can establish that the
4923 * bounded skew will be:
4925 * 1. positive if the absolute delta is higher than the latency and
4927 * 2. negative if the absolute delta is higher than the latency and
4928 * delta is negative.
4929 * 3. zero if the absolute delta is lower than the latency.
4931 * On 3. we make a judgement call and treat the skew as non-existent.
4932 * This is because that, if the absolute delta is lower than the
4933 * latency, then the apparently existing skew is nothing more than a
4934 * side-effect of the high latency at work.
4936 * This may not be entirely true though, as a severely skewed clock
4937 * may be masked by an even higher latency, but with high latencies
4938 * we probably have worse issues to deal with than just skewed clocks.
4940 ceph_assert(latency
>= 0);
4942 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
4943 double abs_delta
= (delta
> 0 ? delta
: -delta
);
4944 double skew_bound
= abs_delta
- latency
;
4948 skew_bound
= -skew_bound
;
4951 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
4952 if (status
!= HEALTH_OK
) {
4953 clog
->health(status
) << other
<< " " << ss
.str();
4956 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
4957 << " delta " << delta
<< " skew_bound " << skew_bound
4958 << " latency " << latency
<< dendl
;
4960 timecheck_skews
[other
] = skew_bound
;
4963 if (timecheck_acks
== quorum
.size()) {
4964 dout(10) << __func__
<< " got pongs from everybody ("
4965 << timecheck_acks
<< " total)" << dendl
;
4966 ceph_assert(timecheck_skews
.size() == timecheck_acks
);
4967 ceph_assert(timecheck_waiting
.empty());
4968 // everyone has acked, so bump the round to finish it.
4969 timecheck_finish_round();
4973 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
4975 auto m
= op
->get_req
<MTimeCheck2
>();
4976 dout(10) << __func__
<< " " << *m
<< dendl
;
4978 ceph_assert(is_peon());
4979 ceph_assert(m
->op
== MTimeCheck2::OP_PING
|| m
->op
== MTimeCheck2::OP_REPORT
);
4981 if (m
->epoch
!= get_epoch()) {
4982 dout(1) << __func__
<< " got wrong epoch "
4983 << "(ours " << get_epoch()
4984 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
4988 if (m
->round
< timecheck_round
) {
4989 dout(1) << __func__
<< " got old round " << m
->round
4990 << " current " << timecheck_round
4991 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
4995 timecheck_round
= m
->round
;
4997 if (m
->op
== MTimeCheck2::OP_REPORT
) {
4998 ceph_assert((timecheck_round
% 2) == 0);
4999 timecheck_latencies
.swap(m
->latencies
);
5000 timecheck_skews
.swap(m
->skews
);
5004 ceph_assert((timecheck_round
% 2) != 0);
5005 MTimeCheck2
*reply
= new MTimeCheck2(MTimeCheck2::OP_PONG
);
5006 utime_t curr_time
= ceph_clock_now();
5007 reply
->timestamp
= curr_time
;
5008 reply
->epoch
= m
->epoch
;
5009 reply
->round
= m
->round
;
5010 dout(10) << __func__
<< " send " << *m
5011 << " to " << m
->get_source_inst() << dendl
;
5012 m
->get_connection()->send_message(reply
);
5015 void Monitor::handle_timecheck(MonOpRequestRef op
)
5017 auto m
= op
->get_req
<MTimeCheck2
>();
5018 dout(10) << __func__
<< " " << *m
<< dendl
;
5021 if (m
->op
!= MTimeCheck2::OP_PONG
) {
5022 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
5024 handle_timecheck_leader(op
);
5026 } else if (is_peon()) {
5027 if (m
->op
!= MTimeCheck2::OP_PING
&& m
->op
!= MTimeCheck2::OP_REPORT
) {
5028 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
5030 handle_timecheck_peon(op
);
5033 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
5037 void Monitor::handle_subscribe(MonOpRequestRef op
)
5039 auto m
= op
->get_req
<MMonSubscribe
>();
5040 dout(10) << "handle_subscribe " << *m
<< dendl
;
5044 MonSession
*s
= op
->get_session();
5047 if (m
->hostname
.size()) {
5048 s
->remote_host
= m
->hostname
;
5051 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
5054 if (p
->first
== "monmap" || p
->first
== "config") {
5055 // these require no caps
5056 } else if (!s
->is_capable("mon", MON_CAP_R
)) {
5057 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
5058 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
5063 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5064 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
5067 // remove conflicting subscribes
5068 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
5069 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
5070 it
!= s
->sub_map
.end(); ) {
5071 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
5072 std::lock_guard
l(session_map_lock
);
5073 session_map
.remove_sub((it
++)->second
);
5081 std::lock_guard
l(session_map_lock
);
5082 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
5083 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
5084 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
5087 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
5088 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
5089 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
5090 Subscription
*sub
= s
->sub_map
[p
->first
];
5091 ceph_assert(sub
!= nullptr);
5092 mdsmon()->check_sub(sub
);
5094 } else if (p
->first
== "osdmap") {
5095 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
5096 if (s
->osd_epoch
> p
->second
.start
) {
5097 // client needs earlier osdmaps on purpose, so reset the sent epoch
5100 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
5102 } else if (p
->first
== "osd_pg_creates") {
5103 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
5104 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
5106 } else if (p
->first
== "monmap") {
5107 monmon()->check_sub(s
->sub_map
[p
->first
]);
5108 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
5109 logmon()->check_sub(s
->sub_map
[p
->first
]);
5110 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
5111 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
5112 } else if (p
->first
== "servicemap") {
5113 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
5114 } else if (p
->first
== "config") {
5115 configmon()->check_sub(s
);
5120 // we only need to reply if the client is old enough to think it
5121 // has to send renewals.
5122 ConnectionRef con
= m
->get_connection();
5123 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
5124 m
->get_connection()->send_message(new MMonSubscribeAck(
5125 monmap
->get_fsid(), (int)g_conf()->mon_subscribe_interval
));
5130 void Monitor::handle_get_version(MonOpRequestRef op
)
5132 auto m
= op
->get_req
<MMonGetVersion
>();
5133 dout(10) << "handle_get_version " << *m
<< dendl
;
5134 PaxosService
*svc
= NULL
;
5136 MonSession
*s
= op
->get_session();
5139 if (!is_leader() && !is_peon()) {
5140 dout(10) << " waiting for quorum" << dendl
;
5141 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
5145 if (m
->what
== "mdsmap") {
5147 } else if (m
->what
== "fsmap") {
5149 } else if (m
->what
== "osdmap") {
5151 } else if (m
->what
== "monmap") {
5154 derr
<< "invalid map type " << m
->what
<< dendl
;
5158 if (!svc
->is_readable()) {
5159 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
5163 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
5164 reply
->handle
= m
->handle
;
5165 reply
->version
= svc
->get_last_committed();
5166 reply
->oldest_version
= svc
->get_first_committed();
5167 reply
->set_tid(m
->get_tid());
5169 m
->get_connection()->send_message(reply
);
5175 bool Monitor::ms_handle_reset(Connection
*con
)
5177 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
5179 // ignore lossless monitor sessions
5180 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
5183 auto priv
= con
->get_priv();
5184 auto s
= static_cast<MonSession
*>(priv
.get());
5188 // break any con <-> session ref cycle
5189 s
->con
->set_priv(nullptr);
5194 std::lock_guard
l(lock
);
5196 dout(10) << "reset/close on session " << s
->name
<< " " << s
->addrs
<< dendl
;
5197 if (!s
->closed
&& s
->item
.is_on_list()) {
5198 std::lock_guard
l(session_map_lock
);
5204 bool Monitor::ms_handle_refused(Connection
*con
)
5206 // just log for now...
5207 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
5213 void Monitor::send_latest_monmap(Connection
*con
)
5216 monmap
->encode(bl
, con
->get_features());
5217 con
->send_message(new MMonMap(bl
));
5220 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
5222 auto m
= op
->get_req
<MMonGetMap
>();
5223 dout(10) << "handle_mon_get_map" << dendl
;
5224 send_latest_monmap(m
->get_connection().get());
5227 void Monitor::handle_mon_metadata(MonOpRequestRef op
)
5229 auto m
= op
->get_req
<MMonMetadata
>();
5231 dout(10) << __func__
<< dendl
;
5232 update_mon_metadata(m
->get_source().num(), std::move(m
->data
));
5236 void Monitor::update_mon_metadata(int from
, Metadata
&& m
)
5238 // NOTE: this is now for legacy (kraken or jewel) mons only.
5239 pending_metadata
[from
] = std::move(m
);
5241 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5243 encode(pending_metadata
, bl
);
5244 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5245 paxos
->trigger_propose();
5248 int Monitor::load_metadata()
5251 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5254 auto it
= bl
.cbegin();
5255 decode(mon_metadata
, it
);
5257 pending_metadata
= mon_metadata
;
5261 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
5264 if (!mon_metadata
.count(mon
)) {
5265 err
<< "mon." << mon
<< " not found";
5268 const Metadata
& m
= mon_metadata
[mon
];
5269 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
5270 f
->dump_string(p
->first
.c_str(), p
->second
);
5275 void Monitor::count_metadata(const string
& field
, map
<string
,int> *out
)
5277 for (auto& p
: mon_metadata
) {
5278 auto q
= p
.second
.find(field
);
5279 if (q
== p
.second
.end()) {
5280 (*out
)["unknown"]++;
5282 (*out
)[q
->second
]++;
5287 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5289 map
<string
,int> by_val
;
5290 count_metadata(field
, &by_val
);
5291 f
->open_object_section(field
.c_str());
5292 for (auto& p
: by_val
) {
5293 f
->dump_int(p
.first
.c_str(), p
.second
);
5298 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5300 map
<string
, list
<string
> > mons
; // hostname => mon
5301 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5302 it
!= mon_metadata
.end(); ++it
) {
5303 const Metadata
& m
= it
->second
;
5304 Metadata::const_iterator hostname
= m
.find("hostname");
5305 if (hostname
== m
.end()) {
5306 // not likely though
5309 mons
[hostname
->second
].push_back(monmap
->get_name(it
->first
));
5312 dump_services(f
, mons
, "mon");
5316 // ----------------------------------------------
5319 int Monitor::scrub_start()
5321 dout(10) << __func__
<< dendl
;
5322 ceph_assert(is_leader());
5324 if (!scrub_result
.empty()) {
5325 clog
->info() << "scrub already in progress";
5329 scrub_event_cancel();
5330 scrub_result
.clear();
5331 scrub_state
.reset(new ScrubState
);
5337 int Monitor::scrub()
5339 ceph_assert(is_leader());
5340 ceph_assert(scrub_state
);
5342 scrub_cancel_timeout();
5343 wait_for_paxos_write();
5344 scrub_version
= paxos
->get_version();
5347 // scrub all keys if we're the only monitor in the quorum
5349 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5351 for (set
<int>::iterator p
= quorum
.begin();
5356 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5358 r
->key
= scrub_state
->last_key
;
5359 send_mon_message(r
, *p
);
5363 bool r
= _scrub(&scrub_result
[rank
],
5364 &scrub_state
->last_key
,
5367 scrub_state
->finished
= !r
;
5369 // only after we got our scrub results do we really care whether the
5370 // other monitors are late on their results. Also, this way we avoid
5371 // triggering the timeout if we end up getting stuck in _scrub() for
5372 // longer than the duration of the timeout.
5373 scrub_reset_timeout();
5375 if (quorum
.size() == 1) {
5376 ceph_assert(scrub_state
->finished
== true);
5382 void Monitor::handle_scrub(MonOpRequestRef op
)
5384 auto m
= op
->get_req
<MMonScrub
>();
5385 dout(10) << __func__
<< " " << *m
<< dendl
;
5387 case MMonScrub::OP_SCRUB
:
5392 wait_for_paxos_write();
5394 if (m
->version
!= paxos
->get_version())
5397 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5401 reply
->key
= m
->key
;
5402 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5403 m
->get_connection()->send_message(reply
);
5407 case MMonScrub::OP_RESULT
:
5411 if (m
->version
!= scrub_version
)
5413 // reset the timeout each time we get a result
5414 scrub_reset_timeout();
5416 int from
= m
->get_source().num();
5417 ceph_assert(scrub_result
.count(from
) == 0);
5418 scrub_result
[from
] = m
->result
;
5420 if (scrub_result
.size() == quorum
.size()) {
5421 scrub_check_results();
5422 scrub_result
.clear();
5423 if (scrub_state
->finished
)
5433 bool Monitor::_scrub(ScrubResult
*r
,
5434 pair
<string
,string
> *start
,
5437 ceph_assert(r
!= NULL
);
5438 ceph_assert(start
!= NULL
);
5439 ceph_assert(num_keys
!= NULL
);
5441 set
<string
> prefixes
= get_sync_targets_names();
5442 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5444 dout(10) << __func__
<< " start (" << *start
<< ")"
5445 << " num_keys " << *num_keys
<< dendl
;
5447 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5449 int scrubbed_keys
= 0;
5450 pair
<string
,string
> last_key
;
5452 while (it
->has_next_chunk()) {
5454 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5457 pair
<string
,string
> k
= it
->get_next_key();
5458 if (prefixes
.count(k
.first
) == 0)
5461 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5462 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5463 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5469 int err
= store
->get(k
.first
, k
.second
, bl
);
5470 ceph_assert(err
== 0);
5472 uint32_t key_crc
= bl
.crc32c(0);
5473 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5474 << " crc " << key_crc
<< dendl
;
5475 r
->prefix_keys
[k
.first
]++;
5476 if (r
->prefix_crc
.count(k
.first
) == 0) {
5477 r
->prefix_crc
[k
.first
] = 0;
5479 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5481 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5482 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5483 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5484 r
->prefix_crc
[k
.first
] += 1;
5491 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5492 << " scrubbed_keys " << scrubbed_keys
5493 << " has_next " << it
->has_next_chunk() << dendl
;
5496 *num_keys
= scrubbed_keys
;
5498 return it
->has_next_chunk();
5501 void Monitor::scrub_check_results()
5503 dout(10) << __func__
<< dendl
;
5507 ScrubResult
& mine
= scrub_result
[rank
];
5508 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5509 p
!= scrub_result
.end();
5511 if (p
->first
== rank
)
5513 if (p
->second
!= mine
) {
5515 clog
->error() << "scrub mismatch";
5516 clog
->error() << " mon." << rank
<< " " << mine
;
5517 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5521 clog
->debug() << "scrub ok on " << quorum
<< ": " << mine
;
5524 inline void Monitor::scrub_timeout()
5526 dout(1) << __func__
<< " restarting scrub" << dendl
;
5531 void Monitor::scrub_finish()
5533 dout(10) << __func__
<< dendl
;
5535 scrub_event_start();
5538 void Monitor::scrub_reset()
5540 dout(10) << __func__
<< dendl
;
5541 scrub_cancel_timeout();
5543 scrub_result
.clear();
5544 scrub_state
.reset();
5547 inline void Monitor::scrub_update_interval(int secs
)
5549 // we don't care about changes if we are not the leader.
5550 // changes will be visible if we become the leader.
5554 dout(1) << __func__
<< " new interval = " << secs
<< dendl
;
5556 // if scrub already in progress, all changes will already be visible during
5557 // the next round. Nothing to do.
5558 if (scrub_state
!= NULL
)
5561 scrub_event_cancel();
5562 scrub_event_start();
5565 void Monitor::scrub_event_start()
5567 dout(10) << __func__
<< dendl
;
5570 scrub_event_cancel();
5572 if (cct
->_conf
->mon_scrub_interval
<= 0) {
5573 dout(1) << __func__
<< " scrub event is disabled"
5574 << " (mon_scrub_interval = " << cct
->_conf
->mon_scrub_interval
5579 scrub_event
= timer
.add_event_after(
5580 cct
->_conf
->mon_scrub_interval
,
5581 new C_MonContext
{this, [this](int) {
5586 void Monitor::scrub_event_cancel()
5588 dout(10) << __func__
<< dendl
;
5590 timer
.cancel_event(scrub_event
);
5595 inline void Monitor::scrub_cancel_timeout()
5597 if (scrub_timeout_event
) {
5598 timer
.cancel_event(scrub_timeout_event
);
5599 scrub_timeout_event
= NULL
;
5603 void Monitor::scrub_reset_timeout()
5605 dout(15) << __func__
<< " reset timeout event" << dendl
;
5606 scrub_cancel_timeout();
5607 scrub_timeout_event
= timer
.add_event_after(
5608 g_conf()->mon_scrub_timeout
,
5609 new C_MonContext
{this, [this](int) {
5614 /************ TICK ***************/
5615 void Monitor::new_tick()
5617 timer
.add_event_after(g_conf()->mon_tick_interval
, new C_MonContext
{this, [this](int) {
5622 void Monitor::tick()
5625 dout(11) << "tick" << dendl
;
5626 const utime_t now
= ceph_clock_now();
5628 // Check if we need to emit any delayed health check updated messages
5630 const auto min_period
= g_conf().get_val
<int64_t>(
5631 "mon_health_log_update_period");
5632 for (auto& svc
: paxos_service
) {
5633 auto health
= svc
->get_health_checks();
5635 for (const auto &i
: health
.checks
) {
5636 const std::string
&code
= i
.first
;
5637 const std::string
&summary
= i
.second
.summary
;
5638 const health_status_t severity
= i
.second
.severity
;
5640 auto status_iter
= health_check_log_times
.find(code
);
5641 if (status_iter
== health_check_log_times
.end()) {
5645 auto &log_status
= status_iter
->second
;
5646 bool const changed
= log_status
.last_message
!= summary
5647 || log_status
.severity
!= severity
;
5649 if (changed
&& now
- log_status
.updated_at
> min_period
) {
5650 log_status
.last_message
= summary
;
5651 log_status
.updated_at
= now
;
5652 log_status
.severity
= severity
;
5655 ss
<< "Health check update: " << summary
<< " (" << code
<< ")";
5656 clog
->health(severity
) << ss
.str();
5663 for (auto& svc
: paxos_service
) {
5670 std::lock_guard
l(session_map_lock
);
5671 auto p
= session_map
.sessions
.begin();
5673 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5674 now
> (exited_quorum
+ 2*g_conf()->mon_lease
));
5680 // don't trim monitors
5681 if (s
->name
.is_mon())
5684 if (s
->session_timeout
< now
&& s
->con
) {
5685 // check keepalive, too
5686 s
->session_timeout
= s
->con
->get_last_keepalive();
5687 s
->session_timeout
+= g_conf()->mon_session_timeout
;
5689 if (s
->session_timeout
< now
) {
5690 dout(10) << " trimming session " << s
->con
<< " " << s
->name
5692 << " (timeout " << s
->session_timeout
5693 << " < now " << now
<< ")" << dendl
;
5694 } else if (out_for_too_long
) {
5695 // boot the client Session because we've taken too long getting back in
5696 dout(10) << " trimming session " << s
->con
<< " " << s
->name
5697 << " because we've been out of quorum too long" << dendl
;
5702 s
->con
->mark_down();
5704 logger
->inc(l_mon_session_trim
);
5707 sync_trim_providers();
5709 if (!maybe_wait_for_quorum
.empty()) {
5710 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5713 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5714 // this is only necessary on upgraded clusters.
5715 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5716 prepare_new_fingerprint(t
);
5717 paxos
->trigger_propose();
5720 mgr_client
.update_daemon_health(get_health_metrics());
5724 vector
<DaemonHealthMetric
> Monitor::get_health_metrics()
5726 vector
<DaemonHealthMetric
> metrics
;
5728 utime_t oldest_secs
;
5729 const utime_t now
= ceph_clock_now();
5731 too_old
-= g_conf().get_val
<std::chrono::seconds
>("mon_op_complaint_time").count();
5733 TrackedOpRef oldest_op
;
5734 auto count_slow_ops
= [&](TrackedOp
& op
) {
5735 if (op
.get_initiated() < too_old
) {
5737 if (!oldest_op
|| op
.get_initiated() < oldest_op
->get_initiated()) {
5745 if (op_tracker
.visit_ops_in_flight(&oldest_secs
, count_slow_ops
)) {
5747 derr
<< __func__
<< " reporting " << slow
<< " slow ops, oldest is "
5748 << oldest_op
->get_desc() << dendl
;
5750 metrics
.emplace_back(daemon_metric::SLOW_OPS
, slow
, oldest_secs
);
5752 metrics
.emplace_back(daemon_metric::SLOW_OPS
, 0, 0);
5757 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5760 nf
.generate_random();
5761 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5765 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5768 int Monitor::check_fsid()
5771 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5774 ceph_assert(r
== 0);
5776 string
es(ebl
.c_str(), ebl
.length());
5778 // only keep the first line
5779 size_t pos
= es
.find_first_of('\n');
5780 if (pos
!= string::npos
)
5783 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5785 if (!ondisk
.parse(es
.c_str())) {
5786 derr
<< "error: unable to parse uuid" << dendl
;
5790 if (monmap
->get_fsid() != ondisk
) {
5791 derr
<< "error: cluster_uuid file exists with value " << ondisk
5792 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5799 int Monitor::write_fsid()
5801 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5803 int r
= store
->apply_transaction(t
);
5807 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5810 ss
<< monmap
->get_fsid() << "\n";
5811 string us
= ss
.str();
5816 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5821 * this is the closest thing to a traditional 'mkfs' for ceph.
5822 * initialize the monitor state machines to their initial values.
5824 int Monitor::mkfs(bufferlist
& osdmapbl
)
5826 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5828 // verify cluster fsid
5829 int r
= check_fsid();
5830 if (r
< 0 && r
!= -ENOENT
)
5834 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
5835 magicbl
.append("\n");
5836 t
->put(MONITOR_NAME
, "magic", magicbl
);
5839 features
= get_initial_supported_features();
5842 // save monmap, osdmap, keyring.
5843 bufferlist monmapbl
;
5844 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
5845 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5846 t
->put("mkfs", "monmap", monmapbl
);
5848 if (osdmapbl
.length()) {
5849 // make sure it's a valid osdmap
5852 om
.decode(osdmapbl
);
5854 catch (buffer::error
& e
) {
5855 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
5858 t
->put("mkfs", "osdmap", osdmapbl
);
5861 if (is_keyring_required()) {
5863 string keyring_filename
;
5865 r
= ceph_resolve_file_search(g_conf()->keyring
, keyring_filename
);
5867 derr
<< "unable to find a keyring file on " << g_conf()->keyring
5868 << ": " << cpp_strerror(r
) << dendl
;
5869 if (g_conf()->key
!= "") {
5870 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf()->key
+
5871 "\n\tcaps mon = \"allow *\"\n";
5873 bl
.append(keyring_plaintext
);
5875 auto i
= bl
.cbegin();
5876 keyring
.decode_plaintext(i
);
5878 catch (const buffer::error
& e
) {
5879 derr
<< "error decoding keyring " << keyring_plaintext
5880 << ": " << e
.what() << dendl
;
5887 r
= keyring
.load(g_ceph_context
, keyring_filename
);
5889 derr
<< "unable to load initial keyring " << g_conf()->keyring
<< dendl
;
5894 // put mon. key in external keyring; seed with everything else.
5895 extract_save_mon_key(keyring
);
5897 bufferlist keyringbl
;
5898 keyring
.encode_plaintext(keyringbl
);
5899 t
->put("mkfs", "keyring", keyringbl
);
5902 store
->apply_transaction(t
);
5907 int Monitor::write_default_keyring(bufferlist
& bl
)
5910 os
<< g_conf()->mon_data
<< "/keyring";
5913 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
|O_CLOEXEC
, 0600);
5916 dout(0) << __func__
<< " failed to open " << os
.str()
5917 << ": " << cpp_strerror(err
) << dendl
;
5921 err
= bl
.write_fd(fd
);
5924 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5929 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
5931 EntityName mon_name
;
5932 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
5934 if (keyring
.get_auth(mon_name
, mon_key
)) {
5935 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
5937 pkey
.add(mon_name
, mon_key
);
5939 pkey
.encode_plaintext(bl
);
5940 write_default_keyring(bl
);
5941 keyring
.remove(mon_name
);
5945 // AuthClient methods -- for mon <-> mon communication
5946 int Monitor::get_auth_request(
5948 AuthConnectionMeta
*auth_meta
,
5950 vector
<uint32_t> *preferred_modes
,
5953 std::scoped_lock
l(auth_lock
);
5954 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
&&
5955 con
->get_peer_type() != CEPH_ENTITY_TYPE_MGR
) {
5958 AuthAuthorizer
*auth
;
5959 if (!get_authorizer(con
->get_peer_type(), &auth
)) {
5962 auth_meta
->authorizer
.reset(auth
);
5963 auth_registry
.get_supported_modes(con
->get_peer_type(),
5966 *method
= auth
->protocol
;
5971 int Monitor::handle_auth_reply_more(
5973 AuthConnectionMeta
*auth_meta
,
5974 const bufferlist
& bl
,
5977 std::scoped_lock
l(auth_lock
);
5978 if (!auth_meta
->authorizer
) {
5979 derr
<< __func__
<< " no authorizer?" << dendl
;
5982 auth_meta
->authorizer
->add_challenge(cct
, bl
);
5983 *reply
= auth_meta
->authorizer
->bl
;
5987 int Monitor::handle_auth_done(
5989 AuthConnectionMeta
*auth_meta
,
5992 const bufferlist
& bl
,
5993 CryptoKey
*session_key
,
5994 std::string
*connection_secret
)
5996 std::scoped_lock
l(auth_lock
);
5997 // verify authorizer reply
5998 auto p
= bl
.begin();
5999 if (!auth_meta
->authorizer
->verify_reply(p
, connection_secret
)) {
6000 dout(0) << __func__
<< " failed verifying authorizer reply" << dendl
;
6003 auth_meta
->session_key
= auth_meta
->authorizer
->session_key
;
6007 int Monitor::handle_auth_bad_method(
6009 AuthConnectionMeta
*auth_meta
,
6010 uint32_t old_auth_method
,
6012 const std::vector
<uint32_t>& allowed_methods
,
6013 const std::vector
<uint32_t>& allowed_modes
)
6015 derr
<< __func__
<< " hmm, they didn't like " << old_auth_method
6016 << " result " << cpp_strerror(result
) << dendl
;
6020 bool Monitor::get_authorizer(int service_id
, AuthAuthorizer
**authorizer
)
6022 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id
)
6028 // we only connect to other monitors and mgr; every else connects to us.
6029 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
6030 service_id
!= CEPH_ENTITY_TYPE_MGR
)
6033 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
6035 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
6036 AuthNoneClientHandler handler
{g_ceph_context
};
6037 handler
.set_global_id(0);
6038 *authorizer
= handler
.build_authorizer(service_id
);
6042 CephXServiceTicketInfo auth_ticket_info
;
6043 CephXSessionAuthInfo info
;
6047 name
.set_type(CEPH_ENTITY_TYPE_MON
);
6048 auth_ticket_info
.ticket
.name
= name
;
6049 auth_ticket_info
.ticket
.global_id
= 0;
6051 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
6052 // mon to mon authentication uses the private monitor shared key and not the
6055 if (!keyring
.get_secret(name
, secret
) &&
6056 !key_server
.get_secret(name
, secret
)) {
6057 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6059 stringstream ss
, ds
;
6060 int err
= key_server
.list_secrets(ds
);
6062 ss
<< "no installed auth entries!";
6064 ss
<< "installed auth entries:";
6065 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
6069 ret
= key_server
.build_session_auth_info(
6070 service_id
, auth_ticket_info
.ticket
, info
, secret
, (uint64_t)-1);
6072 dout(0) << __func__
<< " failed to build mon session_auth_info "
6073 << cpp_strerror(ret
) << dendl
;
6076 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
6078 ret
= key_server
.build_session_auth_info(
6079 service_id
, auth_ticket_info
.ticket
, info
);
6081 derr
<< __func__
<< " failed to build mgr service session_auth_info "
6082 << cpp_strerror(ret
) << dendl
;
6086 ceph_abort(); // see check at top of fn
6089 CephXTicketBlob blob
;
6090 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
6091 dout(0) << "get_authorizer failed to build service ticket" << dendl
;
6094 bufferlist ticket_data
;
6095 encode(blob
, ticket_data
);
6097 auto iter
= ticket_data
.cbegin();
6098 CephXTicketHandler
handler(g_ceph_context
, service_id
);
6099 decode(handler
.ticket
, iter
);
6101 handler
.session_key
= info
.session_key
;
6103 *authorizer
= handler
.build_authorizer(0);
6108 int Monitor::handle_auth_request(
6110 AuthConnectionMeta
*auth_meta
,
6112 uint32_t auth_method
,
6113 const bufferlist
&payload
,
6116 std::scoped_lock
l(auth_lock
);
6118 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6119 // e.g., peer_features, peer_addrs, and others are still unknown.
6121 dout(10) << __func__
<< " con " << con
<< (more
? " (more)":" (start)")
6122 << " method " << auth_method
6123 << " payload " << payload
.length()
6125 if (!payload
.length()) {
6126 if (!con
->is_msgr2() &&
6127 con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
) {
6128 // for v1 connections, we tolerate no authorizer (from
6129 // non-monitors), because authentication happens via MAuth
6136 auth_meta
->auth_mode
= payload
[0];
6139 if (auth_meta
->auth_mode
>= AUTH_MODE_AUTHORIZER
&&
6140 auth_meta
->auth_mode
<= AUTH_MODE_AUTHORIZER_MAX
) {
6141 AuthAuthorizeHandler
*ah
= get_auth_authorize_handler(con
->get_peer_type(),
6144 lderr(cct
) << __func__
<< " no AuthAuthorizeHandler found for auth method "
6145 << auth_method
<< dendl
;
6148 bool was_challenge
= (bool)auth_meta
->authorizer_challenge
;
6149 bool isvalid
= ah
->verify_authorizer(
6153 auth_meta
->get_connection_secret_length(),
6156 &con
->peer_global_id
,
6157 &con
->peer_caps_info
,
6158 &auth_meta
->session_key
,
6159 &auth_meta
->connection_secret
,
6160 &auth_meta
->authorizer_challenge
);
6162 ms_handle_authentication(con
);
6165 if (!more
&& !was_challenge
&& auth_meta
->authorizer_challenge
) {
6168 dout(10) << __func__
<< " bad authorizer on " << con
<< dendl
;
6170 } else if (auth_meta
->auth_mode
< AUTH_MODE_MON
||
6171 auth_meta
->auth_mode
> AUTH_MODE_MON_MAX
) {
6172 derr
<< __func__
<< " unrecognized auth mode " << auth_meta
->auth_mode
6177 // wait until we've formed an initial quorum on mkfs so that we have
6178 // the initial keys (e.g., client.admin).
6179 if (authmon()->get_last_committed() == 0) {
6180 dout(10) << __func__
<< " haven't formed initial quorum, EBUSY" << dendl
;
6187 auto p
= payload
.begin();
6189 if (con
->get_priv()) {
6190 return -EACCES
; // wtf
6194 unique_ptr
<AuthServiceHandler
> auth_handler
{get_auth_service_handler(
6195 auth_method
, g_ceph_context
, &key_server
)};
6196 if (!auth_handler
) {
6197 dout(1) << __func__
<< " auth_method " << auth_method
<< " not supported"
6203 EntityName entity_name
;
6207 if (mode
< AUTH_MODE_MON
||
6208 mode
> AUTH_MODE_MON_MAX
) {
6209 dout(1) << __func__
<< " invalid mode " << (int)mode
<< dendl
;
6212 assert(mode
>= AUTH_MODE_MON
&& mode
<= AUTH_MODE_MON_MAX
);
6213 decode(entity_name
, p
);
6214 decode(con
->peer_global_id
, p
);
6215 } catch (buffer::error
& e
) {
6216 dout(1) << __func__
<< " failed to decode, " << e
.what() << dendl
;
6220 // supported method?
6221 if (entity_name
.get_type() == CEPH_ENTITY_TYPE_MON
||
6222 entity_name
.get_type() == CEPH_ENTITY_TYPE_OSD
||
6223 entity_name
.get_type() == CEPH_ENTITY_TYPE_MDS
||
6224 entity_name
.get_type() == CEPH_ENTITY_TYPE_MGR
) {
6225 if (!auth_cluster_required
.is_supported_auth(auth_method
)) {
6226 dout(10) << __func__
<< " entity " << entity_name
<< " method "
6227 << auth_method
<< " not among supported "
6228 << auth_cluster_required
.get_supported_set() << dendl
;
6232 if (!auth_service_required
.is_supported_auth(auth_method
)) {
6233 dout(10) << __func__
<< " entity " << entity_name
<< " method "
6234 << auth_method
<< " not among supported "
6235 << auth_cluster_required
.get_supported_set() << dendl
;
6240 // for msgr1 we would do some weirdness here to ensure signatures
6241 // are supported by the client if we require it. for msgr2 that
6242 // is not necessary.
6244 if (!con
->peer_global_id
) {
6245 con
->peer_global_id
= authmon()->_assign_global_id();
6246 if (!con
->peer_global_id
) {
6247 dout(1) << __func__
<< " failed to assign global_id" << dendl
;
6250 dout(10) << __func__
<< " assigned global_id " << con
->peer_global_id
6254 // set up partial session
6255 s
= new MonSession(con
);
6256 s
->auth_handler
= auth_handler
.release();
6257 con
->set_priv(RefCountedPtr
{s
, false});
6259 r
= s
->auth_handler
->start_session(
6261 auth_meta
->get_connection_secret_length(),
6263 &con
->peer_caps_info
,
6264 &auth_meta
->session_key
,
6265 &auth_meta
->connection_secret
);
6267 priv
= con
->get_priv();
6269 // this can happen if the async ms_handle_reset event races with
6270 // the unlocked call into handle_auth_request
6273 s
= static_cast<MonSession
*>(priv
.get());
6274 r
= s
->auth_handler
->handle_request(
6276 auth_meta
->get_connection_secret_length(),
6278 &con
->peer_global_id
,
6279 &con
->peer_caps_info
,
6280 &auth_meta
->session_key
,
6281 &auth_meta
->connection_secret
);
6284 !s
->authenticated
) {
6285 ms_handle_authentication(con
);
6288 dout(30) << " r " << r
<< " reply:\n";
6289 reply
->hexdump(*_dout
);
6294 void Monitor::ms_handle_accept(Connection
*con
)
6296 auto priv
= con
->get_priv();
6297 MonSession
*s
= static_cast<MonSession
*>(priv
.get());
6299 // legacy protocol v1?
6300 dout(10) << __func__
<< " con " << con
<< " no session" << dendl
;
6304 if (s
->item
.is_on_list()) {
6305 dout(10) << __func__
<< " con " << con
<< " session " << s
6306 << " already on list" << dendl
;
6308 dout(10) << __func__
<< " con " << con
<< " session " << s
6309 << " registering session for "
6310 << con
->get_peer_addrs() << dendl
;
6311 s
->_ident(entity_name_t(con
->get_peer_type(), con
->get_peer_id()),
6312 con
->get_peer_addrs());
6313 std::lock_guard
l(session_map_lock
);
6314 session_map
.add_session(s
);
6318 int Monitor::ms_handle_authentication(Connection
*con
)
6320 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
6321 // mon <-> mon connections need no Session, and setting one up
6322 // creates an awkward ref cycle between Session and Connection.
6326 auto priv
= con
->get_priv();
6327 MonSession
*s
= static_cast<MonSession
*>(priv
.get());
6329 // must be msgr2, otherwise dispatch would have set up the session.
6330 s
= session_map
.new_session(
6331 entity_name_t(con
->get_peer_type(), -1), // we don't know yet
6332 con
->get_peer_addrs(),
6335 dout(10) << __func__
<< " adding session " << s
<< " to con " << con
6338 logger
->set(l_mon_num_sessions
, session_map
.get_size());
6339 logger
->inc(l_mon_session_add
);
6341 dout(10) << __func__
<< " session " << s
<< " con " << con
6342 << " addr " << s
->con
->get_peer_addr()
6343 << " " << *s
<< dendl
;
6345 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
6347 if (caps_info
.allow_all
) {
6348 s
->caps
.set_allow_all();
6349 s
->authenticated
= true;
6351 } else if (caps_info
.caps
.length()) {
6352 bufferlist::const_iterator p
= caps_info
.caps
.cbegin();
6356 } catch (const buffer::error
&err
) {
6357 derr
<< __func__
<< " corrupt cap data for " << con
->get_peer_entity_name()
6358 << " in auth db" << dendl
;
6363 if (s
->caps
.parse(str
, NULL
)) {
6364 s
->authenticated
= true;
6367 derr
<< __func__
<< " unparseable caps '" << str
<< "' for "
6368 << con
->get_peer_entity_name() << dendl
;