1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
25 #include "common/version.h"
27 #include "osd/OSDMap.h"
29 #include "MonitorDBStore.h"
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
52 #include "messages/MAuthReply.h"
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/OldHealthMonitor.h"
81 #include "mon/HealthMonitor.h"
82 #include "mon/ConfigKeyService.h"
83 #include "common/config.h"
84 #include "common/cmdparse.h"
85 #include "include/assert.h"
86 #include "include/compat.h"
87 #include "perfglue/heap_profiler.h"
89 #include "auth/none/AuthNoneClientHandler.h"
91 #define dout_subsys ceph_subsys_mon
93 #define dout_prefix _prefix(_dout, this)
94 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
95 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
96 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
99 const string
Monitor::MONITOR_NAME
= "monitor";
100 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
105 #undef COMMAND_WITH_FLAG
106 MonCommand mon_commands
[] = {
107 #define FLAG(f) (MonCommand::FLAG_##f)
108 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
109 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
110 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
111 {parsesig, helptext, modulename, req_perms, avail, flags},
112 #include <mon/MonCommands.h>
114 #undef COMMAND_WITH_FLAG
116 // FIXME: slurp up the Mgr commands too
118 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
119 {parsesig, helptext, modulename, req_perms, avail, FLAG(MGR)},
120 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
121 {parsesig, helptext, modulename, req_perms, avail, flags | FLAG(MGR)},
122 #include <mgr/MgrCommands.h>
124 #undef COMMAND_WITH_FLAG
129 void C_MonContext::finish(int r
) {
130 if (mon
->is_shutdown())
132 FunctionContext::finish(r
);
135 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
136 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
141 con_self(m
? m
->get_loopback_connection() : NULL
),
142 lock("Monitor::lock"),
144 finisher(cct_
, "mon_finisher", "fin"),
145 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf
->mon_cpu_threads
),
146 has_ever_joined(false),
147 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
149 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
150 key_server(cct
, &keyring
),
151 auth_cluster_required(cct
,
152 cct
->_conf
->auth_supported
.empty() ?
153 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
154 auth_service_required(cct
,
155 cct
->_conf
->auth_supported
.empty() ?
156 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
157 leader_supported_mon_commands(NULL
),
158 leader_supported_mon_commands_size(0),
159 mgr_messenger(mgr_m
),
160 mgr_client(cct_
, mgr_m
),
164 state(STATE_PROBING
),
167 required_features(0),
169 quorum_con_features(0),
173 scrub_timeout_event(NULL
),
176 sync_provider_count(0),
179 sync_start_version(0),
180 sync_timeout_event(NULL
),
181 sync_last_committed_floor(0),
185 timecheck_rounds_since_clean(0),
186 timecheck_event(NULL
),
188 paxos_service(PAXOS_NUM
),
190 routed_request_tid(0),
191 op_tracker(cct
, true, 1)
193 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
194 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
196 update_log_clients();
198 paxos
= new Paxos(this, "paxos");
200 paxos_service
[PAXOS_MDSMAP
] = new MDSMonitor(this, paxos
, "mdsmap");
201 paxos_service
[PAXOS_MONMAP
] = new MonmapMonitor(this, paxos
, "monmap");
202 paxos_service
[PAXOS_OSDMAP
] = new OSDMonitor(cct
, this, paxos
, "osdmap");
203 paxos_service
[PAXOS_PGMAP
] = new PGMonitor(this, paxos
, "pgmap");
204 paxos_service
[PAXOS_LOG
] = new LogMonitor(this, paxos
, "logm");
205 paxos_service
[PAXOS_AUTH
] = new AuthMonitor(this, paxos
, "auth");
206 paxos_service
[PAXOS_MGR
] = new MgrMonitor(this, paxos
, "mgr");
207 paxos_service
[PAXOS_MGRSTAT
] = new MgrStatMonitor(this, paxos
, "mgrstat");
208 paxos_service
[PAXOS_HEALTH
] = new HealthMonitor(this, paxos
, "health");
210 health_monitor
= new OldHealthMonitor(this);
211 config_key_service
= new ConfigKeyService(this, paxos
);
213 mon_caps
= new MonCap();
214 bool r
= mon_caps
->parse("allow *", NULL
);
217 exited_quorum
= ceph_clock_now();
219 // assume our commands until we have an election. this only means
220 // we won't reply with EINVAL before the election; any command that
221 // actually matters will wait until we have quorum etc and then
222 // retry (and revalidate).
223 const MonCommand
*cmds
;
225 get_locally_supported_monitor_commands(&cmds
, &cmdsize
);
226 set_leader_supported_commands(cmds
, cmdsize
);
228 // note: OSDMonitor may update this based on the luminous flag.
229 pgservice
= mgrstatmon()->get_pg_stat_service();
234 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
236 delete health_monitor
;
237 delete config_key_service
;
239 assert(session_map
.sessions
.empty());
241 if (leader_supported_mon_commands
!= mon_commands
)
242 delete[] leader_supported_mon_commands
;
246 class AdminHook
: public AdminSocketHook
{
249 explicit AdminHook(Monitor
*m
) : mon(m
) {}
250 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
251 bufferlist
& out
) override
{
253 mon
->do_admin_command(command
, cmdmap
, format
, ss
);
259 void Monitor::do_admin_command(string command
, cmdmap_t
& cmdmap
, string format
,
262 Mutex::Locker
l(lock
);
264 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
267 for (cmdmap_t::iterator p
= cmdmap
.begin();
268 p
!= cmdmap
.end(); ++p
) {
269 if (p
->first
== "prefix")
273 args
+= cmd_vartype_stringify(p
->second
);
275 args
= "[" + args
+ "]";
277 bool read_only
= (command
== "mon_status" ||
278 command
== "mon metadata" ||
279 command
== "quorum_status" ||
281 command
== "sessions");
283 (read_only
? audit_clog
->debug() : audit_clog
->info())
284 << "from='admin socket' entity='admin socket' "
285 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
287 if (command
== "mon_status") {
288 get_mon_status(f
.get(), ss
);
291 } else if (command
== "quorum_status") {
292 _quorum_status(f
.get(), ss
);
293 } else if (command
== "sync_force") {
295 if ((!cmd_getval(g_ceph_context
, cmdmap
, "validate", validate
)) ||
296 (validate
!= "--yes-i-really-mean-it")) {
297 ss
<< "are you SURE? this will mean the monitor store will be erased "
298 "the next time the monitor is restarted. pass "
299 "'--yes-i-really-mean-it' if you really do.";
302 sync_force(f
.get(), ss
);
303 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
304 if (!_add_bootstrap_peer_hint(command
, cmdmap
, ss
))
306 } else if (command
== "quorum enter") {
307 elector
.start_participating();
309 ss
<< "started responding to quorum, initiated new election";
310 } else if (command
== "quorum exit") {
312 elector
.stop_participating();
313 ss
<< "stopped responding to quorum, initiated new election";
314 } else if (command
== "ops") {
315 (void)op_tracker
.dump_ops_in_flight(f
.get());
319 } else if (command
== "sessions") {
322 f
->open_array_section("sessions");
323 for (auto p
: session_map
.sessions
) {
324 f
->dump_stream("session") << *p
;
331 assert(0 == "bad AdminSocket command binding");
333 (read_only
? audit_clog
->debug() : audit_clog
->info())
334 << "from='admin socket' "
335 << "entity='admin socket' "
336 << "cmd=" << command
<< " "
337 << "args=" << args
<< ": finished";
341 (read_only
? audit_clog
->debug() : audit_clog
->info())
342 << "from='admin socket' "
343 << "entity='admin socket' "
344 << "cmd=" << command
<< " "
345 << "args=" << args
<< ": aborted";
348 void Monitor::handle_signal(int signum
)
350 assert(signum
== SIGINT
|| signum
== SIGTERM
);
351 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
355 CompatSet
Monitor::get_initial_supported_features()
357 CompatSet::FeatureSet ceph_mon_feature_compat
;
358 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
359 CompatSet::FeatureSet ceph_mon_feature_incompat
;
360 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
361 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
362 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
363 ceph_mon_feature_incompat
);
366 CompatSet
Monitor::get_supported_features()
368 CompatSet compat
= get_initial_supported_features();
369 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
370 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
371 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
372 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
373 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
377 CompatSet
Monitor::get_legacy_features()
379 CompatSet::FeatureSet ceph_mon_feature_compat
;
380 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
381 CompatSet::FeatureSet ceph_mon_feature_incompat
;
382 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
383 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
384 ceph_mon_feature_incompat
);
387 int Monitor::check_features(MonitorDBStore
*store
)
389 CompatSet required
= get_supported_features();
392 read_features_off_disk(store
, &ondisk
);
394 if (!required
.writeable(ondisk
)) {
395 CompatSet diff
= required
.unsupported(ondisk
);
396 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
403 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
405 bufferlist featuresbl
;
406 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
407 if (featuresbl
.length() == 0) {
408 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
409 << "Assuming it is old-style and introducing one." << dendl
;
410 //we only want the baseline ~v.18 features assumed to be on disk.
411 //If new features are introduced this code needs to disappear or
413 *features
= get_legacy_features();
415 features
->encode(featuresbl
);
416 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
417 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
418 store
->apply_transaction(t
);
420 bufferlist::iterator it
= featuresbl
.begin();
421 features
->decode(it
);
425 void Monitor::read_features()
427 read_features_off_disk(store
, &features
);
428 dout(10) << "features " << features
<< dendl
;
430 calc_quorum_requirements();
431 dout(10) << "required_features " << required_features
<< dendl
;
434 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
438 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
441 const char** Monitor::get_tracked_conf_keys() const
443 static const char* KEYS
[] = {
444 "crushtool", // helpful for testing
445 "mon_election_timeout",
447 "mon_lease_renew_interval_factor",
448 "mon_lease_ack_timeout_factor",
449 "mon_accept_timeout_factor",
453 "clog_to_syslog_facility",
454 "clog_to_syslog_level",
456 "clog_to_graylog_host",
457 "clog_to_graylog_port",
460 // periodic health to clog
461 "mon_health_to_clog",
462 "mon_health_to_clog_interval",
463 "mon_health_to_clog_tick_interval",
465 "mon_scrub_interval",
471 void Monitor::handle_conf_change(const struct md_config_t
*conf
,
472 const std::set
<std::string
> &changed
)
476 dout(10) << __func__
<< " " << changed
<< dendl
;
478 if (changed
.count("clog_to_monitors") ||
479 changed
.count("clog_to_syslog") ||
480 changed
.count("clog_to_syslog_level") ||
481 changed
.count("clog_to_syslog_facility") ||
482 changed
.count("clog_to_graylog") ||
483 changed
.count("clog_to_graylog_host") ||
484 changed
.count("clog_to_graylog_port") ||
485 changed
.count("host") ||
486 changed
.count("fsid")) {
487 update_log_clients();
490 if (changed
.count("mon_health_to_clog") ||
491 changed
.count("mon_health_to_clog_interval") ||
492 changed
.count("mon_health_to_clog_tick_interval")) {
493 health_to_clog_update_conf(changed
);
496 if (changed
.count("mon_scrub_interval")) {
497 scrub_update_interval(conf
->mon_scrub_interval
);
501 void Monitor::update_log_clients()
503 map
<string
,string
> log_to_monitors
;
504 map
<string
,string
> log_to_syslog
;
505 map
<string
,string
> log_channel
;
506 map
<string
,string
> log_prio
;
507 map
<string
,string
> log_to_graylog
;
508 map
<string
,string
> log_to_graylog_host
;
509 map
<string
,string
> log_to_graylog_port
;
513 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
514 log_channel
, log_prio
, log_to_graylog
,
515 log_to_graylog_host
, log_to_graylog_port
,
519 clog
->update_config(log_to_monitors
, log_to_syslog
,
520 log_channel
, log_prio
, log_to_graylog
,
521 log_to_graylog_host
, log_to_graylog_port
,
524 audit_clog
->update_config(log_to_monitors
, log_to_syslog
,
525 log_channel
, log_prio
, log_to_graylog
,
526 log_to_graylog_host
, log_to_graylog_port
,
530 int Monitor::sanitize_options()
534 // mon_lease must be greater than mon_lease_renewal; otherwise we
535 // may incur in leases expiring before they are renewed.
536 if (g_conf
->mon_lease_renew_interval_factor
>= 1.0) {
537 clog
->error() << "mon_lease_renew_interval_factor ("
538 << g_conf
->mon_lease_renew_interval_factor
539 << ") must be less than 1.0";
543 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
544 // got time to renew the lease and get an ack for it. Having both options
545 // with the same value, for a given small vale, could mean timing out if
546 // the monitors happened to be overloaded -- or even under normal load for
547 // a small enough value.
548 if (g_conf
->mon_lease_ack_timeout_factor
<= 1.0) {
549 clog
->error() << "mon_lease_ack_timeout_factor ("
550 << g_conf
->mon_lease_ack_timeout_factor
551 << ") must be greater than 1.0";
558 int Monitor::preinit()
562 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
564 int r
= sanitize_options();
566 derr
<< "option sanitization failed!" << dendl
;
573 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
574 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess");
575 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions", "sadd");
576 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions", "srm");
577 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions");
578 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in");
579 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started");
580 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won");
581 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost");
582 logger
= pcb
.create_perf_counters();
583 cct
->get_perfcounters_collection()->add(logger
);
586 assert(!cluster_logger
);
588 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
589 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
590 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
591 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
592 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
593 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
594 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
595 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster");
596 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space");
597 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space");
598 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
599 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
600 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
601 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
602 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
603 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
604 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
605 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
606 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
607 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects");
608 pcb
.add_u64(l_cluster_num_mds_up
, "num_mds_up", "MDSs that are up");
609 pcb
.add_u64(l_cluster_num_mds_in
, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
610 pcb
.add_u64(l_cluster_num_mds_failed
, "num_mds_failed", "Failed MDS");
611 pcb
.add_u64(l_cluster_mds_epoch
, "mds_epoch", "Current epoch of MDS map");
612 cluster_logger
= pcb
.create_perf_counters();
615 paxos
->init_logger();
617 // verify cluster_uuid
619 int r
= check_fsid();
631 // have we ever joined a quorum?
632 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
633 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
635 if (!has_ever_joined
) {
636 // impose initial quorum restrictions?
637 list
<string
> initial_members
;
638 get_str_list(g_conf
->mon_initial_members
, initial_members
);
640 if (!initial_members
.empty()) {
641 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
643 monmap
->set_initial_members(g_ceph_context
, initial_members
, name
, messenger
->get_myaddr(),
646 dout(10) << " monmap is " << *monmap
<< dendl
;
647 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
649 } else if (!monmap
->contains(name
)) {
650 derr
<< "not in monmap and have been in a quorum before; "
651 << "must have been removed" << dendl
;
652 if (g_conf
->mon_force_quorum_join
) {
653 dout(0) << "we should have died but "
654 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
656 derr
<< "commit suicide!" << dendl
;
663 // We have a potentially inconsistent store state in hands. Get rid of it
665 bool clear_store
= false;
666 if (store
->exists("mon_sync", "in_sync")) {
667 dout(1) << __func__
<< " clean up potentially inconsistent store state"
672 if (store
->get("mon_sync", "force_sync") > 0) {
673 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
678 set
<string
> sync_prefixes
= get_sync_targets_names();
679 store
->clear(sync_prefixes
);
683 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
684 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
687 health_monitor
->init();
689 if (is_keyring_required()) {
690 // we need to bootstrap authentication keys so we can form an
692 if (authmon()->get_last_committed() == 0) {
693 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
695 int err
= store
->get("mkfs", "keyring", bl
);
696 if (err
== 0 && bl
.length() > 0) {
697 // Attempt to decode and extract keyring only if it is found.
699 bufferlist::iterator p
= bl
.begin();
700 ::decode(keyring
, p
);
701 extract_save_mon_key(keyring
);
705 string keyring_loc
= g_conf
->mon_data
+ "/keyring";
707 r
= keyring
.load(cct
, keyring_loc
);
710 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
712 if (key_server
.get_auth(mon_name
, mon_key
)) {
713 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
714 keyring
.add(mon_name
, mon_key
);
716 keyring
.encode_plaintext(bl
);
717 write_default_keyring(bl
);
719 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
726 admin_hook
= new AdminHook(this);
727 AdminSocket
* admin_socket
= cct
->get_admin_socket();
729 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
731 r
= admin_socket
->register_command("mon_status", "mon_status", admin_hook
,
732 "show current monitor status");
734 r
= admin_socket
->register_command("quorum_status", "quorum_status",
735 admin_hook
, "show current quorum status");
737 r
= admin_socket
->register_command("sync_force",
738 "sync_force name=validate,"
740 "strings=--yes-i-really-mean-it",
742 "force sync of and clear monitor store");
744 r
= admin_socket
->register_command("add_bootstrap_peer_hint",
745 "add_bootstrap_peer_hint name=addr,"
748 "add peer address as potential bootstrap"
749 " peer for cluster bringup");
751 r
= admin_socket
->register_command("quorum enter", "quorum enter",
753 "force monitor back into quorum");
755 r
= admin_socket
->register_command("quorum exit", "quorum exit",
757 "force monitor out of the quorum");
759 r
= admin_socket
->register_command("ops",
762 "show the ops currently in flight");
764 r
= admin_socket
->register_command("sessions",
767 "list existing sessions");
772 // add ourselves as a conf observer
773 g_conf
->add_observer(this);
781 dout(2) << "init" << dendl
;
782 Mutex::Locker
l(lock
);
793 messenger
->add_dispatcher_tail(this);
796 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
797 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
801 // encode command sets
802 const MonCommand
*cmds
;
804 get_locally_supported_monitor_commands(&cmds
, &cmdsize
);
805 MonCommand::encode_array(cmds
, cmdsize
, supported_commands_bl
);
810 void Monitor::init_paxos()
812 dout(10) << __func__
<< dendl
;
816 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
817 paxos_service
[i
]->init();
820 refresh_from_paxos(NULL
);
823 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
825 dout(10) << __func__
<< dendl
;
828 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
831 bufferlist::iterator p
= bl
.begin();
832 ::decode(fingerprint
, p
);
834 catch (buffer::error
& e
) {
835 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
838 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
841 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
842 paxos_service
[i
]->refresh(need_bootstrap
);
844 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
845 paxos_service
[i
]->post_refresh();
850 void Monitor::register_cluster_logger()
852 if (!cluster_logger_registered
) {
853 dout(10) << "register_cluster_logger" << dendl
;
854 cluster_logger_registered
= true;
855 cct
->get_perfcounters_collection()->add(cluster_logger
);
857 dout(10) << "register_cluster_logger - already registered" << dendl
;
861 void Monitor::unregister_cluster_logger()
863 if (cluster_logger_registered
) {
864 dout(10) << "unregister_cluster_logger" << dendl
;
865 cluster_logger_registered
= false;
866 cct
->get_perfcounters_collection()->remove(cluster_logger
);
868 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
872 void Monitor::update_logger()
874 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
875 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
878 void Monitor::shutdown()
880 dout(1) << "shutdown" << dendl
;
884 wait_for_paxos_write();
886 state
= STATE_SHUTDOWN
;
888 g_conf
->remove_observer(this);
891 AdminSocket
* admin_socket
= cct
->get_admin_socket();
892 admin_socket
->unregister_command("mon_status");
893 admin_socket
->unregister_command("quorum_status");
894 admin_socket
->unregister_command("sync_force");
895 admin_socket
->unregister_command("add_bootstrap_peer_hint");
896 admin_socket
->unregister_command("quorum enter");
897 admin_socket
->unregister_command("quorum exit");
898 admin_socket
->unregister_command("ops");
899 admin_socket
->unregister_command("sessions");
906 mgr_client
.shutdown();
909 finisher
.wait_for_empty();
915 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
917 health_monitor
->shutdown();
919 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
920 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
926 remove_all_sessions();
929 cct
->get_perfcounters_collection()->remove(logger
);
933 if (cluster_logger
) {
934 if (cluster_logger_registered
)
935 cct
->get_perfcounters_collection()->remove(cluster_logger
);
936 delete cluster_logger
;
937 cluster_logger
= NULL
;
940 log_client
.shutdown();
942 // unlock before msgr shutdown...
945 messenger
->shutdown(); // last thing! ceph_mon.cc will delete mon.
946 mgr_messenger
->shutdown();
949 void Monitor::wait_for_paxos_write()
951 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
952 dout(10) << __func__
<< " flushing pending write" << dendl
;
956 dout(10) << __func__
<< " flushed pending write" << dendl
;
960 void Monitor::bootstrap()
962 dout(10) << "bootstrap" << dendl
;
963 wait_for_paxos_write();
965 sync_reset_requester();
966 unregister_cluster_logger();
967 cancel_probe_timeout();
970 int newrank
= monmap
->get_rank(messenger
->get_myaddr());
971 if (newrank
< 0 && rank
>= 0) {
972 // was i ever part of the quorum?
973 if (has_ever_joined
) {
974 dout(0) << " removed from monmap, suicide." << dendl
;
978 if (newrank
!= rank
) {
979 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
980 messenger
->set_myname(entity_name_t::MON(newrank
));
983 // reset all connections, or else our peers will think we are someone else.
984 messenger
->mark_down_all();
988 state
= STATE_PROBING
;
993 if (g_conf
->mon_compact_on_bootstrap
) {
994 dout(10) << "bootstrap -- triggering compaction" << dendl
;
996 dout(10) << "bootstrap -- finished compaction" << dendl
;
999 // singleton monitor?
1000 if (monmap
->size() == 1 && rank
== 0) {
1001 win_standalone_election();
1005 reset_probe_timeout();
1007 // i'm outside the quorum
1008 if (monmap
->contains(name
))
1009 outside_quorum
.insert(name
);
1012 dout(10) << "probing other monitors" << dendl
;
1013 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1015 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
),
1016 monmap
->get_inst(i
));
1018 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
1019 p
!= extra_probe_peers
.end();
1021 if (*p
!= messenger
->get_myaddr()) {
1023 i
.name
= entity_name_t::MON(-1);
1025 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
), i
);
1030 bool Monitor::_add_bootstrap_peer_hint(string cmd
, cmdmap_t
& cmdmap
, ostream
& ss
)
1033 if (!cmd_getval(g_ceph_context
, cmdmap
, "addr", addrstr
)) {
1034 ss
<< "unable to parse address string value '"
1035 << cmd_vartype_stringify(cmdmap
["addr"]) << "'";
1038 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' '"
1039 << addrstr
<< "'" << dendl
;
1042 const char *end
= 0;
1043 if (!addr
.parse(addrstr
.c_str(), &end
)) {
1044 ss
<< "failed to parse addr '" << addrstr
<< "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1048 if (is_leader() || is_peon()) {
1049 ss
<< "mon already active; ignoring bootstrap hint";
1053 if (addr
.get_port() == 0)
1054 addr
.set_port(CEPH_MON_PORT
);
1056 extra_probe_peers
.insert(addr
);
1057 ss
<< "adding peer " << addr
<< " to list: " << extra_probe_peers
;
1061 // called by bootstrap(), or on leader|peon -> electing
1062 void Monitor::_reset()
1064 dout(10) << __func__
<< dendl
;
1066 cancel_probe_timeout();
1068 health_events_cleanup();
1069 scrub_event_cancel();
1071 leader_since
= utime_t();
1072 if (!quorum
.empty()) {
1073 exited_quorum
= ceph_clock_now();
1076 outside_quorum
.clear();
1077 quorum_feature_map
.clear();
1083 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
1085 health_monitor
->finish();
1089 // -----------------------------------------------------------
1092 set
<string
> Monitor::get_sync_targets_names()
1094 set
<string
> targets
;
1095 targets
.insert(paxos
->get_name());
1096 for (int i
= 0; i
< PAXOS_NUM
; ++i
)
1097 paxos_service
[i
]->get_store_prefixes(targets
);
1098 ConfigKeyService
*config_key_service_ptr
= dynamic_cast<ConfigKeyService
*>(config_key_service
);
1099 assert(config_key_service_ptr
);
1100 config_key_service_ptr
->get_store_prefixes(targets
);
1105 void Monitor::sync_timeout()
1107 dout(10) << __func__
<< dendl
;
1108 assert(state
== STATE_SYNCHRONIZING
);
1112 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1114 dout(1) << __func__
<< dendl
;
1116 MonMap latest_monmap
;
1118 // Grab latest monmap from MonmapMonitor
1119 bufferlist monmon_bl
;
1120 int err
= monmon()->get_monmap(monmon_bl
);
1122 if (err
!= -ENOENT
) {
1124 << " something wrong happened while reading the store: "
1125 << cpp_strerror(err
) << dendl
;
1126 assert(0 == "error reading the store");
1129 latest_monmap
.decode(monmon_bl
);
1132 // Grab last backed up monmap (if any) and compare epochs
1133 if (store
->exists("mon_sync", "latest_monmap")) {
1134 bufferlist backup_bl
;
1135 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1138 << " something wrong happened while reading the store: "
1139 << cpp_strerror(err
) << dendl
;
1140 assert(0 == "error reading the store");
1142 assert(backup_bl
.length() > 0);
1144 MonMap backup_monmap
;
1145 backup_monmap
.decode(backup_bl
);
1147 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1148 latest_monmap
= backup_monmap
;
1151 // Check if our current monmap's epoch is greater than the one we've
1153 if (monmap
->epoch
> latest_monmap
.epoch
)
1154 latest_monmap
= *monmap
;
1156 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1158 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1161 void Monitor::sync_reset_requester()
1163 dout(10) << __func__
<< dendl
;
1165 if (sync_timeout_event
) {
1166 timer
.cancel_event(sync_timeout_event
);
1167 sync_timeout_event
= NULL
;
1170 sync_provider
= entity_inst_t();
1173 sync_start_version
= 0;
1176 void Monitor::sync_reset_provider()
1178 dout(10) << __func__
<< dendl
;
1179 sync_providers
.clear();
1182 void Monitor::sync_start(entity_inst_t
&other
, bool full
)
1184 dout(10) << __func__
<< " " << other
<< (full
? " full" : " recent") << dendl
;
1186 assert(state
== STATE_PROBING
||
1187 state
== STATE_SYNCHRONIZING
);
1188 state
= STATE_SYNCHRONIZING
;
1190 // make sure are not a provider for anyone!
1191 sync_reset_provider();
1196 // stash key state, and mark that we are syncing
1197 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1198 sync_stash_critical_state(t
);
1199 t
->put("mon_sync", "in_sync", 1);
1201 sync_last_committed_floor
= MAX(sync_last_committed_floor
, paxos
->get_version());
1202 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1203 << sync_last_committed_floor
<< dendl
;
1204 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1206 store
->apply_transaction(t
);
1208 assert(g_conf
->mon_sync_requester_kill_at
!= 1);
1210 // clear the underlying store
1211 set
<string
> targets
= get_sync_targets_names();
1212 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1213 store
->clear(targets
);
1215 // make sure paxos knows it has been reset. this prevents a
1216 // bootstrap and then different probe reply order from possibly
1217 // deciding a partial or no sync is needed.
1220 assert(g_conf
->mon_sync_requester_kill_at
!= 2);
1223 // assume 'other' as the leader. We will update the leader once we receive
1224 // a reply to the sync start.
1225 sync_provider
= other
;
1227 sync_reset_timeout();
1229 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1231 m
->last_committed
= paxos
->get_version();
1232 messenger
->send_message(m
, sync_provider
);
1235 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1237 dout(10) << __func__
<< dendl
;
1238 bufferlist backup_monmap
;
1239 sync_obtain_latest_monmap(backup_monmap
);
1240 assert(backup_monmap
.length() > 0);
1241 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1244 void Monitor::sync_reset_timeout()
1246 dout(10) << __func__
<< dendl
;
1247 if (sync_timeout_event
)
1248 timer
.cancel_event(sync_timeout_event
);
1249 sync_timeout_event
= new C_MonContext(this, [this](int) {
1252 timer
.add_event_after(g_conf
->mon_sync_timeout
, sync_timeout_event
);
1255 void Monitor::sync_finish(version_t last_committed
)
1257 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1259 assert(g_conf
->mon_sync_requester_kill_at
!= 7);
1262 // finalize the paxos commits
1263 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1264 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1266 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1268 dout(30) << __func__
<< " final tx dump:\n";
1269 JSONFormatter
f(true);
1274 store
->apply_transaction(tx
);
1277 assert(g_conf
->mon_sync_requester_kill_at
!= 8);
1279 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1280 t
->erase("mon_sync", "in_sync");
1281 t
->erase("mon_sync", "force_sync");
1282 t
->erase("mon_sync", "last_committed_floor");
1283 store
->apply_transaction(t
);
1285 assert(g_conf
->mon_sync_requester_kill_at
!= 9);
1289 assert(g_conf
->mon_sync_requester_kill_at
!= 10);
1294 void Monitor::handle_sync(MonOpRequestRef op
)
1296 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1297 dout(10) << __func__
<< " " << *m
<< dendl
;
1300 // provider ---------
1302 case MMonSync::OP_GET_COOKIE_FULL
:
1303 case MMonSync::OP_GET_COOKIE_RECENT
:
1304 handle_sync_get_cookie(op
);
1306 case MMonSync::OP_GET_CHUNK
:
1307 handle_sync_get_chunk(op
);
1310 // client -----------
1312 case MMonSync::OP_COOKIE
:
1313 handle_sync_cookie(op
);
1316 case MMonSync::OP_CHUNK
:
1317 case MMonSync::OP_LAST_CHUNK
:
1318 handle_sync_chunk(op
);
1320 case MMonSync::OP_NO_COOKIE
:
1321 handle_sync_no_cookie(op
);
1325 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1326 assert(0 == "unknown op");
1332 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1334 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1335 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1336 m
->get_connection()->send_message(reply
);
1339 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1341 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1342 if (is_synchronizing()) {
1343 _sync_reply_no_cookie(op
);
1347 assert(g_conf
->mon_sync_provider_kill_at
!= 1);
1349 // make sure they can understand us.
1350 if ((required_features
^ m
->get_connection()->get_features()) &
1351 required_features
) {
1352 dout(5) << " ignoring peer mon." << m
->get_source().num()
1353 << " has features " << std::hex
1354 << m
->get_connection()->get_features()
1355 << " but we require " << required_features
<< std::dec
<< dendl
;
1359 // make up a unique cookie. include election epoch (which persists
1360 // across restarts for the whole cluster) and a counter for this
1361 // process instance. there is no need to be unique *across*
1362 // monitors, though.
1363 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1364 assert(sync_providers
.count(cookie
) == 0);
1366 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1368 SyncProvider
& sp
= sync_providers
[cookie
];
1370 sp
.entity
= m
->get_source_inst();
1371 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1373 set
<string
> sync_targets
;
1374 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1376 sync_targets
= get_sync_targets_names();
1377 sp
.last_committed
= paxos
->get_version();
1378 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1380 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1382 // just catch up paxos
1383 sp
.last_committed
= m
->last_committed
;
1385 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1387 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1388 reply
->last_committed
= sp
.last_committed
;
1389 m
->get_connection()->send_message(reply
);
1392 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1394 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1395 dout(10) << __func__
<< " " << *m
<< dendl
;
1397 if (sync_providers
.count(m
->cookie
) == 0) {
1398 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1399 _sync_reply_no_cookie(op
);
1403 assert(g_conf
->mon_sync_provider_kill_at
!= 2);
1405 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1406 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1408 if (sp
.last_committed
< paxos
->get_first_committed() &&
1409 paxos
->get_first_committed() > 1) {
1410 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1411 << " < our fc " << paxos
->get_first_committed() << dendl
;
1412 sync_providers
.erase(m
->cookie
);
1413 _sync_reply_no_cookie(op
);
1417 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1418 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1420 int left
= g_conf
->mon_sync_max_payload_size
;
1421 while (sp
.last_committed
< paxos
->get_version() && left
> 0) {
1423 sp
.last_committed
++;
1425 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1428 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1429 left
-= bl
.length();
1430 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1433 reply
->last_committed
= sp
.last_committed
;
1435 if (sp
.full
&& left
> 0) {
1436 sp
.synchronizer
->get_chunk_tx(tx
, left
);
1437 sp
.last_key
= sp
.synchronizer
->get_last_key();
1438 reply
->last_key
= sp
.last_key
;
1441 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1442 sp
.last_committed
< paxos
->get_version()) {
1443 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1444 << " key " << sp
.last_key
<< dendl
;
1446 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1447 << " key " << sp
.last_key
<< dendl
;
1448 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1450 assert(g_conf
->mon_sync_provider_kill_at
!= 3);
1452 // clean up our local state
1453 sync_providers
.erase(sp
.cookie
);
1456 ::encode(*tx
, reply
->chunk_bl
);
1458 m
->get_connection()->send_message(reply
);
1463 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1465 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1466 dout(10) << __func__
<< " " << *m
<< dendl
;
1468 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1471 if (m
->get_source_inst() != sync_provider
) {
1472 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1475 sync_cookie
= m
->cookie
;
1476 sync_start_version
= m
->last_committed
;
1478 sync_reset_timeout();
1479 sync_get_next_chunk();
1481 assert(g_conf
->mon_sync_requester_kill_at
!= 3);
1484 void Monitor::sync_get_next_chunk()
1486 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1487 if (g_conf
->mon_inject_sync_get_chunk_delay
> 0) {
1488 dout(20) << __func__
<< " injecting delay of " << g_conf
->mon_inject_sync_get_chunk_delay
<< dendl
;
1489 usleep((long long)(g_conf
->mon_inject_sync_get_chunk_delay
* 1000000.0));
1491 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1492 messenger
->send_message(r
, sync_provider
);
1494 assert(g_conf
->mon_sync_requester_kill_at
!= 4);
1497 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1499 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1500 dout(10) << __func__
<< " " << *m
<< dendl
;
1502 if (m
->cookie
!= sync_cookie
) {
1503 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1506 if (m
->get_source_inst() != sync_provider
) {
1507 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1511 assert(state
== STATE_SYNCHRONIZING
);
1512 assert(g_conf
->mon_sync_requester_kill_at
!= 5);
1514 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1515 tx
->append_from_encoded(m
->chunk_bl
);
1517 dout(30) << __func__
<< " tx dump:\n";
1518 JSONFormatter
f(true);
1523 store
->apply_transaction(tx
);
1525 assert(g_conf
->mon_sync_requester_kill_at
!= 6);
1528 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1529 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1530 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1532 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1534 dout(30) << __func__
<< " tx dump:\n";
1535 JSONFormatter
f(true);
1540 store
->apply_transaction(tx
);
1541 paxos
->init(); // to refresh what we just wrote
1544 if (m
->op
== MMonSync::OP_CHUNK
) {
1545 sync_reset_timeout();
1546 sync_get_next_chunk();
1547 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1548 sync_finish(m
->last_committed
);
1552 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1554 dout(10) << __func__
<< dendl
;
1558 void Monitor::sync_trim_providers()
1560 dout(20) << __func__
<< dendl
;
1562 utime_t now
= ceph_clock_now();
1563 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1564 while (p
!= sync_providers
.end()) {
1565 if (now
> p
->second
.timeout
) {
1566 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
<< " for " << p
->second
.entity
<< dendl
;
1567 sync_providers
.erase(p
++);
1574 // ---------------------------------------------------
1577 void Monitor::cancel_probe_timeout()
1579 if (probe_timeout_event
) {
1580 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1581 timer
.cancel_event(probe_timeout_event
);
1582 probe_timeout_event
= NULL
;
1584 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1588 void Monitor::reset_probe_timeout()
1590 cancel_probe_timeout();
1591 probe_timeout_event
= new C_MonContext(this, [this](int r
) {
1594 double t
= g_conf
->mon_probe_timeout
;
1595 timer
.add_event_after(t
, probe_timeout_event
);
1596 dout(10) << "reset_probe_timeout " << probe_timeout_event
<< " after " << t
<< " seconds" << dendl
;
1599 void Monitor::probe_timeout(int r
)
1601 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1602 assert(is_probing() || is_synchronizing());
1603 assert(probe_timeout_event
);
1604 probe_timeout_event
= NULL
;
1608 void Monitor::handle_probe(MonOpRequestRef op
)
1610 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1611 dout(10) << "handle_probe " << *m
<< dendl
;
1613 if (m
->fsid
!= monmap
->fsid
) {
1614 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1619 case MMonProbe::OP_PROBE
:
1620 handle_probe_probe(op
);
1623 case MMonProbe::OP_REPLY
:
1624 handle_probe_reply(op
);
1627 case MMonProbe::OP_MISSING_FEATURES
:
1628 derr
<< __func__
<< " missing features, have " << CEPH_FEATURES_ALL
1629 << ", required " << m
->required_features
1630 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
)
1636 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1638 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1640 dout(10) << "handle_probe_probe " << m
->get_source_inst() << *m
1641 << " features " << m
->get_connection()->get_features() << dendl
;
1642 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1644 dout(1) << " peer " << m
->get_source_addr() << " missing features "
1645 << missing
<< dendl
;
1646 if (m
->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY
)) {
1647 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1648 name
, has_ever_joined
);
1649 m
->required_features
= required_features
;
1650 m
->get_connection()->send_message(r
);
1655 if (!is_probing() && !is_synchronizing()) {
1656 // If the probing mon is way ahead of us, we need to re-bootstrap.
1657 // Normally we capture this case when we initially bootstrap, but
1658 // it is possible we pass those checks (we overlap with
1659 // quorum-to-be) but fail to join a quorum before it moves past
1660 // us. We need to be kicked back to bootstrap so we can
1661 // synchonize, not keep calling elections.
1662 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1663 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1664 << "ahead of us, re-bootstrapping" << dendl
;
1672 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
);
1675 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1676 r
->paxos_first_version
= paxos
->get_first_committed();
1677 r
->paxos_last_version
= paxos
->get_version();
1678 m
->get_connection()->send_message(r
);
1680 // did we discover a peer here?
1681 if (!monmap
->contains(m
->get_source_addr())) {
1682 dout(1) << " adding peer " << m
->get_source_addr()
1683 << " to list of hints" << dendl
;
1684 extra_probe_peers
.insert(m
->get_source_addr());
1691 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1693 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1694 dout(10) << "handle_probe_reply " << m
->get_source_inst() << *m
<< dendl
;
1695 dout(10) << " monmap is " << *monmap
<< dendl
;
1697 // discover name and addrs during probing or electing states.
1698 if (!is_probing() && !is_electing()) {
1702 // newer map, or they've joined a quorum and we haven't?
1704 monmap
->encode(mybl
, m
->get_connection()->get_features());
1705 // make sure it's actually different; the checks below err toward
1706 // taking the other guy's map, which could cause us to loop.
1707 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1708 MonMap
*newmap
= new MonMap
;
1709 newmap
->decode(m
->monmap_bl
);
1710 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1711 !has_ever_joined
)) {
1712 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1713 << ", mine was " << monmap
->get_epoch() << dendl
;
1715 monmap
->decode(m
->monmap_bl
);
1724 string peer_name
= monmap
->get_name(m
->get_source_addr());
1725 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
1726 dout(10) << " renaming peer " << m
->get_source_addr() << " "
1727 << peer_name
<< " -> " << m
->name
<< " in my monmap"
1729 monmap
->rename(peer_name
, m
->name
);
1731 if (is_electing()) {
1736 dout(10) << " peer name is " << peer_name
<< dendl
;
1739 // new initial peer?
1740 if (monmap
->get_epoch() == 0 &&
1741 monmap
->contains(m
->name
) &&
1742 monmap
->get_addr(m
->name
).is_blank_ip()) {
1743 dout(1) << " learned initial mon " << m
->name
<< " addr " << m
->get_source_addr() << dendl
;
1744 monmap
->set_addr(m
->name
, m
->get_source_addr());
1750 // end discover phase
1751 if (!is_probing()) {
1755 assert(paxos
!= NULL
);
1757 if (is_synchronizing()) {
1758 dout(10) << " currently syncing" << dendl
;
1762 entity_inst_t other
= m
->get_source_inst();
1764 if (m
->paxos_last_version
< sync_last_committed_floor
) {
1765 dout(10) << " peer paxos versions [" << m
->paxos_first_version
1766 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
1767 << sync_last_committed_floor
<< ", ignoring"
1770 if (paxos
->get_version() < m
->paxos_first_version
&&
1771 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
1772 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
1773 << "," << m
->paxos_last_version
<< "]"
1774 << " vs my version " << paxos
->get_version()
1775 << " (too far ahead)"
1777 cancel_probe_timeout();
1778 sync_start(other
, true);
1781 if (paxos
->get_version() + g_conf
->paxos_max_join_drift
< m
->paxos_last_version
) {
1782 dout(10) << " peer paxos last version " << m
->paxos_last_version
1783 << " vs my version " << paxos
->get_version()
1784 << " (too far ahead)"
1786 cancel_probe_timeout();
1787 sync_start(other
, false);
1792 // is there an existing quorum?
1793 if (m
->quorum
.size()) {
1794 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
1796 dout(10) << " peer paxos version " << m
->paxos_last_version
1797 << " vs my version " << paxos
->get_version()
1801 if (monmap
->contains(name
) &&
1802 !monmap
->get_addr(name
).is_blank_ip()) {
1803 // i'm part of the cluster; just initiate a new election
1806 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl
;
1807 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
1808 monmap
->get_inst(*m
->quorum
.begin()));
1811 if (monmap
->contains(m
->name
)) {
1812 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
1813 outside_quorum
.insert(m
->name
);
1815 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
1819 unsigned need
= monmap
->size() / 2 + 1;
1820 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
1821 if (outside_quorum
.size() >= need
) {
1822 if (outside_quorum
.count(name
)) {
1823 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
1826 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
1829 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
1834 void Monitor::join_election()
1836 dout(10) << __func__
<< dendl
;
1837 wait_for_paxos_write();
1839 state
= STATE_ELECTING
;
1841 logger
->inc(l_mon_num_elections
);
1844 void Monitor::start_election()
1846 dout(10) << "start_election" << dendl
;
1847 wait_for_paxos_write();
1849 state
= STATE_ELECTING
;
1851 logger
->inc(l_mon_num_elections
);
1852 logger
->inc(l_mon_election_call
);
1854 clog
->info() << "mon." << name
<< " calling new monitor election";
1855 elector
.call_election();
1858 void Monitor::win_standalone_election()
1860 dout(1) << "win_standalone_election" << dendl
;
1862 // bump election epoch, in case the previous epoch included other
1863 // monitors; we need to be able to make the distinction.
1865 elector
.advance_epoch();
1867 rank
= monmap
->get_rank(name
);
1872 map
<int,Metadata
> metadata
;
1873 collect_metadata(&metadata
[0]);
1875 const MonCommand
*my_cmds
= nullptr;
1877 get_locally_supported_monitor_commands(&my_cmds
, &cmdsize
);
1878 win_election(elector
.get_epoch(), q
,
1880 ceph::features::mon::get_supported(),
1885 const utime_t
& Monitor::get_leader_since() const
1887 assert(state
== STATE_LEADER
);
1888 return leader_since
;
1891 epoch_t
Monitor::get_epoch()
1893 return elector
.get_epoch();
1896 void Monitor::_finish_svc_election()
1898 assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
1900 for (auto p
: paxos_service
) {
1901 // we already called election_finished() on monmon(); avoid callig twice
1902 if (state
== STATE_LEADER
&& p
== monmon())
1904 p
->election_finished();
1908 void Monitor::win_election(epoch_t epoch
, set
<int>& active
, uint64_t features
,
1909 const mon_feature_t
& mon_features
,
1910 const map
<int,Metadata
>& metadata
,
1911 const MonCommand
*cmdset
, int cmdsize
)
1913 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
1914 << " features " << features
1915 << " mon_features " << mon_features
1917 assert(is_electing());
1918 state
= STATE_LEADER
;
1919 leader_since
= ceph_clock_now();
1922 quorum_con_features
= features
;
1923 quorum_mon_features
= mon_features
;
1924 pending_metadata
= metadata
;
1925 outside_quorum
.clear();
1927 clog
->info() << "mon." << name
<< "@" << rank
1928 << " won leader election with quorum " << quorum
;
1930 set_leader_supported_commands(cmdset
, cmdsize
);
1932 paxos
->leader_init();
1933 // NOTE: tell monmap monitor first. This is important for the
1934 // bootstrap case to ensure that the very first paxos proposal
1935 // codifies the monmap. Otherwise any manner of chaos can ensue
1936 // when monitors are call elections or participating in a paxos
1937 // round without agreeing on who the participants are.
1938 monmon()->election_finished();
1939 _finish_svc_election();
1940 health_monitor
->start(epoch
);
1942 logger
->inc(l_mon_election_win
);
1944 // inject new metadata in first transaction.
1946 // include previous metadata for missing mons (that aren't part of
1947 // the current quorum).
1948 map
<int,Metadata
> m
= metadata
;
1949 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
1950 if (m
.count(rank
) == 0 &&
1951 mon_metadata
.count(rank
)) {
1952 m
[rank
] = mon_metadata
[rank
];
1956 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1957 // a new transaction immediately after the election finishes. We should
1958 // do that anyway for other reasons, though.
1959 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
1962 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
1966 if (monmap
->size() > 1 &&
1967 monmap
->get_epoch() > 0) {
1969 health_tick_start();
1970 do_health_to_clog_interval();
1971 scrub_event_start();
1975 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
1977 const mon_feature_t
& mon_features
)
1980 leader_since
= utime_t();
1983 outside_quorum
.clear();
1984 quorum_con_features
= features
;
1985 quorum_mon_features
= mon_features
;
1986 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
1987 << " quorum is " << quorum
<< " features are " << quorum_con_features
1988 << " mon_features are " << quorum_mon_features
1992 _finish_svc_election();
1993 health_monitor
->start(epoch
);
1995 logger
->inc(l_mon_election_lose
);
1999 if ((quorum_con_features
& CEPH_FEATURE_MON_METADATA
) &&
2000 !HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
)) {
2001 // for pre-luminous mons only
2003 collect_metadata(&sys_info
);
2004 messenger
->send_message(new MMonMetadata(sys_info
),
2005 monmap
->get_inst(get_leader()));
2009 void Monitor::collect_metadata(Metadata
*m
)
2011 collect_sys_info(m
, g_ceph_context
);
2012 (*m
)["addr"] = stringify(messenger
->get_myaddr());
2015 void Monitor::finish_election()
2017 apply_quorum_to_compatset_features();
2018 apply_monmap_to_compatset_features();
2020 exited_quorum
= utime_t();
2021 finish_contexts(g_ceph_context
, waitfor_quorum
);
2022 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2023 resend_routed_requests();
2025 register_cluster_logger();
2027 // am i named properly?
2028 string cur_name
= monmap
->get_name(messenger
->get_myaddr());
2029 if (cur_name
!= name
) {
2030 dout(10) << " renaming myself from " << cur_name
<< " -> " << name
<< dendl
;
2031 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
2032 monmap
->get_inst(*quorum
.begin()));
2036 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2038 if (new_features
.compare(features
) != 0) {
2039 CompatSet diff
= features
.unsupported(new_features
);
2040 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2041 features
= new_features
;
2043 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2045 store
->apply_transaction(t
);
2047 calc_quorum_requirements();
2051 void Monitor::apply_quorum_to_compatset_features()
2053 CompatSet
new_features(features
);
2054 if (quorum_con_features
& CEPH_FEATURE_OSD_ERASURE_CODES
) {
2055 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2057 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2058 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2060 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
) {
2061 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2063 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
) {
2064 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2066 dout(5) << __func__
<< dendl
;
2067 _apply_compatset_features(new_features
);
2070 void Monitor::apply_monmap_to_compatset_features()
2072 CompatSet
new_features(features
);
2073 mon_feature_t monmap_features
= monmap
->get_required_features();
2075 /* persistent monmap features may go into the compatset.
2076 * optional monmap features may not - why?
2077 * because optional monmap features may be set/unset by the admin,
2078 * and possibly by other means that haven't yet been thought out,
2079 * so we can't make the monitor enforce them on start - because they
2081 * this, of course, does not invalidate setting a compatset feature
2082 * for an optional feature - as long as you make sure to clean it up
2083 * once you unset it.
2085 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2086 assert(ceph::features::mon::get_persistent().contains_all(
2087 ceph::features::mon::FEATURE_KRAKEN
));
2088 // this feature should only ever be set if the quorum supports it.
2089 assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2090 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2093 dout(5) << __func__
<< dendl
;
2094 _apply_compatset_features(new_features
);
2097 void Monitor::calc_quorum_requirements()
2099 required_features
= 0;
2102 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
)) {
2103 required_features
|= CEPH_FEATURE_OSD_ERASURE_CODES
;
2105 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2106 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2108 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
)) {
2109 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
;
2111 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
)) {
2112 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
;
2114 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2115 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2119 if (monmap
->get_required_features().contains_all(
2120 ceph::features::mon::FEATURE_KRAKEN
)) {
2121 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2123 if (monmap
->get_required_features().contains_all(
2124 ceph::features::mon::FEATURE_LUMINOUS
)) {
2125 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2127 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2130 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2132 *fm
+= session_map
.feature_map
;
2133 for (auto id
: quorum
) {
2135 *fm
+= quorum_feature_map
[id
];
2140 void Monitor::sync_force(Formatter
*f
, ostream
& ss
)
2142 bool free_formatter
= false;
2145 // louzy/lazy hack: default to json if no formatter has been defined
2146 f
= new JSONFormatter();
2147 free_formatter
= true;
2150 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2151 sync_stash_critical_state(tx
);
2152 tx
->put("mon_sync", "force_sync", 1);
2153 store
->apply_transaction(tx
);
2155 f
->open_object_section("sync_force");
2156 f
->dump_int("ret", 0);
2157 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2158 f
->close_section(); // sync_force
2164 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2166 bool free_formatter
= false;
2169 // louzy/lazy hack: default to json if no formatter has been defined
2170 f
= new JSONFormatter();
2171 free_formatter
= true;
2173 f
->open_object_section("quorum_status");
2174 f
->dump_int("election_epoch", get_epoch());
2176 f
->open_array_section("quorum");
2177 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2178 f
->dump_int("mon", *p
);
2179 f
->close_section(); // quorum
2181 list
<string
> quorum_names
= get_quorum_names();
2182 f
->open_array_section("quorum_names");
2183 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2184 f
->dump_string("mon", *p
);
2185 f
->close_section(); // quorum_names
2187 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(*quorum
.begin()));
2189 f
->open_object_section("monmap");
2191 f
->close_section(); // monmap
2193 f
->close_section(); // quorum_status
2199 void Monitor::get_mon_status(Formatter
*f
, ostream
& ss
)
2201 bool free_formatter
= false;
2204 // louzy/lazy hack: default to json if no formatter has been defined
2205 f
= new JSONFormatter();
2206 free_formatter
= true;
2209 f
->open_object_section("mon_status");
2210 f
->dump_string("name", name
);
2211 f
->dump_int("rank", rank
);
2212 f
->dump_string("state", get_state_name());
2213 f
->dump_int("election_epoch", get_epoch());
2215 f
->open_array_section("quorum");
2216 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2217 f
->dump_int("mon", *p
);
2220 f
->close_section(); // quorum
2222 f
->open_object_section("features");
2223 f
->dump_stream("required_con") << required_features
;
2224 mon_feature_t req_mon_features
= get_required_mon_features();
2225 req_mon_features
.dump(f
, "required_mon");
2226 f
->dump_stream("quorum_con") << quorum_con_features
;
2227 quorum_mon_features
.dump(f
, "quorum_mon");
2228 f
->close_section(); // features
2230 f
->open_array_section("outside_quorum");
2231 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2232 f
->dump_string("mon", *p
);
2233 f
->close_section(); // outside_quorum
2235 f
->open_array_section("extra_probe_peers");
2236 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
2237 p
!= extra_probe_peers
.end();
2239 f
->dump_stream("peer") << *p
;
2240 f
->close_section(); // extra_probe_peers
2242 f
->open_array_section("sync_provider");
2243 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2244 p
!= sync_providers
.end();
2246 f
->dump_unsigned("cookie", p
->second
.cookie
);
2247 f
->dump_stream("entity") << p
->second
.entity
;
2248 f
->dump_stream("timeout") << p
->second
.timeout
;
2249 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2250 f
->dump_stream("last_key") << p
->second
.last_key
;
2254 if (is_synchronizing()) {
2255 f
->open_object_section("sync");
2256 f
->dump_stream("sync_provider") << sync_provider
;
2257 f
->dump_unsigned("sync_cookie", sync_cookie
);
2258 f
->dump_unsigned("sync_start_version", sync_start_version
);
2262 if (g_conf
->mon_sync_provider_kill_at
> 0)
2263 f
->dump_int("provider_kill_at", g_conf
->mon_sync_provider_kill_at
);
2264 if (g_conf
->mon_sync_requester_kill_at
> 0)
2265 f
->dump_int("requester_kill_at", g_conf
->mon_sync_requester_kill_at
);
2267 f
->open_object_section("monmap");
2271 f
->dump_object("feature_map", session_map
.feature_map
);
2272 f
->close_section(); // mon_status
2274 if (free_formatter
) {
2275 // flush formatter to ss and delete it iff we created the formatter
2282 // health status to clog
2284 void Monitor::health_tick_start()
2286 if (!cct
->_conf
->mon_health_to_clog
||
2287 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2290 dout(15) << __func__
<< dendl
;
2293 health_tick_event
= new C_MonContext(this, [this](int r
) {
2296 do_health_to_clog();
2297 health_tick_start();
2299 timer
.add_event_after(cct
->_conf
->mon_health_to_clog_tick_interval
,
2303 void Monitor::health_tick_stop()
2305 dout(15) << __func__
<< dendl
;
2307 if (health_tick_event
) {
2308 timer
.cancel_event(health_tick_event
);
2309 health_tick_event
= NULL
;
2313 utime_t
Monitor::health_interval_calc_next_update()
2315 utime_t now
= ceph_clock_now();
2317 time_t secs
= now
.sec();
2318 int remainder
= secs
% cct
->_conf
->mon_health_to_clog_interval
;
2319 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2320 utime_t next
= utime_t(secs
+ adjustment
, 0);
2322 dout(20) << __func__
2323 << " now: " << now
<< ","
2324 << " next: " << next
<< ","
2325 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2331 void Monitor::health_interval_start()
2333 dout(15) << __func__
<< dendl
;
2335 if (!cct
->_conf
->mon_health_to_clog
||
2336 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2340 health_interval_stop();
2341 utime_t next
= health_interval_calc_next_update();
2342 health_interval_event
= new C_MonContext(this, [this](int r
) {
2345 do_health_to_clog_interval();
2347 timer
.add_event_at(next
, health_interval_event
);
2350 void Monitor::health_interval_stop()
2352 dout(15) << __func__
<< dendl
;
2353 if (health_interval_event
) {
2354 timer
.cancel_event(health_interval_event
);
2356 health_interval_event
= NULL
;
2359 void Monitor::health_events_cleanup()
2362 health_interval_stop();
2363 health_status_cache
.reset();
2366 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2368 dout(20) << __func__
<< dendl
;
2370 if (changed
.count("mon_health_to_clog")) {
2371 if (!cct
->_conf
->mon_health_to_clog
) {
2372 health_events_cleanup();
2374 if (!health_tick_event
) {
2375 health_tick_start();
2377 if (!health_interval_event
) {
2378 health_interval_start();
2383 if (changed
.count("mon_health_to_clog_interval")) {
2384 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2385 health_interval_stop();
2387 health_interval_start();
2391 if (changed
.count("mon_health_to_clog_tick_interval")) {
2392 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2395 health_tick_start();
2400 void Monitor::do_health_to_clog_interval()
2402 // outputting to clog may have been disabled in the conf
2403 // since we were scheduled.
2404 if (!cct
->_conf
->mon_health_to_clog
||
2405 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2408 dout(10) << __func__
<< dendl
;
2410 // do we have a cached value for next_clog_update? if not,
2411 // do we know when the last update was?
2413 do_health_to_clog(true);
2414 health_interval_start();
2417 void Monitor::do_health_to_clog(bool force
)
2419 // outputting to clog may have been disabled in the conf
2420 // since we were scheduled.
2421 if (!cct
->_conf
->mon_health_to_clog
||
2422 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2425 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2427 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2429 health_status_t level
= get_health_status(false, nullptr, &summary
);
2431 summary
== health_status_cache
.summary
&&
2432 level
== health_status_cache
.overall
)
2434 if (level
== HEALTH_OK
)
2435 clog
->info() << "overall " << summary
;
2436 else if (level
== HEALTH_WARN
)
2437 clog
->warn() << "overall " << summary
;
2438 else if (level
== HEALTH_ERR
)
2439 clog
->error() << "overall " << summary
;
2442 health_status_cache
.summary
= summary
;
2443 health_status_cache
.overall
= level
;
2446 list
<string
> status
;
2447 health_status_t overall
= get_health(status
, NULL
, NULL
);
2448 dout(25) << __func__
2449 << (force
? " (force)" : "")
2452 string summary
= joinify(status
.begin(), status
.end(), string("; "));
2455 overall
== health_status_cache
.overall
&&
2456 !health_status_cache
.summary
.empty() &&
2457 health_status_cache
.summary
== summary
) {
2462 clog
->info() << summary
;
2464 health_status_cache
.overall
= overall
;
2465 health_status_cache
.summary
= summary
;
2469 health_status_t
Monitor::get_health_status(
2476 health_status_t r
= HEALTH_OK
;
2477 bool compat
= g_conf
->mon_health_preluminous_compat
;
2479 f
->open_object_section("health");
2480 f
->open_object_section("checks");
2484 string
*psummary
= f
? nullptr : &summary
;
2485 for (auto& svc
: paxos_service
) {
2486 r
= std::min(r
, svc
->get_health_checks().dump_summary(
2487 f
, psummary
, sep2
, want_detail
));
2492 f
->dump_stream("status") << r
;
2494 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2495 *plain
= stringify(r
);
2496 if (summary
.size()) {
2504 f
->open_array_section("summary");
2505 for (auto& svc
: paxos_service
) {
2506 svc
->get_health_checks().dump_summary_compat(f
);
2509 f
->dump_stream("overall_status") << r
;
2514 f
->open_array_section("detail");
2517 for (auto& svc
: paxos_service
) {
2518 svc
->get_health_checks().dump_detail(f
, plain
, compat
);
2531 void Monitor::log_health(
2532 const health_check_map_t
& updated
,
2533 const health_check_map_t
& previous
,
2534 MonitorDBStore::TransactionRef t
)
2536 if (!g_conf
->mon_health_to_clog
) {
2539 // FIXME: log atomically as part of @t instead of using clog.
2540 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2541 << " previous " << previous
.checks
.size()
2543 for (auto& p
: updated
.checks
) {
2544 auto q
= previous
.checks
.find(p
.first
);
2545 if (q
== previous
.checks
.end()) {
2548 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2550 if (p
.second
.severity
== HEALTH_WARN
)
2551 clog
->warn() << ss
.str();
2553 clog
->error() << ss
.str();
2555 if (p
.second
.summary
!= q
->second
.summary
||
2556 p
.second
.severity
!= q
->second
.severity
) {
2557 // summary or severity changed (ignore detail changes at this level)
2559 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2560 if (p
.second
.severity
== HEALTH_WARN
)
2561 clog
->warn() << ss
.str();
2563 clog
->error() << ss
.str();
2567 for (auto& p
: previous
.checks
) {
2568 if (!updated
.checks
.count(p
.first
)) {
2571 if (p
.first
== "DEGRADED_OBJECTS") {
2572 clog
->info() << "All degraded objects recovered";
2573 } else if (p
.first
== "OSD_FLAGS") {
2574 clog
->info() << "OSD flags cleared";
2576 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2577 << p
.second
.summary
<< ")";
2582 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2583 // We might be going into a fully healthy state, check
2585 bool any_checks
= false;
2586 for (auto& svc
: paxos_service
) {
2587 if (&(svc
->get_health_checks()) == &(previous
)) {
2588 // Ignore the ones we're clearing right now
2592 if (svc
->get_health_checks().checks
.size() > 0) {
2598 clog
->info() << "Cluster is now healthy";
2603 health_status_t
Monitor::get_health(list
<string
>& status
,
2604 bufferlist
*detailbl
,
2607 list
<pair
<health_status_t
,string
> > summary
;
2608 list
<pair
<health_status_t
,string
> > detail
;
2611 f
->open_object_section("health");
2613 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin();
2614 p
!= paxos_service
.end();
2616 PaxosService
*s
= *p
;
2617 s
->get_health(summary
, detailbl
? &detail
: NULL
, cct
);
2620 health_monitor
->get_health(summary
, (detailbl
? &detail
: NULL
));
2622 health_status_t overall
= HEALTH_OK
;
2623 if (!timecheck_skews
.empty()) {
2625 for (map
<entity_inst_t
,double>::iterator i
= timecheck_skews
.begin();
2626 i
!= timecheck_skews
.end(); ++i
) {
2627 entity_inst_t inst
= i
->first
;
2628 double skew
= i
->second
;
2629 double latency
= timecheck_latencies
[inst
];
2630 string name
= monmap
->get_name(inst
.addr
);
2632 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
2633 if (tcstatus
!= HEALTH_OK
) {
2634 if (overall
> tcstatus
)
2636 warns
.push_back(name
);
2637 ostringstream tmp_ss
;
2638 tmp_ss
<< "mon." << name
2639 << " addr " << inst
.addr
<< " " << tcss
.str()
2640 << " (latency " << latency
<< "s)";
2641 detail
.push_back(make_pair(tcstatus
, tmp_ss
.str()));
2644 if (!warns
.empty()) {
2646 ss
<< "clock skew detected on";
2647 while (!warns
.empty()) {
2648 ss
<< " mon." << warns
.front();
2653 status
.push_back(ss
.str());
2654 summary
.push_back(make_pair(HEALTH_WARN
, "Monitor clock skew detected "));
2659 f
->open_array_section("summary");
2660 if (!summary
.empty()) {
2661 while (!summary
.empty()) {
2662 if (overall
> summary
.front().first
)
2663 overall
= summary
.front().first
;
2664 status
.push_back(summary
.front().second
);
2666 f
->open_object_section("item");
2667 f
->dump_stream("severity") << summary
.front().first
;
2668 f
->dump_string("summary", summary
.front().second
);
2671 summary
.pop_front();
2679 status
.push_front(fss
.str());
2681 f
->dump_stream("overall_status") << overall
;
2684 f
->open_array_section("detail");
2685 while (!detail
.empty()) {
2687 f
->dump_string("item", detail
.front().second
);
2688 else if (detailbl
!= NULL
) {
2689 detailbl
->append(detail
.front().second
);
2690 detailbl
->append('\n');
2703 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
)
2706 f
->open_object_section("status");
2709 f
->dump_stream("fsid") << monmap
->get_fsid();
2710 get_health_status(false, f
, nullptr);
2711 f
->dump_unsigned("election_epoch", get_epoch());
2713 f
->open_array_section("quorum");
2714 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2715 f
->dump_int("rank", *p
);
2717 f
->open_array_section("quorum_names");
2718 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2719 f
->dump_string("id", monmap
->get_name(*p
));
2722 f
->open_object_section("monmap");
2725 f
->open_object_section("osdmap");
2726 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
2728 f
->open_object_section("pgmap");
2729 pgservice
->print_summary(f
, NULL
);
2731 f
->open_object_section("fsmap");
2732 mdsmon()->get_fsmap().print_summary(f
, NULL
);
2734 f
->open_object_section("mgrmap");
2735 mgrmon()->get_map().print_summary(f
, nullptr);
2738 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
2741 ss
<< " cluster:\n";
2742 ss
<< " id: " << monmap
->get_fsid() << "\n";
2745 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2746 get_health_status(false, nullptr, &health
,
2750 get_health(ls
, NULL
, f
);
2751 health
= joinify(ls
.begin(), ls
.end(),
2754 ss
<< " health: " << health
<< "\n";
2756 ss
<< "\n \n services:\n";
2759 auto& service_map
= mgrstatmon()->get_service_map();
2760 for (auto& p
: service_map
.services
) {
2761 maxlen
= std::max(maxlen
, p
.first
.size());
2763 string
spacing(maxlen
- 3, ' ');
2764 const auto quorum_names
= get_quorum_names();
2765 const auto mon_count
= monmap
->mon_info
.size();
2766 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
2768 if (quorum_names
.size() != mon_count
) {
2769 std::list
<std::string
> out_of_q
;
2770 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
2771 if (quorum
.count(i
) == 0) {
2772 out_of_q
.push_back(monmap
->ranks
[i
]);
2775 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
2776 out_of_q
.end(), std::string(", "));
2779 if (mgrmon()->in_use()) {
2780 ss
<< " mgr: " << spacing
;
2781 mgrmon()->get_map().print_summary(nullptr, &ss
);
2784 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2785 ss
<< " mds: " << spacing
<< mdsmon()->get_fsmap() << "\n";
2787 ss
<< " osd: " << spacing
;
2788 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
2790 for (auto& p
: service_map
.services
) {
2791 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
2792 << p
.second
.get_summary() << "\n";
2796 ss
<< "\n \n data:\n";
2797 pgservice
->print_summary(NULL
, &ss
);
2802 void Monitor::_generate_command_map(map
<string
,cmd_vartype
>& cmdmap
,
2803 map
<string
,string
> ¶m_str_map
)
2805 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
2806 p
!= cmdmap
.end(); ++p
) {
2807 if (p
->first
== "prefix")
2809 if (p
->first
== "caps") {
2811 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
2812 cv
.size() % 2 == 0) {
2813 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
2814 string k
= string("caps_") + cv
[i
];
2815 param_str_map
[k
] = cv
[i
+ 1];
2820 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
2824 const MonCommand
*Monitor::_get_moncommand(const string
&cmd_prefix
,
2825 MonCommand
*cmds
, int cmds_size
)
2827 MonCommand
*this_cmd
= NULL
;
2828 for (MonCommand
*cp
= cmds
;
2829 cp
< &cmds
[cmds_size
]; cp
++) {
2830 if (cp
->cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
2838 bool Monitor::_allowed_command(MonSession
*s
, string
&module
, string
&prefix
,
2839 const map
<string
,cmd_vartype
>& cmdmap
,
2840 const map
<string
,string
>& param_str_map
,
2841 const MonCommand
*this_cmd
) {
2843 bool cmd_r
= this_cmd
->requires_perm('r');
2844 bool cmd_w
= this_cmd
->requires_perm('w');
2845 bool cmd_x
= this_cmd
->requires_perm('x');
2847 bool capable
= s
->caps
.is_capable(
2849 CEPH_ENTITY_TYPE_MON
,
2851 module
, prefix
, param_str_map
,
2852 cmd_r
, cmd_w
, cmd_x
);
2854 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
2858 void Monitor::format_command_descriptions(const MonCommand
*commands
,
2859 unsigned commands_size
,
2865 f
->open_object_section("command_descriptions");
2866 for (const MonCommand
*cp
= commands
;
2867 cp
< &commands
[commands_size
]; cp
++) {
2869 unsigned flags
= cp
->flags
;
2870 if (hide_mgr_flag
) {
2871 flags
&= ~MonCommand::FLAG_MGR
;
2873 ostringstream secname
;
2874 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
2875 dump_cmddesc_to_json(f
, secname
.str(),
2876 cp
->cmdstring
, cp
->helpstring
, cp
->module
,
2877 cp
->req_perms
, cp
->availability
, flags
);
2880 f
->close_section(); // command_descriptions
2885 void Monitor::get_locally_supported_monitor_commands(const MonCommand
**cmds
,
2888 *cmds
= mon_commands
;
2889 *count
= ARRAY_SIZE(mon_commands
);
2891 void Monitor::set_leader_supported_commands(const MonCommand
*cmds
, int size
)
2893 if (leader_supported_mon_commands
!= mon_commands
)
2894 delete[] leader_supported_mon_commands
;
2895 leader_supported_mon_commands
= cmds
;
2896 leader_supported_mon_commands_size
= size
;
2899 bool Monitor::is_keyring_required()
2901 string auth_cluster_required
= g_conf
->auth_supported
.empty() ?
2902 g_conf
->auth_cluster_required
: g_conf
->auth_supported
;
2903 string auth_service_required
= g_conf
->auth_supported
.empty() ?
2904 g_conf
->auth_service_required
: g_conf
->auth_supported
;
2906 return auth_service_required
== "cephx" ||
2907 auth_cluster_required
== "cephx";
2910 struct C_MgrProxyCommand
: public Context
{
2916 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
2917 : mon(mon
), op(op
), size(s
) { }
2918 void finish(int r
) {
2919 Mutex::Locker
l(mon
->lock
);
2920 mon
->mgr_proxy_bytes
-= size
;
2921 mon
->reply_command(op
, r
, outs
, outbl
, 0);
2925 void Monitor::handle_command(MonOpRequestRef op
)
2927 assert(op
->is_type_command());
2928 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
2929 if (m
->fsid
!= monmap
->fsid
) {
2930 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
2931 reply_command(op
, -EPERM
, "wrong fsid", 0);
2935 MonSession
*session
= static_cast<MonSession
*>(
2936 m
->get_connection()->get_priv());
2938 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
2941 BOOST_SCOPE_EXIT_ALL(=) {
2945 if (m
->cmd
.empty()) {
2946 string rs
= "No command supplied";
2947 reply_command(op
, -EINVAL
, rs
, 0);
2952 vector
<string
> fullcmd
;
2953 map
<string
, cmd_vartype
> cmdmap
;
2954 stringstream ss
, ds
;
2958 rs
= "unrecognized command";
2960 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
2961 // ss has reason for failure
2964 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
2965 reply_command(op
, r
, rs
, 0);
2969 // check return value. If no prefix parameter provided,
2970 // return value will be false, then return error info.
2971 if (!cmd_getval(g_ceph_context
, cmdmap
, "prefix", prefix
)) {
2972 reply_command(op
, -EINVAL
, "command prefix not found", 0);
2976 // check prefix is empty
2977 if (prefix
.empty()) {
2978 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
2982 if (prefix
== "get_command_descriptions") {
2984 Formatter
*f
= Formatter::create("json");
2985 // hide mgr commands until luminous upgrade is complete
2986 bool hide_mgr_flag
=
2987 osdmon()->osdmap
.require_osd_release
< CEPH_RELEASE_LUMINOUS
;
2988 format_command_descriptions(leader_supported_mon_commands
,
2989 leader_supported_mon_commands_size
, f
, &rdata
,
2992 reply_command(op
, 0, "", rdata
, 0);
2999 dout(0) << "handle_command " << *m
<< dendl
;
3002 cmd_getval(g_ceph_context
, cmdmap
, "format", format
, string("plain"));
3003 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
3005 get_str_vec(prefix
, fullcmd
);
3007 // make sure fullcmd is not empty.
3008 // invalid prefix will cause empty vector fullcmd.
3009 // such as, prefix=";,,;"
3010 if (fullcmd
.empty()) {
3011 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3015 module
= fullcmd
[0];
3017 // validate command is in leader map
3019 const MonCommand
*leader_cmd
;
3020 leader_cmd
= _get_moncommand(prefix
,
3021 // the boost underlying this isn't const for some reason
3022 const_cast<MonCommand
*>(leader_supported_mon_commands
),
3023 leader_supported_mon_commands_size
);
3025 reply_command(op
, -EINVAL
, "command not known", 0);
3028 // validate command is in our map & matches, or forward if it is allowed
3029 const MonCommand
*mon_cmd
= _get_moncommand(prefix
, mon_commands
,
3030 ARRAY_SIZE(mon_commands
));
3033 if (leader_cmd
->is_noforward()) {
3034 reply_command(op
, -EINVAL
,
3035 "command not locally supported and not allowed to forward",
3039 dout(10) << "Command not locally supported, forwarding request "
3041 forward_request_leader(op
);
3043 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3044 if (mon_cmd
->is_noforward()) {
3045 reply_command(op
, -EINVAL
,
3046 "command not compatible with leader and not allowed to forward",
3050 dout(10) << "Command not compatible with leader, forwarding request "
3052 forward_request_leader(op
);
3057 if (mon_cmd
->is_obsolete() ||
3058 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3059 && mon_cmd
->is_deprecated())) {
3060 reply_command(op
, -ENOTSUP
,
3061 "command is obsolete; please check usage and/or man page",
3066 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3067 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3068 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3072 /* what we perceive as being the service the command falls under */
3073 string
service(mon_cmd
->module
);
3075 dout(25) << __func__
<< " prefix='" << prefix
3076 << "' module='" << module
3077 << "' service='" << service
<< "'" << dendl
;
3080 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3082 // validate user's permissions for requested command
3083 map
<string
,string
> param_str_map
;
3084 _generate_command_map(cmdmap
, param_str_map
);
3085 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3086 param_str_map
, mon_cmd
)) {
3087 dout(1) << __func__
<< " access denied" << dendl
;
3088 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3089 << "from='" << session
->inst
<< "' "
3090 << "entity='" << session
->entity_name
<< "' "
3091 << "cmd=" << m
->cmd
<< ": access denied";
3092 reply_command(op
, -EACCES
, "access denied", 0);
3096 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3097 << "from='" << session
->inst
<< "' "
3098 << "entity='" << session
->entity_name
<< "' "
3099 << "cmd=" << m
->cmd
<< ": dispatch";
3101 if (mon_cmd
->is_mgr() &&
3102 osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3103 const auto& hdr
= m
->get_header();
3104 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3106 g_conf
->mon_client_bytes
* g_conf
->mon_mgr_proxy_client_bytes_ratio
;
3107 if (mgr_proxy_bytes
+ size
> max
) {
3108 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3109 << " + " << size
<< " > max " << max
<< dendl
;
3110 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3113 mgr_proxy_bytes
+= size
;
3114 dout(10) << __func__
<< " proxying mgr command (+" << size
3115 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3116 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3117 mgr_client
.start_command(m
->cmd
,
3121 new C_OnFinisher(fin
, &finisher
));
3125 if (module
== "mds" || module
== "fs") {
3126 mdsmon()->dispatch(op
);
3129 if ((module
== "osd" || prefix
== "pg map") &&
3130 prefix
!= "osd last-stat-seq") {
3131 osdmon()->dispatch(op
);
3135 if (module
== "pg") {
3136 pgmon()->dispatch(op
);
3139 if (module
== "mon" &&
3140 /* Let the Monitor class handle the following commands:
3145 prefix
!= "mon compact" &&
3146 prefix
!= "mon scrub" &&
3147 prefix
!= "mon sync force" &&
3148 prefix
!= "mon metadata" &&
3149 prefix
!= "mon versions" &&
3150 prefix
!= "mon count-metadata") {
3151 monmon()->dispatch(op
);
3154 if (module
== "auth") {
3155 authmon()->dispatch(op
);
3158 if (module
== "log") {
3159 logmon()->dispatch(op
);
3163 if (module
== "config-key") {
3164 config_key_service
->dispatch(op
);
3168 if (module
== "mgr") {
3169 mgrmon()->dispatch(op
);
3173 if (prefix
== "fsid") {
3175 f
->open_object_section("fsid");
3176 f
->dump_stream("fsid") << monmap
->fsid
;
3183 reply_command(op
, 0, "", rdata
, 0);
3187 if (prefix
== "scrub" || prefix
== "mon scrub") {
3188 wait_for_paxos_write();
3190 int r
= scrub_start();
3191 reply_command(op
, r
, "", rdata
, 0);
3192 } else if (is_peon()) {
3193 forward_request_leader(op
);
3195 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3200 if (prefix
== "compact" || prefix
== "mon compact") {
3201 dout(1) << "triggering manual compaction" << dendl
;
3202 utime_t start
= ceph_clock_now();
3204 utime_t end
= ceph_clock_now();
3206 dout(1) << "finished manual compaction in " << end
<< " seconds" << dendl
;
3208 oss
<< "compacted " << g_conf
->get_val
<std::string
>("mon_keyvaluedb") << " in " << end
<< " seconds";
3212 else if (prefix
== "injectargs") {
3213 vector
<string
> injected_args
;
3214 cmd_getval(g_ceph_context
, cmdmap
, "injected_args", injected_args
);
3215 if (!injected_args
.empty()) {
3216 dout(0) << "parsing injected options '" << injected_args
<< "'" << dendl
;
3218 r
= g_conf
->injectargs(str_join(injected_args
, " "), &oss
);
3219 ss
<< "injectargs:" << oss
.str();
3223 rs
= "must supply options to be parsed in a single string";
3226 } else if (prefix
== "time-sync-status") {
3228 f
.reset(Formatter::create("json-pretty"));
3229 f
->open_object_section("time_sync");
3230 if (!timecheck_skews
.empty()) {
3231 f
->open_object_section("time_skew_status");
3232 for (auto& i
: timecheck_skews
) {
3233 entity_inst_t inst
= i
.first
;
3234 double skew
= i
.second
;
3235 double latency
= timecheck_latencies
[inst
];
3236 string name
= monmap
->get_name(inst
.addr
);
3238 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3239 f
->open_object_section(name
.c_str());
3240 f
->dump_float("skew", skew
);
3241 f
->dump_float("latency", latency
);
3242 f
->dump_stream("health") << tcstatus
;
3243 if (tcstatus
!= HEALTH_OK
) {
3244 f
->dump_stream("details") << tcss
.str();
3250 f
->open_object_section("timechecks");
3251 f
->dump_unsigned("epoch", get_epoch());
3252 f
->dump_int("round", timecheck_round
);
3253 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3254 "on-going" : "finished");
3260 } else if (prefix
== "status" ||
3261 prefix
== "health" ||
3264 cmd_getval(g_ceph_context
, cmdmap
, "detail", detail
);
3266 if (prefix
== "status") {
3267 // get_cluster_status handles f == NULL
3268 get_cluster_status(ds
, f
.get());
3275 } else if (prefix
== "health") {
3276 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3278 get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3282 rdata
.append(plain
);
3285 list
<string
> health_str
;
3286 get_health(health_str
, detail
== "detail" ? &rdata
: NULL
, f
.get());
3291 assert(!health_str
.empty());
3292 ds
<< health_str
.front();
3293 health_str
.pop_front();
3294 if (!health_str
.empty()) {
3296 ds
<< joinify(health_str
.begin(), health_str
.end(), string("; "));
3301 if (detail
== "detail")
3305 } else if (prefix
== "df") {
3306 bool verbose
= (detail
== "detail");
3308 f
->open_object_section("stats");
3310 pgservice
->dump_fs_stats(&ds
, f
.get(), verbose
);
3313 pgservice
->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3321 assert(0 == "We should never get here!");
3327 } else if (prefix
== "report") {
3329 // this must be formatted, in its current form
3331 f
.reset(Formatter::create("json-pretty"));
3332 f
->open_object_section("report");
3333 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3334 f
->dump_string("version", ceph_version_to_str());
3335 f
->dump_string("commit", git_version_to_str());
3336 f
->dump_stream("timestamp") << ceph_clock_now();
3338 vector
<string
> tagsvec
;
3339 cmd_getval(g_ceph_context
, cmdmap
, "tags", tagsvec
);
3340 string tagstr
= str_join(tagsvec
, " ");
3341 if (!tagstr
.empty())
3342 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3343 f
->dump_string("tag", tagstr
);
3346 get_health(hs
, NULL
, f
.get());
3348 monmon()->dump_info(f
.get());
3349 osdmon()->dump_info(f
.get());
3350 mdsmon()->dump_info(f
.get());
3351 authmon()->dump_info(f
.get());
3352 pgservice
->dump_info(f
.get());
3354 paxos
->dump_info(f
.get());
3360 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT
);
3363 } else if (prefix
== "osd last-stat-seq") {
3365 cmd_getval(g_ceph_context
, cmdmap
, "id", osd
);
3366 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3368 f
->dump_unsigned("seq", seq
);
3376 } else if (prefix
== "node ls") {
3377 string
node_type("all");
3378 cmd_getval(g_ceph_context
, cmdmap
, "type", node_type
);
3380 f
.reset(Formatter::create("json-pretty"));
3381 if (node_type
== "all") {
3382 f
->open_object_section("nodes");
3383 print_nodes(f
.get(), ds
);
3384 osdmon()->print_nodes(f
.get());
3385 mdsmon()->print_nodes(f
.get());
3387 } else if (node_type
== "mon") {
3388 print_nodes(f
.get(), ds
);
3389 } else if (node_type
== "osd") {
3390 osdmon()->print_nodes(f
.get());
3391 } else if (node_type
== "mds") {
3392 mdsmon()->print_nodes(f
.get());
3398 } else if (prefix
== "features") {
3399 if (!is_leader() && !is_peon()) {
3400 dout(10) << " waiting for quorum" << dendl
;
3401 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3405 forward_request_leader(op
);
3409 f
.reset(Formatter::create("json-pretty"));
3411 get_combined_feature_map(&fm
);
3412 f
->dump_object("features", fm
);
3416 } else if (prefix
== "mon metadata") {
3418 f
.reset(Formatter::create("json-pretty"));
3421 bool all
= !cmd_getval(g_ceph_context
, cmdmap
, "id", name
);
3423 // Dump a single mon's metadata
3424 int mon
= monmap
->get_rank(name
);
3426 rs
= "requested mon not found";
3430 f
->open_object_section("mon_metadata");
3431 r
= get_mon_metadata(mon
, f
.get(), ds
);
3434 // Dump all mons' metadata
3436 f
->open_array_section("mon_metadata");
3437 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3438 std::ostringstream get_err
;
3439 f
->open_object_section("mon");
3440 f
->dump_string("name", monmap
->get_name(rank
));
3441 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3443 if (r
== -ENOENT
|| r
== -EINVAL
) {
3444 dout(1) << get_err
.str() << dendl
;
3445 // Drop error, list what metadata we do have
3447 } else if (r
!= 0) {
3448 derr
<< "Unexpected error from get_mon_metadata: "
3449 << cpp_strerror(r
) << dendl
;
3450 ds
<< get_err
.str();
3460 } else if (prefix
== "mon versions") {
3462 f
.reset(Formatter::create("json-pretty"));
3463 count_metadata("ceph_version", f
.get());
3468 } else if (prefix
== "mon count-metadata") {
3470 f
.reset(Formatter::create("json-pretty"));
3472 cmd_getval(g_ceph_context
, cmdmap
, "property", field
);
3473 count_metadata(field
, f
.get());
3478 } else if (prefix
== "quorum_status") {
3479 // make sure our map is readable and up to date
3480 if (!is_leader() && !is_peon()) {
3481 dout(10) << " waiting for quorum" << dendl
;
3482 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3485 _quorum_status(f
.get(), ds
);
3489 } else if (prefix
== "mon_status") {
3490 get_mon_status(f
.get(), ds
);
3496 } else if (prefix
== "sync force" ||
3497 prefix
== "mon sync force") {
3498 string validate1
, validate2
;
3499 cmd_getval(g_ceph_context
, cmdmap
, "validate1", validate1
);
3500 cmd_getval(g_ceph_context
, cmdmap
, "validate2", validate2
);
3501 if (validate1
!= "--yes-i-really-mean-it" ||
3502 validate2
!= "--i-know-what-i-am-doing") {
3504 rs
= "are you SURE? this will mean the monitor store will be "
3505 "erased. pass '--yes-i-really-mean-it "
3506 "--i-know-what-i-am-doing' if you really do.";
3509 sync_force(f
.get(), ds
);
3512 } else if (prefix
== "heap") {
3513 if (!ceph_using_tcmalloc())
3514 rs
= "tcmalloc not enabled, can't use heap profiler commands\n";
3517 cmd_getval(g_ceph_context
, cmdmap
, "heapcmd", heapcmd
);
3518 // XXX 1-element vector, change at callee or make vector here?
3519 vector
<string
> heapcmd_vec
;
3520 get_str_vec(heapcmd
, heapcmd_vec
);
3521 ceph_heap_profiler_handle_command(heapcmd_vec
, ds
);
3526 } else if (prefix
== "quorum") {
3528 cmd_getval(g_ceph_context
, cmdmap
, "quorumcmd", quorumcmd
);
3529 if (quorumcmd
== "exit") {
3531 elector
.stop_participating();
3532 rs
= "stopped responding to quorum, initiated new election";
3534 } else if (quorumcmd
== "enter") {
3535 elector
.start_participating();
3537 rs
= "started responding to quorum, initiated new election";
3540 rs
= "needs a valid 'quorum' command";
3543 } else if (prefix
== "version") {
3545 f
->open_object_section("version");
3546 f
->dump_string("version", pretty_version_to_str());
3550 ds
<< pretty_version_to_str();
3558 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3559 reply_command(op
, r
, rs
, rdata
, 0);
3562 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3565 reply_command(op
, rc
, rs
, rdata
, version
);
3568 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3569 bufferlist
& rdata
, version_t version
)
3571 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
3572 assert(m
->get_type() == MSG_MON_COMMAND
);
3573 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3574 reply
->set_tid(m
->get_tid());
3575 reply
->set_data(rdata
);
3576 send_reply(op
, reply
);
3580 // ------------------------
3581 // request/reply routing
3583 // a client/mds/osd will connect to a random monitor. we need to forward any
3584 // messages requiring state updates to the leader, and then route any replies
3585 // back via the correct monitor and back to them. (the monitor will not
3586 // initiate any connections.)
3588 void Monitor::forward_request_leader(MonOpRequestRef op
)
3590 op
->mark_event(__func__
);
3592 int mon
= get_leader();
3593 MonSession
*session
= op
->get_session();
3594 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
3596 if (req
->get_source().is_mon() && req
->get_source_addr() != messenger
->get_myaddr()) {
3597 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
3598 } else if (session
->proxy_con
) {
3599 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
3600 } else if (!session
->closed
) {
3601 RoutedRequest
*rr
= new RoutedRequest
;
3602 rr
->tid
= ++routed_request_tid
;
3603 rr
->client_inst
= req
->get_source_inst();
3604 rr
->con
= req
->get_connection();
3605 rr
->con_features
= rr
->con
->get_features();
3606 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
3607 rr
->session
= static_cast<MonSession
*>(session
->get());
3609 routed_requests
[rr
->tid
] = rr
;
3610 session
->routed_request_tids
.insert(rr
->tid
);
3612 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
3613 << " features " << rr
->con_features
<< dendl
;
3615 MForward
*forward
= new MForward(rr
->tid
,
3619 forward
->set_priority(req
->get_priority());
3620 if (session
->auth_handler
) {
3621 forward
->entity_name
= session
->entity_name
;
3622 } else if (req
->get_source().is_mon()) {
3623 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
3625 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3626 op
->mark_forwarded();
3627 assert(op
->get_req()->get_type() != 0);
3629 dout(10) << "forward_request no session for request " << *req
<< dendl
;
3633 // fake connection attached to forwarded messages
3634 struct AnonConnection
: public Connection
{
3635 explicit AnonConnection(CephContext
*cct
) : Connection(cct
, NULL
) {}
3637 int send_message(Message
*m
) override
{
3638 assert(!"send_message on anonymous connection");
3640 void send_keepalive() override
{
3641 assert(!"send_keepalive on anonymous connection");
3643 void mark_down() override
{
3646 void mark_disposable() override
{
3649 bool is_connected() override
{ return false; }
3652 //extract the original message and put it into the regular dispatch function
3653 void Monitor::handle_forward(MonOpRequestRef op
)
3655 MForward
*m
= static_cast<MForward
*>(op
->get_req());
3656 dout(10) << "received forwarded message from " << m
->client
3657 << " via " << m
->get_source_inst() << dendl
;
3658 MonSession
*session
= op
->get_session();
3661 if (!session
->is_capable("mon", MON_CAP_X
)) {
3662 dout(0) << "forward from entity with insufficient caps! "
3663 << session
->caps
<< dendl
;
3665 // see PaxosService::dispatch(); we rely on this being anon
3666 // (c->msgr == NULL)
3667 PaxosServiceMessage
*req
= m
->claim_message();
3668 assert(req
!= NULL
);
3670 ConnectionRef
c(new AnonConnection(cct
));
3671 MonSession
*s
= new MonSession(req
->get_source_inst(),
3672 static_cast<Connection
*>(c
.get()));
3673 c
->set_priv(s
->get());
3674 c
->set_peer_addr(m
->client
.addr
);
3675 c
->set_peer_type(m
->client
.name
.type());
3676 c
->set_features(m
->con_features
);
3678 s
->caps
= m
->client_caps
;
3679 dout(10) << " caps are " << s
->caps
<< dendl
;
3680 s
->entity_name
= m
->entity_name
;
3681 dout(10) << " entity name '" << s
->entity_name
<< "' type "
3682 << s
->entity_name
.get_type() << dendl
;
3683 s
->proxy_con
= m
->get_connection();
3684 s
->proxy_tid
= m
->tid
;
3686 req
->set_connection(c
);
3688 // not super accurate, but better than nothing.
3689 req
->set_recv_stamp(m
->get_recv_stamp());
3692 * note which election epoch this is; we will drop the message if
3693 * there is a future election since our peers will resend routed
3694 * requests in that case.
3696 req
->rx_election_epoch
= get_epoch();
3698 /* Because this is a special fake connection, we need to break
3699 the ref loop between Connection and MonSession differently
3700 than we normally do. Here, the Message refers to the Connection
3701 which refers to the Session, and nobody else refers to the Connection
3702 or the Session. And due to the special nature of this message,
3703 nobody refers to the Connection via the Session. So, clear out that
3704 half of the ref loop.*/
3707 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
3714 void Monitor::try_send_message(Message
*m
, const entity_inst_t
& to
)
3716 dout(10) << "try_send_message " << *m
<< " to " << to
<< dendl
;
3719 encode_message(m
, quorum_con_features
, bl
);
3721 messenger
->send_message(m
, to
);
3723 for (int i
=0; i
<(int)monmap
->size(); i
++) {
3725 messenger
->send_message(new MRoute(bl
, to
), monmap
->get_inst(i
));
3729 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
3731 op
->mark_event(__func__
);
3733 MonSession
*session
= op
->get_session();
3735 Message
*req
= op
->get_req();
3736 ConnectionRef con
= op
->get_connection();
3738 reply
->set_cct(g_ceph_context
);
3739 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
3742 dout(2) << "send_reply no connection, dropping reply " << *reply
3743 << " to " << req
<< " " << *req
<< dendl
;
3745 op
->mark_event("reply: no connection");
3749 if (!session
->con
&& !session
->proxy_con
) {
3750 dout(2) << "send_reply no connection, dropping reply " << *reply
3751 << " to " << req
<< " " << *req
<< dendl
;
3753 op
->mark_event("reply: no connection");
3757 if (session
->proxy_con
) {
3758 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
3759 << " via " << session
->proxy_con
->get_peer_addr()
3760 << " for request " << *req
<< dendl
;
3761 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
3762 op
->mark_event("reply: send routed request");
3764 session
->con
->send_message(reply
);
3765 op
->mark_event("reply: send");
3769 void Monitor::no_reply(MonOpRequestRef op
)
3771 MonSession
*session
= op
->get_session();
3772 Message
*req
= op
->get_req();
3774 if (session
->proxy_con
) {
3775 dout(10) << "no_reply to " << req
->get_source_inst()
3776 << " via " << session
->proxy_con
->get_peer_addr()
3777 << " for request " << *req
<< dendl
;
3778 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
3779 op
->mark_event("no_reply: send routed request");
3781 dout(10) << "no_reply to " << req
->get_source_inst()
3782 << " " << *req
<< dendl
;
3783 op
->mark_event("no_reply");
3787 void Monitor::handle_route(MonOpRequestRef op
)
3789 MRoute
*m
= static_cast<MRoute
*>(op
->get_req());
3790 MonSession
*session
= op
->get_session();
3792 if (!session
->is_capable("mon", MON_CAP_X
)) {
3793 dout(0) << "MRoute received from entity without appropriate perms! "
3798 dout(10) << "handle_route " << *m
->msg
<< " to " << m
->dest
<< dendl
;
3800 dout(10) << "handle_route null to " << m
->dest
<< dendl
;
3803 if (m
->session_mon_tid
) {
3804 if (routed_requests
.count(m
->session_mon_tid
)) {
3805 RoutedRequest
*rr
= routed_requests
[m
->session_mon_tid
];
3807 // reset payload, in case encoding is dependent on target features
3809 m
->msg
->clear_payload();
3810 rr
->con
->send_message(m
->msg
);
3813 if (m
->send_osdmap_first
) {
3814 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
3815 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
3816 true, MonOpRequestRef());
3818 assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
3819 routed_requests
.erase(m
->session_mon_tid
);
3820 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
3823 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
3826 dout(10) << " not a routed request, trying to send anyway" << dendl
;
3828 messenger
->send_message(m
->msg
, m
->dest
);
3834 void Monitor::resend_routed_requests()
3836 dout(10) << "resend_routed_requests" << dendl
;
3837 int mon
= get_leader();
3838 list
<Context
*> retry
;
3839 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
3840 p
!= routed_requests
.end();
3842 RoutedRequest
*rr
= p
->second
;
3845 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
3846 rr
->op
->mark_event("retry routed request");
3847 retry
.push_back(new C_RetryMessage(this, rr
->op
));
3849 assert(rr
->session
->routed_request_tids
.count(p
->first
));
3850 rr
->session
->routed_request_tids
.erase(p
->first
);
3854 bufferlist::iterator q
= rr
->request_bl
.begin();
3855 PaxosServiceMessage
*req
= (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
3856 rr
->op
->mark_event("resend forwarded message to leader");
3857 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
<< dendl
;
3858 MForward
*forward
= new MForward(rr
->tid
, req
, rr
->con_features
,
3860 req
->put(); // forward takes its own ref; drop ours.
3861 forward
->client
= rr
->client_inst
;
3862 forward
->set_priority(req
->get_priority());
3863 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3867 routed_requests
.clear();
3868 finish_contexts(g_ceph_context
, retry
);
3872 void Monitor::remove_session(MonSession
*s
)
3874 dout(10) << "remove_session " << s
<< " " << s
->inst
3875 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
3878 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
3879 p
!= s
->routed_request_tids
.end();
3881 assert(routed_requests
.count(*p
));
3882 RoutedRequest
*rr
= routed_requests
[*p
];
3883 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
3885 routed_requests
.erase(*p
);
3887 s
->routed_request_tids
.clear();
3888 s
->con
->set_priv(NULL
);
3889 session_map
.remove_session(s
);
3890 logger
->set(l_mon_num_sessions
, session_map
.get_size());
3891 logger
->inc(l_mon_session_rm
);
3894 void Monitor::remove_all_sessions()
3896 Mutex::Locker
l(session_map_lock
);
3897 while (!session_map
.sessions
.empty()) {
3898 MonSession
*s
= session_map
.sessions
.front();
3901 logger
->inc(l_mon_session_rm
);
3904 logger
->set(l_mon_num_sessions
, session_map
.get_size());
3907 void Monitor::send_command(const entity_inst_t
& inst
,
3908 const vector
<string
>& com
)
3910 dout(10) << "send_command " << inst
<< "" << com
<< dendl
;
3911 MMonCommand
*c
= new MMonCommand(monmap
->fsid
);
3913 try_send_message(c
, inst
);
3916 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
3919 * Wait list the new session until we're in the quorum, assuming it's
3921 * tick() will periodically send them back through so we can send
3922 * the client elsewhere if we don't think we're getting back in.
3924 * But we whitelist a few sorts of messages:
3925 * 1) Monitors can talk to us at any time, of course.
3926 * 2) auth messages. It's unlikely to go through much faster, but
3927 * it's possible we've just lost our quorum status and we want to take...
3928 * 3) command messages. We want to accept these under all possible
3931 Message
*m
= op
->get_req();
3932 MonSession
*s
= op
->get_session();
3933 ConnectionRef con
= op
->get_connection();
3934 utime_t too_old
= ceph_clock_now();
3935 too_old
-= g_ceph_context
->_conf
->mon_lease
;
3936 if (m
->get_recv_stamp() > too_old
&&
3937 con
->is_connected()) {
3938 dout(5) << "waitlisting message " << *m
<< dendl
;
3939 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
3940 op
->mark_wait_for_quorum();
3942 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
3944 // proxied sessions aren't registered and don't have a con; don't remove
3946 if (!s
->proxy_con
) {
3947 Mutex::Locker
l(session_map_lock
);
3954 void Monitor::_ms_dispatch(Message
*m
)
3956 if (is_shutdown()) {
3961 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
3962 bool src_is_mon
= op
->is_src_mon();
3963 op
->mark_event("mon:_ms_dispatch");
3964 MonSession
*s
= op
->get_session();
3965 if (s
&& s
->closed
) {
3969 if (src_is_mon
&& s
) {
3970 ConnectionRef con
= m
->get_connection();
3971 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
3972 // only update features if this is a non-anonymous connection
3973 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
3974 << " (was " << s
->con_features
3975 << ", now " << con
->get_features() << ")" << dendl
;
3976 // connection features changed - recreate session.
3977 if (s
->con
&& s
->con
!= con
) {
3978 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
3979 << " changed from session; mark down and replace" << dendl
;
3980 s
->con
->mark_down();
3982 if (s
->item
.is_on_list()) {
3983 // forwarded messages' sessions are not in the sessions map and
3984 // exist only while the op is being handled.
3993 // if the sender is not a monitor, make sure their first message for a
3994 // session is an MAuth. If it is not, assume it's a stray message,
3995 // and considering that we are creating a new session it is safe to
3996 // assume that the sender hasn't authenticated yet, so we have no way
3997 // of assessing whether we should handle it or not.
3998 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
3999 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4000 m
->get_type() != CEPH_MSG_PING
)) {
4001 dout(1) << __func__
<< " dropping stray message " << *m
4002 << " from " << m
->get_source_inst() << dendl
;
4006 ConnectionRef con
= m
->get_connection();
4008 Mutex::Locker
l(session_map_lock
);
4009 s
= session_map
.new_session(m
->get_source_inst(), con
.get());
4012 con
->set_priv(s
->get());
4013 dout(10) << __func__
<< " new session " << s
<< " " << *s
4014 << " features 0x" << std::hex
4015 << s
->con_features
<< std::dec
<< dendl
;
4018 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4019 logger
->inc(l_mon_session_add
);
4022 // give it monitor caps; the peer type has been authenticated
4023 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4024 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4025 s
->caps
= *mon_caps
;
4029 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->inst
4035 s
->session_timeout
= ceph_clock_now();
4036 s
->session_timeout
+= g_conf
->mon_session_timeout
;
4038 if (s
->auth_handler
) {
4039 s
->entity_name
= s
->auth_handler
->get_entity_name();
4041 dout(20) << " caps " << s
->caps
.get_str() << dendl
;
4043 if ((is_synchronizing() ||
4044 (s
->global_id
== 0 && !exited_quorum
.is_zero())) &&
4046 m
->get_type() != CEPH_MSG_PING
) {
4047 waitlist_or_zap_client(op
);
4054 void Monitor::dispatch_op(MonOpRequestRef op
)
4056 op
->mark_event("mon:dispatch_op");
4057 MonSession
*s
= op
->get_session();
4060 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4064 /* we will consider the default type as being 'monitor' until proven wrong */
4065 op
->set_type_monitor();
4066 /* deal with all messages that do not necessarily need caps */
4067 bool dealt_with
= true;
4068 switch (op
->get_req()->get_type()) {
4070 case MSG_MON_GLOBAL_ID
:
4072 op
->set_type_service();
4073 /* no need to check caps here */
4074 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4081 /* MMonGetMap may be used by clients to obtain a monmap *before*
4082 * authenticating with the monitor. We need to handle these without
4083 * checking caps because, even on a cluster without cephx, we only set
4084 * session caps *after* the auth handshake. A good example of this
4085 * is when a client calls MonClient::get_monmap_privately(), which does
4086 * not authenticate when obtaining a monmap.
4088 case CEPH_MSG_MON_GET_MAP
:
4089 handle_mon_get_map(op
);
4092 case CEPH_MSG_MON_METADATA
:
4093 return handle_mon_metadata(op
);
4102 /* well, maybe the op belongs to a service... */
4103 op
->set_type_service();
4104 /* deal with all messages which caps should be checked somewhere else */
4106 switch (op
->get_req()->get_type()) {
4109 case CEPH_MSG_MON_GET_OSDMAP
:
4110 case CEPH_MSG_POOLOP
:
4111 case MSG_OSD_BEACON
:
4112 case MSG_OSD_MARK_ME_DOWN
:
4114 case MSG_OSD_FAILURE
:
4117 case MSG_OSD_PGTEMP
:
4118 case MSG_OSD_PG_CREATED
:
4119 case MSG_REMOVE_SNAPS
:
4120 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4124 case MSG_MDS_BEACON
:
4125 case MSG_MDS_OFFLOAD_TARGETS
:
4126 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4130 case MSG_MGR_BEACON
:
4131 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4135 case MSG_MON_MGR_REPORT
:
4136 case CEPH_MSG_STATFS
:
4137 case MSG_GETPOOLSTATS
:
4138 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4143 paxos_service
[PAXOS_PGMAP
]->dispatch(op
);
4148 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4151 // handle_command() does its own caps checking
4152 case MSG_MON_COMMAND
:
4153 op
->set_type_command();
4164 /* nop, looks like it's not a service message; revert back to monitor */
4165 op
->set_type_monitor();
4167 /* messages we, the Monitor class, need to deal with
4168 * but may be sent by clients. */
4170 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4171 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4172 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4178 switch (op
->get_req()->get_type()) {
4181 case CEPH_MSG_MON_GET_VERSION
:
4182 handle_get_version(op
);
4185 case CEPH_MSG_MON_SUBSCRIBE
:
4186 /* FIXME: check what's being subscribed, filter accordingly */
4187 handle_subscribe(op
);
4197 if (!op
->is_src_mon()) {
4198 dout(1) << __func__
<< " unexpected monitor message from"
4199 << " non-monitor entity " << op
->get_req()->get_source_inst()
4200 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4204 /* messages that should only be sent by another monitor */
4206 switch (op
->get_req()->get_type()) {
4216 // Sync (i.e., the new slurp, but on steroids)
4224 /* log acks are sent from a monitor we sent the MLog to, and are
4225 never sent by clients to us. */
4227 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4232 op
->set_type_service();
4233 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4239 op
->set_type_paxos();
4240 MMonPaxos
*pm
= static_cast<MMonPaxos
*>(op
->get_req());
4241 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4246 if (state
== STATE_SYNCHRONIZING
) {
4247 // we are synchronizing. These messages would do us no
4248 // good, thus just drop them and ignore them.
4249 dout(10) << __func__
<< " ignore paxos msg from "
4250 << pm
->get_source_inst() << dendl
;
4255 if (pm
->epoch
> get_epoch()) {
4259 if (pm
->epoch
!= get_epoch()) {
4263 paxos
->dispatch(op
);
4268 case MSG_MON_ELECTION
:
4269 op
->set_type_election();
4270 //check privileges here for simplicity
4271 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4272 dout(0) << "MMonElection received from entity without enough caps!"
4273 << op
->get_session()->caps
<< dendl
;
4276 if (!is_probing() && !is_synchronizing()) {
4277 elector
.dispatch(op
);
4286 handle_timecheck(op
);
4289 case MSG_MON_HEALTH
:
4290 health_monitor
->dispatch(op
);
4293 case MSG_MON_HEALTH_CHECKS
:
4294 op
->set_type_service();
4295 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4303 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4312 void Monitor::handle_ping(MonOpRequestRef op
)
4314 MPing
*m
= static_cast<MPing
*>(op
->get_req());
4315 dout(10) << __func__
<< " " << *m
<< dendl
;
4316 MPing
*reply
= new MPing
;
4317 entity_inst_t inst
= m
->get_source_inst();
4319 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4320 f
->open_object_section("pong");
4322 list
<string
> health_str
;
4323 get_health(health_str
, NULL
, f
.get());
4326 get_mon_status(f
.get(), ss
);
4332 ::encode(ss
.str(), payload
);
4333 reply
->set_payload(payload
);
4334 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4335 messenger
->send_message(reply
, inst
);
4338 void Monitor::timecheck_start()
4340 dout(10) << __func__
<< dendl
;
4341 timecheck_cleanup();
4342 timecheck_start_round();
4345 void Monitor::timecheck_finish()
4347 dout(10) << __func__
<< dendl
;
4348 timecheck_cleanup();
4351 void Monitor::timecheck_start_round()
4353 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4354 assert(is_leader());
4356 if (monmap
->size() == 1) {
4357 assert(0 == "We are alone; this shouldn't have been scheduled!");
4361 if (timecheck_round
% 2) {
4362 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4363 utime_t curr_time
= ceph_clock_now();
4364 double max
= g_conf
->mon_timecheck_interval
*3;
4365 if (curr_time
- timecheck_round_start
< max
) {
4366 dout(10) << __func__
<< " keep current round going" << dendl
;
4369 dout(10) << __func__
4370 << " finish current timecheck and start new" << dendl
;
4371 timecheck_cancel_round();
4375 assert(timecheck_round
% 2 == 0);
4378 timecheck_round_start
= ceph_clock_now();
4379 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4383 dout(10) << __func__
<< " setting up next event" << dendl
;
4384 timecheck_reset_event();
4387 void Monitor::timecheck_finish_round(bool success
)
4389 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4390 assert(timecheck_round
% 2);
4392 timecheck_round_start
= utime_t();
4395 assert(timecheck_waiting
.empty());
4396 assert(timecheck_acks
== quorum
.size());
4398 timecheck_check_skews();
4402 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4403 << " peers still waiting:";
4404 for (map
<entity_inst_t
,utime_t
>::iterator p
= timecheck_waiting
.begin();
4405 p
!= timecheck_waiting
.end(); ++p
) {
4406 *_dout
<< " " << p
->first
.name
;
4409 timecheck_waiting
.clear();
4411 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4414 void Monitor::timecheck_cancel_round()
4416 timecheck_finish_round(false);
4419 void Monitor::timecheck_cleanup()
4421 timecheck_round
= 0;
4423 timecheck_round_start
= utime_t();
4425 if (timecheck_event
) {
4426 timer
.cancel_event(timecheck_event
);
4427 timecheck_event
= NULL
;
4429 timecheck_waiting
.clear();
4430 timecheck_skews
.clear();
4431 timecheck_latencies
.clear();
4433 timecheck_rounds_since_clean
= 0;
4436 void Monitor::timecheck_reset_event()
4438 if (timecheck_event
) {
4439 timer
.cancel_event(timecheck_event
);
4440 timecheck_event
= NULL
;
4444 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4446 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4447 delay
= cct
->_conf
->mon_timecheck_interval
;
4450 dout(10) << __func__
<< " delay " << delay
4451 << " rounds_since_clean " << timecheck_rounds_since_clean
4454 timecheck_event
= new C_MonContext(this, [this](int) {
4455 timecheck_start_round();
4457 timer
.add_event_after(delay
, timecheck_event
);
4460 void Monitor::timecheck_check_skews()
4462 dout(10) << __func__
<< dendl
;
4463 assert(is_leader());
4464 assert((timecheck_round
% 2) == 0);
4465 if (monmap
->size() == 1) {
4466 assert(0 == "We are alone; we shouldn't have gotten here!");
4469 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4471 bool found_skew
= false;
4472 for (map
<entity_inst_t
, double>::iterator p
= timecheck_skews
.begin();
4473 p
!= timecheck_skews
.end(); ++p
) {
4476 if (timecheck_has_skew(p
->second
, &abs_skew
)) {
4477 dout(10) << __func__
4478 << " " << p
->first
<< " skew " << abs_skew
<< dendl
;
4484 ++timecheck_rounds_since_clean
;
4485 timecheck_reset_event();
4486 } else if (timecheck_rounds_since_clean
> 0) {
4488 << " no clock skews found after " << timecheck_rounds_since_clean
4489 << " rounds" << dendl
;
4490 // make sure the skews are really gone and not just a transient success
4491 // this will run just once if not in the presence of skews again.
4492 timecheck_rounds_since_clean
= 1;
4493 timecheck_reset_event();
4494 timecheck_rounds_since_clean
= 0;
4499 void Monitor::timecheck_report()
4501 dout(10) << __func__
<< dendl
;
4502 assert(is_leader());
4503 assert((timecheck_round
% 2) == 0);
4504 if (monmap
->size() == 1) {
4505 assert(0 == "We are alone; we shouldn't have gotten here!");
4509 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4510 bool do_output
= true; // only output report once
4511 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4512 if (monmap
->get_name(*q
) == name
)
4515 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_REPORT
);
4516 m
->epoch
= get_epoch();
4517 m
->round
= timecheck_round
;
4519 for (map
<entity_inst_t
, double>::iterator it
= timecheck_skews
.begin();
4520 it
!= timecheck_skews
.end(); ++it
) {
4521 double skew
= it
->second
;
4522 double latency
= timecheck_latencies
[it
->first
];
4524 m
->skews
[it
->first
] = skew
;
4525 m
->latencies
[it
->first
] = latency
;
4528 dout(25) << __func__
<< " " << it
->first
4529 << " latency " << latency
4530 << " skew " << skew
<< dendl
;
4534 entity_inst_t inst
= monmap
->get_inst(*q
);
4535 dout(10) << __func__
<< " send report to " << inst
<< dendl
;
4536 messenger
->send_message(m
, inst
);
4540 void Monitor::timecheck()
4542 dout(10) << __func__
<< dendl
;
4543 assert(is_leader());
4544 if (monmap
->size() == 1) {
4545 assert(0 == "We are alone; we shouldn't have gotten here!");
4548 assert(timecheck_round
% 2 != 0);
4550 timecheck_acks
= 1; // we ack ourselves
4552 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4553 << " round " << timecheck_round
<< dendl
;
4555 // we are at the eye of the storm; the point of reference
4556 timecheck_skews
[messenger
->get_myinst()] = 0.0;
4557 timecheck_latencies
[messenger
->get_myinst()] = 0.0;
4559 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4560 if (monmap
->get_name(*it
) == name
)
4563 entity_inst_t inst
= monmap
->get_inst(*it
);
4564 utime_t curr_time
= ceph_clock_now();
4565 timecheck_waiting
[inst
] = curr_time
;
4566 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_PING
);
4567 m
->epoch
= get_epoch();
4568 m
->round
= timecheck_round
;
4569 dout(10) << __func__
<< " send " << *m
<< " to " << inst
<< dendl
;
4570 messenger
->send_message(m
, inst
);
4574 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
4575 const double skew_bound
,
4576 const double latency
)
4578 health_status_t status
= HEALTH_OK
;
4579 assert(latency
>= 0);
4582 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
4583 status
= HEALTH_WARN
;
4584 ss
<< "clock skew " << abs_skew
<< "s"
4585 << " > max " << g_conf
->mon_clock_drift_allowed
<< "s";
4591 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
4593 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4594 dout(10) << __func__
<< " " << *m
<< dendl
;
4595 /* handles PONG's */
4596 assert(m
->op
== MTimeCheck::OP_PONG
);
4598 entity_inst_t other
= m
->get_source_inst();
4599 if (m
->epoch
< get_epoch()) {
4600 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
4601 << " from " << other
4602 << " curr " << get_epoch()
4603 << " -- severely lagged? discard" << dendl
;
4606 assert(m
->epoch
== get_epoch());
4608 if (m
->round
< timecheck_round
) {
4609 dout(1) << __func__
<< " got old round " << m
->round
4610 << " from " << other
4611 << " curr " << timecheck_round
<< " -- discard" << dendl
;
4615 utime_t curr_time
= ceph_clock_now();
4617 assert(timecheck_waiting
.count(other
) > 0);
4618 utime_t timecheck_sent
= timecheck_waiting
[other
];
4619 timecheck_waiting
.erase(other
);
4620 if (curr_time
< timecheck_sent
) {
4621 // our clock was readjusted -- drop everything until it all makes sense.
4622 dout(1) << __func__
<< " our clock was readjusted --"
4623 << " bump round and drop current check"
4625 timecheck_cancel_round();
4629 /* update peer latencies */
4630 double latency
= (double)(curr_time
- timecheck_sent
);
4632 if (timecheck_latencies
.count(other
) == 0)
4633 timecheck_latencies
[other
] = latency
;
4635 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
4636 timecheck_latencies
[other
] = avg_latency
;
4642 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4643 * and 'a' happens to be lower than 'b'; so we use double instead.
4645 * latency is always expected to be >= 0.
4647 * delta, the difference between theirs timestamp and ours, may either be
4648 * lower or higher than 0; will hardly ever be 0.
4650 * The absolute skew is the absolute delta minus the latency, which is
4651 * taken as a whole instead of an rtt given that there is some queueing
4652 * and dispatch times involved and it's hard to assess how long exactly
4653 * it took for the message to travel to the other side and be handled. So
4654 * we call it a bounded skew, the worst case scenario.
4658 * Given that the latency is always positive, we can establish that the
4659 * bounded skew will be:
4661 * 1. positive if the absolute delta is higher than the latency and
4663 * 2. negative if the absolute delta is higher than the latency and
4664 * delta is negative.
4665 * 3. zero if the absolute delta is lower than the latency.
4667 * On 3. we make a judgement call and treat the skew as non-existent.
4668 * This is because that, if the absolute delta is lower than the
4669 * latency, then the apparently existing skew is nothing more than a
4670 * side-effect of the high latency at work.
4672 * This may not be entirely true though, as a severely skewed clock
4673 * may be masked by an even higher latency, but with high latencies
4674 * we probably have worse issues to deal with than just skewed clocks.
4676 assert(latency
>= 0);
4678 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
4679 double abs_delta
= (delta
> 0 ? delta
: -delta
);
4680 double skew_bound
= abs_delta
- latency
;
4684 skew_bound
= -skew_bound
;
4687 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
4688 if (status
== HEALTH_ERR
)
4689 clog
->error() << other
<< " " << ss
.str();
4690 else if (status
== HEALTH_WARN
)
4691 clog
->warn() << other
<< " " << ss
.str();
4693 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
4694 << " delta " << delta
<< " skew_bound " << skew_bound
4695 << " latency " << latency
<< dendl
;
4697 timecheck_skews
[other
] = skew_bound
;
4700 if (timecheck_acks
== quorum
.size()) {
4701 dout(10) << __func__
<< " got pongs from everybody ("
4702 << timecheck_acks
<< " total)" << dendl
;
4703 assert(timecheck_skews
.size() == timecheck_acks
);
4704 assert(timecheck_waiting
.empty());
4705 // everyone has acked, so bump the round to finish it.
4706 timecheck_finish_round();
4710 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
4712 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4713 dout(10) << __func__
<< " " << *m
<< dendl
;
4716 assert(m
->op
== MTimeCheck::OP_PING
|| m
->op
== MTimeCheck::OP_REPORT
);
4718 if (m
->epoch
!= get_epoch()) {
4719 dout(1) << __func__
<< " got wrong epoch "
4720 << "(ours " << get_epoch()
4721 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
4725 if (m
->round
< timecheck_round
) {
4726 dout(1) << __func__
<< " got old round " << m
->round
4727 << " current " << timecheck_round
4728 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
4732 timecheck_round
= m
->round
;
4734 if (m
->op
== MTimeCheck::OP_REPORT
) {
4735 assert((timecheck_round
% 2) == 0);
4736 timecheck_latencies
.swap(m
->latencies
);
4737 timecheck_skews
.swap(m
->skews
);
4741 assert((timecheck_round
% 2) != 0);
4742 MTimeCheck
*reply
= new MTimeCheck(MTimeCheck::OP_PONG
);
4743 utime_t curr_time
= ceph_clock_now();
4744 reply
->timestamp
= curr_time
;
4745 reply
->epoch
= m
->epoch
;
4746 reply
->round
= m
->round
;
4747 dout(10) << __func__
<< " send " << *m
4748 << " to " << m
->get_source_inst() << dendl
;
4749 m
->get_connection()->send_message(reply
);
4752 void Monitor::handle_timecheck(MonOpRequestRef op
)
4754 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4755 dout(10) << __func__
<< " " << *m
<< dendl
;
4758 if (m
->op
!= MTimeCheck::OP_PONG
) {
4759 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
4761 handle_timecheck_leader(op
);
4763 } else if (is_peon()) {
4764 if (m
->op
!= MTimeCheck::OP_PING
&& m
->op
!= MTimeCheck::OP_REPORT
) {
4765 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
4767 handle_timecheck_peon(op
);
4770 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
4774 void Monitor::handle_subscribe(MonOpRequestRef op
)
4776 MMonSubscribe
*m
= static_cast<MMonSubscribe
*>(op
->get_req());
4777 dout(10) << "handle_subscribe " << *m
<< dendl
;
4781 MonSession
*s
= op
->get_session();
4784 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
4787 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4788 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
4791 // remove conflicting subscribes
4792 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4793 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
4794 it
!= s
->sub_map
.end(); ) {
4795 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
4796 Mutex::Locker
l(session_map_lock
);
4797 session_map
.remove_sub((it
++)->second
);
4805 Mutex::Locker
l(session_map_lock
);
4806 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
4807 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
4808 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
4811 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
4812 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
4813 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
4814 Subscription
*sub
= s
->sub_map
[p
->first
];
4815 assert(sub
!= nullptr);
4816 mdsmon()->check_sub(sub
);
4818 } else if (p
->first
== "osdmap") {
4819 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
4820 if (s
->osd_epoch
> p
->second
.start
) {
4821 // client needs earlier osdmaps on purpose, so reset the sent epoch
4824 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
4826 } else if (p
->first
== "osd_pg_creates") {
4827 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
4828 if (monmap
->get_required_features().contains_all(
4829 ceph::features::mon::FEATURE_LUMINOUS
)) {
4830 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
4832 pgmon()->check_sub(s
->sub_map
["osd_pg_creates"]);
4835 } else if (p
->first
== "monmap") {
4836 monmon()->check_sub(s
->sub_map
[p
->first
]);
4837 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4838 logmon()->check_sub(s
->sub_map
[p
->first
]);
4839 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
4840 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
4841 } else if (p
->first
== "servicemap") {
4842 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
4847 // we only need to reply if the client is old enough to think it
4848 // has to send renewals.
4849 ConnectionRef con
= m
->get_connection();
4850 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
4851 m
->get_connection()->send_message(new MMonSubscribeAck(
4852 monmap
->get_fsid(), (int)g_conf
->mon_subscribe_interval
));
4857 void Monitor::handle_get_version(MonOpRequestRef op
)
4859 MMonGetVersion
*m
= static_cast<MMonGetVersion
*>(op
->get_req());
4860 dout(10) << "handle_get_version " << *m
<< dendl
;
4861 PaxosService
*svc
= NULL
;
4863 MonSession
*s
= op
->get_session();
4866 if (!is_leader() && !is_peon()) {
4867 dout(10) << " waiting for quorum" << dendl
;
4868 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
4872 if (m
->what
== "mdsmap") {
4874 } else if (m
->what
== "fsmap") {
4876 } else if (m
->what
== "osdmap") {
4878 } else if (m
->what
== "monmap") {
4881 derr
<< "invalid map type " << m
->what
<< dendl
;
4885 if (!svc
->is_readable()) {
4886 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
4890 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
4891 reply
->handle
= m
->handle
;
4892 reply
->version
= svc
->get_last_committed();
4893 reply
->oldest_version
= svc
->get_first_committed();
4894 reply
->set_tid(m
->get_tid());
4896 m
->get_connection()->send_message(reply
);
4902 bool Monitor::ms_handle_reset(Connection
*con
)
4904 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
4906 // ignore lossless monitor sessions
4907 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
4910 MonSession
*s
= static_cast<MonSession
*>(con
->get_priv());
4914 // break any con <-> session ref cycle
4915 s
->con
->set_priv(NULL
);
4920 Mutex::Locker
l(lock
);
4922 dout(10) << "reset/close on session " << s
->inst
<< dendl
;
4924 Mutex::Locker
l(session_map_lock
);
4931 bool Monitor::ms_handle_refused(Connection
*con
)
4933 // just log for now...
4934 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
4940 void Monitor::send_latest_monmap(Connection
*con
)
4943 monmap
->encode(bl
, con
->get_features());
4944 con
->send_message(new MMonMap(bl
));
4947 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
4949 MMonGetMap
*m
= static_cast<MMonGetMap
*>(op
->get_req());
4950 dout(10) << "handle_mon_get_map" << dendl
;
4951 send_latest_monmap(m
->get_connection().get());
4954 void Monitor::handle_mon_metadata(MonOpRequestRef op
)
4956 MMonMetadata
*m
= static_cast<MMonMetadata
*>(op
->get_req());
4958 dout(10) << __func__
<< dendl
;
4959 update_mon_metadata(m
->get_source().num(), std::move(m
->data
));
4963 void Monitor::update_mon_metadata(int from
, Metadata
&& m
)
4965 // NOTE: this is now for legacy (kraken or jewel) mons only.
4966 pending_metadata
[from
] = std::move(m
);
4968 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
4970 ::encode(pending_metadata
, bl
);
4971 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
4972 paxos
->trigger_propose();
4975 int Monitor::load_metadata()
4978 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
4981 bufferlist::iterator it
= bl
.begin();
4982 ::decode(mon_metadata
, it
);
4984 pending_metadata
= mon_metadata
;
4988 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
4991 if (!mon_metadata
.count(mon
)) {
4992 err
<< "mon." << mon
<< " not found";
4995 const Metadata
& m
= mon_metadata
[mon
];
4996 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
4997 f
->dump_string(p
->first
.c_str(), p
->second
);
5002 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5004 map
<string
,int> by_val
;
5005 for (auto& p
: mon_metadata
) {
5006 auto q
= p
.second
.find(field
);
5007 if (q
== p
.second
.end()) {
5008 by_val
["unknown"]++;
5010 by_val
[q
->second
]++;
5013 f
->open_object_section(field
.c_str());
5014 for (auto& p
: by_val
) {
5015 f
->dump_int(p
.first
.c_str(), p
.second
);
5020 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5022 map
<string
, list
<int> > mons
; // hostname => mon
5023 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5024 it
!= mon_metadata
.end(); ++it
) {
5025 const Metadata
& m
= it
->second
;
5026 Metadata::const_iterator hostname
= m
.find("hostname");
5027 if (hostname
== m
.end()) {
5028 // not likely though
5031 mons
[hostname
->second
].push_back(it
->first
);
5034 dump_services(f
, mons
, "mon");
5038 // ----------------------------------------------
5041 int Monitor::scrub_start()
5043 dout(10) << __func__
<< dendl
;
5044 assert(is_leader());
5046 if (!scrub_result
.empty()) {
5047 clog
->info() << "scrub already in progress";
5051 scrub_event_cancel();
5052 scrub_result
.clear();
5053 scrub_state
.reset(new ScrubState
);
5059 int Monitor::scrub()
5061 assert(is_leader());
5062 assert(scrub_state
);
5064 scrub_cancel_timeout();
5065 wait_for_paxos_write();
5066 scrub_version
= paxos
->get_version();
5069 // scrub all keys if we're the only monitor in the quorum
5071 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5073 for (set
<int>::iterator p
= quorum
.begin();
5078 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5080 r
->key
= scrub_state
->last_key
;
5081 messenger
->send_message(r
, monmap
->get_inst(*p
));
5085 bool r
= _scrub(&scrub_result
[rank
],
5086 &scrub_state
->last_key
,
5089 scrub_state
->finished
= !r
;
5091 // only after we got our scrub results do we really care whether the
5092 // other monitors are late on their results. Also, this way we avoid
5093 // triggering the timeout if we end up getting stuck in _scrub() for
5094 // longer than the duration of the timeout.
5095 scrub_reset_timeout();
5097 if (quorum
.size() == 1) {
5098 assert(scrub_state
->finished
== true);
5104 void Monitor::handle_scrub(MonOpRequestRef op
)
5106 MMonScrub
*m
= static_cast<MMonScrub
*>(op
->get_req());
5107 dout(10) << __func__
<< " " << *m
<< dendl
;
5109 case MMonScrub::OP_SCRUB
:
5114 wait_for_paxos_write();
5116 if (m
->version
!= paxos
->get_version())
5119 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5123 reply
->key
= m
->key
;
5124 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5125 m
->get_connection()->send_message(reply
);
5129 case MMonScrub::OP_RESULT
:
5133 if (m
->version
!= scrub_version
)
5135 // reset the timeout each time we get a result
5136 scrub_reset_timeout();
5138 int from
= m
->get_source().num();
5139 assert(scrub_result
.count(from
) == 0);
5140 scrub_result
[from
] = m
->result
;
5142 if (scrub_result
.size() == quorum
.size()) {
5143 scrub_check_results();
5144 scrub_result
.clear();
5145 if (scrub_state
->finished
)
5155 bool Monitor::_scrub(ScrubResult
*r
,
5156 pair
<string
,string
> *start
,
5160 assert(start
!= NULL
);
5161 assert(num_keys
!= NULL
);
5163 set
<string
> prefixes
= get_sync_targets_names();
5164 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5166 dout(10) << __func__
<< " start (" << *start
<< ")"
5167 << " num_keys " << *num_keys
<< dendl
;
5169 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5171 int scrubbed_keys
= 0;
5172 pair
<string
,string
> last_key
;
5174 while (it
->has_next_chunk()) {
5176 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5179 pair
<string
,string
> k
= it
->get_next_key();
5180 if (prefixes
.count(k
.first
) == 0)
5183 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5184 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5185 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5191 int err
= store
->get(k
.first
, k
.second
, bl
);
5194 uint32_t key_crc
= bl
.crc32c(0);
5195 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5196 << " crc " << key_crc
<< dendl
;
5197 r
->prefix_keys
[k
.first
]++;
5198 if (r
->prefix_crc
.count(k
.first
) == 0) {
5199 r
->prefix_crc
[k
.first
] = 0;
5201 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5203 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5204 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5205 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5206 r
->prefix_crc
[k
.first
] += 1;
5213 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5214 << " scrubbed_keys " << scrubbed_keys
5215 << " has_next " << it
->has_next_chunk() << dendl
;
5218 *num_keys
= scrubbed_keys
;
5220 return it
->has_next_chunk();
5223 void Monitor::scrub_check_results()
5225 dout(10) << __func__
<< dendl
;
5229 ScrubResult
& mine
= scrub_result
[rank
];
5230 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5231 p
!= scrub_result
.end();
5233 if (p
->first
== rank
)
5235 if (p
->second
!= mine
) {
5237 clog
->error() << "scrub mismatch";
5238 clog
->error() << " mon." << rank
<< " " << mine
;
5239 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5243 clog
->info() << "scrub ok on " << quorum
<< ": " << mine
;
5246 inline void Monitor::scrub_timeout()
5248 dout(1) << __func__
<< " restarting scrub" << dendl
;
5253 void Monitor::scrub_finish()
5255 dout(10) << __func__
<< dendl
;
5257 scrub_event_start();
5260 void Monitor::scrub_reset()
5262 dout(10) << __func__
<< dendl
;
5263 scrub_cancel_timeout();
5265 scrub_result
.clear();
5266 scrub_state
.reset();
5269 inline void Monitor::scrub_update_interval(int secs
)
5271 // we don't care about changes if we are not the leader.
5272 // changes will be visible if we become the leader.
5276 dout(1) << __func__
<< " new interval = " << secs
<< dendl
;
5278 // if scrub already in progress, all changes will already be visible during
5279 // the next round. Nothing to do.
5280 if (scrub_state
!= NULL
)
5283 scrub_event_cancel();
5284 scrub_event_start();
5287 void Monitor::scrub_event_start()
5289 dout(10) << __func__
<< dendl
;
5292 scrub_event_cancel();
5294 if (cct
->_conf
->mon_scrub_interval
<= 0) {
5295 dout(1) << __func__
<< " scrub event is disabled"
5296 << " (mon_scrub_interval = " << cct
->_conf
->mon_scrub_interval
5301 scrub_event
= new C_MonContext(this, [this](int) {
5304 timer
.add_event_after(cct
->_conf
->mon_scrub_interval
, scrub_event
);
5307 void Monitor::scrub_event_cancel()
5309 dout(10) << __func__
<< dendl
;
5311 timer
.cancel_event(scrub_event
);
5316 inline void Monitor::scrub_cancel_timeout()
5318 if (scrub_timeout_event
) {
5319 timer
.cancel_event(scrub_timeout_event
);
5320 scrub_timeout_event
= NULL
;
5324 void Monitor::scrub_reset_timeout()
5326 dout(15) << __func__
<< " reset timeout event" << dendl
;
5327 scrub_cancel_timeout();
5329 scrub_timeout_event
= new C_MonContext(this, [this](int) {
5332 timer
.add_event_after(g_conf
->mon_scrub_timeout
, scrub_timeout_event
);
5335 /************ TICK ***************/
5336 void Monitor::new_tick()
5338 timer
.add_event_after(g_conf
->mon_tick_interval
, new C_MonContext(this, [this](int) {
5343 void Monitor::tick()
5346 dout(11) << "tick" << dendl
;
5348 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
) {
5354 utime_t now
= ceph_clock_now();
5356 Mutex::Locker
l(session_map_lock
);
5357 auto p
= session_map
.sessions
.begin();
5359 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5360 now
> (exited_quorum
+ 2*g_conf
->mon_lease
));
5366 // don't trim monitors
5367 if (s
->inst
.name
.is_mon())
5370 if (s
->session_timeout
< now
&& s
->con
) {
5371 // check keepalive, too
5372 s
->session_timeout
= s
->con
->get_last_keepalive();
5373 s
->session_timeout
+= g_conf
->mon_session_timeout
;
5375 if (s
->session_timeout
< now
) {
5376 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5377 << " (timeout " << s
->session_timeout
5378 << " < now " << now
<< ")" << dendl
;
5379 } else if (out_for_too_long
) {
5380 // boot the client Session because we've taken too long getting back in
5381 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5382 << " because we've been out of quorum too long" << dendl
;
5387 s
->con
->mark_down();
5389 logger
->inc(l_mon_session_trim
);
5392 sync_trim_providers();
5394 if (!maybe_wait_for_quorum
.empty()) {
5395 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5398 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5399 // this is only necessary on upgraded clusters.
5400 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5401 prepare_new_fingerprint(t
);
5402 paxos
->trigger_propose();
5408 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5411 nf
.generate_random();
5412 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5416 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5419 int Monitor::check_fsid()
5422 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5427 string
es(ebl
.c_str(), ebl
.length());
5429 // only keep the first line
5430 size_t pos
= es
.find_first_of('\n');
5431 if (pos
!= string::npos
)
5434 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5436 if (!ondisk
.parse(es
.c_str())) {
5437 derr
<< "error: unable to parse uuid" << dendl
;
5441 if (monmap
->get_fsid() != ondisk
) {
5442 derr
<< "error: cluster_uuid file exists with value " << ondisk
5443 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5450 int Monitor::write_fsid()
5452 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5454 int r
= store
->apply_transaction(t
);
5458 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5461 ss
<< monmap
->get_fsid() << "\n";
5462 string us
= ss
.str();
5467 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5472 * this is the closest thing to a traditional 'mkfs' for ceph.
5473 * initialize the monitor state machines to their initial values.
5475 int Monitor::mkfs(bufferlist
& osdmapbl
)
5477 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5479 // verify cluster fsid
5480 int r
= check_fsid();
5481 if (r
< 0 && r
!= -ENOENT
)
5485 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
5486 magicbl
.append("\n");
5487 t
->put(MONITOR_NAME
, "magic", magicbl
);
5490 features
= get_initial_supported_features();
5493 // save monmap, osdmap, keyring.
5494 bufferlist monmapbl
;
5495 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
5496 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5497 t
->put("mkfs", "monmap", monmapbl
);
5499 if (osdmapbl
.length()) {
5500 // make sure it's a valid osdmap
5503 om
.decode(osdmapbl
);
5505 catch (buffer::error
& e
) {
5506 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
5509 t
->put("mkfs", "osdmap", osdmapbl
);
5512 if (is_keyring_required()) {
5514 string keyring_filename
;
5516 r
= ceph_resolve_file_search(g_conf
->keyring
, keyring_filename
);
5518 derr
<< "unable to find a keyring file on " << g_conf
->keyring
5519 << ": " << cpp_strerror(r
) << dendl
;
5520 if (g_conf
->key
!= "") {
5521 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf
->key
+
5522 "\n\tcaps mon = \"allow *\"\n";
5524 bl
.append(keyring_plaintext
);
5526 bufferlist::iterator i
= bl
.begin();
5527 keyring
.decode_plaintext(i
);
5529 catch (const buffer::error
& e
) {
5530 derr
<< "error decoding keyring " << keyring_plaintext
5531 << ": " << e
.what() << dendl
;
5538 r
= keyring
.load(g_ceph_context
, keyring_filename
);
5540 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
5545 // put mon. key in external keyring; seed with everything else.
5546 extract_save_mon_key(keyring
);
5548 bufferlist keyringbl
;
5549 keyring
.encode_plaintext(keyringbl
);
5550 t
->put("mkfs", "keyring", keyringbl
);
5553 store
->apply_transaction(t
);
5558 int Monitor::write_default_keyring(bufferlist
& bl
)
5561 os
<< g_conf
->mon_data
<< "/keyring";
5564 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
, 0600);
5567 dout(0) << __func__
<< " failed to open " << os
.str()
5568 << ": " << cpp_strerror(err
) << dendl
;
5572 err
= bl
.write_fd(fd
);
5575 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5580 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
5582 EntityName mon_name
;
5583 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
5585 if (keyring
.get_auth(mon_name
, mon_key
)) {
5586 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
5588 pkey
.add(mon_name
, mon_key
);
5590 pkey
.encode_plaintext(bl
);
5591 write_default_keyring(bl
);
5592 keyring
.remove(mon_name
);
5596 bool Monitor::ms_get_authorizer(int service_id
, AuthAuthorizer
**authorizer
,
5599 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id
)
5605 // we only connect to other monitors and mgr; every else connects to us.
5606 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
5607 service_id
!= CEPH_ENTITY_TYPE_MGR
)
5610 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5612 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
5613 AuthNoneClientHandler
handler(g_ceph_context
, nullptr);
5614 handler
.set_global_id(0);
5615 *authorizer
= handler
.build_authorizer(service_id
);
5619 CephXServiceTicketInfo auth_ticket_info
;
5620 CephXSessionAuthInfo info
;
5624 name
.set_type(CEPH_ENTITY_TYPE_MON
);
5625 auth_ticket_info
.ticket
.name
= name
;
5626 auth_ticket_info
.ticket
.global_id
= 0;
5628 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
5629 // mon to mon authentication uses the private monitor shared key and not the
5632 if (!keyring
.get_secret(name
, secret
) &&
5633 !key_server
.get_secret(name
, secret
)) {
5634 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5636 stringstream ss
, ds
;
5637 int err
= key_server
.list_secrets(ds
);
5639 ss
<< "no installed auth entries!";
5641 ss
<< "installed auth entries:";
5642 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
5646 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
,
5647 secret
, (uint64_t)-1);
5649 dout(0) << __func__
<< " failed to build mon session_auth_info "
5650 << cpp_strerror(ret
) << dendl
;
5653 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
5655 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
);
5657 derr
<< __func__
<< " failed to build mgr service session_auth_info "
5658 << cpp_strerror(ret
) << dendl
;
5662 ceph_abort(); // see check at top of fn
5665 CephXTicketBlob blob
;
5666 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
5667 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl
;
5670 bufferlist ticket_data
;
5671 ::encode(blob
, ticket_data
);
5673 bufferlist::iterator iter
= ticket_data
.begin();
5674 CephXTicketHandler
handler(g_ceph_context
, service_id
);
5675 ::decode(handler
.ticket
, iter
);
5677 handler
.session_key
= info
.session_key
;
5679 *authorizer
= handler
.build_authorizer(0);
5684 bool Monitor::ms_verify_authorizer(Connection
*con
, int peer_type
,
5685 int protocol
, bufferlist
& authorizer_data
,
5686 bufferlist
& authorizer_reply
,
5687 bool& isvalid
, CryptoKey
& session_key
)
5689 dout(10) << "ms_verify_authorizer " << con
->get_peer_addr()
5690 << " " << ceph_entity_type_name(peer_type
)
5691 << " protocol " << protocol
<< dendl
;
5696 if (peer_type
== CEPH_ENTITY_TYPE_MON
&&
5697 auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5698 // monitor, and cephx is enabled
5700 if (protocol
== CEPH_AUTH_CEPHX
) {
5701 bufferlist::iterator iter
= authorizer_data
.begin();
5702 CephXServiceTicketInfo auth_ticket_info
;
5704 if (authorizer_data
.length()) {
5705 bool ret
= cephx_verify_authorizer(g_ceph_context
, &keyring
, iter
,
5706 auth_ticket_info
, authorizer_reply
);
5708 session_key
= auth_ticket_info
.session_key
;
5711 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con
->get_peer_addr() << dendl
;
5715 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl
;