1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
25 #include "common/version.h"
27 #include "osd/OSDMap.h"
29 #include "MonitorDBStore.h"
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
48 #include "messages/MStatfs.h"
50 #include "messages/MMonSubscribe.h"
51 #include "messages/MMonSubscribeAck.h"
53 #include "messages/MAuthReply.h"
55 #include "messages/MTimeCheck.h"
56 #include "messages/MPing.h"
58 #include "common/strtol.h"
59 #include "common/ceph_argparse.h"
60 #include "common/Timer.h"
61 #include "common/Clock.h"
62 #include "common/errno.h"
63 #include "common/perf_counters.h"
64 #include "common/admin_socket.h"
65 #include "global/signal_handler.h"
66 #include "common/Formatter.h"
67 #include "include/stringify.h"
68 #include "include/color.h"
69 #include "include/ceph_fs.h"
70 #include "include/str_list.h"
72 #include "OSDMonitor.h"
73 #include "MDSMonitor.h"
74 #include "MonmapMonitor.h"
75 #include "PGMonitor.h"
76 #include "LogMonitor.h"
77 #include "AuthMonitor.h"
78 #include "MgrMonitor.h"
79 #include "MgrStatMonitor.h"
80 #include "mon/QuorumService.h"
81 #include "mon/OldHealthMonitor.h"
82 #include "mon/HealthMonitor.h"
83 #include "mon/ConfigKeyService.h"
84 #include "common/config.h"
85 #include "common/cmdparse.h"
86 #include "include/assert.h"
87 #include "include/compat.h"
88 #include "perfglue/heap_profiler.h"
90 #include "auth/none/AuthNoneClientHandler.h"
92 #define dout_subsys ceph_subsys_mon
94 #define dout_prefix _prefix(_dout, this)
95 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
96 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
97 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
100 const string
Monitor::MONITOR_NAME
= "monitor";
101 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
106 #undef COMMAND_WITH_FLAG
107 #define FLAG(f) (MonCommand::FLAG_##f)
108 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
109 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
110 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
111 {parsesig, helptext, modulename, req_perms, avail, flags},
112 MonCommand mon_commands
[] = {
113 #include <mon/MonCommands.h>
115 MonCommand pgmonitor_commands
[] = {
116 #include <mon/PGMonitorCommands.h>
119 #undef COMMAND_WITH_FLAG
122 void C_MonContext::finish(int r
) {
123 if (mon
->is_shutdown())
125 FunctionContext::finish(r
);
128 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
129 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
134 con_self(m
? m
->get_loopback_connection() : NULL
),
135 lock("Monitor::lock"),
137 finisher(cct_
, "mon_finisher", "fin"),
138 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf
->mon_cpu_threads
),
139 has_ever_joined(false),
140 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
142 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
143 key_server(cct
, &keyring
),
144 auth_cluster_required(cct
,
145 cct
->_conf
->auth_supported
.empty() ?
146 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
147 auth_service_required(cct
,
148 cct
->_conf
->auth_supported
.empty() ?
149 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
150 mgr_messenger(mgr_m
),
151 mgr_client(cct_
, mgr_m
),
155 state(STATE_PROBING
),
158 required_features(0),
160 quorum_con_features(0),
164 scrub_timeout_event(NULL
),
167 sync_provider_count(0),
170 sync_start_version(0),
171 sync_timeout_event(NULL
),
172 sync_last_committed_floor(0),
176 timecheck_rounds_since_clean(0),
177 timecheck_event(NULL
),
179 paxos_service(PAXOS_NUM
),
181 routed_request_tid(0),
182 op_tracker(cct
, true, 1)
184 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
185 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
187 update_log_clients();
189 paxos
= new Paxos(this, "paxos");
191 paxos_service
[PAXOS_MDSMAP
] = new MDSMonitor(this, paxos
, "mdsmap");
192 paxos_service
[PAXOS_MONMAP
] = new MonmapMonitor(this, paxos
, "monmap");
193 paxos_service
[PAXOS_OSDMAP
] = new OSDMonitor(cct
, this, paxos
, "osdmap");
194 paxos_service
[PAXOS_PGMAP
] = new PGMonitor(this, paxos
, "pgmap");
195 paxos_service
[PAXOS_LOG
] = new LogMonitor(this, paxos
, "logm");
196 paxos_service
[PAXOS_AUTH
] = new AuthMonitor(this, paxos
, "auth");
197 paxos_service
[PAXOS_MGR
] = new MgrMonitor(this, paxos
, "mgr");
198 paxos_service
[PAXOS_MGRSTAT
] = new MgrStatMonitor(this, paxos
, "mgrstat");
199 paxos_service
[PAXOS_HEALTH
] = new HealthMonitor(this, paxos
, "health");
201 health_monitor
= new OldHealthMonitor(this);
202 config_key_service
= new ConfigKeyService(this, paxos
);
204 mon_caps
= new MonCap();
205 bool r
= mon_caps
->parse("allow *", NULL
);
208 exited_quorum
= ceph_clock_now();
210 // prepare local commands
211 local_mon_commands
.resize(ARRAY_SIZE(mon_commands
));
212 for (unsigned i
= 0; i
< ARRAY_SIZE(mon_commands
); ++i
) {
213 local_mon_commands
[i
] = mon_commands
[i
];
215 MonCommand::encode_vector(local_mon_commands
, local_mon_commands_bl
);
217 local_upgrading_mon_commands
= local_mon_commands
;
218 for (unsigned i
= 0; i
< ARRAY_SIZE(pgmonitor_commands
); ++i
) {
219 local_upgrading_mon_commands
.push_back(pgmonitor_commands
[i
]);
221 MonCommand::encode_vector(local_upgrading_mon_commands
,
222 local_upgrading_mon_commands_bl
);
224 // assume our commands until we have an election. this only means
225 // we won't reply with EINVAL before the election; any command that
226 // actually matters will wait until we have quorum etc and then
227 // retry (and revalidate).
228 leader_mon_commands
= local_mon_commands
;
230 // note: OSDMonitor may update this based on the luminous flag.
231 pgservice
= mgrstatmon()->get_pg_stat_service();
236 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
238 delete health_monitor
;
239 delete config_key_service
;
241 assert(session_map
.sessions
.empty());
246 class AdminHook
: public AdminSocketHook
{
249 explicit AdminHook(Monitor
*m
) : mon(m
) {}
250 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
251 bufferlist
& out
) override
{
253 mon
->do_admin_command(command
, cmdmap
, format
, ss
);
259 void Monitor::do_admin_command(string command
, cmdmap_t
& cmdmap
, string format
,
262 Mutex::Locker
l(lock
);
264 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
267 for (cmdmap_t::iterator p
= cmdmap
.begin();
268 p
!= cmdmap
.end(); ++p
) {
269 if (p
->first
== "prefix")
273 args
+= cmd_vartype_stringify(p
->second
);
275 args
= "[" + args
+ "]";
277 bool read_only
= (command
== "mon_status" ||
278 command
== "mon metadata" ||
279 command
== "quorum_status" ||
281 command
== "sessions");
283 (read_only
? audit_clog
->debug() : audit_clog
->info())
284 << "from='admin socket' entity='admin socket' "
285 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
287 if (command
== "mon_status") {
288 get_mon_status(f
.get(), ss
);
291 } else if (command
== "quorum_status") {
292 _quorum_status(f
.get(), ss
);
293 } else if (command
== "sync_force") {
295 if ((!cmd_getval(g_ceph_context
, cmdmap
, "validate", validate
)) ||
296 (validate
!= "--yes-i-really-mean-it")) {
297 ss
<< "are you SURE? this will mean the monitor store will be erased "
298 "the next time the monitor is restarted. pass "
299 "'--yes-i-really-mean-it' if you really do.";
302 sync_force(f
.get(), ss
);
303 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
304 if (!_add_bootstrap_peer_hint(command
, cmdmap
, ss
))
306 } else if (command
== "quorum enter") {
307 elector
.start_participating();
309 ss
<< "started responding to quorum, initiated new election";
310 } else if (command
== "quorum exit") {
312 elector
.stop_participating();
313 ss
<< "stopped responding to quorum, initiated new election";
314 } else if (command
== "ops") {
315 (void)op_tracker
.dump_ops_in_flight(f
.get());
319 } else if (command
== "sessions") {
322 f
->open_array_section("sessions");
323 for (auto p
: session_map
.sessions
) {
324 f
->dump_stream("session") << *p
;
331 assert(0 == "bad AdminSocket command binding");
333 (read_only
? audit_clog
->debug() : audit_clog
->info())
334 << "from='admin socket' "
335 << "entity='admin socket' "
336 << "cmd=" << command
<< " "
337 << "args=" << args
<< ": finished";
341 (read_only
? audit_clog
->debug() : audit_clog
->info())
342 << "from='admin socket' "
343 << "entity='admin socket' "
344 << "cmd=" << command
<< " "
345 << "args=" << args
<< ": aborted";
348 void Monitor::handle_signal(int signum
)
350 assert(signum
== SIGINT
|| signum
== SIGTERM
);
351 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
355 CompatSet
Monitor::get_initial_supported_features()
357 CompatSet::FeatureSet ceph_mon_feature_compat
;
358 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
359 CompatSet::FeatureSet ceph_mon_feature_incompat
;
360 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
361 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
362 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
363 ceph_mon_feature_incompat
);
366 CompatSet
Monitor::get_supported_features()
368 CompatSet compat
= get_initial_supported_features();
369 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
370 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
371 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
372 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
373 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
374 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
378 CompatSet
Monitor::get_legacy_features()
380 CompatSet::FeatureSet ceph_mon_feature_compat
;
381 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
382 CompatSet::FeatureSet ceph_mon_feature_incompat
;
383 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
384 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
385 ceph_mon_feature_incompat
);
388 int Monitor::check_features(MonitorDBStore
*store
)
390 CompatSet required
= get_supported_features();
393 read_features_off_disk(store
, &ondisk
);
395 if (!required
.writeable(ondisk
)) {
396 CompatSet diff
= required
.unsupported(ondisk
);
397 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
404 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
406 bufferlist featuresbl
;
407 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
408 if (featuresbl
.length() == 0) {
409 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
410 << "Assuming it is old-style and introducing one." << dendl
;
411 //we only want the baseline ~v.18 features assumed to be on disk.
412 //If new features are introduced this code needs to disappear or
414 *features
= get_legacy_features();
416 features
->encode(featuresbl
);
417 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
418 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
419 store
->apply_transaction(t
);
421 bufferlist::iterator it
= featuresbl
.begin();
422 features
->decode(it
);
426 void Monitor::read_features()
428 read_features_off_disk(store
, &features
);
429 dout(10) << "features " << features
<< dendl
;
431 calc_quorum_requirements();
432 dout(10) << "required_features " << required_features
<< dendl
;
435 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
439 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
442 const char** Monitor::get_tracked_conf_keys() const
444 static const char* KEYS
[] = {
445 "crushtool", // helpful for testing
446 "mon_election_timeout",
448 "mon_lease_renew_interval_factor",
449 "mon_lease_ack_timeout_factor",
450 "mon_accept_timeout_factor",
454 "clog_to_syslog_facility",
455 "clog_to_syslog_level",
457 "clog_to_graylog_host",
458 "clog_to_graylog_port",
461 // periodic health to clog
462 "mon_health_to_clog",
463 "mon_health_to_clog_interval",
464 "mon_health_to_clog_tick_interval",
466 "mon_scrub_interval",
472 void Monitor::handle_conf_change(const struct md_config_t
*conf
,
473 const std::set
<std::string
> &changed
)
477 dout(10) << __func__
<< " " << changed
<< dendl
;
479 if (changed
.count("clog_to_monitors") ||
480 changed
.count("clog_to_syslog") ||
481 changed
.count("clog_to_syslog_level") ||
482 changed
.count("clog_to_syslog_facility") ||
483 changed
.count("clog_to_graylog") ||
484 changed
.count("clog_to_graylog_host") ||
485 changed
.count("clog_to_graylog_port") ||
486 changed
.count("host") ||
487 changed
.count("fsid")) {
488 update_log_clients();
491 if (changed
.count("mon_health_to_clog") ||
492 changed
.count("mon_health_to_clog_interval") ||
493 changed
.count("mon_health_to_clog_tick_interval")) {
494 health_to_clog_update_conf(changed
);
497 if (changed
.count("mon_scrub_interval")) {
498 scrub_update_interval(conf
->mon_scrub_interval
);
502 void Monitor::update_log_clients()
504 map
<string
,string
> log_to_monitors
;
505 map
<string
,string
> log_to_syslog
;
506 map
<string
,string
> log_channel
;
507 map
<string
,string
> log_prio
;
508 map
<string
,string
> log_to_graylog
;
509 map
<string
,string
> log_to_graylog_host
;
510 map
<string
,string
> log_to_graylog_port
;
514 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
515 log_channel
, log_prio
, log_to_graylog
,
516 log_to_graylog_host
, log_to_graylog_port
,
520 clog
->update_config(log_to_monitors
, log_to_syslog
,
521 log_channel
, log_prio
, log_to_graylog
,
522 log_to_graylog_host
, log_to_graylog_port
,
525 audit_clog
->update_config(log_to_monitors
, log_to_syslog
,
526 log_channel
, log_prio
, log_to_graylog
,
527 log_to_graylog_host
, log_to_graylog_port
,
531 int Monitor::sanitize_options()
535 // mon_lease must be greater than mon_lease_renewal; otherwise we
536 // may incur in leases expiring before they are renewed.
537 if (g_conf
->mon_lease_renew_interval_factor
>= 1.0) {
538 clog
->error() << "mon_lease_renew_interval_factor ("
539 << g_conf
->mon_lease_renew_interval_factor
540 << ") must be less than 1.0";
544 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
545 // got time to renew the lease and get an ack for it. Having both options
546 // with the same value, for a given small vale, could mean timing out if
547 // the monitors happened to be overloaded -- or even under normal load for
548 // a small enough value.
549 if (g_conf
->mon_lease_ack_timeout_factor
<= 1.0) {
550 clog
->error() << "mon_lease_ack_timeout_factor ("
551 << g_conf
->mon_lease_ack_timeout_factor
552 << ") must be greater than 1.0";
559 int Monitor::preinit()
563 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
565 int r
= sanitize_options();
567 derr
<< "option sanitization failed!" << dendl
;
574 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
575 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess",
576 PerfCountersBuilder::PRIO_USEFUL
);
577 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions",
578 "sadd", PerfCountersBuilder::PRIO_INTERESTING
);
579 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions",
580 "srm", PerfCountersBuilder::PRIO_INTERESTING
);
581 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions",
582 "strm", PerfCountersBuilder::PRIO_USEFUL
);
583 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in",
584 "ecnt", PerfCountersBuilder::PRIO_USEFUL
);
585 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started",
586 "estt", PerfCountersBuilder::PRIO_INTERESTING
);
587 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won",
588 "ewon", PerfCountersBuilder::PRIO_INTERESTING
);
589 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost",
590 "elst", PerfCountersBuilder::PRIO_INTERESTING
);
591 logger
= pcb
.create_perf_counters();
592 cct
->get_perfcounters_collection()->add(logger
);
595 assert(!cluster_logger
);
597 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
598 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
599 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
600 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
601 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
602 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
603 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
604 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster");
605 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space");
606 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space");
607 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
608 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
609 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
610 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
611 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
612 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
613 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
614 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
615 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
616 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects");
617 pcb
.add_u64(l_cluster_num_mds_up
, "num_mds_up", "MDSs that are up");
618 pcb
.add_u64(l_cluster_num_mds_in
, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
619 pcb
.add_u64(l_cluster_num_mds_failed
, "num_mds_failed", "Failed MDS");
620 pcb
.add_u64(l_cluster_mds_epoch
, "mds_epoch", "Current epoch of MDS map");
621 cluster_logger
= pcb
.create_perf_counters();
624 paxos
->init_logger();
626 // verify cluster_uuid
628 int r
= check_fsid();
640 // have we ever joined a quorum?
641 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
642 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
644 if (!has_ever_joined
) {
645 // impose initial quorum restrictions?
646 list
<string
> initial_members
;
647 get_str_list(g_conf
->mon_initial_members
, initial_members
);
649 if (!initial_members
.empty()) {
650 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
652 monmap
->set_initial_members(g_ceph_context
, initial_members
, name
, messenger
->get_myaddr(),
655 dout(10) << " monmap is " << *monmap
<< dendl
;
656 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
658 } else if (!monmap
->contains(name
)) {
659 derr
<< "not in monmap and have been in a quorum before; "
660 << "must have been removed" << dendl
;
661 if (g_conf
->mon_force_quorum_join
) {
662 dout(0) << "we should have died but "
663 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
665 derr
<< "commit suicide!" << dendl
;
672 // We have a potentially inconsistent store state in hands. Get rid of it
674 bool clear_store
= false;
675 if (store
->exists("mon_sync", "in_sync")) {
676 dout(1) << __func__
<< " clean up potentially inconsistent store state"
681 if (store
->get("mon_sync", "force_sync") > 0) {
682 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
687 set
<string
> sync_prefixes
= get_sync_targets_names();
688 store
->clear(sync_prefixes
);
692 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
693 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
696 health_monitor
->init();
698 if (is_keyring_required()) {
699 // we need to bootstrap authentication keys so we can form an
701 if (authmon()->get_last_committed() == 0) {
702 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
704 int err
= store
->get("mkfs", "keyring", bl
);
705 if (err
== 0 && bl
.length() > 0) {
706 // Attempt to decode and extract keyring only if it is found.
708 bufferlist::iterator p
= bl
.begin();
709 ::decode(keyring
, p
);
710 extract_save_mon_key(keyring
);
714 string keyring_loc
= g_conf
->mon_data
+ "/keyring";
716 r
= keyring
.load(cct
, keyring_loc
);
719 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
721 if (key_server
.get_auth(mon_name
, mon_key
)) {
722 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
723 keyring
.add(mon_name
, mon_key
);
725 keyring
.encode_plaintext(bl
);
726 write_default_keyring(bl
);
728 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
735 admin_hook
= new AdminHook(this);
736 AdminSocket
* admin_socket
= cct
->get_admin_socket();
738 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
740 r
= admin_socket
->register_command("mon_status", "mon_status", admin_hook
,
741 "show current monitor status");
743 r
= admin_socket
->register_command("quorum_status", "quorum_status",
744 admin_hook
, "show current quorum status");
746 r
= admin_socket
->register_command("sync_force",
747 "sync_force name=validate,"
749 "strings=--yes-i-really-mean-it",
751 "force sync of and clear monitor store");
753 r
= admin_socket
->register_command("add_bootstrap_peer_hint",
754 "add_bootstrap_peer_hint name=addr,"
757 "add peer address as potential bootstrap"
758 " peer for cluster bringup");
760 r
= admin_socket
->register_command("quorum enter", "quorum enter",
762 "force monitor back into quorum");
764 r
= admin_socket
->register_command("quorum exit", "quorum exit",
766 "force monitor out of the quorum");
768 r
= admin_socket
->register_command("ops",
771 "show the ops currently in flight");
773 r
= admin_socket
->register_command("sessions",
776 "list existing sessions");
781 // add ourselves as a conf observer
782 g_conf
->add_observer(this);
790 dout(2) << "init" << dendl
;
791 Mutex::Locker
l(lock
);
802 messenger
->add_dispatcher_tail(this);
805 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
806 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
809 // add features of myself into feature_map
810 session_map
.feature_map
.add_mon(con_self
->get_features());
814 void Monitor::init_paxos()
816 dout(10) << __func__
<< dendl
;
820 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
821 paxos_service
[i
]->init();
824 refresh_from_paxos(NULL
);
827 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
829 dout(10) << __func__
<< dendl
;
832 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
835 bufferlist::iterator p
= bl
.begin();
836 ::decode(fingerprint
, p
);
838 catch (buffer::error
& e
) {
839 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
842 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
845 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
846 paxos_service
[i
]->refresh(need_bootstrap
);
848 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
849 paxos_service
[i
]->post_refresh();
854 void Monitor::register_cluster_logger()
856 if (!cluster_logger_registered
) {
857 dout(10) << "register_cluster_logger" << dendl
;
858 cluster_logger_registered
= true;
859 cct
->get_perfcounters_collection()->add(cluster_logger
);
861 dout(10) << "register_cluster_logger - already registered" << dendl
;
865 void Monitor::unregister_cluster_logger()
867 if (cluster_logger_registered
) {
868 dout(10) << "unregister_cluster_logger" << dendl
;
869 cluster_logger_registered
= false;
870 cct
->get_perfcounters_collection()->remove(cluster_logger
);
872 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
876 void Monitor::update_logger()
878 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
879 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
882 void Monitor::shutdown()
884 dout(1) << "shutdown" << dendl
;
888 wait_for_paxos_write();
890 state
= STATE_SHUTDOWN
;
892 g_conf
->remove_observer(this);
895 AdminSocket
* admin_socket
= cct
->get_admin_socket();
896 admin_socket
->unregister_command("mon_status");
897 admin_socket
->unregister_command("quorum_status");
898 admin_socket
->unregister_command("sync_force");
899 admin_socket
->unregister_command("add_bootstrap_peer_hint");
900 admin_socket
->unregister_command("quorum enter");
901 admin_socket
->unregister_command("quorum exit");
902 admin_socket
->unregister_command("ops");
903 admin_socket
->unregister_command("sessions");
910 mgr_client
.shutdown();
913 finisher
.wait_for_empty();
919 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
921 health_monitor
->shutdown();
923 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
924 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
930 remove_all_sessions();
933 cct
->get_perfcounters_collection()->remove(logger
);
937 if (cluster_logger
) {
938 if (cluster_logger_registered
)
939 cct
->get_perfcounters_collection()->remove(cluster_logger
);
940 delete cluster_logger
;
941 cluster_logger
= NULL
;
944 log_client
.shutdown();
946 // unlock before msgr shutdown...
949 messenger
->shutdown(); // last thing! ceph_mon.cc will delete mon.
950 mgr_messenger
->shutdown();
953 void Monitor::wait_for_paxos_write()
955 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
956 dout(10) << __func__
<< " flushing pending write" << dendl
;
960 dout(10) << __func__
<< " flushed pending write" << dendl
;
964 void Monitor::bootstrap()
966 dout(10) << "bootstrap" << dendl
;
967 wait_for_paxos_write();
969 sync_reset_requester();
970 unregister_cluster_logger();
971 cancel_probe_timeout();
974 int newrank
= monmap
->get_rank(messenger
->get_myaddr());
975 if (newrank
< 0 && rank
>= 0) {
976 // was i ever part of the quorum?
977 if (has_ever_joined
) {
978 dout(0) << " removed from monmap, suicide." << dendl
;
982 if (newrank
!= rank
) {
983 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
984 messenger
->set_myname(entity_name_t::MON(newrank
));
987 // reset all connections, or else our peers will think we are someone else.
988 messenger
->mark_down_all();
992 state
= STATE_PROBING
;
997 if (g_conf
->mon_compact_on_bootstrap
) {
998 dout(10) << "bootstrap -- triggering compaction" << dendl
;
1000 dout(10) << "bootstrap -- finished compaction" << dendl
;
1003 // singleton monitor?
1004 if (monmap
->size() == 1 && rank
== 0) {
1005 win_standalone_election();
1009 reset_probe_timeout();
1011 // i'm outside the quorum
1012 if (monmap
->contains(name
))
1013 outside_quorum
.insert(name
);
1016 dout(10) << "probing other monitors" << dendl
;
1017 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1019 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
),
1020 monmap
->get_inst(i
));
1022 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
1023 p
!= extra_probe_peers
.end();
1025 if (*p
!= messenger
->get_myaddr()) {
1027 i
.name
= entity_name_t::MON(-1);
1029 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
), i
);
1034 bool Monitor::_add_bootstrap_peer_hint(string cmd
, cmdmap_t
& cmdmap
, ostream
& ss
)
1037 if (!cmd_getval(g_ceph_context
, cmdmap
, "addr", addrstr
)) {
1038 ss
<< "unable to parse address string value '"
1039 << cmd_vartype_stringify(cmdmap
["addr"]) << "'";
1042 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' '"
1043 << addrstr
<< "'" << dendl
;
1046 const char *end
= 0;
1047 if (!addr
.parse(addrstr
.c_str(), &end
)) {
1048 ss
<< "failed to parse addr '" << addrstr
<< "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1052 if (is_leader() || is_peon()) {
1053 ss
<< "mon already active; ignoring bootstrap hint";
1057 if (addr
.get_port() == 0)
1058 addr
.set_port(CEPH_MON_PORT
);
1060 extra_probe_peers
.insert(addr
);
1061 ss
<< "adding peer " << addr
<< " to list: " << extra_probe_peers
;
1065 // called by bootstrap(), or on leader|peon -> electing
1066 void Monitor::_reset()
1068 dout(10) << __func__
<< dendl
;
1070 cancel_probe_timeout();
1072 health_events_cleanup();
1073 health_check_log_times
.clear();
1074 scrub_event_cancel();
1076 leader_since
= utime_t();
1077 if (!quorum
.empty()) {
1078 exited_quorum
= ceph_clock_now();
1081 outside_quorum
.clear();
1082 quorum_feature_map
.clear();
1088 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
1090 health_monitor
->finish();
1094 // -----------------------------------------------------------
1097 set
<string
> Monitor::get_sync_targets_names()
1099 set
<string
> targets
;
1100 targets
.insert(paxos
->get_name());
1101 for (int i
= 0; i
< PAXOS_NUM
; ++i
)
1102 paxos_service
[i
]->get_store_prefixes(targets
);
1103 ConfigKeyService
*config_key_service_ptr
= dynamic_cast<ConfigKeyService
*>(config_key_service
);
1104 assert(config_key_service_ptr
);
1105 config_key_service_ptr
->get_store_prefixes(targets
);
1110 void Monitor::sync_timeout()
1112 dout(10) << __func__
<< dendl
;
1113 assert(state
== STATE_SYNCHRONIZING
);
1117 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1119 dout(1) << __func__
<< dendl
;
1121 MonMap latest_monmap
;
1123 // Grab latest monmap from MonmapMonitor
1124 bufferlist monmon_bl
;
1125 int err
= monmon()->get_monmap(monmon_bl
);
1127 if (err
!= -ENOENT
) {
1129 << " something wrong happened while reading the store: "
1130 << cpp_strerror(err
) << dendl
;
1131 assert(0 == "error reading the store");
1134 latest_monmap
.decode(monmon_bl
);
1137 // Grab last backed up monmap (if any) and compare epochs
1138 if (store
->exists("mon_sync", "latest_monmap")) {
1139 bufferlist backup_bl
;
1140 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1143 << " something wrong happened while reading the store: "
1144 << cpp_strerror(err
) << dendl
;
1145 assert(0 == "error reading the store");
1147 assert(backup_bl
.length() > 0);
1149 MonMap backup_monmap
;
1150 backup_monmap
.decode(backup_bl
);
1152 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1153 latest_monmap
= backup_monmap
;
1156 // Check if our current monmap's epoch is greater than the one we've
1158 if (monmap
->epoch
> latest_monmap
.epoch
)
1159 latest_monmap
= *monmap
;
1161 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1163 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1166 void Monitor::sync_reset_requester()
1168 dout(10) << __func__
<< dendl
;
1170 if (sync_timeout_event
) {
1171 timer
.cancel_event(sync_timeout_event
);
1172 sync_timeout_event
= NULL
;
1175 sync_provider
= entity_inst_t();
1178 sync_start_version
= 0;
1181 void Monitor::sync_reset_provider()
1183 dout(10) << __func__
<< dendl
;
1184 sync_providers
.clear();
1187 void Monitor::sync_start(entity_inst_t
&other
, bool full
)
1189 dout(10) << __func__
<< " " << other
<< (full
? " full" : " recent") << dendl
;
1191 assert(state
== STATE_PROBING
||
1192 state
== STATE_SYNCHRONIZING
);
1193 state
= STATE_SYNCHRONIZING
;
1195 // make sure are not a provider for anyone!
1196 sync_reset_provider();
1201 // stash key state, and mark that we are syncing
1202 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1203 sync_stash_critical_state(t
);
1204 t
->put("mon_sync", "in_sync", 1);
1206 sync_last_committed_floor
= MAX(sync_last_committed_floor
, paxos
->get_version());
1207 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1208 << sync_last_committed_floor
<< dendl
;
1209 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1211 store
->apply_transaction(t
);
1213 assert(g_conf
->mon_sync_requester_kill_at
!= 1);
1215 // clear the underlying store
1216 set
<string
> targets
= get_sync_targets_names();
1217 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1218 store
->clear(targets
);
1220 // make sure paxos knows it has been reset. this prevents a
1221 // bootstrap and then different probe reply order from possibly
1222 // deciding a partial or no sync is needed.
1225 assert(g_conf
->mon_sync_requester_kill_at
!= 2);
1228 // assume 'other' as the leader. We will update the leader once we receive
1229 // a reply to the sync start.
1230 sync_provider
= other
;
1232 sync_reset_timeout();
1234 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1236 m
->last_committed
= paxos
->get_version();
1237 messenger
->send_message(m
, sync_provider
);
1240 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1242 dout(10) << __func__
<< dendl
;
1243 bufferlist backup_monmap
;
1244 sync_obtain_latest_monmap(backup_monmap
);
1245 assert(backup_monmap
.length() > 0);
1246 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1249 void Monitor::sync_reset_timeout()
1251 dout(10) << __func__
<< dendl
;
1252 if (sync_timeout_event
)
1253 timer
.cancel_event(sync_timeout_event
);
1254 sync_timeout_event
= timer
.add_event_after(
1255 g_conf
->mon_sync_timeout
,
1256 new C_MonContext(this, [this](int) {
1261 void Monitor::sync_finish(version_t last_committed
)
1263 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1265 assert(g_conf
->mon_sync_requester_kill_at
!= 7);
1268 // finalize the paxos commits
1269 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1270 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1272 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1274 dout(30) << __func__
<< " final tx dump:\n";
1275 JSONFormatter
f(true);
1280 store
->apply_transaction(tx
);
1283 assert(g_conf
->mon_sync_requester_kill_at
!= 8);
1285 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1286 t
->erase("mon_sync", "in_sync");
1287 t
->erase("mon_sync", "force_sync");
1288 t
->erase("mon_sync", "last_committed_floor");
1289 store
->apply_transaction(t
);
1291 assert(g_conf
->mon_sync_requester_kill_at
!= 9);
1295 assert(g_conf
->mon_sync_requester_kill_at
!= 10);
1300 void Monitor::handle_sync(MonOpRequestRef op
)
1302 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1303 dout(10) << __func__
<< " " << *m
<< dendl
;
1306 // provider ---------
1308 case MMonSync::OP_GET_COOKIE_FULL
:
1309 case MMonSync::OP_GET_COOKIE_RECENT
:
1310 handle_sync_get_cookie(op
);
1312 case MMonSync::OP_GET_CHUNK
:
1313 handle_sync_get_chunk(op
);
1316 // client -----------
1318 case MMonSync::OP_COOKIE
:
1319 handle_sync_cookie(op
);
1322 case MMonSync::OP_CHUNK
:
1323 case MMonSync::OP_LAST_CHUNK
:
1324 handle_sync_chunk(op
);
1326 case MMonSync::OP_NO_COOKIE
:
1327 handle_sync_no_cookie(op
);
1331 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1332 assert(0 == "unknown op");
1338 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1340 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1341 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1342 m
->get_connection()->send_message(reply
);
1345 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1347 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1348 if (is_synchronizing()) {
1349 _sync_reply_no_cookie(op
);
1353 assert(g_conf
->mon_sync_provider_kill_at
!= 1);
1355 // make sure they can understand us.
1356 if ((required_features
^ m
->get_connection()->get_features()) &
1357 required_features
) {
1358 dout(5) << " ignoring peer mon." << m
->get_source().num()
1359 << " has features " << std::hex
1360 << m
->get_connection()->get_features()
1361 << " but we require " << required_features
<< std::dec
<< dendl
;
1365 // make up a unique cookie. include election epoch (which persists
1366 // across restarts for the whole cluster) and a counter for this
1367 // process instance. there is no need to be unique *across*
1368 // monitors, though.
1369 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1370 assert(sync_providers
.count(cookie
) == 0);
1372 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1374 SyncProvider
& sp
= sync_providers
[cookie
];
1376 sp
.entity
= m
->get_source_inst();
1377 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1379 set
<string
> sync_targets
;
1380 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1382 sync_targets
= get_sync_targets_names();
1383 sp
.last_committed
= paxos
->get_version();
1384 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1386 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1388 // just catch up paxos
1389 sp
.last_committed
= m
->last_committed
;
1391 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1393 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1394 reply
->last_committed
= sp
.last_committed
;
1395 m
->get_connection()->send_message(reply
);
1398 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1400 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1401 dout(10) << __func__
<< " " << *m
<< dendl
;
1403 if (sync_providers
.count(m
->cookie
) == 0) {
1404 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1405 _sync_reply_no_cookie(op
);
1409 assert(g_conf
->mon_sync_provider_kill_at
!= 2);
1411 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1412 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1414 if (sp
.last_committed
< paxos
->get_first_committed() &&
1415 paxos
->get_first_committed() > 1) {
1416 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1417 << " < our fc " << paxos
->get_first_committed() << dendl
;
1418 sync_providers
.erase(m
->cookie
);
1419 _sync_reply_no_cookie(op
);
1423 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1424 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1426 int left
= g_conf
->mon_sync_max_payload_size
;
1427 while (sp
.last_committed
< paxos
->get_version() && left
> 0) {
1429 sp
.last_committed
++;
1431 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1434 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1435 left
-= bl
.length();
1436 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1439 reply
->last_committed
= sp
.last_committed
;
1441 if (sp
.full
&& left
> 0) {
1442 sp
.synchronizer
->get_chunk_tx(tx
, left
);
1443 sp
.last_key
= sp
.synchronizer
->get_last_key();
1444 reply
->last_key
= sp
.last_key
;
1447 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1448 sp
.last_committed
< paxos
->get_version()) {
1449 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1450 << " key " << sp
.last_key
<< dendl
;
1452 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1453 << " key " << sp
.last_key
<< dendl
;
1454 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1456 assert(g_conf
->mon_sync_provider_kill_at
!= 3);
1458 // clean up our local state
1459 sync_providers
.erase(sp
.cookie
);
1462 ::encode(*tx
, reply
->chunk_bl
);
1464 m
->get_connection()->send_message(reply
);
1469 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1471 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1472 dout(10) << __func__
<< " " << *m
<< dendl
;
1474 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1477 if (m
->get_source_inst() != sync_provider
) {
1478 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1481 sync_cookie
= m
->cookie
;
1482 sync_start_version
= m
->last_committed
;
1484 sync_reset_timeout();
1485 sync_get_next_chunk();
1487 assert(g_conf
->mon_sync_requester_kill_at
!= 3);
1490 void Monitor::sync_get_next_chunk()
1492 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1493 if (g_conf
->mon_inject_sync_get_chunk_delay
> 0) {
1494 dout(20) << __func__
<< " injecting delay of " << g_conf
->mon_inject_sync_get_chunk_delay
<< dendl
;
1495 usleep((long long)(g_conf
->mon_inject_sync_get_chunk_delay
* 1000000.0));
1497 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1498 messenger
->send_message(r
, sync_provider
);
1500 assert(g_conf
->mon_sync_requester_kill_at
!= 4);
1503 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1505 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1506 dout(10) << __func__
<< " " << *m
<< dendl
;
1508 if (m
->cookie
!= sync_cookie
) {
1509 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1512 if (m
->get_source_inst() != sync_provider
) {
1513 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1517 assert(state
== STATE_SYNCHRONIZING
);
1518 assert(g_conf
->mon_sync_requester_kill_at
!= 5);
1520 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1521 tx
->append_from_encoded(m
->chunk_bl
);
1523 dout(30) << __func__
<< " tx dump:\n";
1524 JSONFormatter
f(true);
1529 store
->apply_transaction(tx
);
1531 assert(g_conf
->mon_sync_requester_kill_at
!= 6);
1534 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1535 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1536 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1538 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1540 dout(30) << __func__
<< " tx dump:\n";
1541 JSONFormatter
f(true);
1546 store
->apply_transaction(tx
);
1547 paxos
->init(); // to refresh what we just wrote
1550 if (m
->op
== MMonSync::OP_CHUNK
) {
1551 sync_reset_timeout();
1552 sync_get_next_chunk();
1553 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1554 sync_finish(m
->last_committed
);
1558 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1560 dout(10) << __func__
<< dendl
;
1564 void Monitor::sync_trim_providers()
1566 dout(20) << __func__
<< dendl
;
1568 utime_t now
= ceph_clock_now();
1569 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1570 while (p
!= sync_providers
.end()) {
1571 if (now
> p
->second
.timeout
) {
1572 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
<< " for " << p
->second
.entity
<< dendl
;
1573 sync_providers
.erase(p
++);
1580 // ---------------------------------------------------
1583 void Monitor::cancel_probe_timeout()
1585 if (probe_timeout_event
) {
1586 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1587 timer
.cancel_event(probe_timeout_event
);
1588 probe_timeout_event
= NULL
;
1590 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1594 void Monitor::reset_probe_timeout()
1596 cancel_probe_timeout();
1597 probe_timeout_event
= new C_MonContext(this, [this](int r
) {
1600 double t
= g_conf
->mon_probe_timeout
;
1601 if (timer
.add_event_after(t
, probe_timeout_event
)) {
1602 dout(10) << "reset_probe_timeout " << probe_timeout_event
1603 << " after " << t
<< " seconds" << dendl
;
1605 probe_timeout_event
= nullptr;
1609 void Monitor::probe_timeout(int r
)
1611 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1612 assert(is_probing() || is_synchronizing());
1613 assert(probe_timeout_event
);
1614 probe_timeout_event
= NULL
;
1618 void Monitor::handle_probe(MonOpRequestRef op
)
1620 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1621 dout(10) << "handle_probe " << *m
<< dendl
;
1623 if (m
->fsid
!= monmap
->fsid
) {
1624 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1629 case MMonProbe::OP_PROBE
:
1630 handle_probe_probe(op
);
1633 case MMonProbe::OP_REPLY
:
1634 handle_probe_reply(op
);
1637 case MMonProbe::OP_MISSING_FEATURES
:
1638 derr
<< __func__
<< " missing features, have " << CEPH_FEATURES_ALL
1639 << ", required " << m
->required_features
1640 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
)
1646 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1648 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1650 dout(10) << "handle_probe_probe " << m
->get_source_inst() << *m
1651 << " features " << m
->get_connection()->get_features() << dendl
;
1652 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1654 dout(1) << " peer " << m
->get_source_addr() << " missing features "
1655 << missing
<< dendl
;
1656 if (m
->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY
)) {
1657 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1658 name
, has_ever_joined
);
1659 m
->required_features
= required_features
;
1660 m
->get_connection()->send_message(r
);
1665 if (!is_probing() && !is_synchronizing()) {
1666 // If the probing mon is way ahead of us, we need to re-bootstrap.
1667 // Normally we capture this case when we initially bootstrap, but
1668 // it is possible we pass those checks (we overlap with
1669 // quorum-to-be) but fail to join a quorum before it moves past
1670 // us. We need to be kicked back to bootstrap so we can
1671 // synchonize, not keep calling elections.
1672 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1673 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1674 << "ahead of us, re-bootstrapping" << dendl
;
1682 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
);
1685 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1686 r
->paxos_first_version
= paxos
->get_first_committed();
1687 r
->paxos_last_version
= paxos
->get_version();
1688 m
->get_connection()->send_message(r
);
1690 // did we discover a peer here?
1691 if (!monmap
->contains(m
->get_source_addr())) {
1692 dout(1) << " adding peer " << m
->get_source_addr()
1693 << " to list of hints" << dendl
;
1694 extra_probe_peers
.insert(m
->get_source_addr());
1701 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1703 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1704 dout(10) << "handle_probe_reply " << m
->get_source_inst() << *m
<< dendl
;
1705 dout(10) << " monmap is " << *monmap
<< dendl
;
1707 // discover name and addrs during probing or electing states.
1708 if (!is_probing() && !is_electing()) {
1712 // newer map, or they've joined a quorum and we haven't?
1714 monmap
->encode(mybl
, m
->get_connection()->get_features());
1715 // make sure it's actually different; the checks below err toward
1716 // taking the other guy's map, which could cause us to loop.
1717 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1718 MonMap
*newmap
= new MonMap
;
1719 newmap
->decode(m
->monmap_bl
);
1720 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1721 !has_ever_joined
)) {
1722 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1723 << ", mine was " << monmap
->get_epoch() << dendl
;
1725 monmap
->decode(m
->monmap_bl
);
1734 string peer_name
= monmap
->get_name(m
->get_source_addr());
1735 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
1736 dout(10) << " renaming peer " << m
->get_source_addr() << " "
1737 << peer_name
<< " -> " << m
->name
<< " in my monmap"
1739 monmap
->rename(peer_name
, m
->name
);
1741 if (is_electing()) {
1746 dout(10) << " peer name is " << peer_name
<< dendl
;
1749 // new initial peer?
1750 if (monmap
->get_epoch() == 0 &&
1751 monmap
->contains(m
->name
) &&
1752 monmap
->get_addr(m
->name
).is_blank_ip()) {
1753 dout(1) << " learned initial mon " << m
->name
<< " addr " << m
->get_source_addr() << dendl
;
1754 monmap
->set_addr(m
->name
, m
->get_source_addr());
1760 // end discover phase
1761 if (!is_probing()) {
1765 assert(paxos
!= NULL
);
1767 if (is_synchronizing()) {
1768 dout(10) << " currently syncing" << dendl
;
1772 entity_inst_t other
= m
->get_source_inst();
1774 if (m
->paxos_last_version
< sync_last_committed_floor
) {
1775 dout(10) << " peer paxos versions [" << m
->paxos_first_version
1776 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
1777 << sync_last_committed_floor
<< ", ignoring"
1780 if (paxos
->get_version() < m
->paxos_first_version
&&
1781 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
1782 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
1783 << "," << m
->paxos_last_version
<< "]"
1784 << " vs my version " << paxos
->get_version()
1785 << " (too far ahead)"
1787 cancel_probe_timeout();
1788 sync_start(other
, true);
1791 if (paxos
->get_version() + g_conf
->paxos_max_join_drift
< m
->paxos_last_version
) {
1792 dout(10) << " peer paxos last version " << m
->paxos_last_version
1793 << " vs my version " << paxos
->get_version()
1794 << " (too far ahead)"
1796 cancel_probe_timeout();
1797 sync_start(other
, false);
1802 // is there an existing quorum?
1803 if (m
->quorum
.size()) {
1804 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
1806 dout(10) << " peer paxos version " << m
->paxos_last_version
1807 << " vs my version " << paxos
->get_version()
1811 if (monmap
->contains(name
) &&
1812 !monmap
->get_addr(name
).is_blank_ip()) {
1813 // i'm part of the cluster; just initiate a new election
1816 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl
;
1817 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
1818 monmap
->get_inst(*m
->quorum
.begin()));
1821 if (monmap
->contains(m
->name
)) {
1822 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
1823 outside_quorum
.insert(m
->name
);
1825 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
1829 unsigned need
= monmap
->size() / 2 + 1;
1830 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
1831 if (outside_quorum
.size() >= need
) {
1832 if (outside_quorum
.count(name
)) {
1833 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
1836 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
1839 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
1844 void Monitor::join_election()
1846 dout(10) << __func__
<< dendl
;
1847 wait_for_paxos_write();
1849 state
= STATE_ELECTING
;
1851 logger
->inc(l_mon_num_elections
);
1854 void Monitor::start_election()
1856 dout(10) << "start_election" << dendl
;
1857 wait_for_paxos_write();
1859 state
= STATE_ELECTING
;
1861 logger
->inc(l_mon_num_elections
);
1862 logger
->inc(l_mon_election_call
);
1864 clog
->info() << "mon." << name
<< " calling monitor election";
1865 elector
.call_election();
1868 void Monitor::win_standalone_election()
1870 dout(1) << "win_standalone_election" << dendl
;
1872 // bump election epoch, in case the previous epoch included other
1873 // monitors; we need to be able to make the distinction.
1875 elector
.advance_epoch();
1877 rank
= monmap
->get_rank(name
);
1882 map
<int,Metadata
> metadata
;
1883 collect_metadata(&metadata
[0]);
1885 win_election(elector
.get_epoch(), q
,
1887 ceph::features::mon::get_supported(),
1891 const utime_t
& Monitor::get_leader_since() const
1893 assert(state
== STATE_LEADER
);
1894 return leader_since
;
1897 epoch_t
Monitor::get_epoch()
1899 return elector
.get_epoch();
1902 void Monitor::_finish_svc_election()
1904 assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
1906 for (auto p
: paxos_service
) {
1907 // we already called election_finished() on monmon(); avoid callig twice
1908 if (state
== STATE_LEADER
&& p
== monmon())
1910 p
->election_finished();
1914 void Monitor::win_election(epoch_t epoch
, set
<int>& active
, uint64_t features
,
1915 const mon_feature_t
& mon_features
,
1916 const map
<int,Metadata
>& metadata
)
1918 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
1919 << " features " << features
1920 << " mon_features " << mon_features
1922 assert(is_electing());
1923 state
= STATE_LEADER
;
1924 leader_since
= ceph_clock_now();
1927 quorum_con_features
= features
;
1928 quorum_mon_features
= mon_features
;
1929 pending_metadata
= metadata
;
1930 outside_quorum
.clear();
1932 clog
->info() << "mon." << name
<< " is new leader, mons " << get_quorum_names()
1933 << " in quorum (ranks " << quorum
<< ")";
1935 set_leader_commands(get_local_commands(mon_features
));
1937 paxos
->leader_init();
1938 // NOTE: tell monmap monitor first. This is important for the
1939 // bootstrap case to ensure that the very first paxos proposal
1940 // codifies the monmap. Otherwise any manner of chaos can ensue
1941 // when monitors are call elections or participating in a paxos
1942 // round without agreeing on who the participants are.
1943 monmon()->election_finished();
1944 _finish_svc_election();
1945 health_monitor
->start(epoch
);
1947 logger
->inc(l_mon_election_win
);
1949 // inject new metadata in first transaction.
1951 // include previous metadata for missing mons (that aren't part of
1952 // the current quorum).
1953 map
<int,Metadata
> m
= metadata
;
1954 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
1955 if (m
.count(rank
) == 0 &&
1956 mon_metadata
.count(rank
)) {
1957 m
[rank
] = mon_metadata
[rank
];
1961 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1962 // a new transaction immediately after the election finishes. We should
1963 // do that anyway for other reasons, though.
1964 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
1967 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
1971 if (monmap
->size() > 1 &&
1972 monmap
->get_epoch() > 0) {
1974 health_tick_start();
1976 // Freshen the health status before doing health_to_clog in case
1977 // our just-completed election changed the health
1978 healthmon()->wait_for_active_ctx(new FunctionContext([this](int r
){
1979 dout(20) << "healthmon now active" << dendl
;
1980 healthmon()->tick();
1981 if (healthmon()->is_proposing()) {
1982 dout(20) << __func__
<< " healthmon proposing, waiting" << dendl
;
1983 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
1985 assert(lock
.is_locked_by_me());
1986 do_health_to_clog_interval();
1990 do_health_to_clog_interval();
1994 scrub_event_start();
1998 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
2000 const mon_feature_t
& mon_features
)
2003 leader_since
= utime_t();
2006 outside_quorum
.clear();
2007 quorum_con_features
= features
;
2008 quorum_mon_features
= mon_features
;
2009 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
2010 << " quorum is " << quorum
<< " features are " << quorum_con_features
2011 << " mon_features are " << quorum_mon_features
2015 _finish_svc_election();
2016 health_monitor
->start(epoch
);
2018 logger
->inc(l_mon_election_lose
);
2022 if ((quorum_con_features
& CEPH_FEATURE_MON_METADATA
) &&
2023 !HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
)) {
2024 // for pre-luminous mons only
2026 collect_metadata(&sys_info
);
2027 messenger
->send_message(new MMonMetadata(sys_info
),
2028 monmap
->get_inst(get_leader()));
2032 void Monitor::collect_metadata(Metadata
*m
)
2034 collect_sys_info(m
, g_ceph_context
);
2035 (*m
)["addr"] = stringify(messenger
->get_myaddr());
2038 void Monitor::finish_election()
2040 apply_quorum_to_compatset_features();
2041 apply_monmap_to_compatset_features();
2043 exited_quorum
= utime_t();
2044 finish_contexts(g_ceph_context
, waitfor_quorum
);
2045 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2046 resend_routed_requests();
2048 register_cluster_logger();
2050 // am i named properly?
2051 string cur_name
= monmap
->get_name(messenger
->get_myaddr());
2052 if (cur_name
!= name
) {
2053 dout(10) << " renaming myself from " << cur_name
<< " -> " << name
<< dendl
;
2054 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
2055 monmap
->get_inst(*quorum
.begin()));
2059 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2061 if (new_features
.compare(features
) != 0) {
2062 CompatSet diff
= features
.unsupported(new_features
);
2063 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2064 features
= new_features
;
2066 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2068 store
->apply_transaction(t
);
2070 calc_quorum_requirements();
2074 void Monitor::apply_quorum_to_compatset_features()
2076 CompatSet
new_features(features
);
2077 if (quorum_con_features
& CEPH_FEATURE_OSD_ERASURE_CODES
) {
2078 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2080 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2081 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2083 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
) {
2084 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2086 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
) {
2087 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2089 dout(5) << __func__
<< dendl
;
2090 _apply_compatset_features(new_features
);
2093 void Monitor::apply_monmap_to_compatset_features()
2095 CompatSet
new_features(features
);
2096 mon_feature_t monmap_features
= monmap
->get_required_features();
2098 /* persistent monmap features may go into the compatset.
2099 * optional monmap features may not - why?
2100 * because optional monmap features may be set/unset by the admin,
2101 * and possibly by other means that haven't yet been thought out,
2102 * so we can't make the monitor enforce them on start - because they
2104 * this, of course, does not invalidate setting a compatset feature
2105 * for an optional feature - as long as you make sure to clean it up
2106 * once you unset it.
2108 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2109 assert(ceph::features::mon::get_persistent().contains_all(
2110 ceph::features::mon::FEATURE_KRAKEN
));
2111 // this feature should only ever be set if the quorum supports it.
2112 assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2113 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2115 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
2116 assert(ceph::features::mon::get_persistent().contains_all(
2117 ceph::features::mon::FEATURE_LUMINOUS
));
2118 // this feature should only ever be set if the quorum supports it.
2119 assert(HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
));
2120 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
2123 dout(5) << __func__
<< dendl
;
2124 _apply_compatset_features(new_features
);
2127 void Monitor::calc_quorum_requirements()
2129 required_features
= 0;
2132 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
)) {
2133 required_features
|= CEPH_FEATURE_OSD_ERASURE_CODES
;
2135 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2136 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2138 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
)) {
2139 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
;
2141 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
)) {
2142 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
;
2144 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2145 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2147 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
)) {
2148 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2152 if (monmap
->get_required_features().contains_all(
2153 ceph::features::mon::FEATURE_KRAKEN
)) {
2154 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2156 if (monmap
->get_required_features().contains_all(
2157 ceph::features::mon::FEATURE_LUMINOUS
)) {
2158 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2160 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2163 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2165 *fm
+= session_map
.feature_map
;
2166 for (auto id
: quorum
) {
2168 *fm
+= quorum_feature_map
[id
];
2173 void Monitor::sync_force(Formatter
*f
, ostream
& ss
)
2175 bool free_formatter
= false;
2178 // louzy/lazy hack: default to json if no formatter has been defined
2179 f
= new JSONFormatter();
2180 free_formatter
= true;
2183 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2184 sync_stash_critical_state(tx
);
2185 tx
->put("mon_sync", "force_sync", 1);
2186 store
->apply_transaction(tx
);
2188 f
->open_object_section("sync_force");
2189 f
->dump_int("ret", 0);
2190 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2191 f
->close_section(); // sync_force
2197 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2199 bool free_formatter
= false;
2202 // louzy/lazy hack: default to json if no formatter has been defined
2203 f
= new JSONFormatter();
2204 free_formatter
= true;
2206 f
->open_object_section("quorum_status");
2207 f
->dump_int("election_epoch", get_epoch());
2209 f
->open_array_section("quorum");
2210 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2211 f
->dump_int("mon", *p
);
2212 f
->close_section(); // quorum
2214 list
<string
> quorum_names
= get_quorum_names();
2215 f
->open_array_section("quorum_names");
2216 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2217 f
->dump_string("mon", *p
);
2218 f
->close_section(); // quorum_names
2220 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(*quorum
.begin()));
2222 f
->open_object_section("monmap");
2224 f
->close_section(); // monmap
2226 f
->close_section(); // quorum_status
2232 void Monitor::get_mon_status(Formatter
*f
, ostream
& ss
)
2234 bool free_formatter
= false;
2237 // louzy/lazy hack: default to json if no formatter has been defined
2238 f
= new JSONFormatter();
2239 free_formatter
= true;
2242 f
->open_object_section("mon_status");
2243 f
->dump_string("name", name
);
2244 f
->dump_int("rank", rank
);
2245 f
->dump_string("state", get_state_name());
2246 f
->dump_int("election_epoch", get_epoch());
2248 f
->open_array_section("quorum");
2249 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2250 f
->dump_int("mon", *p
);
2253 f
->close_section(); // quorum
2255 f
->open_object_section("features");
2256 f
->dump_stream("required_con") << required_features
;
2257 mon_feature_t req_mon_features
= get_required_mon_features();
2258 req_mon_features
.dump(f
, "required_mon");
2259 f
->dump_stream("quorum_con") << quorum_con_features
;
2260 quorum_mon_features
.dump(f
, "quorum_mon");
2261 f
->close_section(); // features
2263 f
->open_array_section("outside_quorum");
2264 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2265 f
->dump_string("mon", *p
);
2266 f
->close_section(); // outside_quorum
2268 f
->open_array_section("extra_probe_peers");
2269 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
2270 p
!= extra_probe_peers
.end();
2272 f
->dump_stream("peer") << *p
;
2273 f
->close_section(); // extra_probe_peers
2275 f
->open_array_section("sync_provider");
2276 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2277 p
!= sync_providers
.end();
2279 f
->dump_unsigned("cookie", p
->second
.cookie
);
2280 f
->dump_stream("entity") << p
->second
.entity
;
2281 f
->dump_stream("timeout") << p
->second
.timeout
;
2282 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2283 f
->dump_stream("last_key") << p
->second
.last_key
;
2287 if (is_synchronizing()) {
2288 f
->open_object_section("sync");
2289 f
->dump_stream("sync_provider") << sync_provider
;
2290 f
->dump_unsigned("sync_cookie", sync_cookie
);
2291 f
->dump_unsigned("sync_start_version", sync_start_version
);
2295 if (g_conf
->mon_sync_provider_kill_at
> 0)
2296 f
->dump_int("provider_kill_at", g_conf
->mon_sync_provider_kill_at
);
2297 if (g_conf
->mon_sync_requester_kill_at
> 0)
2298 f
->dump_int("requester_kill_at", g_conf
->mon_sync_requester_kill_at
);
2300 f
->open_object_section("monmap");
2304 f
->dump_object("feature_map", session_map
.feature_map
);
2305 f
->close_section(); // mon_status
2307 if (free_formatter
) {
2308 // flush formatter to ss and delete it iff we created the formatter
2315 // health status to clog
2317 void Monitor::health_tick_start()
2319 if (!cct
->_conf
->mon_health_to_clog
||
2320 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2323 dout(15) << __func__
<< dendl
;
2326 health_tick_event
= timer
.add_event_after(
2327 cct
->_conf
->mon_health_to_clog_tick_interval
,
2328 new C_MonContext(this, [this](int r
) {
2331 health_tick_start();
2335 void Monitor::health_tick_stop()
2337 dout(15) << __func__
<< dendl
;
2339 if (health_tick_event
) {
2340 timer
.cancel_event(health_tick_event
);
2341 health_tick_event
= NULL
;
2345 utime_t
Monitor::health_interval_calc_next_update()
2347 utime_t now
= ceph_clock_now();
2349 time_t secs
= now
.sec();
2350 int remainder
= secs
% cct
->_conf
->mon_health_to_clog_interval
;
2351 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2352 utime_t next
= utime_t(secs
+ adjustment
, 0);
2354 dout(20) << __func__
2355 << " now: " << now
<< ","
2356 << " next: " << next
<< ","
2357 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2363 void Monitor::health_interval_start()
2365 dout(15) << __func__
<< dendl
;
2367 if (!cct
->_conf
->mon_health_to_clog
||
2368 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2372 health_interval_stop();
2373 utime_t next
= health_interval_calc_next_update();
2374 health_interval_event
= new C_MonContext(this, [this](int r
) {
2377 do_health_to_clog_interval();
2379 if (!timer
.add_event_at(next
, health_interval_event
)) {
2380 health_interval_event
= nullptr;
2384 void Monitor::health_interval_stop()
2386 dout(15) << __func__
<< dendl
;
2387 if (health_interval_event
) {
2388 timer
.cancel_event(health_interval_event
);
2390 health_interval_event
= NULL
;
2393 void Monitor::health_events_cleanup()
2396 health_interval_stop();
2397 health_status_cache
.reset();
2400 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2402 dout(20) << __func__
<< dendl
;
2404 if (changed
.count("mon_health_to_clog")) {
2405 if (!cct
->_conf
->mon_health_to_clog
) {
2406 health_events_cleanup();
2408 if (!health_tick_event
) {
2409 health_tick_start();
2411 if (!health_interval_event
) {
2412 health_interval_start();
2417 if (changed
.count("mon_health_to_clog_interval")) {
2418 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2419 health_interval_stop();
2421 health_interval_start();
2425 if (changed
.count("mon_health_to_clog_tick_interval")) {
2426 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2429 health_tick_start();
2434 void Monitor::do_health_to_clog_interval()
2436 // outputting to clog may have been disabled in the conf
2437 // since we were scheduled.
2438 if (!cct
->_conf
->mon_health_to_clog
||
2439 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2442 dout(10) << __func__
<< dendl
;
2444 // do we have a cached value for next_clog_update? if not,
2445 // do we know when the last update was?
2447 do_health_to_clog(true);
2448 health_interval_start();
2451 void Monitor::do_health_to_clog(bool force
)
2453 // outputting to clog may have been disabled in the conf
2454 // since we were scheduled.
2455 if (!cct
->_conf
->mon_health_to_clog
||
2456 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2459 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2461 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2463 health_status_t level
= get_health_status(false, nullptr, &summary
);
2465 summary
== health_status_cache
.summary
&&
2466 level
== health_status_cache
.overall
)
2468 clog
->health(level
) << "overall " << summary
;
2469 health_status_cache
.summary
= summary
;
2470 health_status_cache
.overall
= level
;
2473 list
<string
> status
;
2474 health_status_t overall
= get_health(status
, NULL
, NULL
);
2475 dout(25) << __func__
2476 << (force
? " (force)" : "")
2479 string summary
= joinify(status
.begin(), status
.end(), string("; "));
2482 overall
== health_status_cache
.overall
&&
2483 !health_status_cache
.summary
.empty() &&
2484 health_status_cache
.summary
== summary
) {
2489 clog
->info() << summary
;
2491 health_status_cache
.overall
= overall
;
2492 health_status_cache
.summary
= summary
;
2496 health_status_t
Monitor::get_health_status(
2503 health_status_t r
= HEALTH_OK
;
2504 bool compat
= g_conf
->mon_health_preluminous_compat
;
2505 bool compat_warn
= g_conf
->get_val
<bool>("mon_health_preluminous_compat_warning");
2507 f
->open_object_section("health");
2508 f
->open_object_section("checks");
2512 string
*psummary
= f
? nullptr : &summary
;
2513 for (auto& svc
: paxos_service
) {
2514 r
= std::min(r
, svc
->get_health_checks().dump_summary(
2515 f
, psummary
, sep2
, want_detail
));
2520 f
->dump_stream("status") << r
;
2522 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2523 *plain
= stringify(r
);
2524 if (summary
.size()) {
2531 const std::string old_fields_message
= "'ceph health' JSON format has "
2532 "changed in luminous. If you see this your monitoring system is "
2533 "scraping the wrong fields. Disable this with 'mon health preluminous "
2534 "compat warning = false'";
2536 if (f
&& (compat
|| compat_warn
)) {
2537 health_status_t cr
= compat_warn
? min(HEALTH_WARN
, r
) : r
;
2538 f
->open_array_section("summary");
2540 f
->open_object_section("item");
2541 f
->dump_stream("severity") << HEALTH_WARN
;
2542 f
->dump_string("summary", old_fields_message
);
2546 for (auto& svc
: paxos_service
) {
2547 svc
->get_health_checks().dump_summary_compat(f
);
2551 f
->dump_stream("overall_status") << cr
;
2555 if (f
&& (compat
|| compat_warn
)) {
2556 f
->open_array_section("detail");
2558 f
->dump_string("item", old_fields_message
);
2562 for (auto& svc
: paxos_service
) {
2563 svc
->get_health_checks().dump_detail(f
, plain
, compat
);
2566 if (f
&& (compat
|| compat_warn
)) {
2576 void Monitor::log_health(
2577 const health_check_map_t
& updated
,
2578 const health_check_map_t
& previous
,
2579 MonitorDBStore::TransactionRef t
)
2581 if (!g_conf
->mon_health_to_clog
) {
2585 const utime_t now
= ceph_clock_now();
2587 // FIXME: log atomically as part of @t instead of using clog.
2588 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2589 << " previous " << previous
.checks
.size()
2591 const auto min_log_period
= g_conf
->get_val
<int64_t>(
2592 "mon_health_log_update_period");
2593 for (auto& p
: updated
.checks
) {
2594 auto q
= previous
.checks
.find(p
.first
);
2595 bool logged
= false;
2596 if (q
== previous
.checks
.end()) {
2599 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2601 clog
->health(p
.second
.severity
) << ss
.str();
2605 if (p
.second
.summary
!= q
->second
.summary
||
2606 p
.second
.severity
!= q
->second
.severity
) {
2608 auto status_iter
= health_check_log_times
.find(p
.first
);
2609 if (status_iter
!= health_check_log_times
.end()) {
2610 if (p
.second
.severity
== q
->second
.severity
&&
2611 now
- status_iter
->second
.updated_at
< min_log_period
) {
2612 // We already logged this recently and the severity is unchanged,
2613 // so skip emitting an update of the summary string.
2614 // We'll get an update out of tick() later if the check
2615 // is still failing.
2620 // summary or severity changed (ignore detail changes at this level)
2622 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2623 clog
->health(p
.second
.severity
) << ss
.str();
2628 // Record the time at which we last logged, so that we can check this
2629 // when considering whether/when to print update messages.
2631 auto iter
= health_check_log_times
.find(p
.first
);
2632 if (iter
== health_check_log_times
.end()) {
2633 health_check_log_times
.emplace(p
.first
, HealthCheckLogStatus(
2634 p
.second
.severity
, p
.second
.summary
, now
));
2636 iter
->second
= HealthCheckLogStatus(
2637 p
.second
.severity
, p
.second
.summary
, now
);
2641 for (auto& p
: previous
.checks
) {
2642 if (!updated
.checks
.count(p
.first
)) {
2645 if (p
.first
== "DEGRADED_OBJECTS") {
2646 clog
->info() << "All degraded objects recovered";
2647 } else if (p
.first
== "OSD_FLAGS") {
2648 clog
->info() << "OSD flags cleared";
2650 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2651 << p
.second
.summary
<< ")";
2654 if (health_check_log_times
.count(p
.first
)) {
2655 health_check_log_times
.erase(p
.first
);
2660 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2661 // We might be going into a fully healthy state, check
2663 bool any_checks
= false;
2664 for (auto& svc
: paxos_service
) {
2665 if (&(svc
->get_health_checks()) == &(previous
)) {
2666 // Ignore the ones we're clearing right now
2670 if (svc
->get_health_checks().checks
.size() > 0) {
2676 clog
->info() << "Cluster is now healthy";
2681 health_status_t
Monitor::get_health(list
<string
>& status
,
2682 bufferlist
*detailbl
,
2685 list
<pair
<health_status_t
,string
> > summary
;
2686 list
<pair
<health_status_t
,string
> > detail
;
2689 f
->open_object_section("health");
2691 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin();
2692 p
!= paxos_service
.end();
2694 PaxosService
*s
= *p
;
2695 s
->get_health(summary
, detailbl
? &detail
: NULL
, cct
);
2698 health_monitor
->get_health(summary
, (detailbl
? &detail
: NULL
));
2700 health_status_t overall
= HEALTH_OK
;
2701 if (!timecheck_skews
.empty()) {
2703 for (map
<entity_inst_t
,double>::iterator i
= timecheck_skews
.begin();
2704 i
!= timecheck_skews
.end(); ++i
) {
2705 entity_inst_t inst
= i
->first
;
2706 double skew
= i
->second
;
2707 double latency
= timecheck_latencies
[inst
];
2708 string name
= monmap
->get_name(inst
.addr
);
2710 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
2711 if (tcstatus
!= HEALTH_OK
) {
2712 if (overall
> tcstatus
)
2714 warns
.push_back(name
);
2715 ostringstream tmp_ss
;
2716 tmp_ss
<< "mon." << name
2717 << " addr " << inst
.addr
<< " " << tcss
.str()
2718 << " (latency " << latency
<< "s)";
2719 detail
.push_back(make_pair(tcstatus
, tmp_ss
.str()));
2722 if (!warns
.empty()) {
2724 ss
<< "clock skew detected on";
2725 while (!warns
.empty()) {
2726 ss
<< " mon." << warns
.front();
2731 status
.push_back(ss
.str());
2732 summary
.push_back(make_pair(HEALTH_WARN
, "Monitor clock skew detected "));
2737 f
->open_array_section("summary");
2738 if (!summary
.empty()) {
2739 while (!summary
.empty()) {
2740 if (overall
> summary
.front().first
)
2741 overall
= summary
.front().first
;
2742 status
.push_back(summary
.front().second
);
2744 f
->open_object_section("item");
2745 f
->dump_stream("severity") << summary
.front().first
;
2746 f
->dump_string("summary", summary
.front().second
);
2749 summary
.pop_front();
2757 status
.push_front(fss
.str());
2759 f
->dump_stream("overall_status") << overall
;
2762 f
->open_array_section("detail");
2763 while (!detail
.empty()) {
2765 f
->dump_string("item", detail
.front().second
);
2766 else if (detailbl
!= NULL
) {
2767 detailbl
->append(detail
.front().second
);
2768 detailbl
->append('\n');
2781 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
)
2784 f
->open_object_section("status");
2787 f
->dump_stream("fsid") << monmap
->get_fsid();
2788 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2789 get_health_status(false, f
, nullptr);
2791 list
<string
> health_str
;
2792 get_health(health_str
, nullptr, f
);
2794 f
->dump_unsigned("election_epoch", get_epoch());
2796 f
->open_array_section("quorum");
2797 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2798 f
->dump_int("rank", *p
);
2800 f
->open_array_section("quorum_names");
2801 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2802 f
->dump_string("id", monmap
->get_name(*p
));
2805 f
->open_object_section("monmap");
2808 f
->open_object_section("osdmap");
2809 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
2811 f
->open_object_section("pgmap");
2812 pgservice
->print_summary(f
, NULL
);
2814 f
->open_object_section("fsmap");
2815 mdsmon()->get_fsmap().print_summary(f
, NULL
);
2817 f
->open_object_section("mgrmap");
2818 mgrmon()->get_map().print_summary(f
, nullptr);
2821 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
2824 ss
<< " cluster:\n";
2825 ss
<< " id: " << monmap
->get_fsid() << "\n";
2828 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2829 get_health_status(false, nullptr, &health
,
2833 get_health(ls
, NULL
, f
);
2834 health
= joinify(ls
.begin(), ls
.end(),
2837 ss
<< " health: " << health
<< "\n";
2839 ss
<< "\n \n services:\n";
2842 auto& service_map
= mgrstatmon()->get_service_map();
2843 for (auto& p
: service_map
.services
) {
2844 maxlen
= std::max(maxlen
, p
.first
.size());
2846 string
spacing(maxlen
- 3, ' ');
2847 const auto quorum_names
= get_quorum_names();
2848 const auto mon_count
= monmap
->mon_info
.size();
2849 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
2851 if (quorum_names
.size() != mon_count
) {
2852 std::list
<std::string
> out_of_q
;
2853 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
2854 if (quorum
.count(i
) == 0) {
2855 out_of_q
.push_back(monmap
->ranks
[i
]);
2858 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
2859 out_of_q
.end(), std::string(", "));
2862 if (mgrmon()->in_use()) {
2863 ss
<< " mgr: " << spacing
;
2864 mgrmon()->get_map().print_summary(nullptr, &ss
);
2867 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2868 ss
<< " mds: " << spacing
<< mdsmon()->get_fsmap() << "\n";
2870 ss
<< " osd: " << spacing
;
2871 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
2873 for (auto& p
: service_map
.services
) {
2874 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
2875 << p
.second
.get_summary() << "\n";
2879 ss
<< "\n \n data:\n";
2880 pgservice
->print_summary(NULL
, &ss
);
2885 void Monitor::_generate_command_map(map
<string
,cmd_vartype
>& cmdmap
,
2886 map
<string
,string
> ¶m_str_map
)
2888 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
2889 p
!= cmdmap
.end(); ++p
) {
2890 if (p
->first
== "prefix")
2892 if (p
->first
== "caps") {
2894 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
2895 cv
.size() % 2 == 0) {
2896 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
2897 string k
= string("caps_") + cv
[i
];
2898 param_str_map
[k
] = cv
[i
+ 1];
2903 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
2907 const MonCommand
*Monitor::_get_moncommand(
2908 const string
&cmd_prefix
,
2909 const vector
<MonCommand
>& cmds
)
2911 for (auto& c
: cmds
) {
2912 if (c
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
2919 bool Monitor::_allowed_command(MonSession
*s
, string
&module
, string
&prefix
,
2920 const map
<string
,cmd_vartype
>& cmdmap
,
2921 const map
<string
,string
>& param_str_map
,
2922 const MonCommand
*this_cmd
) {
2924 bool cmd_r
= this_cmd
->requires_perm('r');
2925 bool cmd_w
= this_cmd
->requires_perm('w');
2926 bool cmd_x
= this_cmd
->requires_perm('x');
2928 bool capable
= s
->caps
.is_capable(
2930 CEPH_ENTITY_TYPE_MON
,
2932 module
, prefix
, param_str_map
,
2933 cmd_r
, cmd_w
, cmd_x
);
2935 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
2939 void Monitor::format_command_descriptions(const std::vector
<MonCommand
> &commands
,
2945 f
->open_object_section("command_descriptions");
2946 for (const auto &cmd
: commands
) {
2947 unsigned flags
= cmd
.flags
;
2948 if (hide_mgr_flag
) {
2949 flags
&= ~MonCommand::FLAG_MGR
;
2951 ostringstream secname
;
2952 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
2953 dump_cmddesc_to_json(f
, secname
.str(),
2954 cmd
.cmdstring
, cmd
.helpstring
, cmd
.module
,
2955 cmd
.req_perms
, cmd
.availability
, flags
);
2958 f
->close_section(); // command_descriptions
2963 bool Monitor::is_keyring_required()
2965 string auth_cluster_required
= g_conf
->auth_supported
.empty() ?
2966 g_conf
->auth_cluster_required
: g_conf
->auth_supported
;
2967 string auth_service_required
= g_conf
->auth_supported
.empty() ?
2968 g_conf
->auth_service_required
: g_conf
->auth_supported
;
2970 return auth_service_required
== "cephx" ||
2971 auth_cluster_required
== "cephx";
2974 struct C_MgrProxyCommand
: public Context
{
2980 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
2981 : mon(mon
), op(op
), size(s
) { }
2982 void finish(int r
) {
2983 Mutex::Locker
l(mon
->lock
);
2984 mon
->mgr_proxy_bytes
-= size
;
2985 mon
->reply_command(op
, r
, outs
, outbl
, 0);
2989 void Monitor::handle_command(MonOpRequestRef op
)
2991 assert(op
->is_type_command());
2992 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
2993 if (m
->fsid
!= monmap
->fsid
) {
2994 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
2995 reply_command(op
, -EPERM
, "wrong fsid", 0);
2999 MonSession
*session
= static_cast<MonSession
*>(
3000 m
->get_connection()->get_priv());
3002 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
3005 BOOST_SCOPE_EXIT_ALL(=) {
3009 if (m
->cmd
.empty()) {
3010 string rs
= "No command supplied";
3011 reply_command(op
, -EINVAL
, rs
, 0);
3016 vector
<string
> fullcmd
;
3017 map
<string
, cmd_vartype
> cmdmap
;
3018 stringstream ss
, ds
;
3022 rs
= "unrecognized command";
3024 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3025 // ss has reason for failure
3028 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3029 reply_command(op
, r
, rs
, 0);
3033 // check return value. If no prefix parameter provided,
3034 // return value will be false, then return error info.
3035 if (!cmd_getval(g_ceph_context
, cmdmap
, "prefix", prefix
)) {
3036 reply_command(op
, -EINVAL
, "command prefix not found", 0);
3040 // check prefix is empty
3041 if (prefix
.empty()) {
3042 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
3046 if (prefix
== "get_command_descriptions") {
3048 Formatter
*f
= Formatter::create("json");
3049 // hide mgr commands until luminous upgrade is complete
3050 bool hide_mgr_flag
=
3051 osdmon()->osdmap
.require_osd_release
< CEPH_RELEASE_LUMINOUS
;
3053 std::vector
<MonCommand
> commands
;
3055 // only include mgr commands once all mons are upgrade (and we've dropped
3056 // the hard-coded PGMonitor commands)
3057 if (quorum_mon_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
3058 commands
= static_cast<MgrMonitor
*>(
3059 paxos_service
[PAXOS_MGR
])->get_command_descs();
3062 for (auto& c
: leader_mon_commands
) {
3063 commands
.push_back(c
);
3066 format_command_descriptions(commands
, f
, &rdata
, hide_mgr_flag
);
3068 reply_command(op
, 0, "", rdata
, 0);
3075 dout(0) << "handle_command " << *m
<< dendl
;
3078 cmd_getval(g_ceph_context
, cmdmap
, "format", format
, string("plain"));
3079 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
3081 get_str_vec(prefix
, fullcmd
);
3083 // make sure fullcmd is not empty.
3084 // invalid prefix will cause empty vector fullcmd.
3085 // such as, prefix=";,,;"
3086 if (fullcmd
.empty()) {
3087 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3091 module
= fullcmd
[0];
3093 // validate command is in leader map
3095 const MonCommand
*leader_cmd
;
3096 const auto& mgr_cmds
= mgrmon()->get_command_descs();
3097 const MonCommand
*mgr_cmd
= nullptr;
3098 if (!mgr_cmds
.empty()) {
3099 mgr_cmd
= _get_moncommand(prefix
, mgr_cmds
);
3101 leader_cmd
= _get_moncommand(prefix
, leader_mon_commands
);
3103 leader_cmd
= mgr_cmd
;
3105 reply_command(op
, -EINVAL
, "command not known", 0);
3109 // validate command is in our map & matches, or forward if it is allowed
3110 const MonCommand
*mon_cmd
= _get_moncommand(
3112 get_local_commands(quorum_mon_features
));
3118 if (leader_cmd
->is_noforward()) {
3119 reply_command(op
, -EINVAL
,
3120 "command not locally supported and not allowed to forward",
3124 dout(10) << "Command not locally supported, forwarding request "
3126 forward_request_leader(op
);
3128 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3129 if (mon_cmd
->is_noforward()) {
3130 reply_command(op
, -EINVAL
,
3131 "command not compatible with leader and not allowed to forward",
3135 dout(10) << "Command not compatible with leader, forwarding request "
3137 forward_request_leader(op
);
3142 if (mon_cmd
->is_obsolete() ||
3143 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3144 && mon_cmd
->is_deprecated())) {
3145 reply_command(op
, -ENOTSUP
,
3146 "command is obsolete; please check usage and/or man page",
3151 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3152 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3153 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3157 /* what we perceive as being the service the command falls under */
3158 string
service(mon_cmd
->module
);
3160 dout(25) << __func__
<< " prefix='" << prefix
3161 << "' module='" << module
3162 << "' service='" << service
<< "'" << dendl
;
3165 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3167 // validate user's permissions for requested command
3168 map
<string
,string
> param_str_map
;
3169 _generate_command_map(cmdmap
, param_str_map
);
3170 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3171 param_str_map
, mon_cmd
)) {
3172 dout(1) << __func__
<< " access denied" << dendl
;
3173 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3174 << "from='" << session
->inst
<< "' "
3175 << "entity='" << session
->entity_name
<< "' "
3176 << "cmd=" << m
->cmd
<< ": access denied";
3177 reply_command(op
, -EACCES
, "access denied", 0);
3181 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3182 << "from='" << session
->inst
<< "' "
3183 << "entity='" << session
->entity_name
<< "' "
3184 << "cmd=" << m
->cmd
<< ": dispatch";
3186 if (mon_cmd
->is_mgr() &&
3187 osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3188 const auto& hdr
= m
->get_header();
3189 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3190 uint64_t max
= g_conf
->get_val
<uint64_t>("mon_client_bytes")
3191 * g_conf
->get_val
<double>("mon_mgr_proxy_client_bytes_ratio");
3192 if (mgr_proxy_bytes
+ size
> max
) {
3193 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3194 << " + " << size
<< " > max " << max
<< dendl
;
3195 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3198 mgr_proxy_bytes
+= size
;
3199 dout(10) << __func__
<< " proxying mgr command (+" << size
3200 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3201 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3202 mgr_client
.start_command(m
->cmd
,
3206 new C_OnFinisher(fin
, &finisher
));
3210 if ((module
== "mds" || module
== "fs") &&
3211 prefix
!= "fs authorize") {
3212 mdsmon()->dispatch(op
);
3215 if ((module
== "osd" || prefix
== "pg map") &&
3216 prefix
!= "osd last-stat-seq") {
3217 osdmon()->dispatch(op
);
3221 if (module
== "pg") {
3222 pgmon()->dispatch(op
);
3225 if (module
== "mon" &&
3226 /* Let the Monitor class handle the following commands:
3231 prefix
!= "mon compact" &&
3232 prefix
!= "mon scrub" &&
3233 prefix
!= "mon sync force" &&
3234 prefix
!= "mon metadata" &&
3235 prefix
!= "mon versions" &&
3236 prefix
!= "mon count-metadata") {
3237 monmon()->dispatch(op
);
3240 if (module
== "auth" || prefix
== "fs authorize") {
3241 authmon()->dispatch(op
);
3244 if (module
== "log") {
3245 logmon()->dispatch(op
);
3249 if (module
== "config-key") {
3250 config_key_service
->dispatch(op
);
3254 if (module
== "mgr") {
3255 mgrmon()->dispatch(op
);
3259 if (prefix
== "fsid") {
3261 f
->open_object_section("fsid");
3262 f
->dump_stream("fsid") << monmap
->fsid
;
3269 reply_command(op
, 0, "", rdata
, 0);
3273 if (prefix
== "scrub" || prefix
== "mon scrub") {
3274 wait_for_paxos_write();
3276 int r
= scrub_start();
3277 reply_command(op
, r
, "", rdata
, 0);
3278 } else if (is_peon()) {
3279 forward_request_leader(op
);
3281 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3286 if (prefix
== "compact" || prefix
== "mon compact") {
3287 dout(1) << "triggering manual compaction" << dendl
;
3288 utime_t start
= ceph_clock_now();
3290 utime_t end
= ceph_clock_now();
3292 dout(1) << "finished manual compaction in " << end
<< " seconds" << dendl
;
3294 oss
<< "compacted " << g_conf
->get_val
<std::string
>("mon_keyvaluedb") << " in " << end
<< " seconds";
3298 else if (prefix
== "injectargs") {
3299 vector
<string
> injected_args
;
3300 cmd_getval(g_ceph_context
, cmdmap
, "injected_args", injected_args
);
3301 if (!injected_args
.empty()) {
3302 dout(0) << "parsing injected options '" << injected_args
<< "'" << dendl
;
3304 r
= g_conf
->injectargs(str_join(injected_args
, " "), &oss
);
3305 ss
<< "injectargs:" << oss
.str();
3309 rs
= "must supply options to be parsed in a single string";
3312 } else if (prefix
== "time-sync-status") {
3314 f
.reset(Formatter::create("json-pretty"));
3315 f
->open_object_section("time_sync");
3316 if (!timecheck_skews
.empty()) {
3317 f
->open_object_section("time_skew_status");
3318 for (auto& i
: timecheck_skews
) {
3319 entity_inst_t inst
= i
.first
;
3320 double skew
= i
.second
;
3321 double latency
= timecheck_latencies
[inst
];
3322 string name
= monmap
->get_name(inst
.addr
);
3324 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3325 f
->open_object_section(name
.c_str());
3326 f
->dump_float("skew", skew
);
3327 f
->dump_float("latency", latency
);
3328 f
->dump_stream("health") << tcstatus
;
3329 if (tcstatus
!= HEALTH_OK
) {
3330 f
->dump_stream("details") << tcss
.str();
3336 f
->open_object_section("timechecks");
3337 f
->dump_unsigned("epoch", get_epoch());
3338 f
->dump_int("round", timecheck_round
);
3339 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3340 "on-going" : "finished");
3346 } else if (prefix
== "config set") {
3348 cmd_getval(cct
, cmdmap
, "key", key
);
3350 cmd_getval(cct
, cmdmap
, "value", val
);
3351 r
= g_conf
->set_val(key
, val
, true, &ss
);
3353 g_conf
->apply_changes(nullptr);
3357 } else if (prefix
== "status" ||
3358 prefix
== "health" ||
3361 cmd_getval(g_ceph_context
, cmdmap
, "detail", detail
);
3363 if (prefix
== "status") {
3364 // get_cluster_status handles f == NULL
3365 get_cluster_status(ds
, f
.get());
3372 } else if (prefix
== "health") {
3373 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3375 get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3379 rdata
.append(plain
);
3382 list
<string
> health_str
;
3383 get_health(health_str
, detail
== "detail" ? &rdata
: NULL
, f
.get());
3388 assert(!health_str
.empty());
3389 ds
<< health_str
.front();
3390 health_str
.pop_front();
3391 if (!health_str
.empty()) {
3393 ds
<< joinify(health_str
.begin(), health_str
.end(), string("; "));
3398 if (detail
== "detail")
3402 } else if (prefix
== "df") {
3403 bool verbose
= (detail
== "detail");
3405 f
->open_object_section("stats");
3407 pgservice
->dump_fs_stats(&ds
, f
.get(), verbose
);
3410 pgservice
->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3418 assert(0 == "We should never get here!");
3424 } else if (prefix
== "report") {
3426 // this must be formatted, in its current form
3428 f
.reset(Formatter::create("json-pretty"));
3429 f
->open_object_section("report");
3430 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3431 f
->dump_string("version", ceph_version_to_str());
3432 f
->dump_string("commit", git_version_to_str());
3433 f
->dump_stream("timestamp") << ceph_clock_now();
3435 vector
<string
> tagsvec
;
3436 cmd_getval(g_ceph_context
, cmdmap
, "tags", tagsvec
);
3437 string tagstr
= str_join(tagsvec
, " ");
3438 if (!tagstr
.empty())
3439 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3440 f
->dump_string("tag", tagstr
);
3442 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3443 get_health_status(true, f
.get(), nullptr);
3445 list
<string
> health_str
;
3446 get_health(health_str
, nullptr, f
.get());
3449 monmon()->dump_info(f
.get());
3450 osdmon()->dump_info(f
.get());
3451 mdsmon()->dump_info(f
.get());
3452 authmon()->dump_info(f
.get());
3453 pgservice
->dump_info(f
.get());
3455 paxos
->dump_info(f
.get());
3461 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT
);
3464 } else if (prefix
== "osd last-stat-seq") {
3466 cmd_getval(g_ceph_context
, cmdmap
, "id", osd
);
3467 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3469 f
->dump_unsigned("seq", seq
);
3477 } else if (prefix
== "node ls") {
3478 string
node_type("all");
3479 cmd_getval(g_ceph_context
, cmdmap
, "type", node_type
);
3481 f
.reset(Formatter::create("json-pretty"));
3482 if (node_type
== "all") {
3483 f
->open_object_section("nodes");
3484 print_nodes(f
.get(), ds
);
3485 osdmon()->print_nodes(f
.get());
3486 mdsmon()->print_nodes(f
.get());
3488 } else if (node_type
== "mon") {
3489 print_nodes(f
.get(), ds
);
3490 } else if (node_type
== "osd") {
3491 osdmon()->print_nodes(f
.get());
3492 } else if (node_type
== "mds") {
3493 mdsmon()->print_nodes(f
.get());
3499 } else if (prefix
== "features") {
3500 if (!is_leader() && !is_peon()) {
3501 dout(10) << " waiting for quorum" << dendl
;
3502 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3506 forward_request_leader(op
);
3510 f
.reset(Formatter::create("json-pretty"));
3512 get_combined_feature_map(&fm
);
3513 f
->dump_object("features", fm
);
3517 } else if (prefix
== "mon metadata") {
3519 f
.reset(Formatter::create("json-pretty"));
3522 bool all
= !cmd_getval(g_ceph_context
, cmdmap
, "id", name
);
3524 // Dump a single mon's metadata
3525 int mon
= monmap
->get_rank(name
);
3527 rs
= "requested mon not found";
3531 f
->open_object_section("mon_metadata");
3532 r
= get_mon_metadata(mon
, f
.get(), ds
);
3535 // Dump all mons' metadata
3537 f
->open_array_section("mon_metadata");
3538 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3539 std::ostringstream get_err
;
3540 f
->open_object_section("mon");
3541 f
->dump_string("name", monmap
->get_name(rank
));
3542 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3544 if (r
== -ENOENT
|| r
== -EINVAL
) {
3545 dout(1) << get_err
.str() << dendl
;
3546 // Drop error, list what metadata we do have
3548 } else if (r
!= 0) {
3549 derr
<< "Unexpected error from get_mon_metadata: "
3550 << cpp_strerror(r
) << dendl
;
3551 ds
<< get_err
.str();
3561 } else if (prefix
== "mon versions") {
3563 f
.reset(Formatter::create("json-pretty"));
3564 count_metadata("ceph_version", f
.get());
3569 } else if (prefix
== "mon count-metadata") {
3571 f
.reset(Formatter::create("json-pretty"));
3573 cmd_getval(g_ceph_context
, cmdmap
, "property", field
);
3574 count_metadata(field
, f
.get());
3579 } else if (prefix
== "quorum_status") {
3580 // make sure our map is readable and up to date
3581 if (!is_leader() && !is_peon()) {
3582 dout(10) << " waiting for quorum" << dendl
;
3583 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3586 _quorum_status(f
.get(), ds
);
3590 } else if (prefix
== "mon_status") {
3591 get_mon_status(f
.get(), ds
);
3597 } else if (prefix
== "sync force" ||
3598 prefix
== "mon sync force") {
3599 string validate1
, validate2
;
3600 cmd_getval(g_ceph_context
, cmdmap
, "validate1", validate1
);
3601 cmd_getval(g_ceph_context
, cmdmap
, "validate2", validate2
);
3602 if (validate1
!= "--yes-i-really-mean-it" ||
3603 validate2
!= "--i-know-what-i-am-doing") {
3605 rs
= "are you SURE? this will mean the monitor store will be "
3606 "erased. pass '--yes-i-really-mean-it "
3607 "--i-know-what-i-am-doing' if you really do.";
3610 sync_force(f
.get(), ds
);
3613 } else if (prefix
== "heap") {
3614 if (!ceph_using_tcmalloc())
3615 rs
= "tcmalloc not enabled, can't use heap profiler commands\n";
3618 cmd_getval(g_ceph_context
, cmdmap
, "heapcmd", heapcmd
);
3619 // XXX 1-element vector, change at callee or make vector here?
3620 vector
<string
> heapcmd_vec
;
3621 get_str_vec(heapcmd
, heapcmd_vec
);
3622 ceph_heap_profiler_handle_command(heapcmd_vec
, ds
);
3627 } else if (prefix
== "quorum") {
3629 cmd_getval(g_ceph_context
, cmdmap
, "quorumcmd", quorumcmd
);
3630 if (quorumcmd
== "exit") {
3632 elector
.stop_participating();
3633 rs
= "stopped responding to quorum, initiated new election";
3635 } else if (quorumcmd
== "enter") {
3636 elector
.start_participating();
3638 rs
= "started responding to quorum, initiated new election";
3641 rs
= "needs a valid 'quorum' command";
3644 } else if (prefix
== "version") {
3646 f
->open_object_section("version");
3647 f
->dump_string("version", pretty_version_to_str());
3651 ds
<< pretty_version_to_str();
3656 } else if (prefix
== "versions") {
3658 f
.reset(Formatter::create("json-pretty"));
3659 map
<string
,int> overall
;
3660 f
->open_object_section("version");
3661 map
<string
,int> mon
, mgr
, osd
, mds
;
3663 count_metadata("ceph_version", &mon
);
3664 f
->open_object_section("mon");
3665 for (auto& p
: mon
) {
3666 f
->dump_int(p
.first
.c_str(), p
.second
);
3667 overall
[p
.first
] += p
.second
;
3671 mgrmon()->count_metadata("ceph_version", &mgr
);
3672 f
->open_object_section("mgr");
3673 for (auto& p
: mgr
) {
3674 f
->dump_int(p
.first
.c_str(), p
.second
);
3675 overall
[p
.first
] += p
.second
;
3679 osdmon()->count_metadata("ceph_version", &osd
);
3680 f
->open_object_section("osd");
3681 for (auto& p
: osd
) {
3682 f
->dump_int(p
.first
.c_str(), p
.second
);
3683 overall
[p
.first
] += p
.second
;
3687 mdsmon()->count_metadata("ceph_version", &mds
);
3688 f
->open_object_section("mds");
3689 for (auto& p
: mds
) {
3690 f
->dump_int(p
.first
.c_str(), p
.second
);
3691 overall
[p
.first
] += p
.second
;
3695 for (auto& p
: mgrstatmon()->get_service_map().services
) {
3696 f
->open_object_section(p
.first
.c_str());
3698 p
.second
.count_metadata("ceph_version", &m
);
3700 f
->dump_int(q
.first
.c_str(), q
.second
);
3701 overall
[q
.first
] += q
.second
;
3706 f
->open_object_section("overall");
3707 for (auto& p
: overall
) {
3708 f
->dump_int(p
.first
.c_str(), p
.second
);
3718 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3719 reply_command(op
, r
, rs
, rdata
, 0);
3722 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3725 reply_command(op
, rc
, rs
, rdata
, version
);
3728 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3729 bufferlist
& rdata
, version_t version
)
3731 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
3732 assert(m
->get_type() == MSG_MON_COMMAND
);
3733 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3734 reply
->set_tid(m
->get_tid());
3735 reply
->set_data(rdata
);
3736 send_reply(op
, reply
);
3740 // ------------------------
3741 // request/reply routing
3743 // a client/mds/osd will connect to a random monitor. we need to forward any
3744 // messages requiring state updates to the leader, and then route any replies
3745 // back via the correct monitor and back to them. (the monitor will not
3746 // initiate any connections.)
3748 void Monitor::forward_request_leader(MonOpRequestRef op
)
3750 op
->mark_event(__func__
);
3752 int mon
= get_leader();
3753 MonSession
*session
= op
->get_session();
3754 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
3756 if (req
->get_source().is_mon() && req
->get_source_addr() != messenger
->get_myaddr()) {
3757 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
3758 } else if (session
->proxy_con
) {
3759 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
3760 } else if (!session
->closed
) {
3761 RoutedRequest
*rr
= new RoutedRequest
;
3762 rr
->tid
= ++routed_request_tid
;
3763 rr
->client_inst
= req
->get_source_inst();
3764 rr
->con
= req
->get_connection();
3765 rr
->con_features
= rr
->con
->get_features();
3766 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
3767 rr
->session
= static_cast<MonSession
*>(session
->get());
3769 routed_requests
[rr
->tid
] = rr
;
3770 session
->routed_request_tids
.insert(rr
->tid
);
3772 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
3773 << " features " << rr
->con_features
<< dendl
;
3775 MForward
*forward
= new MForward(rr
->tid
,
3779 forward
->set_priority(req
->get_priority());
3780 if (session
->auth_handler
) {
3781 forward
->entity_name
= session
->entity_name
;
3782 } else if (req
->get_source().is_mon()) {
3783 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
3785 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3786 op
->mark_forwarded();
3787 assert(op
->get_req()->get_type() != 0);
3789 dout(10) << "forward_request no session for request " << *req
<< dendl
;
3793 // fake connection attached to forwarded messages
3794 struct AnonConnection
: public Connection
{
3795 explicit AnonConnection(CephContext
*cct
) : Connection(cct
, NULL
) {}
3797 int send_message(Message
*m
) override
{
3798 assert(!"send_message on anonymous connection");
3800 void send_keepalive() override
{
3801 assert(!"send_keepalive on anonymous connection");
3803 void mark_down() override
{
3806 void mark_disposable() override
{
3809 bool is_connected() override
{ return false; }
3812 //extract the original message and put it into the regular dispatch function
3813 void Monitor::handle_forward(MonOpRequestRef op
)
3815 MForward
*m
= static_cast<MForward
*>(op
->get_req());
3816 dout(10) << "received forwarded message from " << m
->client
3817 << " via " << m
->get_source_inst() << dendl
;
3818 MonSession
*session
= op
->get_session();
3821 if (!session
->is_capable("mon", MON_CAP_X
)) {
3822 dout(0) << "forward from entity with insufficient caps! "
3823 << session
->caps
<< dendl
;
3825 // see PaxosService::dispatch(); we rely on this being anon
3826 // (c->msgr == NULL)
3827 PaxosServiceMessage
*req
= m
->claim_message();
3828 assert(req
!= NULL
);
3830 ConnectionRef
c(new AnonConnection(cct
));
3831 MonSession
*s
= new MonSession(req
->get_source_inst(),
3832 static_cast<Connection
*>(c
.get()));
3833 c
->set_priv(s
->get());
3834 c
->set_peer_addr(m
->client
.addr
);
3835 c
->set_peer_type(m
->client
.name
.type());
3836 c
->set_features(m
->con_features
);
3838 s
->caps
= m
->client_caps
;
3839 dout(10) << " caps are " << s
->caps
<< dendl
;
3840 s
->entity_name
= m
->entity_name
;
3841 dout(10) << " entity name '" << s
->entity_name
<< "' type "
3842 << s
->entity_name
.get_type() << dendl
;
3843 s
->proxy_con
= m
->get_connection();
3844 s
->proxy_tid
= m
->tid
;
3846 req
->set_connection(c
);
3848 // not super accurate, but better than nothing.
3849 req
->set_recv_stamp(m
->get_recv_stamp());
3852 * note which election epoch this is; we will drop the message if
3853 * there is a future election since our peers will resend routed
3854 * requests in that case.
3856 req
->rx_election_epoch
= get_epoch();
3858 /* Because this is a special fake connection, we need to break
3859 the ref loop between Connection and MonSession differently
3860 than we normally do. Here, the Message refers to the Connection
3861 which refers to the Session, and nobody else refers to the Connection
3862 or the Session. And due to the special nature of this message,
3863 nobody refers to the Connection via the Session. So, clear out that
3864 half of the ref loop.*/
3867 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
3874 void Monitor::try_send_message(Message
*m
, const entity_inst_t
& to
)
3876 dout(10) << "try_send_message " << *m
<< " to " << to
<< dendl
;
3879 encode_message(m
, quorum_con_features
, bl
);
3881 messenger
->send_message(m
, to
);
3883 for (int i
=0; i
<(int)monmap
->size(); i
++) {
3885 messenger
->send_message(new MRoute(bl
, to
), monmap
->get_inst(i
));
3889 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
3891 op
->mark_event(__func__
);
3893 MonSession
*session
= op
->get_session();
3895 Message
*req
= op
->get_req();
3896 ConnectionRef con
= op
->get_connection();
3898 reply
->set_cct(g_ceph_context
);
3899 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
3902 dout(2) << "send_reply no connection, dropping reply " << *reply
3903 << " to " << req
<< " " << *req
<< dendl
;
3905 op
->mark_event("reply: no connection");
3909 if (!session
->con
&& !session
->proxy_con
) {
3910 dout(2) << "send_reply no connection, dropping reply " << *reply
3911 << " to " << req
<< " " << *req
<< dendl
;
3913 op
->mark_event("reply: no connection");
3917 if (session
->proxy_con
) {
3918 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
3919 << " via " << session
->proxy_con
->get_peer_addr()
3920 << " for request " << *req
<< dendl
;
3921 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
3922 op
->mark_event("reply: send routed request");
3924 session
->con
->send_message(reply
);
3925 op
->mark_event("reply: send");
3929 void Monitor::no_reply(MonOpRequestRef op
)
3931 MonSession
*session
= op
->get_session();
3932 Message
*req
= op
->get_req();
3934 if (session
->proxy_con
) {
3935 dout(10) << "no_reply to " << req
->get_source_inst()
3936 << " via " << session
->proxy_con
->get_peer_addr()
3937 << " for request " << *req
<< dendl
;
3938 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
3939 op
->mark_event("no_reply: send routed request");
3941 dout(10) << "no_reply to " << req
->get_source_inst()
3942 << " " << *req
<< dendl
;
3943 op
->mark_event("no_reply");
3947 void Monitor::handle_route(MonOpRequestRef op
)
3949 MRoute
*m
= static_cast<MRoute
*>(op
->get_req());
3950 MonSession
*session
= op
->get_session();
3952 if (!session
->is_capable("mon", MON_CAP_X
)) {
3953 dout(0) << "MRoute received from entity without appropriate perms! "
3958 dout(10) << "handle_route " << *m
->msg
<< " to " << m
->dest
<< dendl
;
3960 dout(10) << "handle_route null to " << m
->dest
<< dendl
;
3963 if (m
->session_mon_tid
) {
3964 if (routed_requests
.count(m
->session_mon_tid
)) {
3965 RoutedRequest
*rr
= routed_requests
[m
->session_mon_tid
];
3967 // reset payload, in case encoding is dependent on target features
3969 m
->msg
->clear_payload();
3970 rr
->con
->send_message(m
->msg
);
3973 if (m
->send_osdmap_first
) {
3974 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
3975 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
3976 true, MonOpRequestRef());
3978 assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
3979 routed_requests
.erase(m
->session_mon_tid
);
3980 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
3983 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
3986 dout(10) << " not a routed request, trying to send anyway" << dendl
;
3988 messenger
->send_message(m
->msg
, m
->dest
);
3994 void Monitor::resend_routed_requests()
3996 dout(10) << "resend_routed_requests" << dendl
;
3997 int mon
= get_leader();
3998 list
<Context
*> retry
;
3999 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
4000 p
!= routed_requests
.end();
4002 RoutedRequest
*rr
= p
->second
;
4005 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
4006 rr
->op
->mark_event("retry routed request");
4007 retry
.push_back(new C_RetryMessage(this, rr
->op
));
4009 assert(rr
->session
->routed_request_tids
.count(p
->first
));
4010 rr
->session
->routed_request_tids
.erase(p
->first
);
4014 bufferlist::iterator q
= rr
->request_bl
.begin();
4015 PaxosServiceMessage
*req
= (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
4016 rr
->op
->mark_event("resend forwarded message to leader");
4017 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
<< dendl
;
4018 MForward
*forward
= new MForward(rr
->tid
, req
, rr
->con_features
,
4020 req
->put(); // forward takes its own ref; drop ours.
4021 forward
->client
= rr
->client_inst
;
4022 forward
->set_priority(req
->get_priority());
4023 messenger
->send_message(forward
, monmap
->get_inst(mon
));
4027 routed_requests
.clear();
4028 finish_contexts(g_ceph_context
, retry
);
4032 void Monitor::remove_session(MonSession
*s
)
4034 dout(10) << "remove_session " << s
<< " " << s
->inst
4035 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
4038 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
4039 p
!= s
->routed_request_tids
.end();
4041 assert(routed_requests
.count(*p
));
4042 RoutedRequest
*rr
= routed_requests
[*p
];
4043 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
4045 routed_requests
.erase(*p
);
4047 s
->routed_request_tids
.clear();
4048 s
->con
->set_priv(NULL
);
4049 session_map
.remove_session(s
);
4050 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4051 logger
->inc(l_mon_session_rm
);
4054 void Monitor::remove_all_sessions()
4056 Mutex::Locker
l(session_map_lock
);
4057 while (!session_map
.sessions
.empty()) {
4058 MonSession
*s
= session_map
.sessions
.front();
4061 logger
->inc(l_mon_session_rm
);
4064 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4067 void Monitor::send_command(const entity_inst_t
& inst
,
4068 const vector
<string
>& com
)
4070 dout(10) << "send_command " << inst
<< "" << com
<< dendl
;
4071 MMonCommand
*c
= new MMonCommand(monmap
->fsid
);
4073 try_send_message(c
, inst
);
4076 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
4079 * Wait list the new session until we're in the quorum, assuming it's
4081 * tick() will periodically send them back through so we can send
4082 * the client elsewhere if we don't think we're getting back in.
4084 * But we whitelist a few sorts of messages:
4085 * 1) Monitors can talk to us at any time, of course.
4086 * 2) auth messages. It's unlikely to go through much faster, but
4087 * it's possible we've just lost our quorum status and we want to take...
4088 * 3) command messages. We want to accept these under all possible
4091 Message
*m
= op
->get_req();
4092 MonSession
*s
= op
->get_session();
4093 ConnectionRef con
= op
->get_connection();
4094 utime_t too_old
= ceph_clock_now();
4095 too_old
-= g_ceph_context
->_conf
->mon_lease
;
4096 if (m
->get_recv_stamp() > too_old
&&
4097 con
->is_connected()) {
4098 dout(5) << "waitlisting message " << *m
<< dendl
;
4099 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
4100 op
->mark_wait_for_quorum();
4102 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
4104 // proxied sessions aren't registered and don't have a con; don't remove
4106 if (!s
->proxy_con
) {
4107 Mutex::Locker
l(session_map_lock
);
4114 void Monitor::_ms_dispatch(Message
*m
)
4116 if (is_shutdown()) {
4121 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
4122 bool src_is_mon
= op
->is_src_mon();
4123 op
->mark_event("mon:_ms_dispatch");
4124 MonSession
*s
= op
->get_session();
4125 if (s
&& s
->closed
) {
4129 if (src_is_mon
&& s
) {
4130 ConnectionRef con
= m
->get_connection();
4131 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
4132 // only update features if this is a non-anonymous connection
4133 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
4134 << " (was " << s
->con_features
4135 << ", now " << con
->get_features() << ")" << dendl
;
4136 // connection features changed - recreate session.
4137 if (s
->con
&& s
->con
!= con
) {
4138 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
4139 << " changed from session; mark down and replace" << dendl
;
4140 s
->con
->mark_down();
4142 if (s
->item
.is_on_list()) {
4143 // forwarded messages' sessions are not in the sessions map and
4144 // exist only while the op is being handled.
4153 // if the sender is not a monitor, make sure their first message for a
4154 // session is an MAuth. If it is not, assume it's a stray message,
4155 // and considering that we are creating a new session it is safe to
4156 // assume that the sender hasn't authenticated yet, so we have no way
4157 // of assessing whether we should handle it or not.
4158 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
4159 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4160 m
->get_type() != CEPH_MSG_PING
)) {
4161 dout(1) << __func__
<< " dropping stray message " << *m
4162 << " from " << m
->get_source_inst() << dendl
;
4166 ConnectionRef con
= m
->get_connection();
4168 Mutex::Locker
l(session_map_lock
);
4169 s
= session_map
.new_session(m
->get_source_inst(), con
.get());
4172 con
->set_priv(s
->get());
4173 dout(10) << __func__
<< " new session " << s
<< " " << *s
4174 << " features 0x" << std::hex
4175 << s
->con_features
<< std::dec
<< dendl
;
4178 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4179 logger
->inc(l_mon_session_add
);
4182 // give it monitor caps; the peer type has been authenticated
4183 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4184 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4185 s
->caps
= *mon_caps
;
4189 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->inst
4195 s
->session_timeout
= ceph_clock_now();
4196 s
->session_timeout
+= g_conf
->mon_session_timeout
;
4198 if (s
->auth_handler
) {
4199 s
->entity_name
= s
->auth_handler
->get_entity_name();
4201 dout(20) << " caps " << s
->caps
.get_str() << dendl
;
4203 if ((is_synchronizing() ||
4204 (s
->global_id
== 0 && !exited_quorum
.is_zero())) &&
4206 m
->get_type() != CEPH_MSG_PING
) {
4207 waitlist_or_zap_client(op
);
4214 void Monitor::dispatch_op(MonOpRequestRef op
)
4216 op
->mark_event("mon:dispatch_op");
4217 MonSession
*s
= op
->get_session();
4220 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4224 /* we will consider the default type as being 'monitor' until proven wrong */
4225 op
->set_type_monitor();
4226 /* deal with all messages that do not necessarily need caps */
4227 bool dealt_with
= true;
4228 switch (op
->get_req()->get_type()) {
4230 case MSG_MON_GLOBAL_ID
:
4232 op
->set_type_service();
4233 /* no need to check caps here */
4234 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4241 /* MMonGetMap may be used by clients to obtain a monmap *before*
4242 * authenticating with the monitor. We need to handle these without
4243 * checking caps because, even on a cluster without cephx, we only set
4244 * session caps *after* the auth handshake. A good example of this
4245 * is when a client calls MonClient::get_monmap_privately(), which does
4246 * not authenticate when obtaining a monmap.
4248 case CEPH_MSG_MON_GET_MAP
:
4249 handle_mon_get_map(op
);
4252 case CEPH_MSG_MON_METADATA
:
4253 return handle_mon_metadata(op
);
4262 /* well, maybe the op belongs to a service... */
4263 op
->set_type_service();
4264 /* deal with all messages which caps should be checked somewhere else */
4266 switch (op
->get_req()->get_type()) {
4269 case CEPH_MSG_MON_GET_OSDMAP
:
4270 case CEPH_MSG_POOLOP
:
4271 case MSG_OSD_BEACON
:
4272 case MSG_OSD_MARK_ME_DOWN
:
4274 case MSG_OSD_FAILURE
:
4277 case MSG_OSD_PGTEMP
:
4278 case MSG_OSD_PG_CREATED
:
4279 case MSG_REMOVE_SNAPS
:
4280 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4284 case MSG_MDS_BEACON
:
4285 case MSG_MDS_OFFLOAD_TARGETS
:
4286 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4290 case MSG_MGR_BEACON
:
4291 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4295 case CEPH_MSG_STATFS
:
4296 // this is an ugly hack, sorry! force the version to 1 so that we do
4297 // not run afoul of the is_readable() paxos check. the client is going
4298 // by the pgmonitor version and the MgrStatMonitor version will lag behind
4299 // that until we complete the upgrade. The paxos ordering crap really
4300 // doesn't matter for statfs results, so just kludge around it here.
4301 if (osdmon()->osdmap
.require_osd_release
< CEPH_RELEASE_LUMINOUS
) {
4302 ((MStatfs
*)op
->get_req())->version
= 1;
4304 case MSG_MON_MGR_REPORT
:
4305 case MSG_GETPOOLSTATS
:
4306 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4311 paxos_service
[PAXOS_PGMAP
]->dispatch(op
);
4316 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4319 // handle_command() does its own caps checking
4320 case MSG_MON_COMMAND
:
4321 op
->set_type_command();
4332 /* nop, looks like it's not a service message; revert back to monitor */
4333 op
->set_type_monitor();
4335 /* messages we, the Monitor class, need to deal with
4336 * but may be sent by clients. */
4338 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4339 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4340 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4346 switch (op
->get_req()->get_type()) {
4349 case CEPH_MSG_MON_GET_VERSION
:
4350 handle_get_version(op
);
4353 case CEPH_MSG_MON_SUBSCRIBE
:
4354 /* FIXME: check what's being subscribed, filter accordingly */
4355 handle_subscribe(op
);
4365 if (!op
->is_src_mon()) {
4366 dout(1) << __func__
<< " unexpected monitor message from"
4367 << " non-monitor entity " << op
->get_req()->get_source_inst()
4368 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4372 /* messages that should only be sent by another monitor */
4374 switch (op
->get_req()->get_type()) {
4384 // Sync (i.e., the new slurp, but on steroids)
4392 /* log acks are sent from a monitor we sent the MLog to, and are
4393 never sent by clients to us. */
4395 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4400 op
->set_type_service();
4401 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4407 op
->set_type_paxos();
4408 MMonPaxos
*pm
= static_cast<MMonPaxos
*>(op
->get_req());
4409 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4414 if (state
== STATE_SYNCHRONIZING
) {
4415 // we are synchronizing. These messages would do us no
4416 // good, thus just drop them and ignore them.
4417 dout(10) << __func__
<< " ignore paxos msg from "
4418 << pm
->get_source_inst() << dendl
;
4423 if (pm
->epoch
> get_epoch()) {
4427 if (pm
->epoch
!= get_epoch()) {
4431 paxos
->dispatch(op
);
4436 case MSG_MON_ELECTION
:
4437 op
->set_type_election();
4438 //check privileges here for simplicity
4439 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4440 dout(0) << "MMonElection received from entity without enough caps!"
4441 << op
->get_session()->caps
<< dendl
;
4444 if (!is_probing() && !is_synchronizing()) {
4445 elector
.dispatch(op
);
4454 handle_timecheck(op
);
4457 case MSG_MON_HEALTH
:
4458 health_monitor
->dispatch(op
);
4461 case MSG_MON_HEALTH_CHECKS
:
4462 op
->set_type_service();
4463 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4471 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4480 void Monitor::handle_ping(MonOpRequestRef op
)
4482 MPing
*m
= static_cast<MPing
*>(op
->get_req());
4483 dout(10) << __func__
<< " " << *m
<< dendl
;
4484 MPing
*reply
= new MPing
;
4485 entity_inst_t inst
= m
->get_source_inst();
4487 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4488 f
->open_object_section("pong");
4490 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
4491 get_health_status(false, f
.get(), nullptr);
4493 list
<string
> health_str
;
4494 get_health(health_str
, nullptr, f
.get());
4499 get_mon_status(f
.get(), ss
);
4505 ::encode(ss
.str(), payload
);
4506 reply
->set_payload(payload
);
4507 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4508 messenger
->send_message(reply
, inst
);
4511 void Monitor::timecheck_start()
4513 dout(10) << __func__
<< dendl
;
4514 timecheck_cleanup();
4515 timecheck_start_round();
4518 void Monitor::timecheck_finish()
4520 dout(10) << __func__
<< dendl
;
4521 timecheck_cleanup();
4524 void Monitor::timecheck_start_round()
4526 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4527 assert(is_leader());
4529 if (monmap
->size() == 1) {
4530 assert(0 == "We are alone; this shouldn't have been scheduled!");
4534 if (timecheck_round
% 2) {
4535 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4536 utime_t curr_time
= ceph_clock_now();
4537 double max
= g_conf
->mon_timecheck_interval
*3;
4538 if (curr_time
- timecheck_round_start
< max
) {
4539 dout(10) << __func__
<< " keep current round going" << dendl
;
4542 dout(10) << __func__
4543 << " finish current timecheck and start new" << dendl
;
4544 timecheck_cancel_round();
4548 assert(timecheck_round
% 2 == 0);
4551 timecheck_round_start
= ceph_clock_now();
4552 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4556 dout(10) << __func__
<< " setting up next event" << dendl
;
4557 timecheck_reset_event();
4560 void Monitor::timecheck_finish_round(bool success
)
4562 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4563 assert(timecheck_round
% 2);
4565 timecheck_round_start
= utime_t();
4568 assert(timecheck_waiting
.empty());
4569 assert(timecheck_acks
== quorum
.size());
4571 timecheck_check_skews();
4575 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4576 << " peers still waiting:";
4577 for (map
<entity_inst_t
,utime_t
>::iterator p
= timecheck_waiting
.begin();
4578 p
!= timecheck_waiting
.end(); ++p
) {
4579 *_dout
<< " " << p
->first
.name
;
4582 timecheck_waiting
.clear();
4584 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4587 void Monitor::timecheck_cancel_round()
4589 timecheck_finish_round(false);
4592 void Monitor::timecheck_cleanup()
4594 timecheck_round
= 0;
4596 timecheck_round_start
= utime_t();
4598 if (timecheck_event
) {
4599 timer
.cancel_event(timecheck_event
);
4600 timecheck_event
= NULL
;
4602 timecheck_waiting
.clear();
4603 timecheck_skews
.clear();
4604 timecheck_latencies
.clear();
4606 timecheck_rounds_since_clean
= 0;
4609 void Monitor::timecheck_reset_event()
4611 if (timecheck_event
) {
4612 timer
.cancel_event(timecheck_event
);
4613 timecheck_event
= NULL
;
4617 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4619 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4620 delay
= cct
->_conf
->mon_timecheck_interval
;
4623 dout(10) << __func__
<< " delay " << delay
4624 << " rounds_since_clean " << timecheck_rounds_since_clean
4627 timecheck_event
= timer
.add_event_after(
4629 new C_MonContext(this, [this](int) {
4630 timecheck_start_round();
4634 void Monitor::timecheck_check_skews()
4636 dout(10) << __func__
<< dendl
;
4637 assert(is_leader());
4638 assert((timecheck_round
% 2) == 0);
4639 if (monmap
->size() == 1) {
4640 assert(0 == "We are alone; we shouldn't have gotten here!");
4643 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4645 bool found_skew
= false;
4646 for (map
<entity_inst_t
, double>::iterator p
= timecheck_skews
.begin();
4647 p
!= timecheck_skews
.end(); ++p
) {
4650 if (timecheck_has_skew(p
->second
, &abs_skew
)) {
4651 dout(10) << __func__
4652 << " " << p
->first
<< " skew " << abs_skew
<< dendl
;
4658 ++timecheck_rounds_since_clean
;
4659 timecheck_reset_event();
4660 } else if (timecheck_rounds_since_clean
> 0) {
4662 << " no clock skews found after " << timecheck_rounds_since_clean
4663 << " rounds" << dendl
;
4664 // make sure the skews are really gone and not just a transient success
4665 // this will run just once if not in the presence of skews again.
4666 timecheck_rounds_since_clean
= 1;
4667 timecheck_reset_event();
4668 timecheck_rounds_since_clean
= 0;
4673 void Monitor::timecheck_report()
4675 dout(10) << __func__
<< dendl
;
4676 assert(is_leader());
4677 assert((timecheck_round
% 2) == 0);
4678 if (monmap
->size() == 1) {
4679 assert(0 == "We are alone; we shouldn't have gotten here!");
4683 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4684 bool do_output
= true; // only output report once
4685 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4686 if (monmap
->get_name(*q
) == name
)
4689 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_REPORT
);
4690 m
->epoch
= get_epoch();
4691 m
->round
= timecheck_round
;
4693 for (map
<entity_inst_t
, double>::iterator it
= timecheck_skews
.begin();
4694 it
!= timecheck_skews
.end(); ++it
) {
4695 double skew
= it
->second
;
4696 double latency
= timecheck_latencies
[it
->first
];
4698 m
->skews
[it
->first
] = skew
;
4699 m
->latencies
[it
->first
] = latency
;
4702 dout(25) << __func__
<< " " << it
->first
4703 << " latency " << latency
4704 << " skew " << skew
<< dendl
;
4708 entity_inst_t inst
= monmap
->get_inst(*q
);
4709 dout(10) << __func__
<< " send report to " << inst
<< dendl
;
4710 messenger
->send_message(m
, inst
);
4714 void Monitor::timecheck()
4716 dout(10) << __func__
<< dendl
;
4717 assert(is_leader());
4718 if (monmap
->size() == 1) {
4719 assert(0 == "We are alone; we shouldn't have gotten here!");
4722 assert(timecheck_round
% 2 != 0);
4724 timecheck_acks
= 1; // we ack ourselves
4726 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4727 << " round " << timecheck_round
<< dendl
;
4729 // we are at the eye of the storm; the point of reference
4730 timecheck_skews
[messenger
->get_myinst()] = 0.0;
4731 timecheck_latencies
[messenger
->get_myinst()] = 0.0;
4733 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4734 if (monmap
->get_name(*it
) == name
)
4737 entity_inst_t inst
= monmap
->get_inst(*it
);
4738 utime_t curr_time
= ceph_clock_now();
4739 timecheck_waiting
[inst
] = curr_time
;
4740 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_PING
);
4741 m
->epoch
= get_epoch();
4742 m
->round
= timecheck_round
;
4743 dout(10) << __func__
<< " send " << *m
<< " to " << inst
<< dendl
;
4744 messenger
->send_message(m
, inst
);
4748 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
4749 const double skew_bound
,
4750 const double latency
)
4752 health_status_t status
= HEALTH_OK
;
4753 assert(latency
>= 0);
4756 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
4757 status
= HEALTH_WARN
;
4758 ss
<< "clock skew " << abs_skew
<< "s"
4759 << " > max " << g_conf
->mon_clock_drift_allowed
<< "s";
4765 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
4767 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4768 dout(10) << __func__
<< " " << *m
<< dendl
;
4769 /* handles PONG's */
4770 assert(m
->op
== MTimeCheck::OP_PONG
);
4772 entity_inst_t other
= m
->get_source_inst();
4773 if (m
->epoch
< get_epoch()) {
4774 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
4775 << " from " << other
4776 << " curr " << get_epoch()
4777 << " -- severely lagged? discard" << dendl
;
4780 assert(m
->epoch
== get_epoch());
4782 if (m
->round
< timecheck_round
) {
4783 dout(1) << __func__
<< " got old round " << m
->round
4784 << " from " << other
4785 << " curr " << timecheck_round
<< " -- discard" << dendl
;
4789 utime_t curr_time
= ceph_clock_now();
4791 assert(timecheck_waiting
.count(other
) > 0);
4792 utime_t timecheck_sent
= timecheck_waiting
[other
];
4793 timecheck_waiting
.erase(other
);
4794 if (curr_time
< timecheck_sent
) {
4795 // our clock was readjusted -- drop everything until it all makes sense.
4796 dout(1) << __func__
<< " our clock was readjusted --"
4797 << " bump round and drop current check"
4799 timecheck_cancel_round();
4803 /* update peer latencies */
4804 double latency
= (double)(curr_time
- timecheck_sent
);
4806 if (timecheck_latencies
.count(other
) == 0)
4807 timecheck_latencies
[other
] = latency
;
4809 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
4810 timecheck_latencies
[other
] = avg_latency
;
4816 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4817 * and 'a' happens to be lower than 'b'; so we use double instead.
4819 * latency is always expected to be >= 0.
4821 * delta, the difference between theirs timestamp and ours, may either be
4822 * lower or higher than 0; will hardly ever be 0.
4824 * The absolute skew is the absolute delta minus the latency, which is
4825 * taken as a whole instead of an rtt given that there is some queueing
4826 * and dispatch times involved and it's hard to assess how long exactly
4827 * it took for the message to travel to the other side and be handled. So
4828 * we call it a bounded skew, the worst case scenario.
4832 * Given that the latency is always positive, we can establish that the
4833 * bounded skew will be:
4835 * 1. positive if the absolute delta is higher than the latency and
4837 * 2. negative if the absolute delta is higher than the latency and
4838 * delta is negative.
4839 * 3. zero if the absolute delta is lower than the latency.
4841 * On 3. we make a judgement call and treat the skew as non-existent.
4842 * This is because that, if the absolute delta is lower than the
4843 * latency, then the apparently existing skew is nothing more than a
4844 * side-effect of the high latency at work.
4846 * This may not be entirely true though, as a severely skewed clock
4847 * may be masked by an even higher latency, but with high latencies
4848 * we probably have worse issues to deal with than just skewed clocks.
4850 assert(latency
>= 0);
4852 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
4853 double abs_delta
= (delta
> 0 ? delta
: -delta
);
4854 double skew_bound
= abs_delta
- latency
;
4858 skew_bound
= -skew_bound
;
4861 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
4862 if (status
!= HEALTH_OK
) {
4863 clog
->health(status
) << other
<< " " << ss
.str();
4866 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
4867 << " delta " << delta
<< " skew_bound " << skew_bound
4868 << " latency " << latency
<< dendl
;
4870 timecheck_skews
[other
] = skew_bound
;
4873 if (timecheck_acks
== quorum
.size()) {
4874 dout(10) << __func__
<< " got pongs from everybody ("
4875 << timecheck_acks
<< " total)" << dendl
;
4876 assert(timecheck_skews
.size() == timecheck_acks
);
4877 assert(timecheck_waiting
.empty());
4878 // everyone has acked, so bump the round to finish it.
4879 timecheck_finish_round();
4883 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
4885 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4886 dout(10) << __func__
<< " " << *m
<< dendl
;
4889 assert(m
->op
== MTimeCheck::OP_PING
|| m
->op
== MTimeCheck::OP_REPORT
);
4891 if (m
->epoch
!= get_epoch()) {
4892 dout(1) << __func__
<< " got wrong epoch "
4893 << "(ours " << get_epoch()
4894 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
4898 if (m
->round
< timecheck_round
) {
4899 dout(1) << __func__
<< " got old round " << m
->round
4900 << " current " << timecheck_round
4901 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
4905 timecheck_round
= m
->round
;
4907 if (m
->op
== MTimeCheck::OP_REPORT
) {
4908 assert((timecheck_round
% 2) == 0);
4909 timecheck_latencies
.swap(m
->latencies
);
4910 timecheck_skews
.swap(m
->skews
);
4914 assert((timecheck_round
% 2) != 0);
4915 MTimeCheck
*reply
= new MTimeCheck(MTimeCheck::OP_PONG
);
4916 utime_t curr_time
= ceph_clock_now();
4917 reply
->timestamp
= curr_time
;
4918 reply
->epoch
= m
->epoch
;
4919 reply
->round
= m
->round
;
4920 dout(10) << __func__
<< " send " << *m
4921 << " to " << m
->get_source_inst() << dendl
;
4922 m
->get_connection()->send_message(reply
);
4925 void Monitor::handle_timecheck(MonOpRequestRef op
)
4927 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4928 dout(10) << __func__
<< " " << *m
<< dendl
;
4931 if (m
->op
!= MTimeCheck::OP_PONG
) {
4932 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
4934 handle_timecheck_leader(op
);
4936 } else if (is_peon()) {
4937 if (m
->op
!= MTimeCheck::OP_PING
&& m
->op
!= MTimeCheck::OP_REPORT
) {
4938 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
4940 handle_timecheck_peon(op
);
4943 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
4947 void Monitor::handle_subscribe(MonOpRequestRef op
)
4949 MMonSubscribe
*m
= static_cast<MMonSubscribe
*>(op
->get_req());
4950 dout(10) << "handle_subscribe " << *m
<< dendl
;
4954 MonSession
*s
= op
->get_session();
4957 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
4960 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4961 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
4964 // remove conflicting subscribes
4965 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4966 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
4967 it
!= s
->sub_map
.end(); ) {
4968 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
4969 Mutex::Locker
l(session_map_lock
);
4970 session_map
.remove_sub((it
++)->second
);
4978 Mutex::Locker
l(session_map_lock
);
4979 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
4980 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
4981 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
4984 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
4985 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
4986 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
4987 Subscription
*sub
= s
->sub_map
[p
->first
];
4988 assert(sub
!= nullptr);
4989 mdsmon()->check_sub(sub
);
4991 } else if (p
->first
== "osdmap") {
4992 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
4993 if (s
->osd_epoch
> p
->second
.start
) {
4994 // client needs earlier osdmaps on purpose, so reset the sent epoch
4997 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
4999 } else if (p
->first
== "osd_pg_creates") {
5000 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
5001 if (monmap
->get_required_features().contains_all(
5002 ceph::features::mon::FEATURE_LUMINOUS
)) {
5003 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
5005 pgmon()->check_sub(s
->sub_map
["osd_pg_creates"]);
5008 } else if (p
->first
== "monmap") {
5009 monmon()->check_sub(s
->sub_map
[p
->first
]);
5010 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
5011 logmon()->check_sub(s
->sub_map
[p
->first
]);
5012 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
5013 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
5014 } else if (p
->first
== "servicemap") {
5015 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
5020 // we only need to reply if the client is old enough to think it
5021 // has to send renewals.
5022 ConnectionRef con
= m
->get_connection();
5023 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
5024 m
->get_connection()->send_message(new MMonSubscribeAck(
5025 monmap
->get_fsid(), (int)g_conf
->mon_subscribe_interval
));
5030 void Monitor::handle_get_version(MonOpRequestRef op
)
5032 MMonGetVersion
*m
= static_cast<MMonGetVersion
*>(op
->get_req());
5033 dout(10) << "handle_get_version " << *m
<< dendl
;
5034 PaxosService
*svc
= NULL
;
5036 MonSession
*s
= op
->get_session();
5039 if (!is_leader() && !is_peon()) {
5040 dout(10) << " waiting for quorum" << dendl
;
5041 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
5045 if (m
->what
== "mdsmap") {
5047 } else if (m
->what
== "fsmap") {
5049 } else if (m
->what
== "osdmap") {
5051 } else if (m
->what
== "monmap") {
5054 derr
<< "invalid map type " << m
->what
<< dendl
;
5058 if (!svc
->is_readable()) {
5059 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
5063 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
5064 reply
->handle
= m
->handle
;
5065 reply
->version
= svc
->get_last_committed();
5066 reply
->oldest_version
= svc
->get_first_committed();
5067 reply
->set_tid(m
->get_tid());
5069 m
->get_connection()->send_message(reply
);
5075 bool Monitor::ms_handle_reset(Connection
*con
)
5077 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
5079 // ignore lossless monitor sessions
5080 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
5083 MonSession
*s
= static_cast<MonSession
*>(con
->get_priv());
5087 // break any con <-> session ref cycle
5088 s
->con
->set_priv(NULL
);
5093 Mutex::Locker
l(lock
);
5095 dout(10) << "reset/close on session " << s
->inst
<< dendl
;
5097 Mutex::Locker
l(session_map_lock
);
5104 bool Monitor::ms_handle_refused(Connection
*con
)
5106 // just log for now...
5107 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
5113 void Monitor::send_latest_monmap(Connection
*con
)
5116 monmap
->encode(bl
, con
->get_features());
5117 con
->send_message(new MMonMap(bl
));
5120 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
5122 MMonGetMap
*m
= static_cast<MMonGetMap
*>(op
->get_req());
5123 dout(10) << "handle_mon_get_map" << dendl
;
5124 send_latest_monmap(m
->get_connection().get());
5127 void Monitor::handle_mon_metadata(MonOpRequestRef op
)
5129 MMonMetadata
*m
= static_cast<MMonMetadata
*>(op
->get_req());
5131 dout(10) << __func__
<< dendl
;
5132 update_mon_metadata(m
->get_source().num(), std::move(m
->data
));
5136 void Monitor::update_mon_metadata(int from
, Metadata
&& m
)
5138 // NOTE: this is now for legacy (kraken or jewel) mons only.
5139 pending_metadata
[from
] = std::move(m
);
5141 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5143 ::encode(pending_metadata
, bl
);
5144 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5145 paxos
->trigger_propose();
5148 int Monitor::load_metadata()
5151 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5154 bufferlist::iterator it
= bl
.begin();
5155 ::decode(mon_metadata
, it
);
5157 pending_metadata
= mon_metadata
;
5161 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
5164 if (!mon_metadata
.count(mon
)) {
5165 err
<< "mon." << mon
<< " not found";
5168 const Metadata
& m
= mon_metadata
[mon
];
5169 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
5170 f
->dump_string(p
->first
.c_str(), p
->second
);
5175 void Monitor::count_metadata(const string
& field
, map
<string
,int> *out
)
5177 for (auto& p
: mon_metadata
) {
5178 auto q
= p
.second
.find(field
);
5179 if (q
== p
.second
.end()) {
5180 (*out
)["unknown"]++;
5182 (*out
)[q
->second
]++;
5187 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5189 map
<string
,int> by_val
;
5190 count_metadata(field
, &by_val
);
5191 f
->open_object_section(field
.c_str());
5192 for (auto& p
: by_val
) {
5193 f
->dump_int(p
.first
.c_str(), p
.second
);
5198 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5200 map
<string
, list
<int> > mons
; // hostname => mon
5201 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5202 it
!= mon_metadata
.end(); ++it
) {
5203 const Metadata
& m
= it
->second
;
5204 Metadata::const_iterator hostname
= m
.find("hostname");
5205 if (hostname
== m
.end()) {
5206 // not likely though
5209 mons
[hostname
->second
].push_back(it
->first
);
5212 dump_services(f
, mons
, "mon");
5216 // ----------------------------------------------
5219 int Monitor::scrub_start()
5221 dout(10) << __func__
<< dendl
;
5222 assert(is_leader());
5224 if (!scrub_result
.empty()) {
5225 clog
->info() << "scrub already in progress";
5229 scrub_event_cancel();
5230 scrub_result
.clear();
5231 scrub_state
.reset(new ScrubState
);
5237 int Monitor::scrub()
5239 assert(is_leader());
5240 assert(scrub_state
);
5242 scrub_cancel_timeout();
5243 wait_for_paxos_write();
5244 scrub_version
= paxos
->get_version();
5247 // scrub all keys if we're the only monitor in the quorum
5249 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5251 for (set
<int>::iterator p
= quorum
.begin();
5256 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5258 r
->key
= scrub_state
->last_key
;
5259 messenger
->send_message(r
, monmap
->get_inst(*p
));
5263 bool r
= _scrub(&scrub_result
[rank
],
5264 &scrub_state
->last_key
,
5267 scrub_state
->finished
= !r
;
5269 // only after we got our scrub results do we really care whether the
5270 // other monitors are late on their results. Also, this way we avoid
5271 // triggering the timeout if we end up getting stuck in _scrub() for
5272 // longer than the duration of the timeout.
5273 scrub_reset_timeout();
5275 if (quorum
.size() == 1) {
5276 assert(scrub_state
->finished
== true);
5282 void Monitor::handle_scrub(MonOpRequestRef op
)
5284 MMonScrub
*m
= static_cast<MMonScrub
*>(op
->get_req());
5285 dout(10) << __func__
<< " " << *m
<< dendl
;
5287 case MMonScrub::OP_SCRUB
:
5292 wait_for_paxos_write();
5294 if (m
->version
!= paxos
->get_version())
5297 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5301 reply
->key
= m
->key
;
5302 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5303 m
->get_connection()->send_message(reply
);
5307 case MMonScrub::OP_RESULT
:
5311 if (m
->version
!= scrub_version
)
5313 // reset the timeout each time we get a result
5314 scrub_reset_timeout();
5316 int from
= m
->get_source().num();
5317 assert(scrub_result
.count(from
) == 0);
5318 scrub_result
[from
] = m
->result
;
5320 if (scrub_result
.size() == quorum
.size()) {
5321 scrub_check_results();
5322 scrub_result
.clear();
5323 if (scrub_state
->finished
)
5333 bool Monitor::_scrub(ScrubResult
*r
,
5334 pair
<string
,string
> *start
,
5338 assert(start
!= NULL
);
5339 assert(num_keys
!= NULL
);
5341 set
<string
> prefixes
= get_sync_targets_names();
5342 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5344 dout(10) << __func__
<< " start (" << *start
<< ")"
5345 << " num_keys " << *num_keys
<< dendl
;
5347 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5349 int scrubbed_keys
= 0;
5350 pair
<string
,string
> last_key
;
5352 while (it
->has_next_chunk()) {
5354 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5357 pair
<string
,string
> k
= it
->get_next_key();
5358 if (prefixes
.count(k
.first
) == 0)
5361 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5362 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5363 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5369 int err
= store
->get(k
.first
, k
.second
, bl
);
5372 uint32_t key_crc
= bl
.crc32c(0);
5373 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5374 << " crc " << key_crc
<< dendl
;
5375 r
->prefix_keys
[k
.first
]++;
5376 if (r
->prefix_crc
.count(k
.first
) == 0) {
5377 r
->prefix_crc
[k
.first
] = 0;
5379 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5381 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5382 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5383 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5384 r
->prefix_crc
[k
.first
] += 1;
5391 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5392 << " scrubbed_keys " << scrubbed_keys
5393 << " has_next " << it
->has_next_chunk() << dendl
;
5396 *num_keys
= scrubbed_keys
;
5398 return it
->has_next_chunk();
5401 void Monitor::scrub_check_results()
5403 dout(10) << __func__
<< dendl
;
5407 ScrubResult
& mine
= scrub_result
[rank
];
5408 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5409 p
!= scrub_result
.end();
5411 if (p
->first
== rank
)
5413 if (p
->second
!= mine
) {
5415 clog
->error() << "scrub mismatch";
5416 clog
->error() << " mon." << rank
<< " " << mine
;
5417 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5421 clog
->debug() << "scrub ok on " << quorum
<< ": " << mine
;
5424 inline void Monitor::scrub_timeout()
5426 dout(1) << __func__
<< " restarting scrub" << dendl
;
5431 void Monitor::scrub_finish()
5433 dout(10) << __func__
<< dendl
;
5435 scrub_event_start();
5438 void Monitor::scrub_reset()
5440 dout(10) << __func__
<< dendl
;
5441 scrub_cancel_timeout();
5443 scrub_result
.clear();
5444 scrub_state
.reset();
5447 inline void Monitor::scrub_update_interval(int secs
)
5449 // we don't care about changes if we are not the leader.
5450 // changes will be visible if we become the leader.
5454 dout(1) << __func__
<< " new interval = " << secs
<< dendl
;
5456 // if scrub already in progress, all changes will already be visible during
5457 // the next round. Nothing to do.
5458 if (scrub_state
!= NULL
)
5461 scrub_event_cancel();
5462 scrub_event_start();
5465 void Monitor::scrub_event_start()
5467 dout(10) << __func__
<< dendl
;
5470 scrub_event_cancel();
5472 if (cct
->_conf
->mon_scrub_interval
<= 0) {
5473 dout(1) << __func__
<< " scrub event is disabled"
5474 << " (mon_scrub_interval = " << cct
->_conf
->mon_scrub_interval
5479 scrub_event
= timer
.add_event_after(
5480 cct
->_conf
->mon_scrub_interval
,
5481 new C_MonContext(this, [this](int) {
5486 void Monitor::scrub_event_cancel()
5488 dout(10) << __func__
<< dendl
;
5490 timer
.cancel_event(scrub_event
);
5495 inline void Monitor::scrub_cancel_timeout()
5497 if (scrub_timeout_event
) {
5498 timer
.cancel_event(scrub_timeout_event
);
5499 scrub_timeout_event
= NULL
;
5503 void Monitor::scrub_reset_timeout()
5505 dout(15) << __func__
<< " reset timeout event" << dendl
;
5506 scrub_cancel_timeout();
5507 scrub_timeout_event
= timer
.add_event_after(
5508 g_conf
->mon_scrub_timeout
,
5509 new C_MonContext(this, [this](int) {
5514 /************ TICK ***************/
5515 void Monitor::new_tick()
5517 timer
.add_event_after(g_conf
->mon_tick_interval
, new C_MonContext(this, [this](int) {
5522 void Monitor::tick()
5525 dout(11) << "tick" << dendl
;
5526 const utime_t now
= ceph_clock_now();
5528 // Check if we need to emit any delayed health check updated messages
5530 const auto min_period
= g_conf
->get_val
<int64_t>(
5531 "mon_health_log_update_period");
5532 for (auto& svc
: paxos_service
) {
5533 auto health
= svc
->get_health_checks();
5535 for (const auto &i
: health
.checks
) {
5536 const std::string
&code
= i
.first
;
5537 const std::string
&summary
= i
.second
.summary
;
5538 const health_status_t severity
= i
.second
.severity
;
5540 auto status_iter
= health_check_log_times
.find(code
);
5541 if (status_iter
== health_check_log_times
.end()) {
5545 auto &log_status
= status_iter
->second
;
5546 bool const changed
= log_status
.last_message
!= summary
5547 || log_status
.severity
!= severity
;
5549 if (changed
&& now
- log_status
.updated_at
> min_period
) {
5550 log_status
.last_message
= summary
;
5551 log_status
.updated_at
= now
;
5552 log_status
.severity
= severity
;
5555 ss
<< "Health check update: " << summary
<< " (" << code
<< ")";
5556 clog
->health(severity
) << ss
.str();
5563 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
) {
5570 Mutex::Locker
l(session_map_lock
);
5571 auto p
= session_map
.sessions
.begin();
5573 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5574 now
> (exited_quorum
+ 2*g_conf
->mon_lease
));
5580 // don't trim monitors
5581 if (s
->inst
.name
.is_mon())
5584 if (s
->session_timeout
< now
&& s
->con
) {
5585 // check keepalive, too
5586 s
->session_timeout
= s
->con
->get_last_keepalive();
5587 s
->session_timeout
+= g_conf
->mon_session_timeout
;
5589 if (s
->session_timeout
< now
) {
5590 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5591 << " (timeout " << s
->session_timeout
5592 << " < now " << now
<< ")" << dendl
;
5593 } else if (out_for_too_long
) {
5594 // boot the client Session because we've taken too long getting back in
5595 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5596 << " because we've been out of quorum too long" << dendl
;
5601 s
->con
->mark_down();
5603 logger
->inc(l_mon_session_trim
);
5606 sync_trim_providers();
5608 if (!maybe_wait_for_quorum
.empty()) {
5609 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5612 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5613 // this is only necessary on upgraded clusters.
5614 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5615 prepare_new_fingerprint(t
);
5616 paxos
->trigger_propose();
5622 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5625 nf
.generate_random();
5626 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5630 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5633 int Monitor::check_fsid()
5636 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5641 string
es(ebl
.c_str(), ebl
.length());
5643 // only keep the first line
5644 size_t pos
= es
.find_first_of('\n');
5645 if (pos
!= string::npos
)
5648 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5650 if (!ondisk
.parse(es
.c_str())) {
5651 derr
<< "error: unable to parse uuid" << dendl
;
5655 if (monmap
->get_fsid() != ondisk
) {
5656 derr
<< "error: cluster_uuid file exists with value " << ondisk
5657 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5664 int Monitor::write_fsid()
5666 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5668 int r
= store
->apply_transaction(t
);
5672 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5675 ss
<< monmap
->get_fsid() << "\n";
5676 string us
= ss
.str();
5681 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5686 * this is the closest thing to a traditional 'mkfs' for ceph.
5687 * initialize the monitor state machines to their initial values.
5689 int Monitor::mkfs(bufferlist
& osdmapbl
)
5691 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5693 // verify cluster fsid
5694 int r
= check_fsid();
5695 if (r
< 0 && r
!= -ENOENT
)
5699 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
5700 magicbl
.append("\n");
5701 t
->put(MONITOR_NAME
, "magic", magicbl
);
5704 features
= get_initial_supported_features();
5707 // save monmap, osdmap, keyring.
5708 bufferlist monmapbl
;
5709 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
5710 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5711 t
->put("mkfs", "monmap", monmapbl
);
5713 if (osdmapbl
.length()) {
5714 // make sure it's a valid osdmap
5717 om
.decode(osdmapbl
);
5719 catch (buffer::error
& e
) {
5720 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
5723 t
->put("mkfs", "osdmap", osdmapbl
);
5726 if (is_keyring_required()) {
5728 string keyring_filename
;
5730 r
= ceph_resolve_file_search(g_conf
->keyring
, keyring_filename
);
5732 derr
<< "unable to find a keyring file on " << g_conf
->keyring
5733 << ": " << cpp_strerror(r
) << dendl
;
5734 if (g_conf
->key
!= "") {
5735 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf
->key
+
5736 "\n\tcaps mon = \"allow *\"\n";
5738 bl
.append(keyring_plaintext
);
5740 bufferlist::iterator i
= bl
.begin();
5741 keyring
.decode_plaintext(i
);
5743 catch (const buffer::error
& e
) {
5744 derr
<< "error decoding keyring " << keyring_plaintext
5745 << ": " << e
.what() << dendl
;
5752 r
= keyring
.load(g_ceph_context
, keyring_filename
);
5754 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
5759 // put mon. key in external keyring; seed with everything else.
5760 extract_save_mon_key(keyring
);
5762 bufferlist keyringbl
;
5763 keyring
.encode_plaintext(keyringbl
);
5764 t
->put("mkfs", "keyring", keyringbl
);
5767 store
->apply_transaction(t
);
5772 int Monitor::write_default_keyring(bufferlist
& bl
)
5775 os
<< g_conf
->mon_data
<< "/keyring";
5778 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
, 0600);
5781 dout(0) << __func__
<< " failed to open " << os
.str()
5782 << ": " << cpp_strerror(err
) << dendl
;
5786 err
= bl
.write_fd(fd
);
5789 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5794 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
5796 EntityName mon_name
;
5797 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
5799 if (keyring
.get_auth(mon_name
, mon_key
)) {
5800 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
5802 pkey
.add(mon_name
, mon_key
);
5804 pkey
.encode_plaintext(bl
);
5805 write_default_keyring(bl
);
5806 keyring
.remove(mon_name
);
5810 bool Monitor::ms_get_authorizer(int service_id
, AuthAuthorizer
**authorizer
,
5813 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id
)
5819 // we only connect to other monitors and mgr; every else connects to us.
5820 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
5821 service_id
!= CEPH_ENTITY_TYPE_MGR
)
5824 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5826 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
5827 AuthNoneClientHandler
handler(g_ceph_context
, nullptr);
5828 handler
.set_global_id(0);
5829 *authorizer
= handler
.build_authorizer(service_id
);
5833 CephXServiceTicketInfo auth_ticket_info
;
5834 CephXSessionAuthInfo info
;
5838 name
.set_type(CEPH_ENTITY_TYPE_MON
);
5839 auth_ticket_info
.ticket
.name
= name
;
5840 auth_ticket_info
.ticket
.global_id
= 0;
5842 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
5843 // mon to mon authentication uses the private monitor shared key and not the
5846 if (!keyring
.get_secret(name
, secret
) &&
5847 !key_server
.get_secret(name
, secret
)) {
5848 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5850 stringstream ss
, ds
;
5851 int err
= key_server
.list_secrets(ds
);
5853 ss
<< "no installed auth entries!";
5855 ss
<< "installed auth entries:";
5856 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
5860 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
,
5861 secret
, (uint64_t)-1);
5863 dout(0) << __func__
<< " failed to build mon session_auth_info "
5864 << cpp_strerror(ret
) << dendl
;
5867 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
5869 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
);
5871 derr
<< __func__
<< " failed to build mgr service session_auth_info "
5872 << cpp_strerror(ret
) << dendl
;
5876 ceph_abort(); // see check at top of fn
5879 CephXTicketBlob blob
;
5880 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
5881 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl
;
5884 bufferlist ticket_data
;
5885 ::encode(blob
, ticket_data
);
5887 bufferlist::iterator iter
= ticket_data
.begin();
5888 CephXTicketHandler
handler(g_ceph_context
, service_id
);
5889 ::decode(handler
.ticket
, iter
);
5891 handler
.session_key
= info
.session_key
;
5893 *authorizer
= handler
.build_authorizer(0);
5898 bool Monitor::ms_verify_authorizer(Connection
*con
, int peer_type
,
5899 int protocol
, bufferlist
& authorizer_data
,
5900 bufferlist
& authorizer_reply
,
5901 bool& isvalid
, CryptoKey
& session_key
,
5902 std::unique_ptr
<AuthAuthorizerChallenge
> *challenge
)
5904 dout(10) << "ms_verify_authorizer " << con
->get_peer_addr()
5905 << " " << ceph_entity_type_name(peer_type
)
5906 << " protocol " << protocol
<< dendl
;
5911 if (peer_type
== CEPH_ENTITY_TYPE_MON
&&
5912 auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5913 // monitor, and cephx is enabled
5915 if (protocol
== CEPH_AUTH_CEPHX
) {
5916 bufferlist::iterator iter
= authorizer_data
.begin();
5917 CephXServiceTicketInfo auth_ticket_info
;
5919 if (authorizer_data
.length()) {
5920 bool ret
= cephx_verify_authorizer(g_ceph_context
, &keyring
, iter
,
5921 auth_ticket_info
, challenge
, authorizer_reply
);
5923 session_key
= auth_ticket_info
.session_key
;
5926 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con
->get_peer_addr() << dendl
;
5930 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl
;