1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
25 #include "common/version.h"
27 #include "osd/OSDMap.h"
29 #include "MonitorDBStore.h"
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
52 #include "messages/MAuthReply.h"
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/OldHealthMonitor.h"
81 #include "mon/HealthMonitor.h"
82 #include "mon/ConfigKeyService.h"
83 #include "common/config.h"
84 #include "common/cmdparse.h"
85 #include "include/assert.h"
86 #include "include/compat.h"
87 #include "perfglue/heap_profiler.h"
89 #include "auth/none/AuthNoneClientHandler.h"
91 #define dout_subsys ceph_subsys_mon
93 #define dout_prefix _prefix(_dout, this)
94 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
95 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
96 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
99 const string
Monitor::MONITOR_NAME
= "monitor";
100 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
105 #undef COMMAND_WITH_FLAG
106 #define FLAG(f) (MonCommand::FLAG_##f)
107 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
108 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
109 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
110 {parsesig, helptext, modulename, req_perms, avail, flags},
111 MonCommand mon_commands
[] = {
112 #include <mon/MonCommands.h>
114 MonCommand pgmonitor_commands
[] = {
115 #include <mon/PGMonitorCommands.h>
118 #undef COMMAND_WITH_FLAG
121 void C_MonContext::finish(int r
) {
122 if (mon
->is_shutdown())
124 FunctionContext::finish(r
);
127 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
128 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
133 con_self(m
? m
->get_loopback_connection() : NULL
),
134 lock("Monitor::lock"),
136 finisher(cct_
, "mon_finisher", "fin"),
137 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf
->mon_cpu_threads
),
138 has_ever_joined(false),
139 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
141 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
142 key_server(cct
, &keyring
),
143 auth_cluster_required(cct
,
144 cct
->_conf
->auth_supported
.empty() ?
145 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
146 auth_service_required(cct
,
147 cct
->_conf
->auth_supported
.empty() ?
148 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
149 mgr_messenger(mgr_m
),
150 mgr_client(cct_
, mgr_m
),
154 state(STATE_PROBING
),
157 required_features(0),
159 quorum_con_features(0),
163 scrub_timeout_event(NULL
),
166 sync_provider_count(0),
169 sync_start_version(0),
170 sync_timeout_event(NULL
),
171 sync_last_committed_floor(0),
175 timecheck_rounds_since_clean(0),
176 timecheck_event(NULL
),
178 paxos_service(PAXOS_NUM
),
180 routed_request_tid(0),
181 op_tracker(cct
, true, 1)
183 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
184 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
186 update_log_clients();
188 paxos
= new Paxos(this, "paxos");
190 paxos_service
[PAXOS_MDSMAP
] = new MDSMonitor(this, paxos
, "mdsmap");
191 paxos_service
[PAXOS_MONMAP
] = new MonmapMonitor(this, paxos
, "monmap");
192 paxos_service
[PAXOS_OSDMAP
] = new OSDMonitor(cct
, this, paxos
, "osdmap");
193 paxos_service
[PAXOS_PGMAP
] = new PGMonitor(this, paxos
, "pgmap");
194 paxos_service
[PAXOS_LOG
] = new LogMonitor(this, paxos
, "logm");
195 paxos_service
[PAXOS_AUTH
] = new AuthMonitor(this, paxos
, "auth");
196 paxos_service
[PAXOS_MGR
] = new MgrMonitor(this, paxos
, "mgr");
197 paxos_service
[PAXOS_MGRSTAT
] = new MgrStatMonitor(this, paxos
, "mgrstat");
198 paxos_service
[PAXOS_HEALTH
] = new HealthMonitor(this, paxos
, "health");
200 health_monitor
= new OldHealthMonitor(this);
201 config_key_service
= new ConfigKeyService(this, paxos
);
203 mon_caps
= new MonCap();
204 bool r
= mon_caps
->parse("allow *", NULL
);
207 exited_quorum
= ceph_clock_now();
209 // prepare local commands
210 local_mon_commands
.resize(ARRAY_SIZE(mon_commands
));
211 for (unsigned i
= 0; i
< ARRAY_SIZE(mon_commands
); ++i
) {
212 local_mon_commands
[i
] = mon_commands
[i
];
214 MonCommand::encode_vector(local_mon_commands
, local_mon_commands_bl
);
216 local_upgrading_mon_commands
= local_mon_commands
;
217 for (unsigned i
= 0; i
< ARRAY_SIZE(pgmonitor_commands
); ++i
) {
218 local_upgrading_mon_commands
.push_back(pgmonitor_commands
[i
]);
220 MonCommand::encode_vector(local_upgrading_mon_commands
,
221 local_upgrading_mon_commands_bl
);
223 // assume our commands until we have an election. this only means
224 // we won't reply with EINVAL before the election; any command that
225 // actually matters will wait until we have quorum etc and then
226 // retry (and revalidate).
227 leader_mon_commands
= local_mon_commands
;
229 // note: OSDMonitor may update this based on the luminous flag.
230 pgservice
= mgrstatmon()->get_pg_stat_service();
235 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
237 delete health_monitor
;
238 delete config_key_service
;
240 assert(session_map
.sessions
.empty());
245 class AdminHook
: public AdminSocketHook
{
248 explicit AdminHook(Monitor
*m
) : mon(m
) {}
249 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
250 bufferlist
& out
) override
{
252 mon
->do_admin_command(command
, cmdmap
, format
, ss
);
258 void Monitor::do_admin_command(string command
, cmdmap_t
& cmdmap
, string format
,
261 Mutex::Locker
l(lock
);
263 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
266 for (cmdmap_t::iterator p
= cmdmap
.begin();
267 p
!= cmdmap
.end(); ++p
) {
268 if (p
->first
== "prefix")
272 args
+= cmd_vartype_stringify(p
->second
);
274 args
= "[" + args
+ "]";
276 bool read_only
= (command
== "mon_status" ||
277 command
== "mon metadata" ||
278 command
== "quorum_status" ||
280 command
== "sessions");
282 (read_only
? audit_clog
->debug() : audit_clog
->info())
283 << "from='admin socket' entity='admin socket' "
284 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
286 if (command
== "mon_status") {
287 get_mon_status(f
.get(), ss
);
290 } else if (command
== "quorum_status") {
291 _quorum_status(f
.get(), ss
);
292 } else if (command
== "sync_force") {
294 if ((!cmd_getval(g_ceph_context
, cmdmap
, "validate", validate
)) ||
295 (validate
!= "--yes-i-really-mean-it")) {
296 ss
<< "are you SURE? this will mean the monitor store will be erased "
297 "the next time the monitor is restarted. pass "
298 "'--yes-i-really-mean-it' if you really do.";
301 sync_force(f
.get(), ss
);
302 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
303 if (!_add_bootstrap_peer_hint(command
, cmdmap
, ss
))
305 } else if (command
== "quorum enter") {
306 elector
.start_participating();
308 ss
<< "started responding to quorum, initiated new election";
309 } else if (command
== "quorum exit") {
311 elector
.stop_participating();
312 ss
<< "stopped responding to quorum, initiated new election";
313 } else if (command
== "ops") {
314 (void)op_tracker
.dump_ops_in_flight(f
.get());
318 } else if (command
== "sessions") {
321 f
->open_array_section("sessions");
322 for (auto p
: session_map
.sessions
) {
323 f
->dump_stream("session") << *p
;
330 assert(0 == "bad AdminSocket command binding");
332 (read_only
? audit_clog
->debug() : audit_clog
->info())
333 << "from='admin socket' "
334 << "entity='admin socket' "
335 << "cmd=" << command
<< " "
336 << "args=" << args
<< ": finished";
340 (read_only
? audit_clog
->debug() : audit_clog
->info())
341 << "from='admin socket' "
342 << "entity='admin socket' "
343 << "cmd=" << command
<< " "
344 << "args=" << args
<< ": aborted";
347 void Monitor::handle_signal(int signum
)
349 assert(signum
== SIGINT
|| signum
== SIGTERM
);
350 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
354 CompatSet
Monitor::get_initial_supported_features()
356 CompatSet::FeatureSet ceph_mon_feature_compat
;
357 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
358 CompatSet::FeatureSet ceph_mon_feature_incompat
;
359 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
360 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
361 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
362 ceph_mon_feature_incompat
);
365 CompatSet
Monitor::get_supported_features()
367 CompatSet compat
= get_initial_supported_features();
368 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
369 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
370 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
371 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
372 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
373 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
377 CompatSet
Monitor::get_legacy_features()
379 CompatSet::FeatureSet ceph_mon_feature_compat
;
380 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
381 CompatSet::FeatureSet ceph_mon_feature_incompat
;
382 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
383 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
384 ceph_mon_feature_incompat
);
387 int Monitor::check_features(MonitorDBStore
*store
)
389 CompatSet required
= get_supported_features();
392 read_features_off_disk(store
, &ondisk
);
394 if (!required
.writeable(ondisk
)) {
395 CompatSet diff
= required
.unsupported(ondisk
);
396 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
403 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
405 bufferlist featuresbl
;
406 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
407 if (featuresbl
.length() == 0) {
408 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
409 << "Assuming it is old-style and introducing one." << dendl
;
410 //we only want the baseline ~v.18 features assumed to be on disk.
411 //If new features are introduced this code needs to disappear or
413 *features
= get_legacy_features();
415 features
->encode(featuresbl
);
416 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
417 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
418 store
->apply_transaction(t
);
420 bufferlist::iterator it
= featuresbl
.begin();
421 features
->decode(it
);
425 void Monitor::read_features()
427 read_features_off_disk(store
, &features
);
428 dout(10) << "features " << features
<< dendl
;
430 calc_quorum_requirements();
431 dout(10) << "required_features " << required_features
<< dendl
;
434 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
438 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
441 const char** Monitor::get_tracked_conf_keys() const
443 static const char* KEYS
[] = {
444 "crushtool", // helpful for testing
445 "mon_election_timeout",
447 "mon_lease_renew_interval_factor",
448 "mon_lease_ack_timeout_factor",
449 "mon_accept_timeout_factor",
453 "clog_to_syslog_facility",
454 "clog_to_syslog_level",
456 "clog_to_graylog_host",
457 "clog_to_graylog_port",
460 // periodic health to clog
461 "mon_health_to_clog",
462 "mon_health_to_clog_interval",
463 "mon_health_to_clog_tick_interval",
465 "mon_scrub_interval",
471 void Monitor::handle_conf_change(const struct md_config_t
*conf
,
472 const std::set
<std::string
> &changed
)
476 dout(10) << __func__
<< " " << changed
<< dendl
;
478 if (changed
.count("clog_to_monitors") ||
479 changed
.count("clog_to_syslog") ||
480 changed
.count("clog_to_syslog_level") ||
481 changed
.count("clog_to_syslog_facility") ||
482 changed
.count("clog_to_graylog") ||
483 changed
.count("clog_to_graylog_host") ||
484 changed
.count("clog_to_graylog_port") ||
485 changed
.count("host") ||
486 changed
.count("fsid")) {
487 update_log_clients();
490 if (changed
.count("mon_health_to_clog") ||
491 changed
.count("mon_health_to_clog_interval") ||
492 changed
.count("mon_health_to_clog_tick_interval")) {
493 health_to_clog_update_conf(changed
);
496 if (changed
.count("mon_scrub_interval")) {
497 scrub_update_interval(conf
->mon_scrub_interval
);
501 void Monitor::update_log_clients()
503 map
<string
,string
> log_to_monitors
;
504 map
<string
,string
> log_to_syslog
;
505 map
<string
,string
> log_channel
;
506 map
<string
,string
> log_prio
;
507 map
<string
,string
> log_to_graylog
;
508 map
<string
,string
> log_to_graylog_host
;
509 map
<string
,string
> log_to_graylog_port
;
513 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
514 log_channel
, log_prio
, log_to_graylog
,
515 log_to_graylog_host
, log_to_graylog_port
,
519 clog
->update_config(log_to_monitors
, log_to_syslog
,
520 log_channel
, log_prio
, log_to_graylog
,
521 log_to_graylog_host
, log_to_graylog_port
,
524 audit_clog
->update_config(log_to_monitors
, log_to_syslog
,
525 log_channel
, log_prio
, log_to_graylog
,
526 log_to_graylog_host
, log_to_graylog_port
,
530 int Monitor::sanitize_options()
534 // mon_lease must be greater than mon_lease_renewal; otherwise we
535 // may incur in leases expiring before they are renewed.
536 if (g_conf
->mon_lease_renew_interval_factor
>= 1.0) {
537 clog
->error() << "mon_lease_renew_interval_factor ("
538 << g_conf
->mon_lease_renew_interval_factor
539 << ") must be less than 1.0";
543 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
544 // got time to renew the lease and get an ack for it. Having both options
545 // with the same value, for a given small vale, could mean timing out if
546 // the monitors happened to be overloaded -- or even under normal load for
547 // a small enough value.
548 if (g_conf
->mon_lease_ack_timeout_factor
<= 1.0) {
549 clog
->error() << "mon_lease_ack_timeout_factor ("
550 << g_conf
->mon_lease_ack_timeout_factor
551 << ") must be greater than 1.0";
558 int Monitor::preinit()
562 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
564 int r
= sanitize_options();
566 derr
<< "option sanitization failed!" << dendl
;
573 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
574 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess");
575 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions", "sadd");
576 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions", "srm");
577 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions");
578 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in");
579 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started");
580 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won");
581 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost");
582 logger
= pcb
.create_perf_counters();
583 cct
->get_perfcounters_collection()->add(logger
);
586 assert(!cluster_logger
);
588 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
589 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
590 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
591 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
592 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
593 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
594 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
595 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster");
596 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space");
597 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space");
598 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
599 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
600 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
601 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
602 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
603 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
604 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
605 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
606 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
607 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects");
608 pcb
.add_u64(l_cluster_num_mds_up
, "num_mds_up", "MDSs that are up");
609 pcb
.add_u64(l_cluster_num_mds_in
, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
610 pcb
.add_u64(l_cluster_num_mds_failed
, "num_mds_failed", "Failed MDS");
611 pcb
.add_u64(l_cluster_mds_epoch
, "mds_epoch", "Current epoch of MDS map");
612 cluster_logger
= pcb
.create_perf_counters();
615 paxos
->init_logger();
617 // verify cluster_uuid
619 int r
= check_fsid();
631 // have we ever joined a quorum?
632 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
633 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
635 if (!has_ever_joined
) {
636 // impose initial quorum restrictions?
637 list
<string
> initial_members
;
638 get_str_list(g_conf
->mon_initial_members
, initial_members
);
640 if (!initial_members
.empty()) {
641 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
643 monmap
->set_initial_members(g_ceph_context
, initial_members
, name
, messenger
->get_myaddr(),
646 dout(10) << " monmap is " << *monmap
<< dendl
;
647 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
649 } else if (!monmap
->contains(name
)) {
650 derr
<< "not in monmap and have been in a quorum before; "
651 << "must have been removed" << dendl
;
652 if (g_conf
->mon_force_quorum_join
) {
653 dout(0) << "we should have died but "
654 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
656 derr
<< "commit suicide!" << dendl
;
663 // We have a potentially inconsistent store state in hands. Get rid of it
665 bool clear_store
= false;
666 if (store
->exists("mon_sync", "in_sync")) {
667 dout(1) << __func__
<< " clean up potentially inconsistent store state"
672 if (store
->get("mon_sync", "force_sync") > 0) {
673 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
678 set
<string
> sync_prefixes
= get_sync_targets_names();
679 store
->clear(sync_prefixes
);
683 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
684 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
687 health_monitor
->init();
689 if (is_keyring_required()) {
690 // we need to bootstrap authentication keys so we can form an
692 if (authmon()->get_last_committed() == 0) {
693 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
695 int err
= store
->get("mkfs", "keyring", bl
);
696 if (err
== 0 && bl
.length() > 0) {
697 // Attempt to decode and extract keyring only if it is found.
699 bufferlist::iterator p
= bl
.begin();
700 ::decode(keyring
, p
);
701 extract_save_mon_key(keyring
);
705 string keyring_loc
= g_conf
->mon_data
+ "/keyring";
707 r
= keyring
.load(cct
, keyring_loc
);
710 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
712 if (key_server
.get_auth(mon_name
, mon_key
)) {
713 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
714 keyring
.add(mon_name
, mon_key
);
716 keyring
.encode_plaintext(bl
);
717 write_default_keyring(bl
);
719 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
726 admin_hook
= new AdminHook(this);
727 AdminSocket
* admin_socket
= cct
->get_admin_socket();
729 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
731 r
= admin_socket
->register_command("mon_status", "mon_status", admin_hook
,
732 "show current monitor status");
734 r
= admin_socket
->register_command("quorum_status", "quorum_status",
735 admin_hook
, "show current quorum status");
737 r
= admin_socket
->register_command("sync_force",
738 "sync_force name=validate,"
740 "strings=--yes-i-really-mean-it",
742 "force sync of and clear monitor store");
744 r
= admin_socket
->register_command("add_bootstrap_peer_hint",
745 "add_bootstrap_peer_hint name=addr,"
748 "add peer address as potential bootstrap"
749 " peer for cluster bringup");
751 r
= admin_socket
->register_command("quorum enter", "quorum enter",
753 "force monitor back into quorum");
755 r
= admin_socket
->register_command("quorum exit", "quorum exit",
757 "force monitor out of the quorum");
759 r
= admin_socket
->register_command("ops",
762 "show the ops currently in flight");
764 r
= admin_socket
->register_command("sessions",
767 "list existing sessions");
772 // add ourselves as a conf observer
773 g_conf
->add_observer(this);
781 dout(2) << "init" << dendl
;
782 Mutex::Locker
l(lock
);
793 messenger
->add_dispatcher_tail(this);
796 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
797 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
800 // add features of myself into feature_map
801 session_map
.feature_map
.add_mon(con_self
->get_features());
805 void Monitor::init_paxos()
807 dout(10) << __func__
<< dendl
;
811 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
812 paxos_service
[i
]->init();
815 refresh_from_paxos(NULL
);
818 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
820 dout(10) << __func__
<< dendl
;
823 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
826 bufferlist::iterator p
= bl
.begin();
827 ::decode(fingerprint
, p
);
829 catch (buffer::error
& e
) {
830 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
833 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
836 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
837 paxos_service
[i
]->refresh(need_bootstrap
);
839 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
840 paxos_service
[i
]->post_refresh();
845 void Monitor::register_cluster_logger()
847 if (!cluster_logger_registered
) {
848 dout(10) << "register_cluster_logger" << dendl
;
849 cluster_logger_registered
= true;
850 cct
->get_perfcounters_collection()->add(cluster_logger
);
852 dout(10) << "register_cluster_logger - already registered" << dendl
;
856 void Monitor::unregister_cluster_logger()
858 if (cluster_logger_registered
) {
859 dout(10) << "unregister_cluster_logger" << dendl
;
860 cluster_logger_registered
= false;
861 cct
->get_perfcounters_collection()->remove(cluster_logger
);
863 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
867 void Monitor::update_logger()
869 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
870 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
873 void Monitor::shutdown()
875 dout(1) << "shutdown" << dendl
;
879 wait_for_paxos_write();
881 state
= STATE_SHUTDOWN
;
883 g_conf
->remove_observer(this);
886 AdminSocket
* admin_socket
= cct
->get_admin_socket();
887 admin_socket
->unregister_command("mon_status");
888 admin_socket
->unregister_command("quorum_status");
889 admin_socket
->unregister_command("sync_force");
890 admin_socket
->unregister_command("add_bootstrap_peer_hint");
891 admin_socket
->unregister_command("quorum enter");
892 admin_socket
->unregister_command("quorum exit");
893 admin_socket
->unregister_command("ops");
894 admin_socket
->unregister_command("sessions");
901 mgr_client
.shutdown();
904 finisher
.wait_for_empty();
910 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
912 health_monitor
->shutdown();
914 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
915 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
921 remove_all_sessions();
924 cct
->get_perfcounters_collection()->remove(logger
);
928 if (cluster_logger
) {
929 if (cluster_logger_registered
)
930 cct
->get_perfcounters_collection()->remove(cluster_logger
);
931 delete cluster_logger
;
932 cluster_logger
= NULL
;
935 log_client
.shutdown();
937 // unlock before msgr shutdown...
940 messenger
->shutdown(); // last thing! ceph_mon.cc will delete mon.
941 mgr_messenger
->shutdown();
944 void Monitor::wait_for_paxos_write()
946 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
947 dout(10) << __func__
<< " flushing pending write" << dendl
;
951 dout(10) << __func__
<< " flushed pending write" << dendl
;
955 void Monitor::bootstrap()
957 dout(10) << "bootstrap" << dendl
;
958 wait_for_paxos_write();
960 sync_reset_requester();
961 unregister_cluster_logger();
962 cancel_probe_timeout();
965 int newrank
= monmap
->get_rank(messenger
->get_myaddr());
966 if (newrank
< 0 && rank
>= 0) {
967 // was i ever part of the quorum?
968 if (has_ever_joined
) {
969 dout(0) << " removed from monmap, suicide." << dendl
;
973 if (newrank
!= rank
) {
974 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
975 messenger
->set_myname(entity_name_t::MON(newrank
));
978 // reset all connections, or else our peers will think we are someone else.
979 messenger
->mark_down_all();
983 state
= STATE_PROBING
;
988 if (g_conf
->mon_compact_on_bootstrap
) {
989 dout(10) << "bootstrap -- triggering compaction" << dendl
;
991 dout(10) << "bootstrap -- finished compaction" << dendl
;
994 // singleton monitor?
995 if (monmap
->size() == 1 && rank
== 0) {
996 win_standalone_election();
1000 reset_probe_timeout();
1002 // i'm outside the quorum
1003 if (monmap
->contains(name
))
1004 outside_quorum
.insert(name
);
1007 dout(10) << "probing other monitors" << dendl
;
1008 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1010 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
),
1011 monmap
->get_inst(i
));
1013 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
1014 p
!= extra_probe_peers
.end();
1016 if (*p
!= messenger
->get_myaddr()) {
1018 i
.name
= entity_name_t::MON(-1);
1020 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
), i
);
1025 bool Monitor::_add_bootstrap_peer_hint(string cmd
, cmdmap_t
& cmdmap
, ostream
& ss
)
1028 if (!cmd_getval(g_ceph_context
, cmdmap
, "addr", addrstr
)) {
1029 ss
<< "unable to parse address string value '"
1030 << cmd_vartype_stringify(cmdmap
["addr"]) << "'";
1033 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' '"
1034 << addrstr
<< "'" << dendl
;
1037 const char *end
= 0;
1038 if (!addr
.parse(addrstr
.c_str(), &end
)) {
1039 ss
<< "failed to parse addr '" << addrstr
<< "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1043 if (is_leader() || is_peon()) {
1044 ss
<< "mon already active; ignoring bootstrap hint";
1048 if (addr
.get_port() == 0)
1049 addr
.set_port(CEPH_MON_PORT
);
1051 extra_probe_peers
.insert(addr
);
1052 ss
<< "adding peer " << addr
<< " to list: " << extra_probe_peers
;
1056 // called by bootstrap(), or on leader|peon -> electing
1057 void Monitor::_reset()
1059 dout(10) << __func__
<< dendl
;
1061 cancel_probe_timeout();
1063 health_events_cleanup();
1064 health_check_log_times
.clear();
1065 scrub_event_cancel();
1067 leader_since
= utime_t();
1068 if (!quorum
.empty()) {
1069 exited_quorum
= ceph_clock_now();
1072 outside_quorum
.clear();
1073 quorum_feature_map
.clear();
1079 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
1081 health_monitor
->finish();
1085 // -----------------------------------------------------------
1088 set
<string
> Monitor::get_sync_targets_names()
1090 set
<string
> targets
;
1091 targets
.insert(paxos
->get_name());
1092 for (int i
= 0; i
< PAXOS_NUM
; ++i
)
1093 paxos_service
[i
]->get_store_prefixes(targets
);
1094 ConfigKeyService
*config_key_service_ptr
= dynamic_cast<ConfigKeyService
*>(config_key_service
);
1095 assert(config_key_service_ptr
);
1096 config_key_service_ptr
->get_store_prefixes(targets
);
1101 void Monitor::sync_timeout()
1103 dout(10) << __func__
<< dendl
;
1104 assert(state
== STATE_SYNCHRONIZING
);
1108 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1110 dout(1) << __func__
<< dendl
;
1112 MonMap latest_monmap
;
1114 // Grab latest monmap from MonmapMonitor
1115 bufferlist monmon_bl
;
1116 int err
= monmon()->get_monmap(monmon_bl
);
1118 if (err
!= -ENOENT
) {
1120 << " something wrong happened while reading the store: "
1121 << cpp_strerror(err
) << dendl
;
1122 assert(0 == "error reading the store");
1125 latest_monmap
.decode(monmon_bl
);
1128 // Grab last backed up monmap (if any) and compare epochs
1129 if (store
->exists("mon_sync", "latest_monmap")) {
1130 bufferlist backup_bl
;
1131 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1134 << " something wrong happened while reading the store: "
1135 << cpp_strerror(err
) << dendl
;
1136 assert(0 == "error reading the store");
1138 assert(backup_bl
.length() > 0);
1140 MonMap backup_monmap
;
1141 backup_monmap
.decode(backup_bl
);
1143 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1144 latest_monmap
= backup_monmap
;
1147 // Check if our current monmap's epoch is greater than the one we've
1149 if (monmap
->epoch
> latest_monmap
.epoch
)
1150 latest_monmap
= *monmap
;
1152 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1154 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1157 void Monitor::sync_reset_requester()
1159 dout(10) << __func__
<< dendl
;
1161 if (sync_timeout_event
) {
1162 timer
.cancel_event(sync_timeout_event
);
1163 sync_timeout_event
= NULL
;
1166 sync_provider
= entity_inst_t();
1169 sync_start_version
= 0;
1172 void Monitor::sync_reset_provider()
1174 dout(10) << __func__
<< dendl
;
1175 sync_providers
.clear();
1178 void Monitor::sync_start(entity_inst_t
&other
, bool full
)
1180 dout(10) << __func__
<< " " << other
<< (full
? " full" : " recent") << dendl
;
1182 assert(state
== STATE_PROBING
||
1183 state
== STATE_SYNCHRONIZING
);
1184 state
= STATE_SYNCHRONIZING
;
1186 // make sure are not a provider for anyone!
1187 sync_reset_provider();
1192 // stash key state, and mark that we are syncing
1193 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1194 sync_stash_critical_state(t
);
1195 t
->put("mon_sync", "in_sync", 1);
1197 sync_last_committed_floor
= MAX(sync_last_committed_floor
, paxos
->get_version());
1198 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1199 << sync_last_committed_floor
<< dendl
;
1200 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1202 store
->apply_transaction(t
);
1204 assert(g_conf
->mon_sync_requester_kill_at
!= 1);
1206 // clear the underlying store
1207 set
<string
> targets
= get_sync_targets_names();
1208 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1209 store
->clear(targets
);
1211 // make sure paxos knows it has been reset. this prevents a
1212 // bootstrap and then different probe reply order from possibly
1213 // deciding a partial or no sync is needed.
1216 assert(g_conf
->mon_sync_requester_kill_at
!= 2);
1219 // assume 'other' as the leader. We will update the leader once we receive
1220 // a reply to the sync start.
1221 sync_provider
= other
;
1223 sync_reset_timeout();
1225 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1227 m
->last_committed
= paxos
->get_version();
1228 messenger
->send_message(m
, sync_provider
);
1231 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1233 dout(10) << __func__
<< dendl
;
1234 bufferlist backup_monmap
;
1235 sync_obtain_latest_monmap(backup_monmap
);
1236 assert(backup_monmap
.length() > 0);
1237 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1240 void Monitor::sync_reset_timeout()
1242 dout(10) << __func__
<< dendl
;
1243 if (sync_timeout_event
)
1244 timer
.cancel_event(sync_timeout_event
);
1245 sync_timeout_event
= new C_MonContext(this, [this](int) {
1248 timer
.add_event_after(g_conf
->mon_sync_timeout
, sync_timeout_event
);
1251 void Monitor::sync_finish(version_t last_committed
)
1253 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1255 assert(g_conf
->mon_sync_requester_kill_at
!= 7);
1258 // finalize the paxos commits
1259 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1260 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1262 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1264 dout(30) << __func__
<< " final tx dump:\n";
1265 JSONFormatter
f(true);
1270 store
->apply_transaction(tx
);
1273 assert(g_conf
->mon_sync_requester_kill_at
!= 8);
1275 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1276 t
->erase("mon_sync", "in_sync");
1277 t
->erase("mon_sync", "force_sync");
1278 t
->erase("mon_sync", "last_committed_floor");
1279 store
->apply_transaction(t
);
1281 assert(g_conf
->mon_sync_requester_kill_at
!= 9);
1285 assert(g_conf
->mon_sync_requester_kill_at
!= 10);
1290 void Monitor::handle_sync(MonOpRequestRef op
)
1292 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1293 dout(10) << __func__
<< " " << *m
<< dendl
;
1296 // provider ---------
1298 case MMonSync::OP_GET_COOKIE_FULL
:
1299 case MMonSync::OP_GET_COOKIE_RECENT
:
1300 handle_sync_get_cookie(op
);
1302 case MMonSync::OP_GET_CHUNK
:
1303 handle_sync_get_chunk(op
);
1306 // client -----------
1308 case MMonSync::OP_COOKIE
:
1309 handle_sync_cookie(op
);
1312 case MMonSync::OP_CHUNK
:
1313 case MMonSync::OP_LAST_CHUNK
:
1314 handle_sync_chunk(op
);
1316 case MMonSync::OP_NO_COOKIE
:
1317 handle_sync_no_cookie(op
);
1321 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1322 assert(0 == "unknown op");
1328 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1330 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1331 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1332 m
->get_connection()->send_message(reply
);
1335 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1337 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1338 if (is_synchronizing()) {
1339 _sync_reply_no_cookie(op
);
1343 assert(g_conf
->mon_sync_provider_kill_at
!= 1);
1345 // make sure they can understand us.
1346 if ((required_features
^ m
->get_connection()->get_features()) &
1347 required_features
) {
1348 dout(5) << " ignoring peer mon." << m
->get_source().num()
1349 << " has features " << std::hex
1350 << m
->get_connection()->get_features()
1351 << " but we require " << required_features
<< std::dec
<< dendl
;
1355 // make up a unique cookie. include election epoch (which persists
1356 // across restarts for the whole cluster) and a counter for this
1357 // process instance. there is no need to be unique *across*
1358 // monitors, though.
1359 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1360 assert(sync_providers
.count(cookie
) == 0);
1362 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1364 SyncProvider
& sp
= sync_providers
[cookie
];
1366 sp
.entity
= m
->get_source_inst();
1367 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1369 set
<string
> sync_targets
;
1370 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1372 sync_targets
= get_sync_targets_names();
1373 sp
.last_committed
= paxos
->get_version();
1374 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1376 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1378 // just catch up paxos
1379 sp
.last_committed
= m
->last_committed
;
1381 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1383 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1384 reply
->last_committed
= sp
.last_committed
;
1385 m
->get_connection()->send_message(reply
);
1388 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1390 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1391 dout(10) << __func__
<< " " << *m
<< dendl
;
1393 if (sync_providers
.count(m
->cookie
) == 0) {
1394 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1395 _sync_reply_no_cookie(op
);
1399 assert(g_conf
->mon_sync_provider_kill_at
!= 2);
1401 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1402 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1404 if (sp
.last_committed
< paxos
->get_first_committed() &&
1405 paxos
->get_first_committed() > 1) {
1406 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1407 << " < our fc " << paxos
->get_first_committed() << dendl
;
1408 sync_providers
.erase(m
->cookie
);
1409 _sync_reply_no_cookie(op
);
1413 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1414 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1416 int left
= g_conf
->mon_sync_max_payload_size
;
1417 while (sp
.last_committed
< paxos
->get_version() && left
> 0) {
1419 sp
.last_committed
++;
1421 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1424 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1425 left
-= bl
.length();
1426 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1429 reply
->last_committed
= sp
.last_committed
;
1431 if (sp
.full
&& left
> 0) {
1432 sp
.synchronizer
->get_chunk_tx(tx
, left
);
1433 sp
.last_key
= sp
.synchronizer
->get_last_key();
1434 reply
->last_key
= sp
.last_key
;
1437 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1438 sp
.last_committed
< paxos
->get_version()) {
1439 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1440 << " key " << sp
.last_key
<< dendl
;
1442 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1443 << " key " << sp
.last_key
<< dendl
;
1444 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1446 assert(g_conf
->mon_sync_provider_kill_at
!= 3);
1448 // clean up our local state
1449 sync_providers
.erase(sp
.cookie
);
1452 ::encode(*tx
, reply
->chunk_bl
);
1454 m
->get_connection()->send_message(reply
);
1459 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1461 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1462 dout(10) << __func__
<< " " << *m
<< dendl
;
1464 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1467 if (m
->get_source_inst() != sync_provider
) {
1468 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1471 sync_cookie
= m
->cookie
;
1472 sync_start_version
= m
->last_committed
;
1474 sync_reset_timeout();
1475 sync_get_next_chunk();
1477 assert(g_conf
->mon_sync_requester_kill_at
!= 3);
1480 void Monitor::sync_get_next_chunk()
1482 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1483 if (g_conf
->mon_inject_sync_get_chunk_delay
> 0) {
1484 dout(20) << __func__
<< " injecting delay of " << g_conf
->mon_inject_sync_get_chunk_delay
<< dendl
;
1485 usleep((long long)(g_conf
->mon_inject_sync_get_chunk_delay
* 1000000.0));
1487 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1488 messenger
->send_message(r
, sync_provider
);
1490 assert(g_conf
->mon_sync_requester_kill_at
!= 4);
1493 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1495 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1496 dout(10) << __func__
<< " " << *m
<< dendl
;
1498 if (m
->cookie
!= sync_cookie
) {
1499 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1502 if (m
->get_source_inst() != sync_provider
) {
1503 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1507 assert(state
== STATE_SYNCHRONIZING
);
1508 assert(g_conf
->mon_sync_requester_kill_at
!= 5);
1510 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1511 tx
->append_from_encoded(m
->chunk_bl
);
1513 dout(30) << __func__
<< " tx dump:\n";
1514 JSONFormatter
f(true);
1519 store
->apply_transaction(tx
);
1521 assert(g_conf
->mon_sync_requester_kill_at
!= 6);
1524 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1525 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1526 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1528 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1530 dout(30) << __func__
<< " tx dump:\n";
1531 JSONFormatter
f(true);
1536 store
->apply_transaction(tx
);
1537 paxos
->init(); // to refresh what we just wrote
1540 if (m
->op
== MMonSync::OP_CHUNK
) {
1541 sync_reset_timeout();
1542 sync_get_next_chunk();
1543 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1544 sync_finish(m
->last_committed
);
1548 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1550 dout(10) << __func__
<< dendl
;
1554 void Monitor::sync_trim_providers()
1556 dout(20) << __func__
<< dendl
;
1558 utime_t now
= ceph_clock_now();
1559 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1560 while (p
!= sync_providers
.end()) {
1561 if (now
> p
->second
.timeout
) {
1562 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
<< " for " << p
->second
.entity
<< dendl
;
1563 sync_providers
.erase(p
++);
1570 // ---------------------------------------------------
1573 void Monitor::cancel_probe_timeout()
1575 if (probe_timeout_event
) {
1576 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1577 timer
.cancel_event(probe_timeout_event
);
1578 probe_timeout_event
= NULL
;
1580 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1584 void Monitor::reset_probe_timeout()
1586 cancel_probe_timeout();
1587 probe_timeout_event
= new C_MonContext(this, [this](int r
) {
1590 double t
= g_conf
->mon_probe_timeout
;
1591 timer
.add_event_after(t
, probe_timeout_event
);
1592 dout(10) << "reset_probe_timeout " << probe_timeout_event
<< " after " << t
<< " seconds" << dendl
;
1595 void Monitor::probe_timeout(int r
)
1597 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1598 assert(is_probing() || is_synchronizing());
1599 assert(probe_timeout_event
);
1600 probe_timeout_event
= NULL
;
1604 void Monitor::handle_probe(MonOpRequestRef op
)
1606 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1607 dout(10) << "handle_probe " << *m
<< dendl
;
1609 if (m
->fsid
!= monmap
->fsid
) {
1610 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1615 case MMonProbe::OP_PROBE
:
1616 handle_probe_probe(op
);
1619 case MMonProbe::OP_REPLY
:
1620 handle_probe_reply(op
);
1623 case MMonProbe::OP_MISSING_FEATURES
:
1624 derr
<< __func__
<< " missing features, have " << CEPH_FEATURES_ALL
1625 << ", required " << m
->required_features
1626 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
)
1632 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1634 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1636 dout(10) << "handle_probe_probe " << m
->get_source_inst() << *m
1637 << " features " << m
->get_connection()->get_features() << dendl
;
1638 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1640 dout(1) << " peer " << m
->get_source_addr() << " missing features "
1641 << missing
<< dendl
;
1642 if (m
->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY
)) {
1643 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1644 name
, has_ever_joined
);
1645 m
->required_features
= required_features
;
1646 m
->get_connection()->send_message(r
);
1651 if (!is_probing() && !is_synchronizing()) {
1652 // If the probing mon is way ahead of us, we need to re-bootstrap.
1653 // Normally we capture this case when we initially bootstrap, but
1654 // it is possible we pass those checks (we overlap with
1655 // quorum-to-be) but fail to join a quorum before it moves past
1656 // us. We need to be kicked back to bootstrap so we can
1657 // synchonize, not keep calling elections.
1658 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1659 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1660 << "ahead of us, re-bootstrapping" << dendl
;
1668 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
);
1671 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1672 r
->paxos_first_version
= paxos
->get_first_committed();
1673 r
->paxos_last_version
= paxos
->get_version();
1674 m
->get_connection()->send_message(r
);
1676 // did we discover a peer here?
1677 if (!monmap
->contains(m
->get_source_addr())) {
1678 dout(1) << " adding peer " << m
->get_source_addr()
1679 << " to list of hints" << dendl
;
1680 extra_probe_peers
.insert(m
->get_source_addr());
1687 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1689 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1690 dout(10) << "handle_probe_reply " << m
->get_source_inst() << *m
<< dendl
;
1691 dout(10) << " monmap is " << *monmap
<< dendl
;
1693 // discover name and addrs during probing or electing states.
1694 if (!is_probing() && !is_electing()) {
1698 // newer map, or they've joined a quorum and we haven't?
1700 monmap
->encode(mybl
, m
->get_connection()->get_features());
1701 // make sure it's actually different; the checks below err toward
1702 // taking the other guy's map, which could cause us to loop.
1703 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1704 MonMap
*newmap
= new MonMap
;
1705 newmap
->decode(m
->monmap_bl
);
1706 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1707 !has_ever_joined
)) {
1708 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1709 << ", mine was " << monmap
->get_epoch() << dendl
;
1711 monmap
->decode(m
->monmap_bl
);
1720 string peer_name
= monmap
->get_name(m
->get_source_addr());
1721 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
1722 dout(10) << " renaming peer " << m
->get_source_addr() << " "
1723 << peer_name
<< " -> " << m
->name
<< " in my monmap"
1725 monmap
->rename(peer_name
, m
->name
);
1727 if (is_electing()) {
1732 dout(10) << " peer name is " << peer_name
<< dendl
;
1735 // new initial peer?
1736 if (monmap
->get_epoch() == 0 &&
1737 monmap
->contains(m
->name
) &&
1738 monmap
->get_addr(m
->name
).is_blank_ip()) {
1739 dout(1) << " learned initial mon " << m
->name
<< " addr " << m
->get_source_addr() << dendl
;
1740 monmap
->set_addr(m
->name
, m
->get_source_addr());
1746 // end discover phase
1747 if (!is_probing()) {
1751 assert(paxos
!= NULL
);
1753 if (is_synchronizing()) {
1754 dout(10) << " currently syncing" << dendl
;
1758 entity_inst_t other
= m
->get_source_inst();
1760 if (m
->paxos_last_version
< sync_last_committed_floor
) {
1761 dout(10) << " peer paxos versions [" << m
->paxos_first_version
1762 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
1763 << sync_last_committed_floor
<< ", ignoring"
1766 if (paxos
->get_version() < m
->paxos_first_version
&&
1767 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
1768 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
1769 << "," << m
->paxos_last_version
<< "]"
1770 << " vs my version " << paxos
->get_version()
1771 << " (too far ahead)"
1773 cancel_probe_timeout();
1774 sync_start(other
, true);
1777 if (paxos
->get_version() + g_conf
->paxos_max_join_drift
< m
->paxos_last_version
) {
1778 dout(10) << " peer paxos last version " << m
->paxos_last_version
1779 << " vs my version " << paxos
->get_version()
1780 << " (too far ahead)"
1782 cancel_probe_timeout();
1783 sync_start(other
, false);
1788 // is there an existing quorum?
1789 if (m
->quorum
.size()) {
1790 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
1792 dout(10) << " peer paxos version " << m
->paxos_last_version
1793 << " vs my version " << paxos
->get_version()
1797 if (monmap
->contains(name
) &&
1798 !monmap
->get_addr(name
).is_blank_ip()) {
1799 // i'm part of the cluster; just initiate a new election
1802 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl
;
1803 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
1804 monmap
->get_inst(*m
->quorum
.begin()));
1807 if (monmap
->contains(m
->name
)) {
1808 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
1809 outside_quorum
.insert(m
->name
);
1811 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
1815 unsigned need
= monmap
->size() / 2 + 1;
1816 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
1817 if (outside_quorum
.size() >= need
) {
1818 if (outside_quorum
.count(name
)) {
1819 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
1822 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
1825 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
1830 void Monitor::join_election()
1832 dout(10) << __func__
<< dendl
;
1833 wait_for_paxos_write();
1835 state
= STATE_ELECTING
;
1837 logger
->inc(l_mon_num_elections
);
1840 void Monitor::start_election()
1842 dout(10) << "start_election" << dendl
;
1843 wait_for_paxos_write();
1845 state
= STATE_ELECTING
;
1847 logger
->inc(l_mon_num_elections
);
1848 logger
->inc(l_mon_election_call
);
1850 clog
->info() << "mon." << name
<< " calling new monitor election";
1851 elector
.call_election();
1854 void Monitor::win_standalone_election()
1856 dout(1) << "win_standalone_election" << dendl
;
1858 // bump election epoch, in case the previous epoch included other
1859 // monitors; we need to be able to make the distinction.
1861 elector
.advance_epoch();
1863 rank
= monmap
->get_rank(name
);
1868 map
<int,Metadata
> metadata
;
1869 collect_metadata(&metadata
[0]);
1871 win_election(elector
.get_epoch(), q
,
1873 ceph::features::mon::get_supported(),
1877 const utime_t
& Monitor::get_leader_since() const
1879 assert(state
== STATE_LEADER
);
1880 return leader_since
;
1883 epoch_t
Monitor::get_epoch()
1885 return elector
.get_epoch();
1888 void Monitor::_finish_svc_election()
1890 assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
1892 for (auto p
: paxos_service
) {
1893 // we already called election_finished() on monmon(); avoid callig twice
1894 if (state
== STATE_LEADER
&& p
== monmon())
1896 p
->election_finished();
1900 void Monitor::win_election(epoch_t epoch
, set
<int>& active
, uint64_t features
,
1901 const mon_feature_t
& mon_features
,
1902 const map
<int,Metadata
>& metadata
)
1904 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
1905 << " features " << features
1906 << " mon_features " << mon_features
1908 assert(is_electing());
1909 state
= STATE_LEADER
;
1910 leader_since
= ceph_clock_now();
1913 quorum_con_features
= features
;
1914 quorum_mon_features
= mon_features
;
1915 pending_metadata
= metadata
;
1916 outside_quorum
.clear();
1918 clog
->info() << "mon." << name
<< "@" << rank
1919 << " won leader election with quorum " << quorum
;
1921 set_leader_commands(get_local_commands(mon_features
));
1923 paxos
->leader_init();
1924 // NOTE: tell monmap monitor first. This is important for the
1925 // bootstrap case to ensure that the very first paxos proposal
1926 // codifies the monmap. Otherwise any manner of chaos can ensue
1927 // when monitors are call elections or participating in a paxos
1928 // round without agreeing on who the participants are.
1929 monmon()->election_finished();
1930 _finish_svc_election();
1931 health_monitor
->start(epoch
);
1933 logger
->inc(l_mon_election_win
);
1935 // inject new metadata in first transaction.
1937 // include previous metadata for missing mons (that aren't part of
1938 // the current quorum).
1939 map
<int,Metadata
> m
= metadata
;
1940 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
1941 if (m
.count(rank
) == 0 &&
1942 mon_metadata
.count(rank
)) {
1943 m
[rank
] = mon_metadata
[rank
];
1947 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1948 // a new transaction immediately after the election finishes. We should
1949 // do that anyway for other reasons, though.
1950 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
1953 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
1957 if (monmap
->size() > 1 &&
1958 monmap
->get_epoch() > 0) {
1960 health_tick_start();
1961 do_health_to_clog_interval();
1962 scrub_event_start();
1966 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
1968 const mon_feature_t
& mon_features
)
1971 leader_since
= utime_t();
1974 outside_quorum
.clear();
1975 quorum_con_features
= features
;
1976 quorum_mon_features
= mon_features
;
1977 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
1978 << " quorum is " << quorum
<< " features are " << quorum_con_features
1979 << " mon_features are " << quorum_mon_features
1983 _finish_svc_election();
1984 health_monitor
->start(epoch
);
1986 logger
->inc(l_mon_election_lose
);
1990 if ((quorum_con_features
& CEPH_FEATURE_MON_METADATA
) &&
1991 !HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
)) {
1992 // for pre-luminous mons only
1994 collect_metadata(&sys_info
);
1995 messenger
->send_message(new MMonMetadata(sys_info
),
1996 monmap
->get_inst(get_leader()));
2000 void Monitor::collect_metadata(Metadata
*m
)
2002 collect_sys_info(m
, g_ceph_context
);
2003 (*m
)["addr"] = stringify(messenger
->get_myaddr());
2006 void Monitor::finish_election()
2008 apply_quorum_to_compatset_features();
2009 apply_monmap_to_compatset_features();
2011 exited_quorum
= utime_t();
2012 finish_contexts(g_ceph_context
, waitfor_quorum
);
2013 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2014 resend_routed_requests();
2016 register_cluster_logger();
2018 // am i named properly?
2019 string cur_name
= monmap
->get_name(messenger
->get_myaddr());
2020 if (cur_name
!= name
) {
2021 dout(10) << " renaming myself from " << cur_name
<< " -> " << name
<< dendl
;
2022 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
2023 monmap
->get_inst(*quorum
.begin()));
2027 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2029 if (new_features
.compare(features
) != 0) {
2030 CompatSet diff
= features
.unsupported(new_features
);
2031 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2032 features
= new_features
;
2034 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2036 store
->apply_transaction(t
);
2038 calc_quorum_requirements();
2042 void Monitor::apply_quorum_to_compatset_features()
2044 CompatSet
new_features(features
);
2045 if (quorum_con_features
& CEPH_FEATURE_OSD_ERASURE_CODES
) {
2046 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2048 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2049 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2051 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
) {
2052 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2054 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
) {
2055 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2057 dout(5) << __func__
<< dendl
;
2058 _apply_compatset_features(new_features
);
2061 void Monitor::apply_monmap_to_compatset_features()
2063 CompatSet
new_features(features
);
2064 mon_feature_t monmap_features
= monmap
->get_required_features();
2066 /* persistent monmap features may go into the compatset.
2067 * optional monmap features may not - why?
2068 * because optional monmap features may be set/unset by the admin,
2069 * and possibly by other means that haven't yet been thought out,
2070 * so we can't make the monitor enforce them on start - because they
2072 * this, of course, does not invalidate setting a compatset feature
2073 * for an optional feature - as long as you make sure to clean it up
2074 * once you unset it.
2076 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2077 assert(ceph::features::mon::get_persistent().contains_all(
2078 ceph::features::mon::FEATURE_KRAKEN
));
2079 // this feature should only ever be set if the quorum supports it.
2080 assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2081 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2083 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
2084 assert(ceph::features::mon::get_persistent().contains_all(
2085 ceph::features::mon::FEATURE_LUMINOUS
));
2086 // this feature should only ever be set if the quorum supports it.
2087 assert(HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
));
2088 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
2091 dout(5) << __func__
<< dendl
;
2092 _apply_compatset_features(new_features
);
2095 void Monitor::calc_quorum_requirements()
2097 required_features
= 0;
2100 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
)) {
2101 required_features
|= CEPH_FEATURE_OSD_ERASURE_CODES
;
2103 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2104 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2106 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
)) {
2107 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
;
2109 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
)) {
2110 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
;
2112 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2113 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2115 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
)) {
2116 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2120 if (monmap
->get_required_features().contains_all(
2121 ceph::features::mon::FEATURE_KRAKEN
)) {
2122 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2124 if (monmap
->get_required_features().contains_all(
2125 ceph::features::mon::FEATURE_LUMINOUS
)) {
2126 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2128 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2131 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2133 *fm
+= session_map
.feature_map
;
2134 for (auto id
: quorum
) {
2136 *fm
+= quorum_feature_map
[id
];
2141 void Monitor::sync_force(Formatter
*f
, ostream
& ss
)
2143 bool free_formatter
= false;
2146 // louzy/lazy hack: default to json if no formatter has been defined
2147 f
= new JSONFormatter();
2148 free_formatter
= true;
2151 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2152 sync_stash_critical_state(tx
);
2153 tx
->put("mon_sync", "force_sync", 1);
2154 store
->apply_transaction(tx
);
2156 f
->open_object_section("sync_force");
2157 f
->dump_int("ret", 0);
2158 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2159 f
->close_section(); // sync_force
2165 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2167 bool free_formatter
= false;
2170 // louzy/lazy hack: default to json if no formatter has been defined
2171 f
= new JSONFormatter();
2172 free_formatter
= true;
2174 f
->open_object_section("quorum_status");
2175 f
->dump_int("election_epoch", get_epoch());
2177 f
->open_array_section("quorum");
2178 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2179 f
->dump_int("mon", *p
);
2180 f
->close_section(); // quorum
2182 list
<string
> quorum_names
= get_quorum_names();
2183 f
->open_array_section("quorum_names");
2184 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2185 f
->dump_string("mon", *p
);
2186 f
->close_section(); // quorum_names
2188 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(*quorum
.begin()));
2190 f
->open_object_section("monmap");
2192 f
->close_section(); // monmap
2194 f
->close_section(); // quorum_status
2200 void Monitor::get_mon_status(Formatter
*f
, ostream
& ss
)
2202 bool free_formatter
= false;
2205 // louzy/lazy hack: default to json if no formatter has been defined
2206 f
= new JSONFormatter();
2207 free_formatter
= true;
2210 f
->open_object_section("mon_status");
2211 f
->dump_string("name", name
);
2212 f
->dump_int("rank", rank
);
2213 f
->dump_string("state", get_state_name());
2214 f
->dump_int("election_epoch", get_epoch());
2216 f
->open_array_section("quorum");
2217 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2218 f
->dump_int("mon", *p
);
2221 f
->close_section(); // quorum
2223 f
->open_object_section("features");
2224 f
->dump_stream("required_con") << required_features
;
2225 mon_feature_t req_mon_features
= get_required_mon_features();
2226 req_mon_features
.dump(f
, "required_mon");
2227 f
->dump_stream("quorum_con") << quorum_con_features
;
2228 quorum_mon_features
.dump(f
, "quorum_mon");
2229 f
->close_section(); // features
2231 f
->open_array_section("outside_quorum");
2232 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2233 f
->dump_string("mon", *p
);
2234 f
->close_section(); // outside_quorum
2236 f
->open_array_section("extra_probe_peers");
2237 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
2238 p
!= extra_probe_peers
.end();
2240 f
->dump_stream("peer") << *p
;
2241 f
->close_section(); // extra_probe_peers
2243 f
->open_array_section("sync_provider");
2244 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2245 p
!= sync_providers
.end();
2247 f
->dump_unsigned("cookie", p
->second
.cookie
);
2248 f
->dump_stream("entity") << p
->second
.entity
;
2249 f
->dump_stream("timeout") << p
->second
.timeout
;
2250 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2251 f
->dump_stream("last_key") << p
->second
.last_key
;
2255 if (is_synchronizing()) {
2256 f
->open_object_section("sync");
2257 f
->dump_stream("sync_provider") << sync_provider
;
2258 f
->dump_unsigned("sync_cookie", sync_cookie
);
2259 f
->dump_unsigned("sync_start_version", sync_start_version
);
2263 if (g_conf
->mon_sync_provider_kill_at
> 0)
2264 f
->dump_int("provider_kill_at", g_conf
->mon_sync_provider_kill_at
);
2265 if (g_conf
->mon_sync_requester_kill_at
> 0)
2266 f
->dump_int("requester_kill_at", g_conf
->mon_sync_requester_kill_at
);
2268 f
->open_object_section("monmap");
2272 f
->dump_object("feature_map", session_map
.feature_map
);
2273 f
->close_section(); // mon_status
2275 if (free_formatter
) {
2276 // flush formatter to ss and delete it iff we created the formatter
2283 // health status to clog
2285 void Monitor::health_tick_start()
2287 if (!cct
->_conf
->mon_health_to_clog
||
2288 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2291 dout(15) << __func__
<< dendl
;
2294 health_tick_event
= new C_MonContext(this, [this](int r
) {
2297 do_health_to_clog();
2298 health_tick_start();
2300 timer
.add_event_after(cct
->_conf
->mon_health_to_clog_tick_interval
,
2304 void Monitor::health_tick_stop()
2306 dout(15) << __func__
<< dendl
;
2308 if (health_tick_event
) {
2309 timer
.cancel_event(health_tick_event
);
2310 health_tick_event
= NULL
;
2314 utime_t
Monitor::health_interval_calc_next_update()
2316 utime_t now
= ceph_clock_now();
2318 time_t secs
= now
.sec();
2319 int remainder
= secs
% cct
->_conf
->mon_health_to_clog_interval
;
2320 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2321 utime_t next
= utime_t(secs
+ adjustment
, 0);
2323 dout(20) << __func__
2324 << " now: " << now
<< ","
2325 << " next: " << next
<< ","
2326 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2332 void Monitor::health_interval_start()
2334 dout(15) << __func__
<< dendl
;
2336 if (!cct
->_conf
->mon_health_to_clog
||
2337 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2341 health_interval_stop();
2342 utime_t next
= health_interval_calc_next_update();
2343 health_interval_event
= new C_MonContext(this, [this](int r
) {
2346 do_health_to_clog_interval();
2348 timer
.add_event_at(next
, health_interval_event
);
2351 void Monitor::health_interval_stop()
2353 dout(15) << __func__
<< dendl
;
2354 if (health_interval_event
) {
2355 timer
.cancel_event(health_interval_event
);
2357 health_interval_event
= NULL
;
2360 void Monitor::health_events_cleanup()
2363 health_interval_stop();
2364 health_status_cache
.reset();
2367 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2369 dout(20) << __func__
<< dendl
;
2371 if (changed
.count("mon_health_to_clog")) {
2372 if (!cct
->_conf
->mon_health_to_clog
) {
2373 health_events_cleanup();
2375 if (!health_tick_event
) {
2376 health_tick_start();
2378 if (!health_interval_event
) {
2379 health_interval_start();
2384 if (changed
.count("mon_health_to_clog_interval")) {
2385 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2386 health_interval_stop();
2388 health_interval_start();
2392 if (changed
.count("mon_health_to_clog_tick_interval")) {
2393 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2396 health_tick_start();
2401 void Monitor::do_health_to_clog_interval()
2403 // outputting to clog may have been disabled in the conf
2404 // since we were scheduled.
2405 if (!cct
->_conf
->mon_health_to_clog
||
2406 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2409 dout(10) << __func__
<< dendl
;
2411 // do we have a cached value for next_clog_update? if not,
2412 // do we know when the last update was?
2414 do_health_to_clog(true);
2415 health_interval_start();
2418 void Monitor::do_health_to_clog(bool force
)
2420 // outputting to clog may have been disabled in the conf
2421 // since we were scheduled.
2422 if (!cct
->_conf
->mon_health_to_clog
||
2423 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2426 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2428 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2430 health_status_t level
= get_health_status(false, nullptr, &summary
);
2432 summary
== health_status_cache
.summary
&&
2433 level
== health_status_cache
.overall
)
2435 clog
->health(level
) << "overall " << summary
;
2436 health_status_cache
.summary
= summary
;
2437 health_status_cache
.overall
= level
;
2440 list
<string
> status
;
2441 health_status_t overall
= get_health(status
, NULL
, NULL
);
2442 dout(25) << __func__
2443 << (force
? " (force)" : "")
2446 string summary
= joinify(status
.begin(), status
.end(), string("; "));
2449 overall
== health_status_cache
.overall
&&
2450 !health_status_cache
.summary
.empty() &&
2451 health_status_cache
.summary
== summary
) {
2456 clog
->info() << summary
;
2458 health_status_cache
.overall
= overall
;
2459 health_status_cache
.summary
= summary
;
2463 health_status_t
Monitor::get_health_status(
2470 health_status_t r
= HEALTH_OK
;
2471 bool compat
= g_conf
->mon_health_preluminous_compat
;
2472 bool compat_warn
= g_conf
->get_val
<bool>("mon_health_preluminous_compat_warning");
2474 f
->open_object_section("health");
2475 f
->open_object_section("checks");
2479 string
*psummary
= f
? nullptr : &summary
;
2480 for (auto& svc
: paxos_service
) {
2481 r
= std::min(r
, svc
->get_health_checks().dump_summary(
2482 f
, psummary
, sep2
, want_detail
));
2487 f
->dump_stream("status") << r
;
2489 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2490 *plain
= stringify(r
);
2491 if (summary
.size()) {
2498 if (f
&& (compat
|| compat_warn
)) {
2499 health_status_t cr
= compat_warn
? min(HEALTH_WARN
, r
) : r
;
2501 f
->open_array_section("summary");
2503 f
->open_object_section("item");
2504 f
->dump_stream("severity") << HEALTH_WARN
;
2505 f
->dump_string("summary", "'ceph health' JSON format has changed in luminous; update your health monitoring scripts");
2508 for (auto& svc
: paxos_service
) {
2509 svc
->get_health_checks().dump_summary_compat(f
);
2513 f
->dump_stream("overall_status") << cr
;
2517 if (f
&& (compat
|| compat_warn
)) {
2518 f
->open_array_section("detail");
2520 f
->dump_string("item", "'ceph health' JSON format has changed in luminous. If you see this your monitoring system is scraping the wrong fields. Disable this with 'mon health preluminous compat warning = false'");
2524 for (auto& svc
: paxos_service
) {
2525 svc
->get_health_checks().dump_detail(f
, plain
, compat
);
2528 if (f
&& (compat
|| compat_warn
)) {
2538 void Monitor::log_health(
2539 const health_check_map_t
& updated
,
2540 const health_check_map_t
& previous
,
2541 MonitorDBStore::TransactionRef t
)
2543 if (!g_conf
->mon_health_to_clog
) {
2547 const utime_t now
= ceph_clock_now();
2549 // FIXME: log atomically as part of @t instead of using clog.
2550 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2551 << " previous " << previous
.checks
.size()
2553 const auto min_log_period
= g_conf
->get_val
<int64_t>(
2554 "mon_health_log_update_period");
2555 for (auto& p
: updated
.checks
) {
2556 auto q
= previous
.checks
.find(p
.first
);
2557 bool logged
= false;
2558 if (q
== previous
.checks
.end()) {
2561 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2563 clog
->health(p
.second
.severity
) << ss
.str();
2567 if (p
.second
.summary
!= q
->second
.summary
||
2568 p
.second
.severity
!= q
->second
.severity
) {
2570 auto status_iter
= health_check_log_times
.find(p
.first
);
2571 if (status_iter
!= health_check_log_times
.end()) {
2572 if (p
.second
.severity
== q
->second
.severity
&&
2573 now
- status_iter
->second
.updated_at
< min_log_period
) {
2574 // We already logged this recently and the severity is unchanged,
2575 // so skip emitting an update of the summary string.
2576 // We'll get an update out of tick() later if the check
2577 // is still failing.
2582 // summary or severity changed (ignore detail changes at this level)
2584 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2585 clog
->health(p
.second
.severity
) << ss
.str();
2590 // Record the time at which we last logged, so that we can check this
2591 // when considering whether/when to print update messages.
2593 auto iter
= health_check_log_times
.find(p
.first
);
2594 if (iter
== health_check_log_times
.end()) {
2595 health_check_log_times
.emplace(p
.first
, HealthCheckLogStatus(
2596 p
.second
.severity
, p
.second
.summary
, now
));
2598 iter
->second
= HealthCheckLogStatus(
2599 p
.second
.severity
, p
.second
.summary
, now
);
2603 for (auto& p
: previous
.checks
) {
2604 if (!updated
.checks
.count(p
.first
)) {
2607 if (p
.first
== "DEGRADED_OBJECTS") {
2608 clog
->info() << "All degraded objects recovered";
2609 } else if (p
.first
== "OSD_FLAGS") {
2610 clog
->info() << "OSD flags cleared";
2612 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2613 << p
.second
.summary
<< ")";
2616 if (health_check_log_times
.count(p
.first
)) {
2617 health_check_log_times
.erase(p
.first
);
2622 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2623 // We might be going into a fully healthy state, check
2625 bool any_checks
= false;
2626 for (auto& svc
: paxos_service
) {
2627 if (&(svc
->get_health_checks()) == &(previous
)) {
2628 // Ignore the ones we're clearing right now
2632 if (svc
->get_health_checks().checks
.size() > 0) {
2638 clog
->info() << "Cluster is now healthy";
2643 health_status_t
Monitor::get_health(list
<string
>& status
,
2644 bufferlist
*detailbl
,
2647 list
<pair
<health_status_t
,string
> > summary
;
2648 list
<pair
<health_status_t
,string
> > detail
;
2651 f
->open_object_section("health");
2653 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin();
2654 p
!= paxos_service
.end();
2656 PaxosService
*s
= *p
;
2657 s
->get_health(summary
, detailbl
? &detail
: NULL
, cct
);
2660 health_monitor
->get_health(summary
, (detailbl
? &detail
: NULL
));
2662 health_status_t overall
= HEALTH_OK
;
2663 if (!timecheck_skews
.empty()) {
2665 for (map
<entity_inst_t
,double>::iterator i
= timecheck_skews
.begin();
2666 i
!= timecheck_skews
.end(); ++i
) {
2667 entity_inst_t inst
= i
->first
;
2668 double skew
= i
->second
;
2669 double latency
= timecheck_latencies
[inst
];
2670 string name
= monmap
->get_name(inst
.addr
);
2672 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
2673 if (tcstatus
!= HEALTH_OK
) {
2674 if (overall
> tcstatus
)
2676 warns
.push_back(name
);
2677 ostringstream tmp_ss
;
2678 tmp_ss
<< "mon." << name
2679 << " addr " << inst
.addr
<< " " << tcss
.str()
2680 << " (latency " << latency
<< "s)";
2681 detail
.push_back(make_pair(tcstatus
, tmp_ss
.str()));
2684 if (!warns
.empty()) {
2686 ss
<< "clock skew detected on";
2687 while (!warns
.empty()) {
2688 ss
<< " mon." << warns
.front();
2693 status
.push_back(ss
.str());
2694 summary
.push_back(make_pair(HEALTH_WARN
, "Monitor clock skew detected "));
2699 f
->open_array_section("summary");
2700 if (!summary
.empty()) {
2701 while (!summary
.empty()) {
2702 if (overall
> summary
.front().first
)
2703 overall
= summary
.front().first
;
2704 status
.push_back(summary
.front().second
);
2706 f
->open_object_section("item");
2707 f
->dump_stream("severity") << summary
.front().first
;
2708 f
->dump_string("summary", summary
.front().second
);
2711 summary
.pop_front();
2719 status
.push_front(fss
.str());
2721 f
->dump_stream("overall_status") << overall
;
2724 f
->open_array_section("detail");
2725 while (!detail
.empty()) {
2727 f
->dump_string("item", detail
.front().second
);
2728 else if (detailbl
!= NULL
) {
2729 detailbl
->append(detail
.front().second
);
2730 detailbl
->append('\n');
2743 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
)
2746 f
->open_object_section("status");
2749 f
->dump_stream("fsid") << monmap
->get_fsid();
2750 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2751 get_health_status(false, f
, nullptr);
2753 list
<string
> health_str
;
2754 get_health(health_str
, nullptr, f
);
2756 f
->dump_unsigned("election_epoch", get_epoch());
2758 f
->open_array_section("quorum");
2759 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2760 f
->dump_int("rank", *p
);
2762 f
->open_array_section("quorum_names");
2763 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2764 f
->dump_string("id", monmap
->get_name(*p
));
2767 f
->open_object_section("monmap");
2770 f
->open_object_section("osdmap");
2771 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
2773 f
->open_object_section("pgmap");
2774 pgservice
->print_summary(f
, NULL
);
2776 f
->open_object_section("fsmap");
2777 mdsmon()->get_fsmap().print_summary(f
, NULL
);
2779 f
->open_object_section("mgrmap");
2780 mgrmon()->get_map().print_summary(f
, nullptr);
2783 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
2786 ss
<< " cluster:\n";
2787 ss
<< " id: " << monmap
->get_fsid() << "\n";
2790 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2791 get_health_status(false, nullptr, &health
,
2795 get_health(ls
, NULL
, f
);
2796 health
= joinify(ls
.begin(), ls
.end(),
2799 ss
<< " health: " << health
<< "\n";
2801 ss
<< "\n \n services:\n";
2804 auto& service_map
= mgrstatmon()->get_service_map();
2805 for (auto& p
: service_map
.services
) {
2806 maxlen
= std::max(maxlen
, p
.first
.size());
2808 string
spacing(maxlen
- 3, ' ');
2809 const auto quorum_names
= get_quorum_names();
2810 const auto mon_count
= monmap
->mon_info
.size();
2811 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
2813 if (quorum_names
.size() != mon_count
) {
2814 std::list
<std::string
> out_of_q
;
2815 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
2816 if (quorum
.count(i
) == 0) {
2817 out_of_q
.push_back(monmap
->ranks
[i
]);
2820 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
2821 out_of_q
.end(), std::string(", "));
2824 if (mgrmon()->in_use()) {
2825 ss
<< " mgr: " << spacing
;
2826 mgrmon()->get_map().print_summary(nullptr, &ss
);
2829 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2830 ss
<< " mds: " << spacing
<< mdsmon()->get_fsmap() << "\n";
2832 ss
<< " osd: " << spacing
;
2833 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
2835 for (auto& p
: service_map
.services
) {
2836 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
2837 << p
.second
.get_summary() << "\n";
2841 ss
<< "\n \n data:\n";
2842 pgservice
->print_summary(NULL
, &ss
);
2847 void Monitor::_generate_command_map(map
<string
,cmd_vartype
>& cmdmap
,
2848 map
<string
,string
> ¶m_str_map
)
2850 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
2851 p
!= cmdmap
.end(); ++p
) {
2852 if (p
->first
== "prefix")
2854 if (p
->first
== "caps") {
2856 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
2857 cv
.size() % 2 == 0) {
2858 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
2859 string k
= string("caps_") + cv
[i
];
2860 param_str_map
[k
] = cv
[i
+ 1];
2865 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
2869 const MonCommand
*Monitor::_get_moncommand(
2870 const string
&cmd_prefix
,
2871 const vector
<MonCommand
>& cmds
)
2873 for (auto& c
: cmds
) {
2874 if (c
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
2881 bool Monitor::_allowed_command(MonSession
*s
, string
&module
, string
&prefix
,
2882 const map
<string
,cmd_vartype
>& cmdmap
,
2883 const map
<string
,string
>& param_str_map
,
2884 const MonCommand
*this_cmd
) {
2886 bool cmd_r
= this_cmd
->requires_perm('r');
2887 bool cmd_w
= this_cmd
->requires_perm('w');
2888 bool cmd_x
= this_cmd
->requires_perm('x');
2890 bool capable
= s
->caps
.is_capable(
2892 CEPH_ENTITY_TYPE_MON
,
2894 module
, prefix
, param_str_map
,
2895 cmd_r
, cmd_w
, cmd_x
);
2897 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
2901 void Monitor::format_command_descriptions(const std::vector
<MonCommand
> &commands
,
2907 f
->open_object_section("command_descriptions");
2908 for (const auto &cmd
: commands
) {
2909 unsigned flags
= cmd
.flags
;
2910 if (hide_mgr_flag
) {
2911 flags
&= ~MonCommand::FLAG_MGR
;
2913 ostringstream secname
;
2914 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
2915 dump_cmddesc_to_json(f
, secname
.str(),
2916 cmd
.cmdstring
, cmd
.helpstring
, cmd
.module
,
2917 cmd
.req_perms
, cmd
.availability
, flags
);
2920 f
->close_section(); // command_descriptions
2925 bool Monitor::is_keyring_required()
2927 string auth_cluster_required
= g_conf
->auth_supported
.empty() ?
2928 g_conf
->auth_cluster_required
: g_conf
->auth_supported
;
2929 string auth_service_required
= g_conf
->auth_supported
.empty() ?
2930 g_conf
->auth_service_required
: g_conf
->auth_supported
;
2932 return auth_service_required
== "cephx" ||
2933 auth_cluster_required
== "cephx";
2936 struct C_MgrProxyCommand
: public Context
{
2942 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
2943 : mon(mon
), op(op
), size(s
) { }
2944 void finish(int r
) {
2945 Mutex::Locker
l(mon
->lock
);
2946 mon
->mgr_proxy_bytes
-= size
;
2947 mon
->reply_command(op
, r
, outs
, outbl
, 0);
2951 void Monitor::handle_command(MonOpRequestRef op
)
2953 assert(op
->is_type_command());
2954 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
2955 if (m
->fsid
!= monmap
->fsid
) {
2956 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
2957 reply_command(op
, -EPERM
, "wrong fsid", 0);
2961 MonSession
*session
= static_cast<MonSession
*>(
2962 m
->get_connection()->get_priv());
2964 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
2967 BOOST_SCOPE_EXIT_ALL(=) {
2971 if (m
->cmd
.empty()) {
2972 string rs
= "No command supplied";
2973 reply_command(op
, -EINVAL
, rs
, 0);
2978 vector
<string
> fullcmd
;
2979 map
<string
, cmd_vartype
> cmdmap
;
2980 stringstream ss
, ds
;
2984 rs
= "unrecognized command";
2986 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
2987 // ss has reason for failure
2990 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
2991 reply_command(op
, r
, rs
, 0);
2995 // check return value. If no prefix parameter provided,
2996 // return value will be false, then return error info.
2997 if (!cmd_getval(g_ceph_context
, cmdmap
, "prefix", prefix
)) {
2998 reply_command(op
, -EINVAL
, "command prefix not found", 0);
3002 // check prefix is empty
3003 if (prefix
.empty()) {
3004 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
3008 if (prefix
== "get_command_descriptions") {
3010 Formatter
*f
= Formatter::create("json");
3011 // hide mgr commands until luminous upgrade is complete
3012 bool hide_mgr_flag
=
3013 osdmon()->osdmap
.require_osd_release
< CEPH_RELEASE_LUMINOUS
;
3015 std::vector
<MonCommand
> commands
;
3017 // only include mgr commands once all mons are upgrade (and we've dropped
3018 // the hard-coded PGMonitor commands)
3019 if (quorum_mon_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
3020 commands
= static_cast<MgrMonitor
*>(
3021 paxos_service
[PAXOS_MGR
])->get_command_descs();
3024 for (auto& c
: leader_mon_commands
) {
3025 commands
.push_back(c
);
3028 format_command_descriptions(commands
, f
, &rdata
, hide_mgr_flag
);
3030 reply_command(op
, 0, "", rdata
, 0);
3037 dout(0) << "handle_command " << *m
<< dendl
;
3040 cmd_getval(g_ceph_context
, cmdmap
, "format", format
, string("plain"));
3041 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
3043 get_str_vec(prefix
, fullcmd
);
3045 // make sure fullcmd is not empty.
3046 // invalid prefix will cause empty vector fullcmd.
3047 // such as, prefix=";,,;"
3048 if (fullcmd
.empty()) {
3049 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3053 module
= fullcmd
[0];
3055 // validate command is in leader map
3057 const MonCommand
*leader_cmd
;
3058 const auto& mgr_cmds
= mgrmon()->get_command_descs();
3059 const MonCommand
*mgr_cmd
= nullptr;
3060 if (!mgr_cmds
.empty()) {
3061 mgr_cmd
= _get_moncommand(prefix
, mgr_cmds
);
3063 leader_cmd
= _get_moncommand(prefix
, leader_mon_commands
);
3065 leader_cmd
= mgr_cmd
;
3067 reply_command(op
, -EINVAL
, "command not known", 0);
3071 // validate command is in our map & matches, or forward if it is allowed
3072 const MonCommand
*mon_cmd
= _get_moncommand(
3074 get_local_commands(quorum_mon_features
));
3080 if (leader_cmd
->is_noforward()) {
3081 reply_command(op
, -EINVAL
,
3082 "command not locally supported and not allowed to forward",
3086 dout(10) << "Command not locally supported, forwarding request "
3088 forward_request_leader(op
);
3090 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3091 if (mon_cmd
->is_noforward()) {
3092 reply_command(op
, -EINVAL
,
3093 "command not compatible with leader and not allowed to forward",
3097 dout(10) << "Command not compatible with leader, forwarding request "
3099 forward_request_leader(op
);
3104 if (mon_cmd
->is_obsolete() ||
3105 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3106 && mon_cmd
->is_deprecated())) {
3107 reply_command(op
, -ENOTSUP
,
3108 "command is obsolete; please check usage and/or man page",
3113 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3114 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3115 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3119 /* what we perceive as being the service the command falls under */
3120 string
service(mon_cmd
->module
);
3122 dout(25) << __func__
<< " prefix='" << prefix
3123 << "' module='" << module
3124 << "' service='" << service
<< "'" << dendl
;
3127 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3129 // validate user's permissions for requested command
3130 map
<string
,string
> param_str_map
;
3131 _generate_command_map(cmdmap
, param_str_map
);
3132 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3133 param_str_map
, mon_cmd
)) {
3134 dout(1) << __func__
<< " access denied" << dendl
;
3135 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3136 << "from='" << session
->inst
<< "' "
3137 << "entity='" << session
->entity_name
<< "' "
3138 << "cmd=" << m
->cmd
<< ": access denied";
3139 reply_command(op
, -EACCES
, "access denied", 0);
3143 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3144 << "from='" << session
->inst
<< "' "
3145 << "entity='" << session
->entity_name
<< "' "
3146 << "cmd=" << m
->cmd
<< ": dispatch";
3148 if (mon_cmd
->is_mgr() &&
3149 osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3150 const auto& hdr
= m
->get_header();
3151 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3153 g_conf
->mon_client_bytes
* g_conf
->mon_mgr_proxy_client_bytes_ratio
;
3154 if (mgr_proxy_bytes
+ size
> max
) {
3155 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3156 << " + " << size
<< " > max " << max
<< dendl
;
3157 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3160 mgr_proxy_bytes
+= size
;
3161 dout(10) << __func__
<< " proxying mgr command (+" << size
3162 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3163 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3164 mgr_client
.start_command(m
->cmd
,
3168 new C_OnFinisher(fin
, &finisher
));
3172 if ((module
== "mds" || module
== "fs") &&
3173 prefix
!= "fs authorize") {
3174 mdsmon()->dispatch(op
);
3177 if ((module
== "osd" || prefix
== "pg map") &&
3178 prefix
!= "osd last-stat-seq") {
3179 osdmon()->dispatch(op
);
3183 if (module
== "pg") {
3184 pgmon()->dispatch(op
);
3187 if (module
== "mon" &&
3188 /* Let the Monitor class handle the following commands:
3193 prefix
!= "mon compact" &&
3194 prefix
!= "mon scrub" &&
3195 prefix
!= "mon sync force" &&
3196 prefix
!= "mon metadata" &&
3197 prefix
!= "mon versions" &&
3198 prefix
!= "mon count-metadata") {
3199 monmon()->dispatch(op
);
3202 if (module
== "auth" || prefix
== "fs authorize") {
3203 authmon()->dispatch(op
);
3206 if (module
== "log") {
3207 logmon()->dispatch(op
);
3211 if (module
== "config-key") {
3212 config_key_service
->dispatch(op
);
3216 if (module
== "mgr") {
3217 mgrmon()->dispatch(op
);
3221 if (prefix
== "fsid") {
3223 f
->open_object_section("fsid");
3224 f
->dump_stream("fsid") << monmap
->fsid
;
3231 reply_command(op
, 0, "", rdata
, 0);
3235 if (prefix
== "scrub" || prefix
== "mon scrub") {
3236 wait_for_paxos_write();
3238 int r
= scrub_start();
3239 reply_command(op
, r
, "", rdata
, 0);
3240 } else if (is_peon()) {
3241 forward_request_leader(op
);
3243 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3248 if (prefix
== "compact" || prefix
== "mon compact") {
3249 dout(1) << "triggering manual compaction" << dendl
;
3250 utime_t start
= ceph_clock_now();
3252 utime_t end
= ceph_clock_now();
3254 dout(1) << "finished manual compaction in " << end
<< " seconds" << dendl
;
3256 oss
<< "compacted " << g_conf
->get_val
<std::string
>("mon_keyvaluedb") << " in " << end
<< " seconds";
3260 else if (prefix
== "injectargs") {
3261 vector
<string
> injected_args
;
3262 cmd_getval(g_ceph_context
, cmdmap
, "injected_args", injected_args
);
3263 if (!injected_args
.empty()) {
3264 dout(0) << "parsing injected options '" << injected_args
<< "'" << dendl
;
3266 r
= g_conf
->injectargs(str_join(injected_args
, " "), &oss
);
3267 ss
<< "injectargs:" << oss
.str();
3271 rs
= "must supply options to be parsed in a single string";
3274 } else if (prefix
== "time-sync-status") {
3276 f
.reset(Formatter::create("json-pretty"));
3277 f
->open_object_section("time_sync");
3278 if (!timecheck_skews
.empty()) {
3279 f
->open_object_section("time_skew_status");
3280 for (auto& i
: timecheck_skews
) {
3281 entity_inst_t inst
= i
.first
;
3282 double skew
= i
.second
;
3283 double latency
= timecheck_latencies
[inst
];
3284 string name
= monmap
->get_name(inst
.addr
);
3286 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3287 f
->open_object_section(name
.c_str());
3288 f
->dump_float("skew", skew
);
3289 f
->dump_float("latency", latency
);
3290 f
->dump_stream("health") << tcstatus
;
3291 if (tcstatus
!= HEALTH_OK
) {
3292 f
->dump_stream("details") << tcss
.str();
3298 f
->open_object_section("timechecks");
3299 f
->dump_unsigned("epoch", get_epoch());
3300 f
->dump_int("round", timecheck_round
);
3301 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3302 "on-going" : "finished");
3308 } else if (prefix
== "config set") {
3310 cmd_getval(cct
, cmdmap
, "key", key
);
3312 cmd_getval(cct
, cmdmap
, "value", val
);
3313 r
= g_conf
->set_val(key
, val
, true, &ss
);
3315 g_conf
->apply_changes(nullptr);
3319 } else if (prefix
== "status" ||
3320 prefix
== "health" ||
3323 cmd_getval(g_ceph_context
, cmdmap
, "detail", detail
);
3325 if (prefix
== "status") {
3326 // get_cluster_status handles f == NULL
3327 get_cluster_status(ds
, f
.get());
3334 } else if (prefix
== "health") {
3335 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3337 get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3341 rdata
.append(plain
);
3344 list
<string
> health_str
;
3345 get_health(health_str
, detail
== "detail" ? &rdata
: NULL
, f
.get());
3350 assert(!health_str
.empty());
3351 ds
<< health_str
.front();
3352 health_str
.pop_front();
3353 if (!health_str
.empty()) {
3355 ds
<< joinify(health_str
.begin(), health_str
.end(), string("; "));
3360 if (detail
== "detail")
3364 } else if (prefix
== "df") {
3365 bool verbose
= (detail
== "detail");
3367 f
->open_object_section("stats");
3369 pgservice
->dump_fs_stats(&ds
, f
.get(), verbose
);
3372 pgservice
->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3380 assert(0 == "We should never get here!");
3386 } else if (prefix
== "report") {
3388 // this must be formatted, in its current form
3390 f
.reset(Formatter::create("json-pretty"));
3391 f
->open_object_section("report");
3392 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3393 f
->dump_string("version", ceph_version_to_str());
3394 f
->dump_string("commit", git_version_to_str());
3395 f
->dump_stream("timestamp") << ceph_clock_now();
3397 vector
<string
> tagsvec
;
3398 cmd_getval(g_ceph_context
, cmdmap
, "tags", tagsvec
);
3399 string tagstr
= str_join(tagsvec
, " ");
3400 if (!tagstr
.empty())
3401 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3402 f
->dump_string("tag", tagstr
);
3404 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3405 get_health_status(true, f
.get(), nullptr);
3407 list
<string
> health_str
;
3408 get_health(health_str
, nullptr, f
.get());
3411 monmon()->dump_info(f
.get());
3412 osdmon()->dump_info(f
.get());
3413 mdsmon()->dump_info(f
.get());
3414 authmon()->dump_info(f
.get());
3415 pgservice
->dump_info(f
.get());
3417 paxos
->dump_info(f
.get());
3423 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT
);
3426 } else if (prefix
== "osd last-stat-seq") {
3428 cmd_getval(g_ceph_context
, cmdmap
, "id", osd
);
3429 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3431 f
->dump_unsigned("seq", seq
);
3439 } else if (prefix
== "node ls") {
3440 string
node_type("all");
3441 cmd_getval(g_ceph_context
, cmdmap
, "type", node_type
);
3443 f
.reset(Formatter::create("json-pretty"));
3444 if (node_type
== "all") {
3445 f
->open_object_section("nodes");
3446 print_nodes(f
.get(), ds
);
3447 osdmon()->print_nodes(f
.get());
3448 mdsmon()->print_nodes(f
.get());
3450 } else if (node_type
== "mon") {
3451 print_nodes(f
.get(), ds
);
3452 } else if (node_type
== "osd") {
3453 osdmon()->print_nodes(f
.get());
3454 } else if (node_type
== "mds") {
3455 mdsmon()->print_nodes(f
.get());
3461 } else if (prefix
== "features") {
3462 if (!is_leader() && !is_peon()) {
3463 dout(10) << " waiting for quorum" << dendl
;
3464 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3468 forward_request_leader(op
);
3472 f
.reset(Formatter::create("json-pretty"));
3474 get_combined_feature_map(&fm
);
3475 f
->dump_object("features", fm
);
3479 } else if (prefix
== "mon metadata") {
3481 f
.reset(Formatter::create("json-pretty"));
3484 bool all
= !cmd_getval(g_ceph_context
, cmdmap
, "id", name
);
3486 // Dump a single mon's metadata
3487 int mon
= monmap
->get_rank(name
);
3489 rs
= "requested mon not found";
3493 f
->open_object_section("mon_metadata");
3494 r
= get_mon_metadata(mon
, f
.get(), ds
);
3497 // Dump all mons' metadata
3499 f
->open_array_section("mon_metadata");
3500 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3501 std::ostringstream get_err
;
3502 f
->open_object_section("mon");
3503 f
->dump_string("name", monmap
->get_name(rank
));
3504 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3506 if (r
== -ENOENT
|| r
== -EINVAL
) {
3507 dout(1) << get_err
.str() << dendl
;
3508 // Drop error, list what metadata we do have
3510 } else if (r
!= 0) {
3511 derr
<< "Unexpected error from get_mon_metadata: "
3512 << cpp_strerror(r
) << dendl
;
3513 ds
<< get_err
.str();
3523 } else if (prefix
== "mon versions") {
3525 f
.reset(Formatter::create("json-pretty"));
3526 count_metadata("ceph_version", f
.get());
3531 } else if (prefix
== "mon count-metadata") {
3533 f
.reset(Formatter::create("json-pretty"));
3535 cmd_getval(g_ceph_context
, cmdmap
, "property", field
);
3536 count_metadata(field
, f
.get());
3541 } else if (prefix
== "quorum_status") {
3542 // make sure our map is readable and up to date
3543 if (!is_leader() && !is_peon()) {
3544 dout(10) << " waiting for quorum" << dendl
;
3545 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3548 _quorum_status(f
.get(), ds
);
3552 } else if (prefix
== "mon_status") {
3553 get_mon_status(f
.get(), ds
);
3559 } else if (prefix
== "sync force" ||
3560 prefix
== "mon sync force") {
3561 string validate1
, validate2
;
3562 cmd_getval(g_ceph_context
, cmdmap
, "validate1", validate1
);
3563 cmd_getval(g_ceph_context
, cmdmap
, "validate2", validate2
);
3564 if (validate1
!= "--yes-i-really-mean-it" ||
3565 validate2
!= "--i-know-what-i-am-doing") {
3567 rs
= "are you SURE? this will mean the monitor store will be "
3568 "erased. pass '--yes-i-really-mean-it "
3569 "--i-know-what-i-am-doing' if you really do.";
3572 sync_force(f
.get(), ds
);
3575 } else if (prefix
== "heap") {
3576 if (!ceph_using_tcmalloc())
3577 rs
= "tcmalloc not enabled, can't use heap profiler commands\n";
3580 cmd_getval(g_ceph_context
, cmdmap
, "heapcmd", heapcmd
);
3581 // XXX 1-element vector, change at callee or make vector here?
3582 vector
<string
> heapcmd_vec
;
3583 get_str_vec(heapcmd
, heapcmd_vec
);
3584 ceph_heap_profiler_handle_command(heapcmd_vec
, ds
);
3589 } else if (prefix
== "quorum") {
3591 cmd_getval(g_ceph_context
, cmdmap
, "quorumcmd", quorumcmd
);
3592 if (quorumcmd
== "exit") {
3594 elector
.stop_participating();
3595 rs
= "stopped responding to quorum, initiated new election";
3597 } else if (quorumcmd
== "enter") {
3598 elector
.start_participating();
3600 rs
= "started responding to quorum, initiated new election";
3603 rs
= "needs a valid 'quorum' command";
3606 } else if (prefix
== "version") {
3608 f
->open_object_section("version");
3609 f
->dump_string("version", pretty_version_to_str());
3613 ds
<< pretty_version_to_str();
3618 } else if (prefix
== "versions") {
3620 f
.reset(Formatter::create("json-pretty"));
3621 map
<string
,int> overall
;
3622 f
->open_object_section("version");
3623 map
<string
,int> mon
, mgr
, osd
, mds
;
3625 count_metadata("ceph_version", &mon
);
3626 f
->open_object_section("mon");
3627 for (auto& p
: mon
) {
3628 f
->dump_int(p
.first
.c_str(), p
.second
);
3629 overall
[p
.first
] += p
.second
;
3633 mgrmon()->count_metadata("ceph_version", &mgr
);
3634 f
->open_object_section("mgr");
3635 for (auto& p
: mgr
) {
3636 f
->dump_int(p
.first
.c_str(), p
.second
);
3637 overall
[p
.first
] += p
.second
;
3641 osdmon()->count_metadata("ceph_version", &osd
);
3642 f
->open_object_section("osd");
3643 for (auto& p
: osd
) {
3644 f
->dump_int(p
.first
.c_str(), p
.second
);
3645 overall
[p
.first
] += p
.second
;
3649 mdsmon()->count_metadata("ceph_version", &mds
);
3650 f
->open_object_section("mds");
3651 for (auto& p
: mds
) {
3652 f
->dump_int(p
.first
.c_str(), p
.second
);
3653 overall
[p
.first
] += p
.second
;
3657 for (auto& p
: mgrstatmon()->get_service_map().services
) {
3658 f
->open_object_section(p
.first
.c_str());
3660 p
.second
.count_metadata("ceph_version", &m
);
3662 f
->dump_int(q
.first
.c_str(), q
.second
);
3663 overall
[q
.first
] += q
.second
;
3668 f
->open_object_section("overall");
3669 for (auto& p
: overall
) {
3670 f
->dump_int(p
.first
.c_str(), p
.second
);
3680 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3681 reply_command(op
, r
, rs
, rdata
, 0);
3684 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3687 reply_command(op
, rc
, rs
, rdata
, version
);
3690 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3691 bufferlist
& rdata
, version_t version
)
3693 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
3694 assert(m
->get_type() == MSG_MON_COMMAND
);
3695 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3696 reply
->set_tid(m
->get_tid());
3697 reply
->set_data(rdata
);
3698 send_reply(op
, reply
);
3702 // ------------------------
3703 // request/reply routing
3705 // a client/mds/osd will connect to a random monitor. we need to forward any
3706 // messages requiring state updates to the leader, and then route any replies
3707 // back via the correct monitor and back to them. (the monitor will not
3708 // initiate any connections.)
3710 void Monitor::forward_request_leader(MonOpRequestRef op
)
3712 op
->mark_event(__func__
);
3714 int mon
= get_leader();
3715 MonSession
*session
= op
->get_session();
3716 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
3718 if (req
->get_source().is_mon() && req
->get_source_addr() != messenger
->get_myaddr()) {
3719 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
3720 } else if (session
->proxy_con
) {
3721 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
3722 } else if (!session
->closed
) {
3723 RoutedRequest
*rr
= new RoutedRequest
;
3724 rr
->tid
= ++routed_request_tid
;
3725 rr
->client_inst
= req
->get_source_inst();
3726 rr
->con
= req
->get_connection();
3727 rr
->con_features
= rr
->con
->get_features();
3728 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
3729 rr
->session
= static_cast<MonSession
*>(session
->get());
3731 routed_requests
[rr
->tid
] = rr
;
3732 session
->routed_request_tids
.insert(rr
->tid
);
3734 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
3735 << " features " << rr
->con_features
<< dendl
;
3737 MForward
*forward
= new MForward(rr
->tid
,
3741 forward
->set_priority(req
->get_priority());
3742 if (session
->auth_handler
) {
3743 forward
->entity_name
= session
->entity_name
;
3744 } else if (req
->get_source().is_mon()) {
3745 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
3747 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3748 op
->mark_forwarded();
3749 assert(op
->get_req()->get_type() != 0);
3751 dout(10) << "forward_request no session for request " << *req
<< dendl
;
3755 // fake connection attached to forwarded messages
3756 struct AnonConnection
: public Connection
{
3757 explicit AnonConnection(CephContext
*cct
) : Connection(cct
, NULL
) {}
3759 int send_message(Message
*m
) override
{
3760 assert(!"send_message on anonymous connection");
3762 void send_keepalive() override
{
3763 assert(!"send_keepalive on anonymous connection");
3765 void mark_down() override
{
3768 void mark_disposable() override
{
3771 bool is_connected() override
{ return false; }
3774 //extract the original message and put it into the regular dispatch function
3775 void Monitor::handle_forward(MonOpRequestRef op
)
3777 MForward
*m
= static_cast<MForward
*>(op
->get_req());
3778 dout(10) << "received forwarded message from " << m
->client
3779 << " via " << m
->get_source_inst() << dendl
;
3780 MonSession
*session
= op
->get_session();
3783 if (!session
->is_capable("mon", MON_CAP_X
)) {
3784 dout(0) << "forward from entity with insufficient caps! "
3785 << session
->caps
<< dendl
;
3787 // see PaxosService::dispatch(); we rely on this being anon
3788 // (c->msgr == NULL)
3789 PaxosServiceMessage
*req
= m
->claim_message();
3790 assert(req
!= NULL
);
3792 ConnectionRef
c(new AnonConnection(cct
));
3793 MonSession
*s
= new MonSession(req
->get_source_inst(),
3794 static_cast<Connection
*>(c
.get()));
3795 c
->set_priv(s
->get());
3796 c
->set_peer_addr(m
->client
.addr
);
3797 c
->set_peer_type(m
->client
.name
.type());
3798 c
->set_features(m
->con_features
);
3800 s
->caps
= m
->client_caps
;
3801 dout(10) << " caps are " << s
->caps
<< dendl
;
3802 s
->entity_name
= m
->entity_name
;
3803 dout(10) << " entity name '" << s
->entity_name
<< "' type "
3804 << s
->entity_name
.get_type() << dendl
;
3805 s
->proxy_con
= m
->get_connection();
3806 s
->proxy_tid
= m
->tid
;
3808 req
->set_connection(c
);
3810 // not super accurate, but better than nothing.
3811 req
->set_recv_stamp(m
->get_recv_stamp());
3814 * note which election epoch this is; we will drop the message if
3815 * there is a future election since our peers will resend routed
3816 * requests in that case.
3818 req
->rx_election_epoch
= get_epoch();
3820 /* Because this is a special fake connection, we need to break
3821 the ref loop between Connection and MonSession differently
3822 than we normally do. Here, the Message refers to the Connection
3823 which refers to the Session, and nobody else refers to the Connection
3824 or the Session. And due to the special nature of this message,
3825 nobody refers to the Connection via the Session. So, clear out that
3826 half of the ref loop.*/
3829 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
3836 void Monitor::try_send_message(Message
*m
, const entity_inst_t
& to
)
3838 dout(10) << "try_send_message " << *m
<< " to " << to
<< dendl
;
3841 encode_message(m
, quorum_con_features
, bl
);
3843 messenger
->send_message(m
, to
);
3845 for (int i
=0; i
<(int)monmap
->size(); i
++) {
3847 messenger
->send_message(new MRoute(bl
, to
), monmap
->get_inst(i
));
3851 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
3853 op
->mark_event(__func__
);
3855 MonSession
*session
= op
->get_session();
3857 Message
*req
= op
->get_req();
3858 ConnectionRef con
= op
->get_connection();
3860 reply
->set_cct(g_ceph_context
);
3861 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
3864 dout(2) << "send_reply no connection, dropping reply " << *reply
3865 << " to " << req
<< " " << *req
<< dendl
;
3867 op
->mark_event("reply: no connection");
3871 if (!session
->con
&& !session
->proxy_con
) {
3872 dout(2) << "send_reply no connection, dropping reply " << *reply
3873 << " to " << req
<< " " << *req
<< dendl
;
3875 op
->mark_event("reply: no connection");
3879 if (session
->proxy_con
) {
3880 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
3881 << " via " << session
->proxy_con
->get_peer_addr()
3882 << " for request " << *req
<< dendl
;
3883 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
3884 op
->mark_event("reply: send routed request");
3886 session
->con
->send_message(reply
);
3887 op
->mark_event("reply: send");
3891 void Monitor::no_reply(MonOpRequestRef op
)
3893 MonSession
*session
= op
->get_session();
3894 Message
*req
= op
->get_req();
3896 if (session
->proxy_con
) {
3897 dout(10) << "no_reply to " << req
->get_source_inst()
3898 << " via " << session
->proxy_con
->get_peer_addr()
3899 << " for request " << *req
<< dendl
;
3900 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
3901 op
->mark_event("no_reply: send routed request");
3903 dout(10) << "no_reply to " << req
->get_source_inst()
3904 << " " << *req
<< dendl
;
3905 op
->mark_event("no_reply");
3909 void Monitor::handle_route(MonOpRequestRef op
)
3911 MRoute
*m
= static_cast<MRoute
*>(op
->get_req());
3912 MonSession
*session
= op
->get_session();
3914 if (!session
->is_capable("mon", MON_CAP_X
)) {
3915 dout(0) << "MRoute received from entity without appropriate perms! "
3920 dout(10) << "handle_route " << *m
->msg
<< " to " << m
->dest
<< dendl
;
3922 dout(10) << "handle_route null to " << m
->dest
<< dendl
;
3925 if (m
->session_mon_tid
) {
3926 if (routed_requests
.count(m
->session_mon_tid
)) {
3927 RoutedRequest
*rr
= routed_requests
[m
->session_mon_tid
];
3929 // reset payload, in case encoding is dependent on target features
3931 m
->msg
->clear_payload();
3932 rr
->con
->send_message(m
->msg
);
3935 if (m
->send_osdmap_first
) {
3936 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
3937 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
3938 true, MonOpRequestRef());
3940 assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
3941 routed_requests
.erase(m
->session_mon_tid
);
3942 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
3945 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
3948 dout(10) << " not a routed request, trying to send anyway" << dendl
;
3950 messenger
->send_message(m
->msg
, m
->dest
);
3956 void Monitor::resend_routed_requests()
3958 dout(10) << "resend_routed_requests" << dendl
;
3959 int mon
= get_leader();
3960 list
<Context
*> retry
;
3961 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
3962 p
!= routed_requests
.end();
3964 RoutedRequest
*rr
= p
->second
;
3967 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
3968 rr
->op
->mark_event("retry routed request");
3969 retry
.push_back(new C_RetryMessage(this, rr
->op
));
3971 assert(rr
->session
->routed_request_tids
.count(p
->first
));
3972 rr
->session
->routed_request_tids
.erase(p
->first
);
3976 bufferlist::iterator q
= rr
->request_bl
.begin();
3977 PaxosServiceMessage
*req
= (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
3978 rr
->op
->mark_event("resend forwarded message to leader");
3979 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
<< dendl
;
3980 MForward
*forward
= new MForward(rr
->tid
, req
, rr
->con_features
,
3982 req
->put(); // forward takes its own ref; drop ours.
3983 forward
->client
= rr
->client_inst
;
3984 forward
->set_priority(req
->get_priority());
3985 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3989 routed_requests
.clear();
3990 finish_contexts(g_ceph_context
, retry
);
3994 void Monitor::remove_session(MonSession
*s
)
3996 dout(10) << "remove_session " << s
<< " " << s
->inst
3997 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
4000 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
4001 p
!= s
->routed_request_tids
.end();
4003 assert(routed_requests
.count(*p
));
4004 RoutedRequest
*rr
= routed_requests
[*p
];
4005 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
4007 routed_requests
.erase(*p
);
4009 s
->routed_request_tids
.clear();
4010 s
->con
->set_priv(NULL
);
4011 session_map
.remove_session(s
);
4012 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4013 logger
->inc(l_mon_session_rm
);
4016 void Monitor::remove_all_sessions()
4018 Mutex::Locker
l(session_map_lock
);
4019 while (!session_map
.sessions
.empty()) {
4020 MonSession
*s
= session_map
.sessions
.front();
4023 logger
->inc(l_mon_session_rm
);
4026 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4029 void Monitor::send_command(const entity_inst_t
& inst
,
4030 const vector
<string
>& com
)
4032 dout(10) << "send_command " << inst
<< "" << com
<< dendl
;
4033 MMonCommand
*c
= new MMonCommand(monmap
->fsid
);
4035 try_send_message(c
, inst
);
4038 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
4041 * Wait list the new session until we're in the quorum, assuming it's
4043 * tick() will periodically send them back through so we can send
4044 * the client elsewhere if we don't think we're getting back in.
4046 * But we whitelist a few sorts of messages:
4047 * 1) Monitors can talk to us at any time, of course.
4048 * 2) auth messages. It's unlikely to go through much faster, but
4049 * it's possible we've just lost our quorum status and we want to take...
4050 * 3) command messages. We want to accept these under all possible
4053 Message
*m
= op
->get_req();
4054 MonSession
*s
= op
->get_session();
4055 ConnectionRef con
= op
->get_connection();
4056 utime_t too_old
= ceph_clock_now();
4057 too_old
-= g_ceph_context
->_conf
->mon_lease
;
4058 if (m
->get_recv_stamp() > too_old
&&
4059 con
->is_connected()) {
4060 dout(5) << "waitlisting message " << *m
<< dendl
;
4061 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
4062 op
->mark_wait_for_quorum();
4064 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
4066 // proxied sessions aren't registered and don't have a con; don't remove
4068 if (!s
->proxy_con
) {
4069 Mutex::Locker
l(session_map_lock
);
4076 void Monitor::_ms_dispatch(Message
*m
)
4078 if (is_shutdown()) {
4083 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
4084 bool src_is_mon
= op
->is_src_mon();
4085 op
->mark_event("mon:_ms_dispatch");
4086 MonSession
*s
= op
->get_session();
4087 if (s
&& s
->closed
) {
4091 if (src_is_mon
&& s
) {
4092 ConnectionRef con
= m
->get_connection();
4093 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
4094 // only update features if this is a non-anonymous connection
4095 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
4096 << " (was " << s
->con_features
4097 << ", now " << con
->get_features() << ")" << dendl
;
4098 // connection features changed - recreate session.
4099 if (s
->con
&& s
->con
!= con
) {
4100 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
4101 << " changed from session; mark down and replace" << dendl
;
4102 s
->con
->mark_down();
4104 if (s
->item
.is_on_list()) {
4105 // forwarded messages' sessions are not in the sessions map and
4106 // exist only while the op is being handled.
4115 // if the sender is not a monitor, make sure their first message for a
4116 // session is an MAuth. If it is not, assume it's a stray message,
4117 // and considering that we are creating a new session it is safe to
4118 // assume that the sender hasn't authenticated yet, so we have no way
4119 // of assessing whether we should handle it or not.
4120 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
4121 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4122 m
->get_type() != CEPH_MSG_PING
)) {
4123 dout(1) << __func__
<< " dropping stray message " << *m
4124 << " from " << m
->get_source_inst() << dendl
;
4128 ConnectionRef con
= m
->get_connection();
4130 Mutex::Locker
l(session_map_lock
);
4131 s
= session_map
.new_session(m
->get_source_inst(), con
.get());
4134 con
->set_priv(s
->get());
4135 dout(10) << __func__
<< " new session " << s
<< " " << *s
4136 << " features 0x" << std::hex
4137 << s
->con_features
<< std::dec
<< dendl
;
4140 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4141 logger
->inc(l_mon_session_add
);
4144 // give it monitor caps; the peer type has been authenticated
4145 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4146 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4147 s
->caps
= *mon_caps
;
4151 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->inst
4157 s
->session_timeout
= ceph_clock_now();
4158 s
->session_timeout
+= g_conf
->mon_session_timeout
;
4160 if (s
->auth_handler
) {
4161 s
->entity_name
= s
->auth_handler
->get_entity_name();
4163 dout(20) << " caps " << s
->caps
.get_str() << dendl
;
4165 if ((is_synchronizing() ||
4166 (s
->global_id
== 0 && !exited_quorum
.is_zero())) &&
4168 m
->get_type() != CEPH_MSG_PING
) {
4169 waitlist_or_zap_client(op
);
4176 void Monitor::dispatch_op(MonOpRequestRef op
)
4178 op
->mark_event("mon:dispatch_op");
4179 MonSession
*s
= op
->get_session();
4182 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4186 /* we will consider the default type as being 'monitor' until proven wrong */
4187 op
->set_type_monitor();
4188 /* deal with all messages that do not necessarily need caps */
4189 bool dealt_with
= true;
4190 switch (op
->get_req()->get_type()) {
4192 case MSG_MON_GLOBAL_ID
:
4194 op
->set_type_service();
4195 /* no need to check caps here */
4196 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4203 /* MMonGetMap may be used by clients to obtain a monmap *before*
4204 * authenticating with the monitor. We need to handle these without
4205 * checking caps because, even on a cluster without cephx, we only set
4206 * session caps *after* the auth handshake. A good example of this
4207 * is when a client calls MonClient::get_monmap_privately(), which does
4208 * not authenticate when obtaining a monmap.
4210 case CEPH_MSG_MON_GET_MAP
:
4211 handle_mon_get_map(op
);
4214 case CEPH_MSG_MON_METADATA
:
4215 return handle_mon_metadata(op
);
4224 /* well, maybe the op belongs to a service... */
4225 op
->set_type_service();
4226 /* deal with all messages which caps should be checked somewhere else */
4228 switch (op
->get_req()->get_type()) {
4231 case CEPH_MSG_MON_GET_OSDMAP
:
4232 case CEPH_MSG_POOLOP
:
4233 case MSG_OSD_BEACON
:
4234 case MSG_OSD_MARK_ME_DOWN
:
4236 case MSG_OSD_FAILURE
:
4239 case MSG_OSD_PGTEMP
:
4240 case MSG_OSD_PG_CREATED
:
4241 case MSG_REMOVE_SNAPS
:
4242 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4246 case MSG_MDS_BEACON
:
4247 case MSG_MDS_OFFLOAD_TARGETS
:
4248 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4252 case MSG_MGR_BEACON
:
4253 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4257 case MSG_MON_MGR_REPORT
:
4258 case CEPH_MSG_STATFS
:
4259 case MSG_GETPOOLSTATS
:
4260 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4265 paxos_service
[PAXOS_PGMAP
]->dispatch(op
);
4270 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4273 // handle_command() does its own caps checking
4274 case MSG_MON_COMMAND
:
4275 op
->set_type_command();
4286 /* nop, looks like it's not a service message; revert back to monitor */
4287 op
->set_type_monitor();
4289 /* messages we, the Monitor class, need to deal with
4290 * but may be sent by clients. */
4292 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4293 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4294 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4300 switch (op
->get_req()->get_type()) {
4303 case CEPH_MSG_MON_GET_VERSION
:
4304 handle_get_version(op
);
4307 case CEPH_MSG_MON_SUBSCRIBE
:
4308 /* FIXME: check what's being subscribed, filter accordingly */
4309 handle_subscribe(op
);
4319 if (!op
->is_src_mon()) {
4320 dout(1) << __func__
<< " unexpected monitor message from"
4321 << " non-monitor entity " << op
->get_req()->get_source_inst()
4322 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4326 /* messages that should only be sent by another monitor */
4328 switch (op
->get_req()->get_type()) {
4338 // Sync (i.e., the new slurp, but on steroids)
4346 /* log acks are sent from a monitor we sent the MLog to, and are
4347 never sent by clients to us. */
4349 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4354 op
->set_type_service();
4355 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4361 op
->set_type_paxos();
4362 MMonPaxos
*pm
= static_cast<MMonPaxos
*>(op
->get_req());
4363 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4368 if (state
== STATE_SYNCHRONIZING
) {
4369 // we are synchronizing. These messages would do us no
4370 // good, thus just drop them and ignore them.
4371 dout(10) << __func__
<< " ignore paxos msg from "
4372 << pm
->get_source_inst() << dendl
;
4377 if (pm
->epoch
> get_epoch()) {
4381 if (pm
->epoch
!= get_epoch()) {
4385 paxos
->dispatch(op
);
4390 case MSG_MON_ELECTION
:
4391 op
->set_type_election();
4392 //check privileges here for simplicity
4393 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4394 dout(0) << "MMonElection received from entity without enough caps!"
4395 << op
->get_session()->caps
<< dendl
;
4398 if (!is_probing() && !is_synchronizing()) {
4399 elector
.dispatch(op
);
4408 handle_timecheck(op
);
4411 case MSG_MON_HEALTH
:
4412 health_monitor
->dispatch(op
);
4415 case MSG_MON_HEALTH_CHECKS
:
4416 op
->set_type_service();
4417 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4425 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4434 void Monitor::handle_ping(MonOpRequestRef op
)
4436 MPing
*m
= static_cast<MPing
*>(op
->get_req());
4437 dout(10) << __func__
<< " " << *m
<< dendl
;
4438 MPing
*reply
= new MPing
;
4439 entity_inst_t inst
= m
->get_source_inst();
4441 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4442 f
->open_object_section("pong");
4444 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
4445 get_health_status(false, f
.get(), nullptr);
4447 list
<string
> health_str
;
4448 get_health(health_str
, nullptr, f
.get());
4453 get_mon_status(f
.get(), ss
);
4459 ::encode(ss
.str(), payload
);
4460 reply
->set_payload(payload
);
4461 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4462 messenger
->send_message(reply
, inst
);
4465 void Monitor::timecheck_start()
4467 dout(10) << __func__
<< dendl
;
4468 timecheck_cleanup();
4469 timecheck_start_round();
4472 void Monitor::timecheck_finish()
4474 dout(10) << __func__
<< dendl
;
4475 timecheck_cleanup();
4478 void Monitor::timecheck_start_round()
4480 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4481 assert(is_leader());
4483 if (monmap
->size() == 1) {
4484 assert(0 == "We are alone; this shouldn't have been scheduled!");
4488 if (timecheck_round
% 2) {
4489 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4490 utime_t curr_time
= ceph_clock_now();
4491 double max
= g_conf
->mon_timecheck_interval
*3;
4492 if (curr_time
- timecheck_round_start
< max
) {
4493 dout(10) << __func__
<< " keep current round going" << dendl
;
4496 dout(10) << __func__
4497 << " finish current timecheck and start new" << dendl
;
4498 timecheck_cancel_round();
4502 assert(timecheck_round
% 2 == 0);
4505 timecheck_round_start
= ceph_clock_now();
4506 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4510 dout(10) << __func__
<< " setting up next event" << dendl
;
4511 timecheck_reset_event();
4514 void Monitor::timecheck_finish_round(bool success
)
4516 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4517 assert(timecheck_round
% 2);
4519 timecheck_round_start
= utime_t();
4522 assert(timecheck_waiting
.empty());
4523 assert(timecheck_acks
== quorum
.size());
4525 timecheck_check_skews();
4529 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4530 << " peers still waiting:";
4531 for (map
<entity_inst_t
,utime_t
>::iterator p
= timecheck_waiting
.begin();
4532 p
!= timecheck_waiting
.end(); ++p
) {
4533 *_dout
<< " " << p
->first
.name
;
4536 timecheck_waiting
.clear();
4538 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4541 void Monitor::timecheck_cancel_round()
4543 timecheck_finish_round(false);
4546 void Monitor::timecheck_cleanup()
4548 timecheck_round
= 0;
4550 timecheck_round_start
= utime_t();
4552 if (timecheck_event
) {
4553 timer
.cancel_event(timecheck_event
);
4554 timecheck_event
= NULL
;
4556 timecheck_waiting
.clear();
4557 timecheck_skews
.clear();
4558 timecheck_latencies
.clear();
4560 timecheck_rounds_since_clean
= 0;
4563 void Monitor::timecheck_reset_event()
4565 if (timecheck_event
) {
4566 timer
.cancel_event(timecheck_event
);
4567 timecheck_event
= NULL
;
4571 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4573 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4574 delay
= cct
->_conf
->mon_timecheck_interval
;
4577 dout(10) << __func__
<< " delay " << delay
4578 << " rounds_since_clean " << timecheck_rounds_since_clean
4581 timecheck_event
= new C_MonContext(this, [this](int) {
4582 timecheck_start_round();
4584 timer
.add_event_after(delay
, timecheck_event
);
4587 void Monitor::timecheck_check_skews()
4589 dout(10) << __func__
<< dendl
;
4590 assert(is_leader());
4591 assert((timecheck_round
% 2) == 0);
4592 if (monmap
->size() == 1) {
4593 assert(0 == "We are alone; we shouldn't have gotten here!");
4596 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4598 bool found_skew
= false;
4599 for (map
<entity_inst_t
, double>::iterator p
= timecheck_skews
.begin();
4600 p
!= timecheck_skews
.end(); ++p
) {
4603 if (timecheck_has_skew(p
->second
, &abs_skew
)) {
4604 dout(10) << __func__
4605 << " " << p
->first
<< " skew " << abs_skew
<< dendl
;
4611 ++timecheck_rounds_since_clean
;
4612 timecheck_reset_event();
4613 } else if (timecheck_rounds_since_clean
> 0) {
4615 << " no clock skews found after " << timecheck_rounds_since_clean
4616 << " rounds" << dendl
;
4617 // make sure the skews are really gone and not just a transient success
4618 // this will run just once if not in the presence of skews again.
4619 timecheck_rounds_since_clean
= 1;
4620 timecheck_reset_event();
4621 timecheck_rounds_since_clean
= 0;
4626 void Monitor::timecheck_report()
4628 dout(10) << __func__
<< dendl
;
4629 assert(is_leader());
4630 assert((timecheck_round
% 2) == 0);
4631 if (monmap
->size() == 1) {
4632 assert(0 == "We are alone; we shouldn't have gotten here!");
4636 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4637 bool do_output
= true; // only output report once
4638 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4639 if (monmap
->get_name(*q
) == name
)
4642 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_REPORT
);
4643 m
->epoch
= get_epoch();
4644 m
->round
= timecheck_round
;
4646 for (map
<entity_inst_t
, double>::iterator it
= timecheck_skews
.begin();
4647 it
!= timecheck_skews
.end(); ++it
) {
4648 double skew
= it
->second
;
4649 double latency
= timecheck_latencies
[it
->first
];
4651 m
->skews
[it
->first
] = skew
;
4652 m
->latencies
[it
->first
] = latency
;
4655 dout(25) << __func__
<< " " << it
->first
4656 << " latency " << latency
4657 << " skew " << skew
<< dendl
;
4661 entity_inst_t inst
= monmap
->get_inst(*q
);
4662 dout(10) << __func__
<< " send report to " << inst
<< dendl
;
4663 messenger
->send_message(m
, inst
);
4667 void Monitor::timecheck()
4669 dout(10) << __func__
<< dendl
;
4670 assert(is_leader());
4671 if (monmap
->size() == 1) {
4672 assert(0 == "We are alone; we shouldn't have gotten here!");
4675 assert(timecheck_round
% 2 != 0);
4677 timecheck_acks
= 1; // we ack ourselves
4679 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4680 << " round " << timecheck_round
<< dendl
;
4682 // we are at the eye of the storm; the point of reference
4683 timecheck_skews
[messenger
->get_myinst()] = 0.0;
4684 timecheck_latencies
[messenger
->get_myinst()] = 0.0;
4686 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4687 if (monmap
->get_name(*it
) == name
)
4690 entity_inst_t inst
= monmap
->get_inst(*it
);
4691 utime_t curr_time
= ceph_clock_now();
4692 timecheck_waiting
[inst
] = curr_time
;
4693 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_PING
);
4694 m
->epoch
= get_epoch();
4695 m
->round
= timecheck_round
;
4696 dout(10) << __func__
<< " send " << *m
<< " to " << inst
<< dendl
;
4697 messenger
->send_message(m
, inst
);
4701 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
4702 const double skew_bound
,
4703 const double latency
)
4705 health_status_t status
= HEALTH_OK
;
4706 assert(latency
>= 0);
4709 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
4710 status
= HEALTH_WARN
;
4711 ss
<< "clock skew " << abs_skew
<< "s"
4712 << " > max " << g_conf
->mon_clock_drift_allowed
<< "s";
4718 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
4720 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4721 dout(10) << __func__
<< " " << *m
<< dendl
;
4722 /* handles PONG's */
4723 assert(m
->op
== MTimeCheck::OP_PONG
);
4725 entity_inst_t other
= m
->get_source_inst();
4726 if (m
->epoch
< get_epoch()) {
4727 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
4728 << " from " << other
4729 << " curr " << get_epoch()
4730 << " -- severely lagged? discard" << dendl
;
4733 assert(m
->epoch
== get_epoch());
4735 if (m
->round
< timecheck_round
) {
4736 dout(1) << __func__
<< " got old round " << m
->round
4737 << " from " << other
4738 << " curr " << timecheck_round
<< " -- discard" << dendl
;
4742 utime_t curr_time
= ceph_clock_now();
4744 assert(timecheck_waiting
.count(other
) > 0);
4745 utime_t timecheck_sent
= timecheck_waiting
[other
];
4746 timecheck_waiting
.erase(other
);
4747 if (curr_time
< timecheck_sent
) {
4748 // our clock was readjusted -- drop everything until it all makes sense.
4749 dout(1) << __func__
<< " our clock was readjusted --"
4750 << " bump round and drop current check"
4752 timecheck_cancel_round();
4756 /* update peer latencies */
4757 double latency
= (double)(curr_time
- timecheck_sent
);
4759 if (timecheck_latencies
.count(other
) == 0)
4760 timecheck_latencies
[other
] = latency
;
4762 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
4763 timecheck_latencies
[other
] = avg_latency
;
4769 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4770 * and 'a' happens to be lower than 'b'; so we use double instead.
4772 * latency is always expected to be >= 0.
4774 * delta, the difference between theirs timestamp and ours, may either be
4775 * lower or higher than 0; will hardly ever be 0.
4777 * The absolute skew is the absolute delta minus the latency, which is
4778 * taken as a whole instead of an rtt given that there is some queueing
4779 * and dispatch times involved and it's hard to assess how long exactly
4780 * it took for the message to travel to the other side and be handled. So
4781 * we call it a bounded skew, the worst case scenario.
4785 * Given that the latency is always positive, we can establish that the
4786 * bounded skew will be:
4788 * 1. positive if the absolute delta is higher than the latency and
4790 * 2. negative if the absolute delta is higher than the latency and
4791 * delta is negative.
4792 * 3. zero if the absolute delta is lower than the latency.
4794 * On 3. we make a judgement call and treat the skew as non-existent.
4795 * This is because that, if the absolute delta is lower than the
4796 * latency, then the apparently existing skew is nothing more than a
4797 * side-effect of the high latency at work.
4799 * This may not be entirely true though, as a severely skewed clock
4800 * may be masked by an even higher latency, but with high latencies
4801 * we probably have worse issues to deal with than just skewed clocks.
4803 assert(latency
>= 0);
4805 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
4806 double abs_delta
= (delta
> 0 ? delta
: -delta
);
4807 double skew_bound
= abs_delta
- latency
;
4811 skew_bound
= -skew_bound
;
4814 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
4815 clog
->health(status
) << other
<< " " << ss
.str();
4817 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
4818 << " delta " << delta
<< " skew_bound " << skew_bound
4819 << " latency " << latency
<< dendl
;
4821 timecheck_skews
[other
] = skew_bound
;
4824 if (timecheck_acks
== quorum
.size()) {
4825 dout(10) << __func__
<< " got pongs from everybody ("
4826 << timecheck_acks
<< " total)" << dendl
;
4827 assert(timecheck_skews
.size() == timecheck_acks
);
4828 assert(timecheck_waiting
.empty());
4829 // everyone has acked, so bump the round to finish it.
4830 timecheck_finish_round();
4834 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
4836 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4837 dout(10) << __func__
<< " " << *m
<< dendl
;
4840 assert(m
->op
== MTimeCheck::OP_PING
|| m
->op
== MTimeCheck::OP_REPORT
);
4842 if (m
->epoch
!= get_epoch()) {
4843 dout(1) << __func__
<< " got wrong epoch "
4844 << "(ours " << get_epoch()
4845 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
4849 if (m
->round
< timecheck_round
) {
4850 dout(1) << __func__
<< " got old round " << m
->round
4851 << " current " << timecheck_round
4852 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
4856 timecheck_round
= m
->round
;
4858 if (m
->op
== MTimeCheck::OP_REPORT
) {
4859 assert((timecheck_round
% 2) == 0);
4860 timecheck_latencies
.swap(m
->latencies
);
4861 timecheck_skews
.swap(m
->skews
);
4865 assert((timecheck_round
% 2) != 0);
4866 MTimeCheck
*reply
= new MTimeCheck(MTimeCheck::OP_PONG
);
4867 utime_t curr_time
= ceph_clock_now();
4868 reply
->timestamp
= curr_time
;
4869 reply
->epoch
= m
->epoch
;
4870 reply
->round
= m
->round
;
4871 dout(10) << __func__
<< " send " << *m
4872 << " to " << m
->get_source_inst() << dendl
;
4873 m
->get_connection()->send_message(reply
);
4876 void Monitor::handle_timecheck(MonOpRequestRef op
)
4878 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4879 dout(10) << __func__
<< " " << *m
<< dendl
;
4882 if (m
->op
!= MTimeCheck::OP_PONG
) {
4883 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
4885 handle_timecheck_leader(op
);
4887 } else if (is_peon()) {
4888 if (m
->op
!= MTimeCheck::OP_PING
&& m
->op
!= MTimeCheck::OP_REPORT
) {
4889 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
4891 handle_timecheck_peon(op
);
4894 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
4898 void Monitor::handle_subscribe(MonOpRequestRef op
)
4900 MMonSubscribe
*m
= static_cast<MMonSubscribe
*>(op
->get_req());
4901 dout(10) << "handle_subscribe " << *m
<< dendl
;
4905 MonSession
*s
= op
->get_session();
4908 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
4911 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4912 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
4915 // remove conflicting subscribes
4916 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4917 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
4918 it
!= s
->sub_map
.end(); ) {
4919 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
4920 Mutex::Locker
l(session_map_lock
);
4921 session_map
.remove_sub((it
++)->second
);
4929 Mutex::Locker
l(session_map_lock
);
4930 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
4931 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
4932 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
4935 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
4936 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
4937 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
4938 Subscription
*sub
= s
->sub_map
[p
->first
];
4939 assert(sub
!= nullptr);
4940 mdsmon()->check_sub(sub
);
4942 } else if (p
->first
== "osdmap") {
4943 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
4944 if (s
->osd_epoch
> p
->second
.start
) {
4945 // client needs earlier osdmaps on purpose, so reset the sent epoch
4948 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
4950 } else if (p
->first
== "osd_pg_creates") {
4951 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
4952 if (monmap
->get_required_features().contains_all(
4953 ceph::features::mon::FEATURE_LUMINOUS
)) {
4954 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
4956 pgmon()->check_sub(s
->sub_map
["osd_pg_creates"]);
4959 } else if (p
->first
== "monmap") {
4960 monmon()->check_sub(s
->sub_map
[p
->first
]);
4961 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4962 logmon()->check_sub(s
->sub_map
[p
->first
]);
4963 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
4964 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
4965 } else if (p
->first
== "servicemap") {
4966 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
4971 // we only need to reply if the client is old enough to think it
4972 // has to send renewals.
4973 ConnectionRef con
= m
->get_connection();
4974 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
4975 m
->get_connection()->send_message(new MMonSubscribeAck(
4976 monmap
->get_fsid(), (int)g_conf
->mon_subscribe_interval
));
4981 void Monitor::handle_get_version(MonOpRequestRef op
)
4983 MMonGetVersion
*m
= static_cast<MMonGetVersion
*>(op
->get_req());
4984 dout(10) << "handle_get_version " << *m
<< dendl
;
4985 PaxosService
*svc
= NULL
;
4987 MonSession
*s
= op
->get_session();
4990 if (!is_leader() && !is_peon()) {
4991 dout(10) << " waiting for quorum" << dendl
;
4992 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
4996 if (m
->what
== "mdsmap") {
4998 } else if (m
->what
== "fsmap") {
5000 } else if (m
->what
== "osdmap") {
5002 } else if (m
->what
== "monmap") {
5005 derr
<< "invalid map type " << m
->what
<< dendl
;
5009 if (!svc
->is_readable()) {
5010 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
5014 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
5015 reply
->handle
= m
->handle
;
5016 reply
->version
= svc
->get_last_committed();
5017 reply
->oldest_version
= svc
->get_first_committed();
5018 reply
->set_tid(m
->get_tid());
5020 m
->get_connection()->send_message(reply
);
5026 bool Monitor::ms_handle_reset(Connection
*con
)
5028 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
5030 // ignore lossless monitor sessions
5031 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
5034 MonSession
*s
= static_cast<MonSession
*>(con
->get_priv());
5038 // break any con <-> session ref cycle
5039 s
->con
->set_priv(NULL
);
5044 Mutex::Locker
l(lock
);
5046 dout(10) << "reset/close on session " << s
->inst
<< dendl
;
5048 Mutex::Locker
l(session_map_lock
);
5055 bool Monitor::ms_handle_refused(Connection
*con
)
5057 // just log for now...
5058 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
5064 void Monitor::send_latest_monmap(Connection
*con
)
5067 monmap
->encode(bl
, con
->get_features());
5068 con
->send_message(new MMonMap(bl
));
5071 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
5073 MMonGetMap
*m
= static_cast<MMonGetMap
*>(op
->get_req());
5074 dout(10) << "handle_mon_get_map" << dendl
;
5075 send_latest_monmap(m
->get_connection().get());
5078 void Monitor::handle_mon_metadata(MonOpRequestRef op
)
5080 MMonMetadata
*m
= static_cast<MMonMetadata
*>(op
->get_req());
5082 dout(10) << __func__
<< dendl
;
5083 update_mon_metadata(m
->get_source().num(), std::move(m
->data
));
5087 void Monitor::update_mon_metadata(int from
, Metadata
&& m
)
5089 // NOTE: this is now for legacy (kraken or jewel) mons only.
5090 pending_metadata
[from
] = std::move(m
);
5092 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5094 ::encode(pending_metadata
, bl
);
5095 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5096 paxos
->trigger_propose();
5099 int Monitor::load_metadata()
5102 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5105 bufferlist::iterator it
= bl
.begin();
5106 ::decode(mon_metadata
, it
);
5108 pending_metadata
= mon_metadata
;
5112 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
5115 if (!mon_metadata
.count(mon
)) {
5116 err
<< "mon." << mon
<< " not found";
5119 const Metadata
& m
= mon_metadata
[mon
];
5120 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
5121 f
->dump_string(p
->first
.c_str(), p
->second
);
5126 void Monitor::count_metadata(const string
& field
, map
<string
,int> *out
)
5128 for (auto& p
: mon_metadata
) {
5129 auto q
= p
.second
.find(field
);
5130 if (q
== p
.second
.end()) {
5131 (*out
)["unknown"]++;
5133 (*out
)[q
->second
]++;
5138 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5140 map
<string
,int> by_val
;
5141 count_metadata(field
, &by_val
);
5142 f
->open_object_section(field
.c_str());
5143 for (auto& p
: by_val
) {
5144 f
->dump_int(p
.first
.c_str(), p
.second
);
5149 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5151 map
<string
, list
<int> > mons
; // hostname => mon
5152 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5153 it
!= mon_metadata
.end(); ++it
) {
5154 const Metadata
& m
= it
->second
;
5155 Metadata::const_iterator hostname
= m
.find("hostname");
5156 if (hostname
== m
.end()) {
5157 // not likely though
5160 mons
[hostname
->second
].push_back(it
->first
);
5163 dump_services(f
, mons
, "mon");
5167 // ----------------------------------------------
5170 int Monitor::scrub_start()
5172 dout(10) << __func__
<< dendl
;
5173 assert(is_leader());
5175 if (!scrub_result
.empty()) {
5176 clog
->info() << "scrub already in progress";
5180 scrub_event_cancel();
5181 scrub_result
.clear();
5182 scrub_state
.reset(new ScrubState
);
5188 int Monitor::scrub()
5190 assert(is_leader());
5191 assert(scrub_state
);
5193 scrub_cancel_timeout();
5194 wait_for_paxos_write();
5195 scrub_version
= paxos
->get_version();
5198 // scrub all keys if we're the only monitor in the quorum
5200 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5202 for (set
<int>::iterator p
= quorum
.begin();
5207 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5209 r
->key
= scrub_state
->last_key
;
5210 messenger
->send_message(r
, monmap
->get_inst(*p
));
5214 bool r
= _scrub(&scrub_result
[rank
],
5215 &scrub_state
->last_key
,
5218 scrub_state
->finished
= !r
;
5220 // only after we got our scrub results do we really care whether the
5221 // other monitors are late on their results. Also, this way we avoid
5222 // triggering the timeout if we end up getting stuck in _scrub() for
5223 // longer than the duration of the timeout.
5224 scrub_reset_timeout();
5226 if (quorum
.size() == 1) {
5227 assert(scrub_state
->finished
== true);
5233 void Monitor::handle_scrub(MonOpRequestRef op
)
5235 MMonScrub
*m
= static_cast<MMonScrub
*>(op
->get_req());
5236 dout(10) << __func__
<< " " << *m
<< dendl
;
5238 case MMonScrub::OP_SCRUB
:
5243 wait_for_paxos_write();
5245 if (m
->version
!= paxos
->get_version())
5248 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5252 reply
->key
= m
->key
;
5253 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5254 m
->get_connection()->send_message(reply
);
5258 case MMonScrub::OP_RESULT
:
5262 if (m
->version
!= scrub_version
)
5264 // reset the timeout each time we get a result
5265 scrub_reset_timeout();
5267 int from
= m
->get_source().num();
5268 assert(scrub_result
.count(from
) == 0);
5269 scrub_result
[from
] = m
->result
;
5271 if (scrub_result
.size() == quorum
.size()) {
5272 scrub_check_results();
5273 scrub_result
.clear();
5274 if (scrub_state
->finished
)
5284 bool Monitor::_scrub(ScrubResult
*r
,
5285 pair
<string
,string
> *start
,
5289 assert(start
!= NULL
);
5290 assert(num_keys
!= NULL
);
5292 set
<string
> prefixes
= get_sync_targets_names();
5293 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5295 dout(10) << __func__
<< " start (" << *start
<< ")"
5296 << " num_keys " << *num_keys
<< dendl
;
5298 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5300 int scrubbed_keys
= 0;
5301 pair
<string
,string
> last_key
;
5303 while (it
->has_next_chunk()) {
5305 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5308 pair
<string
,string
> k
= it
->get_next_key();
5309 if (prefixes
.count(k
.first
) == 0)
5312 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5313 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5314 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5320 int err
= store
->get(k
.first
, k
.second
, bl
);
5323 uint32_t key_crc
= bl
.crc32c(0);
5324 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5325 << " crc " << key_crc
<< dendl
;
5326 r
->prefix_keys
[k
.first
]++;
5327 if (r
->prefix_crc
.count(k
.first
) == 0) {
5328 r
->prefix_crc
[k
.first
] = 0;
5330 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5332 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5333 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5334 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5335 r
->prefix_crc
[k
.first
] += 1;
5342 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5343 << " scrubbed_keys " << scrubbed_keys
5344 << " has_next " << it
->has_next_chunk() << dendl
;
5347 *num_keys
= scrubbed_keys
;
5349 return it
->has_next_chunk();
5352 void Monitor::scrub_check_results()
5354 dout(10) << __func__
<< dendl
;
5358 ScrubResult
& mine
= scrub_result
[rank
];
5359 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5360 p
!= scrub_result
.end();
5362 if (p
->first
== rank
)
5364 if (p
->second
!= mine
) {
5366 clog
->error() << "scrub mismatch";
5367 clog
->error() << " mon." << rank
<< " " << mine
;
5368 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5372 clog
->debug() << "scrub ok on " << quorum
<< ": " << mine
;
5375 inline void Monitor::scrub_timeout()
5377 dout(1) << __func__
<< " restarting scrub" << dendl
;
5382 void Monitor::scrub_finish()
5384 dout(10) << __func__
<< dendl
;
5386 scrub_event_start();
5389 void Monitor::scrub_reset()
5391 dout(10) << __func__
<< dendl
;
5392 scrub_cancel_timeout();
5394 scrub_result
.clear();
5395 scrub_state
.reset();
5398 inline void Monitor::scrub_update_interval(int secs
)
5400 // we don't care about changes if we are not the leader.
5401 // changes will be visible if we become the leader.
5405 dout(1) << __func__
<< " new interval = " << secs
<< dendl
;
5407 // if scrub already in progress, all changes will already be visible during
5408 // the next round. Nothing to do.
5409 if (scrub_state
!= NULL
)
5412 scrub_event_cancel();
5413 scrub_event_start();
5416 void Monitor::scrub_event_start()
5418 dout(10) << __func__
<< dendl
;
5421 scrub_event_cancel();
5423 if (cct
->_conf
->mon_scrub_interval
<= 0) {
5424 dout(1) << __func__
<< " scrub event is disabled"
5425 << " (mon_scrub_interval = " << cct
->_conf
->mon_scrub_interval
5430 scrub_event
= new C_MonContext(this, [this](int) {
5433 timer
.add_event_after(cct
->_conf
->mon_scrub_interval
, scrub_event
);
5436 void Monitor::scrub_event_cancel()
5438 dout(10) << __func__
<< dendl
;
5440 timer
.cancel_event(scrub_event
);
5445 inline void Monitor::scrub_cancel_timeout()
5447 if (scrub_timeout_event
) {
5448 timer
.cancel_event(scrub_timeout_event
);
5449 scrub_timeout_event
= NULL
;
5453 void Monitor::scrub_reset_timeout()
5455 dout(15) << __func__
<< " reset timeout event" << dendl
;
5456 scrub_cancel_timeout();
5458 scrub_timeout_event
= new C_MonContext(this, [this](int) {
5461 timer
.add_event_after(g_conf
->mon_scrub_timeout
, scrub_timeout_event
);
5464 /************ TICK ***************/
5465 void Monitor::new_tick()
5467 timer
.add_event_after(g_conf
->mon_tick_interval
, new C_MonContext(this, [this](int) {
5472 void Monitor::tick()
5475 dout(11) << "tick" << dendl
;
5476 const utime_t now
= ceph_clock_now();
5478 // Check if we need to emit any delayed health check updated messages
5480 const auto min_period
= g_conf
->get_val
<int64_t>(
5481 "mon_health_log_update_period");
5482 for (auto& svc
: paxos_service
) {
5483 auto health
= svc
->get_health_checks();
5485 for (const auto &i
: health
.checks
) {
5486 const std::string
&code
= i
.first
;
5487 const std::string
&summary
= i
.second
.summary
;
5488 const health_status_t severity
= i
.second
.severity
;
5490 auto status_iter
= health_check_log_times
.find(code
);
5491 if (status_iter
== health_check_log_times
.end()) {
5495 auto &log_status
= status_iter
->second
;
5496 bool const changed
= log_status
.last_message
!= summary
5497 || log_status
.severity
!= severity
;
5499 if (changed
&& now
- log_status
.updated_at
> min_period
) {
5500 log_status
.last_message
= summary
;
5501 log_status
.updated_at
= now
;
5502 log_status
.severity
= severity
;
5505 ss
<< "Health check update: " << summary
<< " (" << code
<< ")";
5506 clog
->health(severity
) << ss
.str();
5513 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
) {
5520 Mutex::Locker
l(session_map_lock
);
5521 auto p
= session_map
.sessions
.begin();
5523 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5524 now
> (exited_quorum
+ 2*g_conf
->mon_lease
));
5530 // don't trim monitors
5531 if (s
->inst
.name
.is_mon())
5534 if (s
->session_timeout
< now
&& s
->con
) {
5535 // check keepalive, too
5536 s
->session_timeout
= s
->con
->get_last_keepalive();
5537 s
->session_timeout
+= g_conf
->mon_session_timeout
;
5539 if (s
->session_timeout
< now
) {
5540 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5541 << " (timeout " << s
->session_timeout
5542 << " < now " << now
<< ")" << dendl
;
5543 } else if (out_for_too_long
) {
5544 // boot the client Session because we've taken too long getting back in
5545 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5546 << " because we've been out of quorum too long" << dendl
;
5551 s
->con
->mark_down();
5553 logger
->inc(l_mon_session_trim
);
5556 sync_trim_providers();
5558 if (!maybe_wait_for_quorum
.empty()) {
5559 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5562 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5563 // this is only necessary on upgraded clusters.
5564 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5565 prepare_new_fingerprint(t
);
5566 paxos
->trigger_propose();
5572 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5575 nf
.generate_random();
5576 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5580 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5583 int Monitor::check_fsid()
5586 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5591 string
es(ebl
.c_str(), ebl
.length());
5593 // only keep the first line
5594 size_t pos
= es
.find_first_of('\n');
5595 if (pos
!= string::npos
)
5598 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5600 if (!ondisk
.parse(es
.c_str())) {
5601 derr
<< "error: unable to parse uuid" << dendl
;
5605 if (monmap
->get_fsid() != ondisk
) {
5606 derr
<< "error: cluster_uuid file exists with value " << ondisk
5607 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5614 int Monitor::write_fsid()
5616 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5618 int r
= store
->apply_transaction(t
);
5622 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5625 ss
<< monmap
->get_fsid() << "\n";
5626 string us
= ss
.str();
5631 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5636 * this is the closest thing to a traditional 'mkfs' for ceph.
5637 * initialize the monitor state machines to their initial values.
5639 int Monitor::mkfs(bufferlist
& osdmapbl
)
5641 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5643 // verify cluster fsid
5644 int r
= check_fsid();
5645 if (r
< 0 && r
!= -ENOENT
)
5649 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
5650 magicbl
.append("\n");
5651 t
->put(MONITOR_NAME
, "magic", magicbl
);
5654 features
= get_initial_supported_features();
5657 // save monmap, osdmap, keyring.
5658 bufferlist monmapbl
;
5659 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
5660 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5661 t
->put("mkfs", "monmap", monmapbl
);
5663 if (osdmapbl
.length()) {
5664 // make sure it's a valid osdmap
5667 om
.decode(osdmapbl
);
5669 catch (buffer::error
& e
) {
5670 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
5673 t
->put("mkfs", "osdmap", osdmapbl
);
5676 if (is_keyring_required()) {
5678 string keyring_filename
;
5680 r
= ceph_resolve_file_search(g_conf
->keyring
, keyring_filename
);
5682 derr
<< "unable to find a keyring file on " << g_conf
->keyring
5683 << ": " << cpp_strerror(r
) << dendl
;
5684 if (g_conf
->key
!= "") {
5685 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf
->key
+
5686 "\n\tcaps mon = \"allow *\"\n";
5688 bl
.append(keyring_plaintext
);
5690 bufferlist::iterator i
= bl
.begin();
5691 keyring
.decode_plaintext(i
);
5693 catch (const buffer::error
& e
) {
5694 derr
<< "error decoding keyring " << keyring_plaintext
5695 << ": " << e
.what() << dendl
;
5702 r
= keyring
.load(g_ceph_context
, keyring_filename
);
5704 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
5709 // put mon. key in external keyring; seed with everything else.
5710 extract_save_mon_key(keyring
);
5712 bufferlist keyringbl
;
5713 keyring
.encode_plaintext(keyringbl
);
5714 t
->put("mkfs", "keyring", keyringbl
);
5717 store
->apply_transaction(t
);
5722 int Monitor::write_default_keyring(bufferlist
& bl
)
5725 os
<< g_conf
->mon_data
<< "/keyring";
5728 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
, 0600);
5731 dout(0) << __func__
<< " failed to open " << os
.str()
5732 << ": " << cpp_strerror(err
) << dendl
;
5736 err
= bl
.write_fd(fd
);
5739 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5744 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
5746 EntityName mon_name
;
5747 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
5749 if (keyring
.get_auth(mon_name
, mon_key
)) {
5750 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
5752 pkey
.add(mon_name
, mon_key
);
5754 pkey
.encode_plaintext(bl
);
5755 write_default_keyring(bl
);
5756 keyring
.remove(mon_name
);
5760 bool Monitor::ms_get_authorizer(int service_id
, AuthAuthorizer
**authorizer
,
5763 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id
)
5769 // we only connect to other monitors and mgr; every else connects to us.
5770 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
5771 service_id
!= CEPH_ENTITY_TYPE_MGR
)
5774 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5776 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
5777 AuthNoneClientHandler
handler(g_ceph_context
, nullptr);
5778 handler
.set_global_id(0);
5779 *authorizer
= handler
.build_authorizer(service_id
);
5783 CephXServiceTicketInfo auth_ticket_info
;
5784 CephXSessionAuthInfo info
;
5788 name
.set_type(CEPH_ENTITY_TYPE_MON
);
5789 auth_ticket_info
.ticket
.name
= name
;
5790 auth_ticket_info
.ticket
.global_id
= 0;
5792 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
5793 // mon to mon authentication uses the private monitor shared key and not the
5796 if (!keyring
.get_secret(name
, secret
) &&
5797 !key_server
.get_secret(name
, secret
)) {
5798 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5800 stringstream ss
, ds
;
5801 int err
= key_server
.list_secrets(ds
);
5803 ss
<< "no installed auth entries!";
5805 ss
<< "installed auth entries:";
5806 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
5810 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
,
5811 secret
, (uint64_t)-1);
5813 dout(0) << __func__
<< " failed to build mon session_auth_info "
5814 << cpp_strerror(ret
) << dendl
;
5817 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
5819 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
);
5821 derr
<< __func__
<< " failed to build mgr service session_auth_info "
5822 << cpp_strerror(ret
) << dendl
;
5826 ceph_abort(); // see check at top of fn
5829 CephXTicketBlob blob
;
5830 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
5831 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl
;
5834 bufferlist ticket_data
;
5835 ::encode(blob
, ticket_data
);
5837 bufferlist::iterator iter
= ticket_data
.begin();
5838 CephXTicketHandler
handler(g_ceph_context
, service_id
);
5839 ::decode(handler
.ticket
, iter
);
5841 handler
.session_key
= info
.session_key
;
5843 *authorizer
= handler
.build_authorizer(0);
5848 bool Monitor::ms_verify_authorizer(Connection
*con
, int peer_type
,
5849 int protocol
, bufferlist
& authorizer_data
,
5850 bufferlist
& authorizer_reply
,
5851 bool& isvalid
, CryptoKey
& session_key
)
5853 dout(10) << "ms_verify_authorizer " << con
->get_peer_addr()
5854 << " " << ceph_entity_type_name(peer_type
)
5855 << " protocol " << protocol
<< dendl
;
5860 if (peer_type
== CEPH_ENTITY_TYPE_MON
&&
5861 auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5862 // monitor, and cephx is enabled
5864 if (protocol
== CEPH_AUTH_CEPHX
) {
5865 bufferlist::iterator iter
= authorizer_data
.begin();
5866 CephXServiceTicketInfo auth_ticket_info
;
5868 if (authorizer_data
.length()) {
5869 bool ret
= cephx_verify_authorizer(g_ceph_context
, &keyring
, iter
,
5870 auth_ticket_info
, authorizer_reply
);
5872 session_key
= auth_ticket_info
.session_key
;
5875 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con
->get_peer_addr() << dendl
;
5879 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl
;