1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
25 #include "common/version.h"
27 #include "osd/OSDMap.h"
29 #include "MonitorDBStore.h"
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
52 #include "messages/MAuthReply.h"
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/OldHealthMonitor.h"
81 #include "mon/HealthMonitor.h"
82 #include "mon/ConfigKeyService.h"
83 #include "common/config.h"
84 #include "common/cmdparse.h"
85 #include "include/assert.h"
86 #include "include/compat.h"
87 #include "perfglue/heap_profiler.h"
89 #include "auth/none/AuthNoneClientHandler.h"
91 #define dout_subsys ceph_subsys_mon
93 #define dout_prefix _prefix(_dout, this)
94 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
95 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
96 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
99 const string
Monitor::MONITOR_NAME
= "monitor";
100 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
105 #undef COMMAND_WITH_FLAG
106 #define FLAG(f) (MonCommand::FLAG_##f)
107 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
108 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
109 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
110 {parsesig, helptext, modulename, req_perms, avail, flags},
111 MonCommand mon_commands
[] = {
112 #include <mon/MonCommands.h>
114 MonCommand pgmonitor_commands
[] = {
115 #include <mon/PGMonitorCommands.h>
118 #undef COMMAND_WITH_FLAG
121 void C_MonContext::finish(int r
) {
122 if (mon
->is_shutdown())
124 FunctionContext::finish(r
);
127 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
128 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
133 con_self(m
? m
->get_loopback_connection() : NULL
),
134 lock("Monitor::lock"),
136 finisher(cct_
, "mon_finisher", "fin"),
137 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf
->mon_cpu_threads
),
138 has_ever_joined(false),
139 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
141 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
142 key_server(cct
, &keyring
),
143 auth_cluster_required(cct
,
144 cct
->_conf
->auth_supported
.empty() ?
145 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
146 auth_service_required(cct
,
147 cct
->_conf
->auth_supported
.empty() ?
148 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
149 mgr_messenger(mgr_m
),
150 mgr_client(cct_
, mgr_m
),
154 state(STATE_PROBING
),
157 required_features(0),
159 quorum_con_features(0),
163 scrub_timeout_event(NULL
),
166 sync_provider_count(0),
169 sync_start_version(0),
170 sync_timeout_event(NULL
),
171 sync_last_committed_floor(0),
175 timecheck_rounds_since_clean(0),
176 timecheck_event(NULL
),
178 paxos_service(PAXOS_NUM
),
180 routed_request_tid(0),
181 op_tracker(cct
, true, 1)
183 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
184 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
186 update_log_clients();
188 paxos
= new Paxos(this, "paxos");
190 paxos_service
[PAXOS_MDSMAP
] = new MDSMonitor(this, paxos
, "mdsmap");
191 paxos_service
[PAXOS_MONMAP
] = new MonmapMonitor(this, paxos
, "monmap");
192 paxos_service
[PAXOS_OSDMAP
] = new OSDMonitor(cct
, this, paxos
, "osdmap");
193 paxos_service
[PAXOS_PGMAP
] = new PGMonitor(this, paxos
, "pgmap");
194 paxos_service
[PAXOS_LOG
] = new LogMonitor(this, paxos
, "logm");
195 paxos_service
[PAXOS_AUTH
] = new AuthMonitor(this, paxos
, "auth");
196 paxos_service
[PAXOS_MGR
] = new MgrMonitor(this, paxos
, "mgr");
197 paxos_service
[PAXOS_MGRSTAT
] = new MgrStatMonitor(this, paxos
, "mgrstat");
198 paxos_service
[PAXOS_HEALTH
] = new HealthMonitor(this, paxos
, "health");
200 health_monitor
= new OldHealthMonitor(this);
201 config_key_service
= new ConfigKeyService(this, paxos
);
203 mon_caps
= new MonCap();
204 bool r
= mon_caps
->parse("allow *", NULL
);
207 exited_quorum
= ceph_clock_now();
209 // prepare local commands
210 local_mon_commands
.resize(ARRAY_SIZE(mon_commands
));
211 for (unsigned i
= 0; i
< ARRAY_SIZE(mon_commands
); ++i
) {
212 local_mon_commands
[i
] = mon_commands
[i
];
214 MonCommand::encode_vector(local_mon_commands
, local_mon_commands_bl
);
216 local_upgrading_mon_commands
= local_mon_commands
;
217 for (unsigned i
= 0; i
< ARRAY_SIZE(pgmonitor_commands
); ++i
) {
218 local_upgrading_mon_commands
.push_back(pgmonitor_commands
[i
]);
220 MonCommand::encode_vector(local_upgrading_mon_commands
,
221 local_upgrading_mon_commands_bl
);
223 // assume our commands until we have an election. this only means
224 // we won't reply with EINVAL before the election; any command that
225 // actually matters will wait until we have quorum etc and then
226 // retry (and revalidate).
227 leader_mon_commands
= local_mon_commands
;
229 // note: OSDMonitor may update this based on the luminous flag.
230 pgservice
= mgrstatmon()->get_pg_stat_service();
235 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
237 delete health_monitor
;
238 delete config_key_service
;
240 assert(session_map
.sessions
.empty());
245 class AdminHook
: public AdminSocketHook
{
248 explicit AdminHook(Monitor
*m
) : mon(m
) {}
249 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
250 bufferlist
& out
) override
{
252 mon
->do_admin_command(command
, cmdmap
, format
, ss
);
258 void Monitor::do_admin_command(string command
, cmdmap_t
& cmdmap
, string format
,
261 Mutex::Locker
l(lock
);
263 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
266 for (cmdmap_t::iterator p
= cmdmap
.begin();
267 p
!= cmdmap
.end(); ++p
) {
268 if (p
->first
== "prefix")
272 args
+= cmd_vartype_stringify(p
->second
);
274 args
= "[" + args
+ "]";
276 bool read_only
= (command
== "mon_status" ||
277 command
== "mon metadata" ||
278 command
== "quorum_status" ||
280 command
== "sessions");
282 (read_only
? audit_clog
->debug() : audit_clog
->info())
283 << "from='admin socket' entity='admin socket' "
284 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
286 if (command
== "mon_status") {
287 get_mon_status(f
.get(), ss
);
290 } else if (command
== "quorum_status") {
291 _quorum_status(f
.get(), ss
);
292 } else if (command
== "sync_force") {
294 if ((!cmd_getval(g_ceph_context
, cmdmap
, "validate", validate
)) ||
295 (validate
!= "--yes-i-really-mean-it")) {
296 ss
<< "are you SURE? this will mean the monitor store will be erased "
297 "the next time the monitor is restarted. pass "
298 "'--yes-i-really-mean-it' if you really do.";
301 sync_force(f
.get(), ss
);
302 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
303 if (!_add_bootstrap_peer_hint(command
, cmdmap
, ss
))
305 } else if (command
== "quorum enter") {
306 elector
.start_participating();
308 ss
<< "started responding to quorum, initiated new election";
309 } else if (command
== "quorum exit") {
311 elector
.stop_participating();
312 ss
<< "stopped responding to quorum, initiated new election";
313 } else if (command
== "ops") {
314 (void)op_tracker
.dump_ops_in_flight(f
.get());
318 } else if (command
== "sessions") {
321 f
->open_array_section("sessions");
322 for (auto p
: session_map
.sessions
) {
323 f
->dump_stream("session") << *p
;
330 assert(0 == "bad AdminSocket command binding");
332 (read_only
? audit_clog
->debug() : audit_clog
->info())
333 << "from='admin socket' "
334 << "entity='admin socket' "
335 << "cmd=" << command
<< " "
336 << "args=" << args
<< ": finished";
340 (read_only
? audit_clog
->debug() : audit_clog
->info())
341 << "from='admin socket' "
342 << "entity='admin socket' "
343 << "cmd=" << command
<< " "
344 << "args=" << args
<< ": aborted";
347 void Monitor::handle_signal(int signum
)
349 assert(signum
== SIGINT
|| signum
== SIGTERM
);
350 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
354 CompatSet
Monitor::get_initial_supported_features()
356 CompatSet::FeatureSet ceph_mon_feature_compat
;
357 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
358 CompatSet::FeatureSet ceph_mon_feature_incompat
;
359 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
360 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
361 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
362 ceph_mon_feature_incompat
);
365 CompatSet
Monitor::get_supported_features()
367 CompatSet compat
= get_initial_supported_features();
368 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
369 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
370 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
371 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
372 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
373 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
377 CompatSet
Monitor::get_legacy_features()
379 CompatSet::FeatureSet ceph_mon_feature_compat
;
380 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
381 CompatSet::FeatureSet ceph_mon_feature_incompat
;
382 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
383 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
384 ceph_mon_feature_incompat
);
387 int Monitor::check_features(MonitorDBStore
*store
)
389 CompatSet required
= get_supported_features();
392 read_features_off_disk(store
, &ondisk
);
394 if (!required
.writeable(ondisk
)) {
395 CompatSet diff
= required
.unsupported(ondisk
);
396 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
403 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
405 bufferlist featuresbl
;
406 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
407 if (featuresbl
.length() == 0) {
408 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
409 << "Assuming it is old-style and introducing one." << dendl
;
410 //we only want the baseline ~v.18 features assumed to be on disk.
411 //If new features are introduced this code needs to disappear or
413 *features
= get_legacy_features();
415 features
->encode(featuresbl
);
416 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
417 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
418 store
->apply_transaction(t
);
420 bufferlist::iterator it
= featuresbl
.begin();
421 features
->decode(it
);
425 void Monitor::read_features()
427 read_features_off_disk(store
, &features
);
428 dout(10) << "features " << features
<< dendl
;
430 calc_quorum_requirements();
431 dout(10) << "required_features " << required_features
<< dendl
;
434 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
438 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
441 const char** Monitor::get_tracked_conf_keys() const
443 static const char* KEYS
[] = {
444 "crushtool", // helpful for testing
445 "mon_election_timeout",
447 "mon_lease_renew_interval_factor",
448 "mon_lease_ack_timeout_factor",
449 "mon_accept_timeout_factor",
453 "clog_to_syslog_facility",
454 "clog_to_syslog_level",
456 "clog_to_graylog_host",
457 "clog_to_graylog_port",
460 // periodic health to clog
461 "mon_health_to_clog",
462 "mon_health_to_clog_interval",
463 "mon_health_to_clog_tick_interval",
465 "mon_scrub_interval",
471 void Monitor::handle_conf_change(const struct md_config_t
*conf
,
472 const std::set
<std::string
> &changed
)
476 dout(10) << __func__
<< " " << changed
<< dendl
;
478 if (changed
.count("clog_to_monitors") ||
479 changed
.count("clog_to_syslog") ||
480 changed
.count("clog_to_syslog_level") ||
481 changed
.count("clog_to_syslog_facility") ||
482 changed
.count("clog_to_graylog") ||
483 changed
.count("clog_to_graylog_host") ||
484 changed
.count("clog_to_graylog_port") ||
485 changed
.count("host") ||
486 changed
.count("fsid")) {
487 update_log_clients();
490 if (changed
.count("mon_health_to_clog") ||
491 changed
.count("mon_health_to_clog_interval") ||
492 changed
.count("mon_health_to_clog_tick_interval")) {
493 health_to_clog_update_conf(changed
);
496 if (changed
.count("mon_scrub_interval")) {
497 scrub_update_interval(conf
->mon_scrub_interval
);
501 void Monitor::update_log_clients()
503 map
<string
,string
> log_to_monitors
;
504 map
<string
,string
> log_to_syslog
;
505 map
<string
,string
> log_channel
;
506 map
<string
,string
> log_prio
;
507 map
<string
,string
> log_to_graylog
;
508 map
<string
,string
> log_to_graylog_host
;
509 map
<string
,string
> log_to_graylog_port
;
513 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
514 log_channel
, log_prio
, log_to_graylog
,
515 log_to_graylog_host
, log_to_graylog_port
,
519 clog
->update_config(log_to_monitors
, log_to_syslog
,
520 log_channel
, log_prio
, log_to_graylog
,
521 log_to_graylog_host
, log_to_graylog_port
,
524 audit_clog
->update_config(log_to_monitors
, log_to_syslog
,
525 log_channel
, log_prio
, log_to_graylog
,
526 log_to_graylog_host
, log_to_graylog_port
,
530 int Monitor::sanitize_options()
534 // mon_lease must be greater than mon_lease_renewal; otherwise we
535 // may incur in leases expiring before they are renewed.
536 if (g_conf
->mon_lease_renew_interval_factor
>= 1.0) {
537 clog
->error() << "mon_lease_renew_interval_factor ("
538 << g_conf
->mon_lease_renew_interval_factor
539 << ") must be less than 1.0";
543 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
544 // got time to renew the lease and get an ack for it. Having both options
545 // with the same value, for a given small vale, could mean timing out if
546 // the monitors happened to be overloaded -- or even under normal load for
547 // a small enough value.
548 if (g_conf
->mon_lease_ack_timeout_factor
<= 1.0) {
549 clog
->error() << "mon_lease_ack_timeout_factor ("
550 << g_conf
->mon_lease_ack_timeout_factor
551 << ") must be greater than 1.0";
558 int Monitor::preinit()
562 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
564 int r
= sanitize_options();
566 derr
<< "option sanitization failed!" << dendl
;
573 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
574 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess",
575 PerfCountersBuilder::PRIO_USEFUL
);
576 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions",
577 "sadd", PerfCountersBuilder::PRIO_INTERESTING
);
578 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions",
579 "srm", PerfCountersBuilder::PRIO_INTERESTING
);
580 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions",
581 "strm", PerfCountersBuilder::PRIO_USEFUL
);
582 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in",
583 "ecnt", PerfCountersBuilder::PRIO_USEFUL
);
584 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started",
585 "estt", PerfCountersBuilder::PRIO_INTERESTING
);
586 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won",
587 "ewon", PerfCountersBuilder::PRIO_INTERESTING
);
588 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost",
589 "elst", PerfCountersBuilder::PRIO_INTERESTING
);
590 logger
= pcb
.create_perf_counters();
591 cct
->get_perfcounters_collection()->add(logger
);
594 assert(!cluster_logger
);
596 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
597 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
598 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
599 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
600 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
601 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
602 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
603 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster");
604 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space");
605 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space");
606 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
607 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
608 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
609 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
610 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
611 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
612 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
613 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
614 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
615 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects");
616 pcb
.add_u64(l_cluster_num_mds_up
, "num_mds_up", "MDSs that are up");
617 pcb
.add_u64(l_cluster_num_mds_in
, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
618 pcb
.add_u64(l_cluster_num_mds_failed
, "num_mds_failed", "Failed MDS");
619 pcb
.add_u64(l_cluster_mds_epoch
, "mds_epoch", "Current epoch of MDS map");
620 cluster_logger
= pcb
.create_perf_counters();
623 paxos
->init_logger();
625 // verify cluster_uuid
627 int r
= check_fsid();
639 // have we ever joined a quorum?
640 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
641 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
643 if (!has_ever_joined
) {
644 // impose initial quorum restrictions?
645 list
<string
> initial_members
;
646 get_str_list(g_conf
->mon_initial_members
, initial_members
);
648 if (!initial_members
.empty()) {
649 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
651 monmap
->set_initial_members(g_ceph_context
, initial_members
, name
, messenger
->get_myaddr(),
654 dout(10) << " monmap is " << *monmap
<< dendl
;
655 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
657 } else if (!monmap
->contains(name
)) {
658 derr
<< "not in monmap and have been in a quorum before; "
659 << "must have been removed" << dendl
;
660 if (g_conf
->mon_force_quorum_join
) {
661 dout(0) << "we should have died but "
662 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
664 derr
<< "commit suicide!" << dendl
;
671 // We have a potentially inconsistent store state in hands. Get rid of it
673 bool clear_store
= false;
674 if (store
->exists("mon_sync", "in_sync")) {
675 dout(1) << __func__
<< " clean up potentially inconsistent store state"
680 if (store
->get("mon_sync", "force_sync") > 0) {
681 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
686 set
<string
> sync_prefixes
= get_sync_targets_names();
687 store
->clear(sync_prefixes
);
691 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
692 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
695 health_monitor
->init();
697 if (is_keyring_required()) {
698 // we need to bootstrap authentication keys so we can form an
700 if (authmon()->get_last_committed() == 0) {
701 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
703 int err
= store
->get("mkfs", "keyring", bl
);
704 if (err
== 0 && bl
.length() > 0) {
705 // Attempt to decode and extract keyring only if it is found.
707 bufferlist::iterator p
= bl
.begin();
708 ::decode(keyring
, p
);
709 extract_save_mon_key(keyring
);
713 string keyring_loc
= g_conf
->mon_data
+ "/keyring";
715 r
= keyring
.load(cct
, keyring_loc
);
718 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
720 if (key_server
.get_auth(mon_name
, mon_key
)) {
721 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
722 keyring
.add(mon_name
, mon_key
);
724 keyring
.encode_plaintext(bl
);
725 write_default_keyring(bl
);
727 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
734 admin_hook
= new AdminHook(this);
735 AdminSocket
* admin_socket
= cct
->get_admin_socket();
737 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
739 r
= admin_socket
->register_command("mon_status", "mon_status", admin_hook
,
740 "show current monitor status");
742 r
= admin_socket
->register_command("quorum_status", "quorum_status",
743 admin_hook
, "show current quorum status");
745 r
= admin_socket
->register_command("sync_force",
746 "sync_force name=validate,"
748 "strings=--yes-i-really-mean-it",
750 "force sync of and clear monitor store");
752 r
= admin_socket
->register_command("add_bootstrap_peer_hint",
753 "add_bootstrap_peer_hint name=addr,"
756 "add peer address as potential bootstrap"
757 " peer for cluster bringup");
759 r
= admin_socket
->register_command("quorum enter", "quorum enter",
761 "force monitor back into quorum");
763 r
= admin_socket
->register_command("quorum exit", "quorum exit",
765 "force monitor out of the quorum");
767 r
= admin_socket
->register_command("ops",
770 "show the ops currently in flight");
772 r
= admin_socket
->register_command("sessions",
775 "list existing sessions");
780 // add ourselves as a conf observer
781 g_conf
->add_observer(this);
789 dout(2) << "init" << dendl
;
790 Mutex::Locker
l(lock
);
801 messenger
->add_dispatcher_tail(this);
804 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
805 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
808 // add features of myself into feature_map
809 session_map
.feature_map
.add_mon(con_self
->get_features());
813 void Monitor::init_paxos()
815 dout(10) << __func__
<< dendl
;
819 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
820 paxos_service
[i
]->init();
823 refresh_from_paxos(NULL
);
826 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
828 dout(10) << __func__
<< dendl
;
831 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
834 bufferlist::iterator p
= bl
.begin();
835 ::decode(fingerprint
, p
);
837 catch (buffer::error
& e
) {
838 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
841 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
844 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
845 paxos_service
[i
]->refresh(need_bootstrap
);
847 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
848 paxos_service
[i
]->post_refresh();
853 void Monitor::register_cluster_logger()
855 if (!cluster_logger_registered
) {
856 dout(10) << "register_cluster_logger" << dendl
;
857 cluster_logger_registered
= true;
858 cct
->get_perfcounters_collection()->add(cluster_logger
);
860 dout(10) << "register_cluster_logger - already registered" << dendl
;
864 void Monitor::unregister_cluster_logger()
866 if (cluster_logger_registered
) {
867 dout(10) << "unregister_cluster_logger" << dendl
;
868 cluster_logger_registered
= false;
869 cct
->get_perfcounters_collection()->remove(cluster_logger
);
871 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
875 void Monitor::update_logger()
877 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
878 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
881 void Monitor::shutdown()
883 dout(1) << "shutdown" << dendl
;
887 wait_for_paxos_write();
889 state
= STATE_SHUTDOWN
;
891 g_conf
->remove_observer(this);
894 AdminSocket
* admin_socket
= cct
->get_admin_socket();
895 admin_socket
->unregister_command("mon_status");
896 admin_socket
->unregister_command("quorum_status");
897 admin_socket
->unregister_command("sync_force");
898 admin_socket
->unregister_command("add_bootstrap_peer_hint");
899 admin_socket
->unregister_command("quorum enter");
900 admin_socket
->unregister_command("quorum exit");
901 admin_socket
->unregister_command("ops");
902 admin_socket
->unregister_command("sessions");
909 mgr_client
.shutdown();
912 finisher
.wait_for_empty();
918 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
920 health_monitor
->shutdown();
922 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
923 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
929 remove_all_sessions();
932 cct
->get_perfcounters_collection()->remove(logger
);
936 if (cluster_logger
) {
937 if (cluster_logger_registered
)
938 cct
->get_perfcounters_collection()->remove(cluster_logger
);
939 delete cluster_logger
;
940 cluster_logger
= NULL
;
943 log_client
.shutdown();
945 // unlock before msgr shutdown...
948 messenger
->shutdown(); // last thing! ceph_mon.cc will delete mon.
949 mgr_messenger
->shutdown();
952 void Monitor::wait_for_paxos_write()
954 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
955 dout(10) << __func__
<< " flushing pending write" << dendl
;
959 dout(10) << __func__
<< " flushed pending write" << dendl
;
963 void Monitor::bootstrap()
965 dout(10) << "bootstrap" << dendl
;
966 wait_for_paxos_write();
968 sync_reset_requester();
969 unregister_cluster_logger();
970 cancel_probe_timeout();
973 int newrank
= monmap
->get_rank(messenger
->get_myaddr());
974 if (newrank
< 0 && rank
>= 0) {
975 // was i ever part of the quorum?
976 if (has_ever_joined
) {
977 dout(0) << " removed from monmap, suicide." << dendl
;
981 if (newrank
!= rank
) {
982 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
983 messenger
->set_myname(entity_name_t::MON(newrank
));
986 // reset all connections, or else our peers will think we are someone else.
987 messenger
->mark_down_all();
991 state
= STATE_PROBING
;
996 if (g_conf
->mon_compact_on_bootstrap
) {
997 dout(10) << "bootstrap -- triggering compaction" << dendl
;
999 dout(10) << "bootstrap -- finished compaction" << dendl
;
1002 // singleton monitor?
1003 if (monmap
->size() == 1 && rank
== 0) {
1004 win_standalone_election();
1008 reset_probe_timeout();
1010 // i'm outside the quorum
1011 if (monmap
->contains(name
))
1012 outside_quorum
.insert(name
);
1015 dout(10) << "probing other monitors" << dendl
;
1016 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1018 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
),
1019 monmap
->get_inst(i
));
1021 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
1022 p
!= extra_probe_peers
.end();
1024 if (*p
!= messenger
->get_myaddr()) {
1026 i
.name
= entity_name_t::MON(-1);
1028 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
), i
);
1033 bool Monitor::_add_bootstrap_peer_hint(string cmd
, cmdmap_t
& cmdmap
, ostream
& ss
)
1036 if (!cmd_getval(g_ceph_context
, cmdmap
, "addr", addrstr
)) {
1037 ss
<< "unable to parse address string value '"
1038 << cmd_vartype_stringify(cmdmap
["addr"]) << "'";
1041 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' '"
1042 << addrstr
<< "'" << dendl
;
1045 const char *end
= 0;
1046 if (!addr
.parse(addrstr
.c_str(), &end
)) {
1047 ss
<< "failed to parse addr '" << addrstr
<< "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1051 if (is_leader() || is_peon()) {
1052 ss
<< "mon already active; ignoring bootstrap hint";
1056 if (addr
.get_port() == 0)
1057 addr
.set_port(CEPH_MON_PORT
);
1059 extra_probe_peers
.insert(addr
);
1060 ss
<< "adding peer " << addr
<< " to list: " << extra_probe_peers
;
1064 // called by bootstrap(), or on leader|peon -> electing
1065 void Monitor::_reset()
1067 dout(10) << __func__
<< dendl
;
1069 cancel_probe_timeout();
1071 health_events_cleanup();
1072 health_check_log_times
.clear();
1073 scrub_event_cancel();
1075 leader_since
= utime_t();
1076 if (!quorum
.empty()) {
1077 exited_quorum
= ceph_clock_now();
1080 outside_quorum
.clear();
1081 quorum_feature_map
.clear();
1087 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
1089 health_monitor
->finish();
1093 // -----------------------------------------------------------
1096 set
<string
> Monitor::get_sync_targets_names()
1098 set
<string
> targets
;
1099 targets
.insert(paxos
->get_name());
1100 for (int i
= 0; i
< PAXOS_NUM
; ++i
)
1101 paxos_service
[i
]->get_store_prefixes(targets
);
1102 ConfigKeyService
*config_key_service_ptr
= dynamic_cast<ConfigKeyService
*>(config_key_service
);
1103 assert(config_key_service_ptr
);
1104 config_key_service_ptr
->get_store_prefixes(targets
);
1109 void Monitor::sync_timeout()
1111 dout(10) << __func__
<< dendl
;
1112 assert(state
== STATE_SYNCHRONIZING
);
1116 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1118 dout(1) << __func__
<< dendl
;
1120 MonMap latest_monmap
;
1122 // Grab latest monmap from MonmapMonitor
1123 bufferlist monmon_bl
;
1124 int err
= monmon()->get_monmap(monmon_bl
);
1126 if (err
!= -ENOENT
) {
1128 << " something wrong happened while reading the store: "
1129 << cpp_strerror(err
) << dendl
;
1130 assert(0 == "error reading the store");
1133 latest_monmap
.decode(monmon_bl
);
1136 // Grab last backed up monmap (if any) and compare epochs
1137 if (store
->exists("mon_sync", "latest_monmap")) {
1138 bufferlist backup_bl
;
1139 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1142 << " something wrong happened while reading the store: "
1143 << cpp_strerror(err
) << dendl
;
1144 assert(0 == "error reading the store");
1146 assert(backup_bl
.length() > 0);
1148 MonMap backup_monmap
;
1149 backup_monmap
.decode(backup_bl
);
1151 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1152 latest_monmap
= backup_monmap
;
1155 // Check if our current monmap's epoch is greater than the one we've
1157 if (monmap
->epoch
> latest_monmap
.epoch
)
1158 latest_monmap
= *monmap
;
1160 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1162 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1165 void Monitor::sync_reset_requester()
1167 dout(10) << __func__
<< dendl
;
1169 if (sync_timeout_event
) {
1170 timer
.cancel_event(sync_timeout_event
);
1171 sync_timeout_event
= NULL
;
1174 sync_provider
= entity_inst_t();
1177 sync_start_version
= 0;
1180 void Monitor::sync_reset_provider()
1182 dout(10) << __func__
<< dendl
;
1183 sync_providers
.clear();
1186 void Monitor::sync_start(entity_inst_t
&other
, bool full
)
1188 dout(10) << __func__
<< " " << other
<< (full
? " full" : " recent") << dendl
;
1190 assert(state
== STATE_PROBING
||
1191 state
== STATE_SYNCHRONIZING
);
1192 state
= STATE_SYNCHRONIZING
;
1194 // make sure are not a provider for anyone!
1195 sync_reset_provider();
1200 // stash key state, and mark that we are syncing
1201 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1202 sync_stash_critical_state(t
);
1203 t
->put("mon_sync", "in_sync", 1);
1205 sync_last_committed_floor
= MAX(sync_last_committed_floor
, paxos
->get_version());
1206 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1207 << sync_last_committed_floor
<< dendl
;
1208 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1210 store
->apply_transaction(t
);
1212 assert(g_conf
->mon_sync_requester_kill_at
!= 1);
1214 // clear the underlying store
1215 set
<string
> targets
= get_sync_targets_names();
1216 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1217 store
->clear(targets
);
1219 // make sure paxos knows it has been reset. this prevents a
1220 // bootstrap and then different probe reply order from possibly
1221 // deciding a partial or no sync is needed.
1224 assert(g_conf
->mon_sync_requester_kill_at
!= 2);
1227 // assume 'other' as the leader. We will update the leader once we receive
1228 // a reply to the sync start.
1229 sync_provider
= other
;
1231 sync_reset_timeout();
1233 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1235 m
->last_committed
= paxos
->get_version();
1236 messenger
->send_message(m
, sync_provider
);
1239 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1241 dout(10) << __func__
<< dendl
;
1242 bufferlist backup_monmap
;
1243 sync_obtain_latest_monmap(backup_monmap
);
1244 assert(backup_monmap
.length() > 0);
1245 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1248 void Monitor::sync_reset_timeout()
1250 dout(10) << __func__
<< dendl
;
1251 if (sync_timeout_event
)
1252 timer
.cancel_event(sync_timeout_event
);
1253 sync_timeout_event
= timer
.add_event_after(
1254 g_conf
->mon_sync_timeout
,
1255 new C_MonContext(this, [this](int) {
1260 void Monitor::sync_finish(version_t last_committed
)
1262 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1264 assert(g_conf
->mon_sync_requester_kill_at
!= 7);
1267 // finalize the paxos commits
1268 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1269 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1271 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1273 dout(30) << __func__
<< " final tx dump:\n";
1274 JSONFormatter
f(true);
1279 store
->apply_transaction(tx
);
1282 assert(g_conf
->mon_sync_requester_kill_at
!= 8);
1284 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1285 t
->erase("mon_sync", "in_sync");
1286 t
->erase("mon_sync", "force_sync");
1287 t
->erase("mon_sync", "last_committed_floor");
1288 store
->apply_transaction(t
);
1290 assert(g_conf
->mon_sync_requester_kill_at
!= 9);
1294 assert(g_conf
->mon_sync_requester_kill_at
!= 10);
1299 void Monitor::handle_sync(MonOpRequestRef op
)
1301 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1302 dout(10) << __func__
<< " " << *m
<< dendl
;
1305 // provider ---------
1307 case MMonSync::OP_GET_COOKIE_FULL
:
1308 case MMonSync::OP_GET_COOKIE_RECENT
:
1309 handle_sync_get_cookie(op
);
1311 case MMonSync::OP_GET_CHUNK
:
1312 handle_sync_get_chunk(op
);
1315 // client -----------
1317 case MMonSync::OP_COOKIE
:
1318 handle_sync_cookie(op
);
1321 case MMonSync::OP_CHUNK
:
1322 case MMonSync::OP_LAST_CHUNK
:
1323 handle_sync_chunk(op
);
1325 case MMonSync::OP_NO_COOKIE
:
1326 handle_sync_no_cookie(op
);
1330 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1331 assert(0 == "unknown op");
1337 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1339 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1340 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1341 m
->get_connection()->send_message(reply
);
1344 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1346 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1347 if (is_synchronizing()) {
1348 _sync_reply_no_cookie(op
);
1352 assert(g_conf
->mon_sync_provider_kill_at
!= 1);
1354 // make sure they can understand us.
1355 if ((required_features
^ m
->get_connection()->get_features()) &
1356 required_features
) {
1357 dout(5) << " ignoring peer mon." << m
->get_source().num()
1358 << " has features " << std::hex
1359 << m
->get_connection()->get_features()
1360 << " but we require " << required_features
<< std::dec
<< dendl
;
1364 // make up a unique cookie. include election epoch (which persists
1365 // across restarts for the whole cluster) and a counter for this
1366 // process instance. there is no need to be unique *across*
1367 // monitors, though.
1368 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1369 assert(sync_providers
.count(cookie
) == 0);
1371 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1373 SyncProvider
& sp
= sync_providers
[cookie
];
1375 sp
.entity
= m
->get_source_inst();
1376 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1378 set
<string
> sync_targets
;
1379 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1381 sync_targets
= get_sync_targets_names();
1382 sp
.last_committed
= paxos
->get_version();
1383 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1385 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1387 // just catch up paxos
1388 sp
.last_committed
= m
->last_committed
;
1390 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1392 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1393 reply
->last_committed
= sp
.last_committed
;
1394 m
->get_connection()->send_message(reply
);
1397 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1399 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1400 dout(10) << __func__
<< " " << *m
<< dendl
;
1402 if (sync_providers
.count(m
->cookie
) == 0) {
1403 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1404 _sync_reply_no_cookie(op
);
1408 assert(g_conf
->mon_sync_provider_kill_at
!= 2);
1410 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1411 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1413 if (sp
.last_committed
< paxos
->get_first_committed() &&
1414 paxos
->get_first_committed() > 1) {
1415 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1416 << " < our fc " << paxos
->get_first_committed() << dendl
;
1417 sync_providers
.erase(m
->cookie
);
1418 _sync_reply_no_cookie(op
);
1422 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1423 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1425 int left
= g_conf
->mon_sync_max_payload_size
;
1426 while (sp
.last_committed
< paxos
->get_version() && left
> 0) {
1428 sp
.last_committed
++;
1430 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1433 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1434 left
-= bl
.length();
1435 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1438 reply
->last_committed
= sp
.last_committed
;
1440 if (sp
.full
&& left
> 0) {
1441 sp
.synchronizer
->get_chunk_tx(tx
, left
);
1442 sp
.last_key
= sp
.synchronizer
->get_last_key();
1443 reply
->last_key
= sp
.last_key
;
1446 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1447 sp
.last_committed
< paxos
->get_version()) {
1448 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1449 << " key " << sp
.last_key
<< dendl
;
1451 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1452 << " key " << sp
.last_key
<< dendl
;
1453 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1455 assert(g_conf
->mon_sync_provider_kill_at
!= 3);
1457 // clean up our local state
1458 sync_providers
.erase(sp
.cookie
);
1461 ::encode(*tx
, reply
->chunk_bl
);
1463 m
->get_connection()->send_message(reply
);
1468 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1470 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1471 dout(10) << __func__
<< " " << *m
<< dendl
;
1473 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1476 if (m
->get_source_inst() != sync_provider
) {
1477 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1480 sync_cookie
= m
->cookie
;
1481 sync_start_version
= m
->last_committed
;
1483 sync_reset_timeout();
1484 sync_get_next_chunk();
1486 assert(g_conf
->mon_sync_requester_kill_at
!= 3);
1489 void Monitor::sync_get_next_chunk()
1491 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1492 if (g_conf
->mon_inject_sync_get_chunk_delay
> 0) {
1493 dout(20) << __func__
<< " injecting delay of " << g_conf
->mon_inject_sync_get_chunk_delay
<< dendl
;
1494 usleep((long long)(g_conf
->mon_inject_sync_get_chunk_delay
* 1000000.0));
1496 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1497 messenger
->send_message(r
, sync_provider
);
1499 assert(g_conf
->mon_sync_requester_kill_at
!= 4);
1502 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1504 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1505 dout(10) << __func__
<< " " << *m
<< dendl
;
1507 if (m
->cookie
!= sync_cookie
) {
1508 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1511 if (m
->get_source_inst() != sync_provider
) {
1512 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1516 assert(state
== STATE_SYNCHRONIZING
);
1517 assert(g_conf
->mon_sync_requester_kill_at
!= 5);
1519 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1520 tx
->append_from_encoded(m
->chunk_bl
);
1522 dout(30) << __func__
<< " tx dump:\n";
1523 JSONFormatter
f(true);
1528 store
->apply_transaction(tx
);
1530 assert(g_conf
->mon_sync_requester_kill_at
!= 6);
1533 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1534 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1535 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1537 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1539 dout(30) << __func__
<< " tx dump:\n";
1540 JSONFormatter
f(true);
1545 store
->apply_transaction(tx
);
1546 paxos
->init(); // to refresh what we just wrote
1549 if (m
->op
== MMonSync::OP_CHUNK
) {
1550 sync_reset_timeout();
1551 sync_get_next_chunk();
1552 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1553 sync_finish(m
->last_committed
);
1557 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1559 dout(10) << __func__
<< dendl
;
1563 void Monitor::sync_trim_providers()
1565 dout(20) << __func__
<< dendl
;
1567 utime_t now
= ceph_clock_now();
1568 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1569 while (p
!= sync_providers
.end()) {
1570 if (now
> p
->second
.timeout
) {
1571 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
<< " for " << p
->second
.entity
<< dendl
;
1572 sync_providers
.erase(p
++);
1579 // ---------------------------------------------------
1582 void Monitor::cancel_probe_timeout()
1584 if (probe_timeout_event
) {
1585 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1586 timer
.cancel_event(probe_timeout_event
);
1587 probe_timeout_event
= NULL
;
1589 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1593 void Monitor::reset_probe_timeout()
1595 cancel_probe_timeout();
1596 probe_timeout_event
= new C_MonContext(this, [this](int r
) {
1599 double t
= g_conf
->mon_probe_timeout
;
1600 if (timer
.add_event_after(t
, probe_timeout_event
)) {
1601 dout(10) << "reset_probe_timeout " << probe_timeout_event
1602 << " after " << t
<< " seconds" << dendl
;
1604 probe_timeout_event
= nullptr;
1608 void Monitor::probe_timeout(int r
)
1610 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1611 assert(is_probing() || is_synchronizing());
1612 assert(probe_timeout_event
);
1613 probe_timeout_event
= NULL
;
1617 void Monitor::handle_probe(MonOpRequestRef op
)
1619 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1620 dout(10) << "handle_probe " << *m
<< dendl
;
1622 if (m
->fsid
!= monmap
->fsid
) {
1623 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1628 case MMonProbe::OP_PROBE
:
1629 handle_probe_probe(op
);
1632 case MMonProbe::OP_REPLY
:
1633 handle_probe_reply(op
);
1636 case MMonProbe::OP_MISSING_FEATURES
:
1637 derr
<< __func__
<< " missing features, have " << CEPH_FEATURES_ALL
1638 << ", required " << m
->required_features
1639 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
)
1645 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1647 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1649 dout(10) << "handle_probe_probe " << m
->get_source_inst() << *m
1650 << " features " << m
->get_connection()->get_features() << dendl
;
1651 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1653 dout(1) << " peer " << m
->get_source_addr() << " missing features "
1654 << missing
<< dendl
;
1655 if (m
->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY
)) {
1656 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1657 name
, has_ever_joined
);
1658 m
->required_features
= required_features
;
1659 m
->get_connection()->send_message(r
);
1664 if (!is_probing() && !is_synchronizing()) {
1665 // If the probing mon is way ahead of us, we need to re-bootstrap.
1666 // Normally we capture this case when we initially bootstrap, but
1667 // it is possible we pass those checks (we overlap with
1668 // quorum-to-be) but fail to join a quorum before it moves past
1669 // us. We need to be kicked back to bootstrap so we can
1670 // synchonize, not keep calling elections.
1671 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1672 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1673 << "ahead of us, re-bootstrapping" << dendl
;
1681 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
);
1684 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1685 r
->paxos_first_version
= paxos
->get_first_committed();
1686 r
->paxos_last_version
= paxos
->get_version();
1687 m
->get_connection()->send_message(r
);
1689 // did we discover a peer here?
1690 if (!monmap
->contains(m
->get_source_addr())) {
1691 dout(1) << " adding peer " << m
->get_source_addr()
1692 << " to list of hints" << dendl
;
1693 extra_probe_peers
.insert(m
->get_source_addr());
1700 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1702 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1703 dout(10) << "handle_probe_reply " << m
->get_source_inst() << *m
<< dendl
;
1704 dout(10) << " monmap is " << *monmap
<< dendl
;
1706 // discover name and addrs during probing or electing states.
1707 if (!is_probing() && !is_electing()) {
1711 // newer map, or they've joined a quorum and we haven't?
1713 monmap
->encode(mybl
, m
->get_connection()->get_features());
1714 // make sure it's actually different; the checks below err toward
1715 // taking the other guy's map, which could cause us to loop.
1716 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1717 MonMap
*newmap
= new MonMap
;
1718 newmap
->decode(m
->monmap_bl
);
1719 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1720 !has_ever_joined
)) {
1721 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1722 << ", mine was " << monmap
->get_epoch() << dendl
;
1724 monmap
->decode(m
->monmap_bl
);
1733 string peer_name
= monmap
->get_name(m
->get_source_addr());
1734 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
1735 dout(10) << " renaming peer " << m
->get_source_addr() << " "
1736 << peer_name
<< " -> " << m
->name
<< " in my monmap"
1738 monmap
->rename(peer_name
, m
->name
);
1740 if (is_electing()) {
1745 dout(10) << " peer name is " << peer_name
<< dendl
;
1748 // new initial peer?
1749 if (monmap
->get_epoch() == 0 &&
1750 monmap
->contains(m
->name
) &&
1751 monmap
->get_addr(m
->name
).is_blank_ip()) {
1752 dout(1) << " learned initial mon " << m
->name
<< " addr " << m
->get_source_addr() << dendl
;
1753 monmap
->set_addr(m
->name
, m
->get_source_addr());
1759 // end discover phase
1760 if (!is_probing()) {
1764 assert(paxos
!= NULL
);
1766 if (is_synchronizing()) {
1767 dout(10) << " currently syncing" << dendl
;
1771 entity_inst_t other
= m
->get_source_inst();
1773 if (m
->paxos_last_version
< sync_last_committed_floor
) {
1774 dout(10) << " peer paxos versions [" << m
->paxos_first_version
1775 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
1776 << sync_last_committed_floor
<< ", ignoring"
1779 if (paxos
->get_version() < m
->paxos_first_version
&&
1780 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
1781 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
1782 << "," << m
->paxos_last_version
<< "]"
1783 << " vs my version " << paxos
->get_version()
1784 << " (too far ahead)"
1786 cancel_probe_timeout();
1787 sync_start(other
, true);
1790 if (paxos
->get_version() + g_conf
->paxos_max_join_drift
< m
->paxos_last_version
) {
1791 dout(10) << " peer paxos last version " << m
->paxos_last_version
1792 << " vs my version " << paxos
->get_version()
1793 << " (too far ahead)"
1795 cancel_probe_timeout();
1796 sync_start(other
, false);
1801 // is there an existing quorum?
1802 if (m
->quorum
.size()) {
1803 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
1805 dout(10) << " peer paxos version " << m
->paxos_last_version
1806 << " vs my version " << paxos
->get_version()
1810 if (monmap
->contains(name
) &&
1811 !monmap
->get_addr(name
).is_blank_ip()) {
1812 // i'm part of the cluster; just initiate a new election
1815 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl
;
1816 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
1817 monmap
->get_inst(*m
->quorum
.begin()));
1820 if (monmap
->contains(m
->name
)) {
1821 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
1822 outside_quorum
.insert(m
->name
);
1824 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
1828 unsigned need
= monmap
->size() / 2 + 1;
1829 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
1830 if (outside_quorum
.size() >= need
) {
1831 if (outside_quorum
.count(name
)) {
1832 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
1835 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
1838 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
1843 void Monitor::join_election()
1845 dout(10) << __func__
<< dendl
;
1846 wait_for_paxos_write();
1848 state
= STATE_ELECTING
;
1850 logger
->inc(l_mon_num_elections
);
1853 void Monitor::start_election()
1855 dout(10) << "start_election" << dendl
;
1856 wait_for_paxos_write();
1858 state
= STATE_ELECTING
;
1860 logger
->inc(l_mon_num_elections
);
1861 logger
->inc(l_mon_election_call
);
1863 clog
->info() << "mon." << name
<< " calling new monitor election";
1864 elector
.call_election();
1867 void Monitor::win_standalone_election()
1869 dout(1) << "win_standalone_election" << dendl
;
1871 // bump election epoch, in case the previous epoch included other
1872 // monitors; we need to be able to make the distinction.
1874 elector
.advance_epoch();
1876 rank
= monmap
->get_rank(name
);
1881 map
<int,Metadata
> metadata
;
1882 collect_metadata(&metadata
[0]);
1884 win_election(elector
.get_epoch(), q
,
1886 ceph::features::mon::get_supported(),
1890 const utime_t
& Monitor::get_leader_since() const
1892 assert(state
== STATE_LEADER
);
1893 return leader_since
;
1896 epoch_t
Monitor::get_epoch()
1898 return elector
.get_epoch();
1901 void Monitor::_finish_svc_election()
1903 assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
1905 for (auto p
: paxos_service
) {
1906 // we already called election_finished() on monmon(); avoid callig twice
1907 if (state
== STATE_LEADER
&& p
== monmon())
1909 p
->election_finished();
1913 void Monitor::win_election(epoch_t epoch
, set
<int>& active
, uint64_t features
,
1914 const mon_feature_t
& mon_features
,
1915 const map
<int,Metadata
>& metadata
)
1917 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
1918 << " features " << features
1919 << " mon_features " << mon_features
1921 assert(is_electing());
1922 state
= STATE_LEADER
;
1923 leader_since
= ceph_clock_now();
1926 quorum_con_features
= features
;
1927 quorum_mon_features
= mon_features
;
1928 pending_metadata
= metadata
;
1929 outside_quorum
.clear();
1931 clog
->info() << "mon." << name
<< "@" << rank
1932 << " won leader election with quorum " << quorum
;
1934 set_leader_commands(get_local_commands(mon_features
));
1936 paxos
->leader_init();
1937 // NOTE: tell monmap monitor first. This is important for the
1938 // bootstrap case to ensure that the very first paxos proposal
1939 // codifies the monmap. Otherwise any manner of chaos can ensue
1940 // when monitors are call elections or participating in a paxos
1941 // round without agreeing on who the participants are.
1942 monmon()->election_finished();
1943 _finish_svc_election();
1944 health_monitor
->start(epoch
);
1946 logger
->inc(l_mon_election_win
);
1948 // inject new metadata in first transaction.
1950 // include previous metadata for missing mons (that aren't part of
1951 // the current quorum).
1952 map
<int,Metadata
> m
= metadata
;
1953 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
1954 if (m
.count(rank
) == 0 &&
1955 mon_metadata
.count(rank
)) {
1956 m
[rank
] = mon_metadata
[rank
];
1960 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1961 // a new transaction immediately after the election finishes. We should
1962 // do that anyway for other reasons, though.
1963 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
1966 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
1970 if (monmap
->size() > 1 &&
1971 monmap
->get_epoch() > 0) {
1973 health_tick_start();
1974 do_health_to_clog_interval();
1975 scrub_event_start();
1979 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
1981 const mon_feature_t
& mon_features
)
1984 leader_since
= utime_t();
1987 outside_quorum
.clear();
1988 quorum_con_features
= features
;
1989 quorum_mon_features
= mon_features
;
1990 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
1991 << " quorum is " << quorum
<< " features are " << quorum_con_features
1992 << " mon_features are " << quorum_mon_features
1996 _finish_svc_election();
1997 health_monitor
->start(epoch
);
1999 logger
->inc(l_mon_election_lose
);
2003 if ((quorum_con_features
& CEPH_FEATURE_MON_METADATA
) &&
2004 !HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
)) {
2005 // for pre-luminous mons only
2007 collect_metadata(&sys_info
);
2008 messenger
->send_message(new MMonMetadata(sys_info
),
2009 monmap
->get_inst(get_leader()));
2013 void Monitor::collect_metadata(Metadata
*m
)
2015 collect_sys_info(m
, g_ceph_context
);
2016 (*m
)["addr"] = stringify(messenger
->get_myaddr());
2019 void Monitor::finish_election()
2021 apply_quorum_to_compatset_features();
2022 apply_monmap_to_compatset_features();
2024 exited_quorum
= utime_t();
2025 finish_contexts(g_ceph_context
, waitfor_quorum
);
2026 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2027 resend_routed_requests();
2029 register_cluster_logger();
2031 // am i named properly?
2032 string cur_name
= monmap
->get_name(messenger
->get_myaddr());
2033 if (cur_name
!= name
) {
2034 dout(10) << " renaming myself from " << cur_name
<< " -> " << name
<< dendl
;
2035 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
2036 monmap
->get_inst(*quorum
.begin()));
2040 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2042 if (new_features
.compare(features
) != 0) {
2043 CompatSet diff
= features
.unsupported(new_features
);
2044 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2045 features
= new_features
;
2047 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2049 store
->apply_transaction(t
);
2051 calc_quorum_requirements();
2055 void Monitor::apply_quorum_to_compatset_features()
2057 CompatSet
new_features(features
);
2058 if (quorum_con_features
& CEPH_FEATURE_OSD_ERASURE_CODES
) {
2059 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2061 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2062 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2064 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
) {
2065 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2067 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
) {
2068 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2070 dout(5) << __func__
<< dendl
;
2071 _apply_compatset_features(new_features
);
2074 void Monitor::apply_monmap_to_compatset_features()
2076 CompatSet
new_features(features
);
2077 mon_feature_t monmap_features
= monmap
->get_required_features();
2079 /* persistent monmap features may go into the compatset.
2080 * optional monmap features may not - why?
2081 * because optional monmap features may be set/unset by the admin,
2082 * and possibly by other means that haven't yet been thought out,
2083 * so we can't make the monitor enforce them on start - because they
2085 * this, of course, does not invalidate setting a compatset feature
2086 * for an optional feature - as long as you make sure to clean it up
2087 * once you unset it.
2089 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2090 assert(ceph::features::mon::get_persistent().contains_all(
2091 ceph::features::mon::FEATURE_KRAKEN
));
2092 // this feature should only ever be set if the quorum supports it.
2093 assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2094 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2096 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
2097 assert(ceph::features::mon::get_persistent().contains_all(
2098 ceph::features::mon::FEATURE_LUMINOUS
));
2099 // this feature should only ever be set if the quorum supports it.
2100 assert(HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
));
2101 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
);
2104 dout(5) << __func__
<< dendl
;
2105 _apply_compatset_features(new_features
);
2108 void Monitor::calc_quorum_requirements()
2110 required_features
= 0;
2113 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
)) {
2114 required_features
|= CEPH_FEATURE_OSD_ERASURE_CODES
;
2116 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2117 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2119 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
)) {
2120 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
;
2122 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
)) {
2123 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
;
2125 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2126 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2128 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS
)) {
2129 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2133 if (monmap
->get_required_features().contains_all(
2134 ceph::features::mon::FEATURE_KRAKEN
)) {
2135 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2137 if (monmap
->get_required_features().contains_all(
2138 ceph::features::mon::FEATURE_LUMINOUS
)) {
2139 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2141 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2144 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2146 *fm
+= session_map
.feature_map
;
2147 for (auto id
: quorum
) {
2149 *fm
+= quorum_feature_map
[id
];
2154 void Monitor::sync_force(Formatter
*f
, ostream
& ss
)
2156 bool free_formatter
= false;
2159 // louzy/lazy hack: default to json if no formatter has been defined
2160 f
= new JSONFormatter();
2161 free_formatter
= true;
2164 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2165 sync_stash_critical_state(tx
);
2166 tx
->put("mon_sync", "force_sync", 1);
2167 store
->apply_transaction(tx
);
2169 f
->open_object_section("sync_force");
2170 f
->dump_int("ret", 0);
2171 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2172 f
->close_section(); // sync_force
2178 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2180 bool free_formatter
= false;
2183 // louzy/lazy hack: default to json if no formatter has been defined
2184 f
= new JSONFormatter();
2185 free_formatter
= true;
2187 f
->open_object_section("quorum_status");
2188 f
->dump_int("election_epoch", get_epoch());
2190 f
->open_array_section("quorum");
2191 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2192 f
->dump_int("mon", *p
);
2193 f
->close_section(); // quorum
2195 list
<string
> quorum_names
= get_quorum_names();
2196 f
->open_array_section("quorum_names");
2197 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2198 f
->dump_string("mon", *p
);
2199 f
->close_section(); // quorum_names
2201 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(*quorum
.begin()));
2203 f
->open_object_section("monmap");
2205 f
->close_section(); // monmap
2207 f
->close_section(); // quorum_status
2213 void Monitor::get_mon_status(Formatter
*f
, ostream
& ss
)
2215 bool free_formatter
= false;
2218 // louzy/lazy hack: default to json if no formatter has been defined
2219 f
= new JSONFormatter();
2220 free_formatter
= true;
2223 f
->open_object_section("mon_status");
2224 f
->dump_string("name", name
);
2225 f
->dump_int("rank", rank
);
2226 f
->dump_string("state", get_state_name());
2227 f
->dump_int("election_epoch", get_epoch());
2229 f
->open_array_section("quorum");
2230 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2231 f
->dump_int("mon", *p
);
2234 f
->close_section(); // quorum
2236 f
->open_object_section("features");
2237 f
->dump_stream("required_con") << required_features
;
2238 mon_feature_t req_mon_features
= get_required_mon_features();
2239 req_mon_features
.dump(f
, "required_mon");
2240 f
->dump_stream("quorum_con") << quorum_con_features
;
2241 quorum_mon_features
.dump(f
, "quorum_mon");
2242 f
->close_section(); // features
2244 f
->open_array_section("outside_quorum");
2245 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2246 f
->dump_string("mon", *p
);
2247 f
->close_section(); // outside_quorum
2249 f
->open_array_section("extra_probe_peers");
2250 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
2251 p
!= extra_probe_peers
.end();
2253 f
->dump_stream("peer") << *p
;
2254 f
->close_section(); // extra_probe_peers
2256 f
->open_array_section("sync_provider");
2257 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2258 p
!= sync_providers
.end();
2260 f
->dump_unsigned("cookie", p
->second
.cookie
);
2261 f
->dump_stream("entity") << p
->second
.entity
;
2262 f
->dump_stream("timeout") << p
->second
.timeout
;
2263 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2264 f
->dump_stream("last_key") << p
->second
.last_key
;
2268 if (is_synchronizing()) {
2269 f
->open_object_section("sync");
2270 f
->dump_stream("sync_provider") << sync_provider
;
2271 f
->dump_unsigned("sync_cookie", sync_cookie
);
2272 f
->dump_unsigned("sync_start_version", sync_start_version
);
2276 if (g_conf
->mon_sync_provider_kill_at
> 0)
2277 f
->dump_int("provider_kill_at", g_conf
->mon_sync_provider_kill_at
);
2278 if (g_conf
->mon_sync_requester_kill_at
> 0)
2279 f
->dump_int("requester_kill_at", g_conf
->mon_sync_requester_kill_at
);
2281 f
->open_object_section("monmap");
2285 f
->dump_object("feature_map", session_map
.feature_map
);
2286 f
->close_section(); // mon_status
2288 if (free_formatter
) {
2289 // flush formatter to ss and delete it iff we created the formatter
2296 // health status to clog
2298 void Monitor::health_tick_start()
2300 if (!cct
->_conf
->mon_health_to_clog
||
2301 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2304 dout(15) << __func__
<< dendl
;
2307 health_tick_event
= timer
.add_event_after(
2308 cct
->_conf
->mon_health_to_clog_tick_interval
,
2309 new C_MonContext(this, [this](int r
) {
2312 do_health_to_clog();
2313 health_tick_start();
2317 void Monitor::health_tick_stop()
2319 dout(15) << __func__
<< dendl
;
2321 if (health_tick_event
) {
2322 timer
.cancel_event(health_tick_event
);
2323 health_tick_event
= NULL
;
2327 utime_t
Monitor::health_interval_calc_next_update()
2329 utime_t now
= ceph_clock_now();
2331 time_t secs
= now
.sec();
2332 int remainder
= secs
% cct
->_conf
->mon_health_to_clog_interval
;
2333 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2334 utime_t next
= utime_t(secs
+ adjustment
, 0);
2336 dout(20) << __func__
2337 << " now: " << now
<< ","
2338 << " next: " << next
<< ","
2339 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2345 void Monitor::health_interval_start()
2347 dout(15) << __func__
<< dendl
;
2349 if (!cct
->_conf
->mon_health_to_clog
||
2350 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2354 health_interval_stop();
2355 utime_t next
= health_interval_calc_next_update();
2356 health_interval_event
= new C_MonContext(this, [this](int r
) {
2359 do_health_to_clog_interval();
2361 if (!timer
.add_event_at(next
, health_interval_event
)) {
2362 health_interval_event
= nullptr;
2366 void Monitor::health_interval_stop()
2368 dout(15) << __func__
<< dendl
;
2369 if (health_interval_event
) {
2370 timer
.cancel_event(health_interval_event
);
2372 health_interval_event
= NULL
;
2375 void Monitor::health_events_cleanup()
2378 health_interval_stop();
2379 health_status_cache
.reset();
2382 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2384 dout(20) << __func__
<< dendl
;
2386 if (changed
.count("mon_health_to_clog")) {
2387 if (!cct
->_conf
->mon_health_to_clog
) {
2388 health_events_cleanup();
2390 if (!health_tick_event
) {
2391 health_tick_start();
2393 if (!health_interval_event
) {
2394 health_interval_start();
2399 if (changed
.count("mon_health_to_clog_interval")) {
2400 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2401 health_interval_stop();
2403 health_interval_start();
2407 if (changed
.count("mon_health_to_clog_tick_interval")) {
2408 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2411 health_tick_start();
2416 void Monitor::do_health_to_clog_interval()
2418 // outputting to clog may have been disabled in the conf
2419 // since we were scheduled.
2420 if (!cct
->_conf
->mon_health_to_clog
||
2421 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2424 dout(10) << __func__
<< dendl
;
2426 // do we have a cached value for next_clog_update? if not,
2427 // do we know when the last update was?
2429 do_health_to_clog(true);
2430 health_interval_start();
2433 void Monitor::do_health_to_clog(bool force
)
2435 // outputting to clog may have been disabled in the conf
2436 // since we were scheduled.
2437 if (!cct
->_conf
->mon_health_to_clog
||
2438 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2441 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2443 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2445 health_status_t level
= get_health_status(false, nullptr, &summary
);
2447 summary
== health_status_cache
.summary
&&
2448 level
== health_status_cache
.overall
)
2450 clog
->health(level
) << "overall " << summary
;
2451 health_status_cache
.summary
= summary
;
2452 health_status_cache
.overall
= level
;
2455 list
<string
> status
;
2456 health_status_t overall
= get_health(status
, NULL
, NULL
);
2457 dout(25) << __func__
2458 << (force
? " (force)" : "")
2461 string summary
= joinify(status
.begin(), status
.end(), string("; "));
2464 overall
== health_status_cache
.overall
&&
2465 !health_status_cache
.summary
.empty() &&
2466 health_status_cache
.summary
== summary
) {
2471 clog
->info() << summary
;
2473 health_status_cache
.overall
= overall
;
2474 health_status_cache
.summary
= summary
;
2478 health_status_t
Monitor::get_health_status(
2485 health_status_t r
= HEALTH_OK
;
2486 bool compat
= g_conf
->mon_health_preluminous_compat
;
2487 bool compat_warn
= g_conf
->get_val
<bool>("mon_health_preluminous_compat_warning");
2489 f
->open_object_section("health");
2490 f
->open_object_section("checks");
2494 string
*psummary
= f
? nullptr : &summary
;
2495 for (auto& svc
: paxos_service
) {
2496 r
= std::min(r
, svc
->get_health_checks().dump_summary(
2497 f
, psummary
, sep2
, want_detail
));
2502 f
->dump_stream("status") << r
;
2504 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2505 *plain
= stringify(r
);
2506 if (summary
.size()) {
2513 const std::string old_fields_message
= "'ceph health' JSON format has "
2514 "changed in luminous. If you see this your monitoring system is "
2515 "scraping the wrong fields. Disable this with 'mon health preluminous "
2516 "compat warning = false'";
2518 if (f
&& (compat
|| compat_warn
)) {
2519 health_status_t cr
= compat_warn
? min(HEALTH_WARN
, r
) : r
;
2520 f
->open_array_section("summary");
2522 f
->open_object_section("item");
2523 f
->dump_stream("severity") << HEALTH_WARN
;
2524 f
->dump_string("summary", old_fields_message
);
2528 for (auto& svc
: paxos_service
) {
2529 svc
->get_health_checks().dump_summary_compat(f
);
2533 f
->dump_stream("overall_status") << cr
;
2537 if (f
&& (compat
|| compat_warn
)) {
2538 f
->open_array_section("detail");
2540 f
->dump_string("item", old_fields_message
);
2544 for (auto& svc
: paxos_service
) {
2545 svc
->get_health_checks().dump_detail(f
, plain
, compat
);
2548 if (f
&& (compat
|| compat_warn
)) {
2558 void Monitor::log_health(
2559 const health_check_map_t
& updated
,
2560 const health_check_map_t
& previous
,
2561 MonitorDBStore::TransactionRef t
)
2563 if (!g_conf
->mon_health_to_clog
) {
2567 const utime_t now
= ceph_clock_now();
2569 // FIXME: log atomically as part of @t instead of using clog.
2570 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2571 << " previous " << previous
.checks
.size()
2573 const auto min_log_period
= g_conf
->get_val
<int64_t>(
2574 "mon_health_log_update_period");
2575 for (auto& p
: updated
.checks
) {
2576 auto q
= previous
.checks
.find(p
.first
);
2577 bool logged
= false;
2578 if (q
== previous
.checks
.end()) {
2581 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2583 clog
->health(p
.second
.severity
) << ss
.str();
2587 if (p
.second
.summary
!= q
->second
.summary
||
2588 p
.second
.severity
!= q
->second
.severity
) {
2590 auto status_iter
= health_check_log_times
.find(p
.first
);
2591 if (status_iter
!= health_check_log_times
.end()) {
2592 if (p
.second
.severity
== q
->second
.severity
&&
2593 now
- status_iter
->second
.updated_at
< min_log_period
) {
2594 // We already logged this recently and the severity is unchanged,
2595 // so skip emitting an update of the summary string.
2596 // We'll get an update out of tick() later if the check
2597 // is still failing.
2602 // summary or severity changed (ignore detail changes at this level)
2604 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2605 clog
->health(p
.second
.severity
) << ss
.str();
2610 // Record the time at which we last logged, so that we can check this
2611 // when considering whether/when to print update messages.
2613 auto iter
= health_check_log_times
.find(p
.first
);
2614 if (iter
== health_check_log_times
.end()) {
2615 health_check_log_times
.emplace(p
.first
, HealthCheckLogStatus(
2616 p
.second
.severity
, p
.second
.summary
, now
));
2618 iter
->second
= HealthCheckLogStatus(
2619 p
.second
.severity
, p
.second
.summary
, now
);
2623 for (auto& p
: previous
.checks
) {
2624 if (!updated
.checks
.count(p
.first
)) {
2627 if (p
.first
== "DEGRADED_OBJECTS") {
2628 clog
->info() << "All degraded objects recovered";
2629 } else if (p
.first
== "OSD_FLAGS") {
2630 clog
->info() << "OSD flags cleared";
2632 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2633 << p
.second
.summary
<< ")";
2636 if (health_check_log_times
.count(p
.first
)) {
2637 health_check_log_times
.erase(p
.first
);
2642 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2643 // We might be going into a fully healthy state, check
2645 bool any_checks
= false;
2646 for (auto& svc
: paxos_service
) {
2647 if (&(svc
->get_health_checks()) == &(previous
)) {
2648 // Ignore the ones we're clearing right now
2652 if (svc
->get_health_checks().checks
.size() > 0) {
2658 clog
->info() << "Cluster is now healthy";
2663 health_status_t
Monitor::get_health(list
<string
>& status
,
2664 bufferlist
*detailbl
,
2667 list
<pair
<health_status_t
,string
> > summary
;
2668 list
<pair
<health_status_t
,string
> > detail
;
2671 f
->open_object_section("health");
2673 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin();
2674 p
!= paxos_service
.end();
2676 PaxosService
*s
= *p
;
2677 s
->get_health(summary
, detailbl
? &detail
: NULL
, cct
);
2680 health_monitor
->get_health(summary
, (detailbl
? &detail
: NULL
));
2682 health_status_t overall
= HEALTH_OK
;
2683 if (!timecheck_skews
.empty()) {
2685 for (map
<entity_inst_t
,double>::iterator i
= timecheck_skews
.begin();
2686 i
!= timecheck_skews
.end(); ++i
) {
2687 entity_inst_t inst
= i
->first
;
2688 double skew
= i
->second
;
2689 double latency
= timecheck_latencies
[inst
];
2690 string name
= monmap
->get_name(inst
.addr
);
2692 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
2693 if (tcstatus
!= HEALTH_OK
) {
2694 if (overall
> tcstatus
)
2696 warns
.push_back(name
);
2697 ostringstream tmp_ss
;
2698 tmp_ss
<< "mon." << name
2699 << " addr " << inst
.addr
<< " " << tcss
.str()
2700 << " (latency " << latency
<< "s)";
2701 detail
.push_back(make_pair(tcstatus
, tmp_ss
.str()));
2704 if (!warns
.empty()) {
2706 ss
<< "clock skew detected on";
2707 while (!warns
.empty()) {
2708 ss
<< " mon." << warns
.front();
2713 status
.push_back(ss
.str());
2714 summary
.push_back(make_pair(HEALTH_WARN
, "Monitor clock skew detected "));
2719 f
->open_array_section("summary");
2720 if (!summary
.empty()) {
2721 while (!summary
.empty()) {
2722 if (overall
> summary
.front().first
)
2723 overall
= summary
.front().first
;
2724 status
.push_back(summary
.front().second
);
2726 f
->open_object_section("item");
2727 f
->dump_stream("severity") << summary
.front().first
;
2728 f
->dump_string("summary", summary
.front().second
);
2731 summary
.pop_front();
2739 status
.push_front(fss
.str());
2741 f
->dump_stream("overall_status") << overall
;
2744 f
->open_array_section("detail");
2745 while (!detail
.empty()) {
2747 f
->dump_string("item", detail
.front().second
);
2748 else if (detailbl
!= NULL
) {
2749 detailbl
->append(detail
.front().second
);
2750 detailbl
->append('\n');
2763 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
)
2766 f
->open_object_section("status");
2769 f
->dump_stream("fsid") << monmap
->get_fsid();
2770 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2771 get_health_status(false, f
, nullptr);
2773 list
<string
> health_str
;
2774 get_health(health_str
, nullptr, f
);
2776 f
->dump_unsigned("election_epoch", get_epoch());
2778 f
->open_array_section("quorum");
2779 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2780 f
->dump_int("rank", *p
);
2782 f
->open_array_section("quorum_names");
2783 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2784 f
->dump_string("id", monmap
->get_name(*p
));
2787 f
->open_object_section("monmap");
2790 f
->open_object_section("osdmap");
2791 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
2793 f
->open_object_section("pgmap");
2794 pgservice
->print_summary(f
, NULL
);
2796 f
->open_object_section("fsmap");
2797 mdsmon()->get_fsmap().print_summary(f
, NULL
);
2799 f
->open_object_section("mgrmap");
2800 mgrmon()->get_map().print_summary(f
, nullptr);
2803 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
2806 ss
<< " cluster:\n";
2807 ss
<< " id: " << monmap
->get_fsid() << "\n";
2810 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2811 get_health_status(false, nullptr, &health
,
2815 get_health(ls
, NULL
, f
);
2816 health
= joinify(ls
.begin(), ls
.end(),
2819 ss
<< " health: " << health
<< "\n";
2821 ss
<< "\n \n services:\n";
2824 auto& service_map
= mgrstatmon()->get_service_map();
2825 for (auto& p
: service_map
.services
) {
2826 maxlen
= std::max(maxlen
, p
.first
.size());
2828 string
spacing(maxlen
- 3, ' ');
2829 const auto quorum_names
= get_quorum_names();
2830 const auto mon_count
= monmap
->mon_info
.size();
2831 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
2833 if (quorum_names
.size() != mon_count
) {
2834 std::list
<std::string
> out_of_q
;
2835 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
2836 if (quorum
.count(i
) == 0) {
2837 out_of_q
.push_back(monmap
->ranks
[i
]);
2840 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
2841 out_of_q
.end(), std::string(", "));
2844 if (mgrmon()->in_use()) {
2845 ss
<< " mgr: " << spacing
;
2846 mgrmon()->get_map().print_summary(nullptr, &ss
);
2849 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2850 ss
<< " mds: " << spacing
<< mdsmon()->get_fsmap() << "\n";
2852 ss
<< " osd: " << spacing
;
2853 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
2855 for (auto& p
: service_map
.services
) {
2856 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
2857 << p
.second
.get_summary() << "\n";
2861 ss
<< "\n \n data:\n";
2862 pgservice
->print_summary(NULL
, &ss
);
2867 void Monitor::_generate_command_map(map
<string
,cmd_vartype
>& cmdmap
,
2868 map
<string
,string
> ¶m_str_map
)
2870 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
2871 p
!= cmdmap
.end(); ++p
) {
2872 if (p
->first
== "prefix")
2874 if (p
->first
== "caps") {
2876 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
2877 cv
.size() % 2 == 0) {
2878 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
2879 string k
= string("caps_") + cv
[i
];
2880 param_str_map
[k
] = cv
[i
+ 1];
2885 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
2889 const MonCommand
*Monitor::_get_moncommand(
2890 const string
&cmd_prefix
,
2891 const vector
<MonCommand
>& cmds
)
2893 for (auto& c
: cmds
) {
2894 if (c
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
2901 bool Monitor::_allowed_command(MonSession
*s
, string
&module
, string
&prefix
,
2902 const map
<string
,cmd_vartype
>& cmdmap
,
2903 const map
<string
,string
>& param_str_map
,
2904 const MonCommand
*this_cmd
) {
2906 bool cmd_r
= this_cmd
->requires_perm('r');
2907 bool cmd_w
= this_cmd
->requires_perm('w');
2908 bool cmd_x
= this_cmd
->requires_perm('x');
2910 bool capable
= s
->caps
.is_capable(
2912 CEPH_ENTITY_TYPE_MON
,
2914 module
, prefix
, param_str_map
,
2915 cmd_r
, cmd_w
, cmd_x
);
2917 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
2921 void Monitor::format_command_descriptions(const std::vector
<MonCommand
> &commands
,
2927 f
->open_object_section("command_descriptions");
2928 for (const auto &cmd
: commands
) {
2929 unsigned flags
= cmd
.flags
;
2930 if (hide_mgr_flag
) {
2931 flags
&= ~MonCommand::FLAG_MGR
;
2933 ostringstream secname
;
2934 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
2935 dump_cmddesc_to_json(f
, secname
.str(),
2936 cmd
.cmdstring
, cmd
.helpstring
, cmd
.module
,
2937 cmd
.req_perms
, cmd
.availability
, flags
);
2940 f
->close_section(); // command_descriptions
2945 bool Monitor::is_keyring_required()
2947 string auth_cluster_required
= g_conf
->auth_supported
.empty() ?
2948 g_conf
->auth_cluster_required
: g_conf
->auth_supported
;
2949 string auth_service_required
= g_conf
->auth_supported
.empty() ?
2950 g_conf
->auth_service_required
: g_conf
->auth_supported
;
2952 return auth_service_required
== "cephx" ||
2953 auth_cluster_required
== "cephx";
2956 struct C_MgrProxyCommand
: public Context
{
2962 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
2963 : mon(mon
), op(op
), size(s
) { }
2964 void finish(int r
) {
2965 Mutex::Locker
l(mon
->lock
);
2966 mon
->mgr_proxy_bytes
-= size
;
2967 mon
->reply_command(op
, r
, outs
, outbl
, 0);
2971 void Monitor::handle_command(MonOpRequestRef op
)
2973 assert(op
->is_type_command());
2974 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
2975 if (m
->fsid
!= monmap
->fsid
) {
2976 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
2977 reply_command(op
, -EPERM
, "wrong fsid", 0);
2981 MonSession
*session
= static_cast<MonSession
*>(
2982 m
->get_connection()->get_priv());
2984 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
2987 BOOST_SCOPE_EXIT_ALL(=) {
2991 if (m
->cmd
.empty()) {
2992 string rs
= "No command supplied";
2993 reply_command(op
, -EINVAL
, rs
, 0);
2998 vector
<string
> fullcmd
;
2999 map
<string
, cmd_vartype
> cmdmap
;
3000 stringstream ss
, ds
;
3004 rs
= "unrecognized command";
3006 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
3007 // ss has reason for failure
3010 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3011 reply_command(op
, r
, rs
, 0);
3015 // check return value. If no prefix parameter provided,
3016 // return value will be false, then return error info.
3017 if (!cmd_getval(g_ceph_context
, cmdmap
, "prefix", prefix
)) {
3018 reply_command(op
, -EINVAL
, "command prefix not found", 0);
3022 // check prefix is empty
3023 if (prefix
.empty()) {
3024 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
3028 if (prefix
== "get_command_descriptions") {
3030 Formatter
*f
= Formatter::create("json");
3031 // hide mgr commands until luminous upgrade is complete
3032 bool hide_mgr_flag
=
3033 osdmon()->osdmap
.require_osd_release
< CEPH_RELEASE_LUMINOUS
;
3035 std::vector
<MonCommand
> commands
;
3037 // only include mgr commands once all mons are upgrade (and we've dropped
3038 // the hard-coded PGMonitor commands)
3039 if (quorum_mon_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
3040 commands
= static_cast<MgrMonitor
*>(
3041 paxos_service
[PAXOS_MGR
])->get_command_descs();
3044 for (auto& c
: leader_mon_commands
) {
3045 commands
.push_back(c
);
3048 format_command_descriptions(commands
, f
, &rdata
, hide_mgr_flag
);
3050 reply_command(op
, 0, "", rdata
, 0);
3057 dout(0) << "handle_command " << *m
<< dendl
;
3060 cmd_getval(g_ceph_context
, cmdmap
, "format", format
, string("plain"));
3061 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
3063 get_str_vec(prefix
, fullcmd
);
3065 // make sure fullcmd is not empty.
3066 // invalid prefix will cause empty vector fullcmd.
3067 // such as, prefix=";,,;"
3068 if (fullcmd
.empty()) {
3069 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3073 module
= fullcmd
[0];
3075 // validate command is in leader map
3077 const MonCommand
*leader_cmd
;
3078 const auto& mgr_cmds
= mgrmon()->get_command_descs();
3079 const MonCommand
*mgr_cmd
= nullptr;
3080 if (!mgr_cmds
.empty()) {
3081 mgr_cmd
= _get_moncommand(prefix
, mgr_cmds
);
3083 leader_cmd
= _get_moncommand(prefix
, leader_mon_commands
);
3085 leader_cmd
= mgr_cmd
;
3087 reply_command(op
, -EINVAL
, "command not known", 0);
3091 // validate command is in our map & matches, or forward if it is allowed
3092 const MonCommand
*mon_cmd
= _get_moncommand(
3094 get_local_commands(quorum_mon_features
));
3100 if (leader_cmd
->is_noforward()) {
3101 reply_command(op
, -EINVAL
,
3102 "command not locally supported and not allowed to forward",
3106 dout(10) << "Command not locally supported, forwarding request "
3108 forward_request_leader(op
);
3110 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3111 if (mon_cmd
->is_noforward()) {
3112 reply_command(op
, -EINVAL
,
3113 "command not compatible with leader and not allowed to forward",
3117 dout(10) << "Command not compatible with leader, forwarding request "
3119 forward_request_leader(op
);
3124 if (mon_cmd
->is_obsolete() ||
3125 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3126 && mon_cmd
->is_deprecated())) {
3127 reply_command(op
, -ENOTSUP
,
3128 "command is obsolete; please check usage and/or man page",
3133 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3134 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3135 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3139 /* what we perceive as being the service the command falls under */
3140 string
service(mon_cmd
->module
);
3142 dout(25) << __func__
<< " prefix='" << prefix
3143 << "' module='" << module
3144 << "' service='" << service
<< "'" << dendl
;
3147 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3149 // validate user's permissions for requested command
3150 map
<string
,string
> param_str_map
;
3151 _generate_command_map(cmdmap
, param_str_map
);
3152 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3153 param_str_map
, mon_cmd
)) {
3154 dout(1) << __func__
<< " access denied" << dendl
;
3155 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3156 << "from='" << session
->inst
<< "' "
3157 << "entity='" << session
->entity_name
<< "' "
3158 << "cmd=" << m
->cmd
<< ": access denied";
3159 reply_command(op
, -EACCES
, "access denied", 0);
3163 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3164 << "from='" << session
->inst
<< "' "
3165 << "entity='" << session
->entity_name
<< "' "
3166 << "cmd=" << m
->cmd
<< ": dispatch";
3168 if (mon_cmd
->is_mgr() &&
3169 osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3170 const auto& hdr
= m
->get_header();
3171 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3172 uint64_t max
= g_conf
->get_val
<uint64_t>("mon_client_bytes")
3173 * g_conf
->get_val
<double>("mon_mgr_proxy_client_bytes_ratio");
3174 if (mgr_proxy_bytes
+ size
> max
) {
3175 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3176 << " + " << size
<< " > max " << max
<< dendl
;
3177 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3180 mgr_proxy_bytes
+= size
;
3181 dout(10) << __func__
<< " proxying mgr command (+" << size
3182 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3183 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3184 mgr_client
.start_command(m
->cmd
,
3188 new C_OnFinisher(fin
, &finisher
));
3192 if ((module
== "mds" || module
== "fs") &&
3193 prefix
!= "fs authorize") {
3194 mdsmon()->dispatch(op
);
3197 if ((module
== "osd" || prefix
== "pg map") &&
3198 prefix
!= "osd last-stat-seq") {
3199 osdmon()->dispatch(op
);
3203 if (module
== "pg") {
3204 pgmon()->dispatch(op
);
3207 if (module
== "mon" &&
3208 /* Let the Monitor class handle the following commands:
3213 prefix
!= "mon compact" &&
3214 prefix
!= "mon scrub" &&
3215 prefix
!= "mon sync force" &&
3216 prefix
!= "mon metadata" &&
3217 prefix
!= "mon versions" &&
3218 prefix
!= "mon count-metadata") {
3219 monmon()->dispatch(op
);
3222 if (module
== "auth" || prefix
== "fs authorize") {
3223 authmon()->dispatch(op
);
3226 if (module
== "log") {
3227 logmon()->dispatch(op
);
3231 if (module
== "config-key") {
3232 config_key_service
->dispatch(op
);
3236 if (module
== "mgr") {
3237 mgrmon()->dispatch(op
);
3241 if (prefix
== "fsid") {
3243 f
->open_object_section("fsid");
3244 f
->dump_stream("fsid") << monmap
->fsid
;
3251 reply_command(op
, 0, "", rdata
, 0);
3255 if (prefix
== "scrub" || prefix
== "mon scrub") {
3256 wait_for_paxos_write();
3258 int r
= scrub_start();
3259 reply_command(op
, r
, "", rdata
, 0);
3260 } else if (is_peon()) {
3261 forward_request_leader(op
);
3263 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3268 if (prefix
== "compact" || prefix
== "mon compact") {
3269 dout(1) << "triggering manual compaction" << dendl
;
3270 utime_t start
= ceph_clock_now();
3272 utime_t end
= ceph_clock_now();
3274 dout(1) << "finished manual compaction in " << end
<< " seconds" << dendl
;
3276 oss
<< "compacted " << g_conf
->get_val
<std::string
>("mon_keyvaluedb") << " in " << end
<< " seconds";
3280 else if (prefix
== "injectargs") {
3281 vector
<string
> injected_args
;
3282 cmd_getval(g_ceph_context
, cmdmap
, "injected_args", injected_args
);
3283 if (!injected_args
.empty()) {
3284 dout(0) << "parsing injected options '" << injected_args
<< "'" << dendl
;
3286 r
= g_conf
->injectargs(str_join(injected_args
, " "), &oss
);
3287 ss
<< "injectargs:" << oss
.str();
3291 rs
= "must supply options to be parsed in a single string";
3294 } else if (prefix
== "time-sync-status") {
3296 f
.reset(Formatter::create("json-pretty"));
3297 f
->open_object_section("time_sync");
3298 if (!timecheck_skews
.empty()) {
3299 f
->open_object_section("time_skew_status");
3300 for (auto& i
: timecheck_skews
) {
3301 entity_inst_t inst
= i
.first
;
3302 double skew
= i
.second
;
3303 double latency
= timecheck_latencies
[inst
];
3304 string name
= monmap
->get_name(inst
.addr
);
3306 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3307 f
->open_object_section(name
.c_str());
3308 f
->dump_float("skew", skew
);
3309 f
->dump_float("latency", latency
);
3310 f
->dump_stream("health") << tcstatus
;
3311 if (tcstatus
!= HEALTH_OK
) {
3312 f
->dump_stream("details") << tcss
.str();
3318 f
->open_object_section("timechecks");
3319 f
->dump_unsigned("epoch", get_epoch());
3320 f
->dump_int("round", timecheck_round
);
3321 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3322 "on-going" : "finished");
3328 } else if (prefix
== "config set") {
3330 cmd_getval(cct
, cmdmap
, "key", key
);
3332 cmd_getval(cct
, cmdmap
, "value", val
);
3333 r
= g_conf
->set_val(key
, val
, true, &ss
);
3335 g_conf
->apply_changes(nullptr);
3339 } else if (prefix
== "status" ||
3340 prefix
== "health" ||
3343 cmd_getval(g_ceph_context
, cmdmap
, "detail", detail
);
3345 if (prefix
== "status") {
3346 // get_cluster_status handles f == NULL
3347 get_cluster_status(ds
, f
.get());
3354 } else if (prefix
== "health") {
3355 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3357 get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3361 rdata
.append(plain
);
3364 list
<string
> health_str
;
3365 get_health(health_str
, detail
== "detail" ? &rdata
: NULL
, f
.get());
3370 assert(!health_str
.empty());
3371 ds
<< health_str
.front();
3372 health_str
.pop_front();
3373 if (!health_str
.empty()) {
3375 ds
<< joinify(health_str
.begin(), health_str
.end(), string("; "));
3380 if (detail
== "detail")
3384 } else if (prefix
== "df") {
3385 bool verbose
= (detail
== "detail");
3387 f
->open_object_section("stats");
3389 pgservice
->dump_fs_stats(&ds
, f
.get(), verbose
);
3392 pgservice
->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3400 assert(0 == "We should never get here!");
3406 } else if (prefix
== "report") {
3408 // this must be formatted, in its current form
3410 f
.reset(Formatter::create("json-pretty"));
3411 f
->open_object_section("report");
3412 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3413 f
->dump_string("version", ceph_version_to_str());
3414 f
->dump_string("commit", git_version_to_str());
3415 f
->dump_stream("timestamp") << ceph_clock_now();
3417 vector
<string
> tagsvec
;
3418 cmd_getval(g_ceph_context
, cmdmap
, "tags", tagsvec
);
3419 string tagstr
= str_join(tagsvec
, " ");
3420 if (!tagstr
.empty())
3421 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3422 f
->dump_string("tag", tagstr
);
3424 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3425 get_health_status(true, f
.get(), nullptr);
3427 list
<string
> health_str
;
3428 get_health(health_str
, nullptr, f
.get());
3431 monmon()->dump_info(f
.get());
3432 osdmon()->dump_info(f
.get());
3433 mdsmon()->dump_info(f
.get());
3434 authmon()->dump_info(f
.get());
3435 pgservice
->dump_info(f
.get());
3437 paxos
->dump_info(f
.get());
3443 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT
);
3446 } else if (prefix
== "osd last-stat-seq") {
3448 cmd_getval(g_ceph_context
, cmdmap
, "id", osd
);
3449 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3451 f
->dump_unsigned("seq", seq
);
3459 } else if (prefix
== "node ls") {
3460 string
node_type("all");
3461 cmd_getval(g_ceph_context
, cmdmap
, "type", node_type
);
3463 f
.reset(Formatter::create("json-pretty"));
3464 if (node_type
== "all") {
3465 f
->open_object_section("nodes");
3466 print_nodes(f
.get(), ds
);
3467 osdmon()->print_nodes(f
.get());
3468 mdsmon()->print_nodes(f
.get());
3470 } else if (node_type
== "mon") {
3471 print_nodes(f
.get(), ds
);
3472 } else if (node_type
== "osd") {
3473 osdmon()->print_nodes(f
.get());
3474 } else if (node_type
== "mds") {
3475 mdsmon()->print_nodes(f
.get());
3481 } else if (prefix
== "features") {
3482 if (!is_leader() && !is_peon()) {
3483 dout(10) << " waiting for quorum" << dendl
;
3484 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3488 forward_request_leader(op
);
3492 f
.reset(Formatter::create("json-pretty"));
3494 get_combined_feature_map(&fm
);
3495 f
->dump_object("features", fm
);
3499 } else if (prefix
== "mon metadata") {
3501 f
.reset(Formatter::create("json-pretty"));
3504 bool all
= !cmd_getval(g_ceph_context
, cmdmap
, "id", name
);
3506 // Dump a single mon's metadata
3507 int mon
= monmap
->get_rank(name
);
3509 rs
= "requested mon not found";
3513 f
->open_object_section("mon_metadata");
3514 r
= get_mon_metadata(mon
, f
.get(), ds
);
3517 // Dump all mons' metadata
3519 f
->open_array_section("mon_metadata");
3520 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3521 std::ostringstream get_err
;
3522 f
->open_object_section("mon");
3523 f
->dump_string("name", monmap
->get_name(rank
));
3524 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3526 if (r
== -ENOENT
|| r
== -EINVAL
) {
3527 dout(1) << get_err
.str() << dendl
;
3528 // Drop error, list what metadata we do have
3530 } else if (r
!= 0) {
3531 derr
<< "Unexpected error from get_mon_metadata: "
3532 << cpp_strerror(r
) << dendl
;
3533 ds
<< get_err
.str();
3543 } else if (prefix
== "mon versions") {
3545 f
.reset(Formatter::create("json-pretty"));
3546 count_metadata("ceph_version", f
.get());
3551 } else if (prefix
== "mon count-metadata") {
3553 f
.reset(Formatter::create("json-pretty"));
3555 cmd_getval(g_ceph_context
, cmdmap
, "property", field
);
3556 count_metadata(field
, f
.get());
3561 } else if (prefix
== "quorum_status") {
3562 // make sure our map is readable and up to date
3563 if (!is_leader() && !is_peon()) {
3564 dout(10) << " waiting for quorum" << dendl
;
3565 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3568 _quorum_status(f
.get(), ds
);
3572 } else if (prefix
== "mon_status") {
3573 get_mon_status(f
.get(), ds
);
3579 } else if (prefix
== "sync force" ||
3580 prefix
== "mon sync force") {
3581 string validate1
, validate2
;
3582 cmd_getval(g_ceph_context
, cmdmap
, "validate1", validate1
);
3583 cmd_getval(g_ceph_context
, cmdmap
, "validate2", validate2
);
3584 if (validate1
!= "--yes-i-really-mean-it" ||
3585 validate2
!= "--i-know-what-i-am-doing") {
3587 rs
= "are you SURE? this will mean the monitor store will be "
3588 "erased. pass '--yes-i-really-mean-it "
3589 "--i-know-what-i-am-doing' if you really do.";
3592 sync_force(f
.get(), ds
);
3595 } else if (prefix
== "heap") {
3596 if (!ceph_using_tcmalloc())
3597 rs
= "tcmalloc not enabled, can't use heap profiler commands\n";
3600 cmd_getval(g_ceph_context
, cmdmap
, "heapcmd", heapcmd
);
3601 // XXX 1-element vector, change at callee or make vector here?
3602 vector
<string
> heapcmd_vec
;
3603 get_str_vec(heapcmd
, heapcmd_vec
);
3604 ceph_heap_profiler_handle_command(heapcmd_vec
, ds
);
3609 } else if (prefix
== "quorum") {
3611 cmd_getval(g_ceph_context
, cmdmap
, "quorumcmd", quorumcmd
);
3612 if (quorumcmd
== "exit") {
3614 elector
.stop_participating();
3615 rs
= "stopped responding to quorum, initiated new election";
3617 } else if (quorumcmd
== "enter") {
3618 elector
.start_participating();
3620 rs
= "started responding to quorum, initiated new election";
3623 rs
= "needs a valid 'quorum' command";
3626 } else if (prefix
== "version") {
3628 f
->open_object_section("version");
3629 f
->dump_string("version", pretty_version_to_str());
3633 ds
<< pretty_version_to_str();
3638 } else if (prefix
== "versions") {
3640 f
.reset(Formatter::create("json-pretty"));
3641 map
<string
,int> overall
;
3642 f
->open_object_section("version");
3643 map
<string
,int> mon
, mgr
, osd
, mds
;
3645 count_metadata("ceph_version", &mon
);
3646 f
->open_object_section("mon");
3647 for (auto& p
: mon
) {
3648 f
->dump_int(p
.first
.c_str(), p
.second
);
3649 overall
[p
.first
] += p
.second
;
3653 mgrmon()->count_metadata("ceph_version", &mgr
);
3654 f
->open_object_section("mgr");
3655 for (auto& p
: mgr
) {
3656 f
->dump_int(p
.first
.c_str(), p
.second
);
3657 overall
[p
.first
] += p
.second
;
3661 osdmon()->count_metadata("ceph_version", &osd
);
3662 f
->open_object_section("osd");
3663 for (auto& p
: osd
) {
3664 f
->dump_int(p
.first
.c_str(), p
.second
);
3665 overall
[p
.first
] += p
.second
;
3669 mdsmon()->count_metadata("ceph_version", &mds
);
3670 f
->open_object_section("mds");
3671 for (auto& p
: mds
) {
3672 f
->dump_int(p
.first
.c_str(), p
.second
);
3673 overall
[p
.first
] += p
.second
;
3677 for (auto& p
: mgrstatmon()->get_service_map().services
) {
3678 f
->open_object_section(p
.first
.c_str());
3680 p
.second
.count_metadata("ceph_version", &m
);
3682 f
->dump_int(q
.first
.c_str(), q
.second
);
3683 overall
[q
.first
] += q
.second
;
3688 f
->open_object_section("overall");
3689 for (auto& p
: overall
) {
3690 f
->dump_int(p
.first
.c_str(), p
.second
);
3700 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3701 reply_command(op
, r
, rs
, rdata
, 0);
3704 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3707 reply_command(op
, rc
, rs
, rdata
, version
);
3710 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3711 bufferlist
& rdata
, version_t version
)
3713 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
3714 assert(m
->get_type() == MSG_MON_COMMAND
);
3715 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3716 reply
->set_tid(m
->get_tid());
3717 reply
->set_data(rdata
);
3718 send_reply(op
, reply
);
3722 // ------------------------
3723 // request/reply routing
3725 // a client/mds/osd will connect to a random monitor. we need to forward any
3726 // messages requiring state updates to the leader, and then route any replies
3727 // back via the correct monitor and back to them. (the monitor will not
3728 // initiate any connections.)
3730 void Monitor::forward_request_leader(MonOpRequestRef op
)
3732 op
->mark_event(__func__
);
3734 int mon
= get_leader();
3735 MonSession
*session
= op
->get_session();
3736 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
3738 if (req
->get_source().is_mon() && req
->get_source_addr() != messenger
->get_myaddr()) {
3739 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
3740 } else if (session
->proxy_con
) {
3741 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
3742 } else if (!session
->closed
) {
3743 RoutedRequest
*rr
= new RoutedRequest
;
3744 rr
->tid
= ++routed_request_tid
;
3745 rr
->client_inst
= req
->get_source_inst();
3746 rr
->con
= req
->get_connection();
3747 rr
->con_features
= rr
->con
->get_features();
3748 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
3749 rr
->session
= static_cast<MonSession
*>(session
->get());
3751 routed_requests
[rr
->tid
] = rr
;
3752 session
->routed_request_tids
.insert(rr
->tid
);
3754 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
3755 << " features " << rr
->con_features
<< dendl
;
3757 MForward
*forward
= new MForward(rr
->tid
,
3761 forward
->set_priority(req
->get_priority());
3762 if (session
->auth_handler
) {
3763 forward
->entity_name
= session
->entity_name
;
3764 } else if (req
->get_source().is_mon()) {
3765 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
3767 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3768 op
->mark_forwarded();
3769 assert(op
->get_req()->get_type() != 0);
3771 dout(10) << "forward_request no session for request " << *req
<< dendl
;
3775 // fake connection attached to forwarded messages
3776 struct AnonConnection
: public Connection
{
3777 explicit AnonConnection(CephContext
*cct
) : Connection(cct
, NULL
) {}
3779 int send_message(Message
*m
) override
{
3780 assert(!"send_message on anonymous connection");
3782 void send_keepalive() override
{
3783 assert(!"send_keepalive on anonymous connection");
3785 void mark_down() override
{
3788 void mark_disposable() override
{
3791 bool is_connected() override
{ return false; }
3794 //extract the original message and put it into the regular dispatch function
3795 void Monitor::handle_forward(MonOpRequestRef op
)
3797 MForward
*m
= static_cast<MForward
*>(op
->get_req());
3798 dout(10) << "received forwarded message from " << m
->client
3799 << " via " << m
->get_source_inst() << dendl
;
3800 MonSession
*session
= op
->get_session();
3803 if (!session
->is_capable("mon", MON_CAP_X
)) {
3804 dout(0) << "forward from entity with insufficient caps! "
3805 << session
->caps
<< dendl
;
3807 // see PaxosService::dispatch(); we rely on this being anon
3808 // (c->msgr == NULL)
3809 PaxosServiceMessage
*req
= m
->claim_message();
3810 assert(req
!= NULL
);
3812 ConnectionRef
c(new AnonConnection(cct
));
3813 MonSession
*s
= new MonSession(req
->get_source_inst(),
3814 static_cast<Connection
*>(c
.get()));
3815 c
->set_priv(s
->get());
3816 c
->set_peer_addr(m
->client
.addr
);
3817 c
->set_peer_type(m
->client
.name
.type());
3818 c
->set_features(m
->con_features
);
3820 s
->caps
= m
->client_caps
;
3821 dout(10) << " caps are " << s
->caps
<< dendl
;
3822 s
->entity_name
= m
->entity_name
;
3823 dout(10) << " entity name '" << s
->entity_name
<< "' type "
3824 << s
->entity_name
.get_type() << dendl
;
3825 s
->proxy_con
= m
->get_connection();
3826 s
->proxy_tid
= m
->tid
;
3828 req
->set_connection(c
);
3830 // not super accurate, but better than nothing.
3831 req
->set_recv_stamp(m
->get_recv_stamp());
3834 * note which election epoch this is; we will drop the message if
3835 * there is a future election since our peers will resend routed
3836 * requests in that case.
3838 req
->rx_election_epoch
= get_epoch();
3840 /* Because this is a special fake connection, we need to break
3841 the ref loop between Connection and MonSession differently
3842 than we normally do. Here, the Message refers to the Connection
3843 which refers to the Session, and nobody else refers to the Connection
3844 or the Session. And due to the special nature of this message,
3845 nobody refers to the Connection via the Session. So, clear out that
3846 half of the ref loop.*/
3849 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
3856 void Monitor::try_send_message(Message
*m
, const entity_inst_t
& to
)
3858 dout(10) << "try_send_message " << *m
<< " to " << to
<< dendl
;
3861 encode_message(m
, quorum_con_features
, bl
);
3863 messenger
->send_message(m
, to
);
3865 for (int i
=0; i
<(int)monmap
->size(); i
++) {
3867 messenger
->send_message(new MRoute(bl
, to
), monmap
->get_inst(i
));
3871 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
3873 op
->mark_event(__func__
);
3875 MonSession
*session
= op
->get_session();
3877 Message
*req
= op
->get_req();
3878 ConnectionRef con
= op
->get_connection();
3880 reply
->set_cct(g_ceph_context
);
3881 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
3884 dout(2) << "send_reply no connection, dropping reply " << *reply
3885 << " to " << req
<< " " << *req
<< dendl
;
3887 op
->mark_event("reply: no connection");
3891 if (!session
->con
&& !session
->proxy_con
) {
3892 dout(2) << "send_reply no connection, dropping reply " << *reply
3893 << " to " << req
<< " " << *req
<< dendl
;
3895 op
->mark_event("reply: no connection");
3899 if (session
->proxy_con
) {
3900 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
3901 << " via " << session
->proxy_con
->get_peer_addr()
3902 << " for request " << *req
<< dendl
;
3903 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
3904 op
->mark_event("reply: send routed request");
3906 session
->con
->send_message(reply
);
3907 op
->mark_event("reply: send");
3911 void Monitor::no_reply(MonOpRequestRef op
)
3913 MonSession
*session
= op
->get_session();
3914 Message
*req
= op
->get_req();
3916 if (session
->proxy_con
) {
3917 dout(10) << "no_reply to " << req
->get_source_inst()
3918 << " via " << session
->proxy_con
->get_peer_addr()
3919 << " for request " << *req
<< dendl
;
3920 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
3921 op
->mark_event("no_reply: send routed request");
3923 dout(10) << "no_reply to " << req
->get_source_inst()
3924 << " " << *req
<< dendl
;
3925 op
->mark_event("no_reply");
3929 void Monitor::handle_route(MonOpRequestRef op
)
3931 MRoute
*m
= static_cast<MRoute
*>(op
->get_req());
3932 MonSession
*session
= op
->get_session();
3934 if (!session
->is_capable("mon", MON_CAP_X
)) {
3935 dout(0) << "MRoute received from entity without appropriate perms! "
3940 dout(10) << "handle_route " << *m
->msg
<< " to " << m
->dest
<< dendl
;
3942 dout(10) << "handle_route null to " << m
->dest
<< dendl
;
3945 if (m
->session_mon_tid
) {
3946 if (routed_requests
.count(m
->session_mon_tid
)) {
3947 RoutedRequest
*rr
= routed_requests
[m
->session_mon_tid
];
3949 // reset payload, in case encoding is dependent on target features
3951 m
->msg
->clear_payload();
3952 rr
->con
->send_message(m
->msg
);
3955 if (m
->send_osdmap_first
) {
3956 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
3957 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
3958 true, MonOpRequestRef());
3960 assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
3961 routed_requests
.erase(m
->session_mon_tid
);
3962 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
3965 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
3968 dout(10) << " not a routed request, trying to send anyway" << dendl
;
3970 messenger
->send_message(m
->msg
, m
->dest
);
3976 void Monitor::resend_routed_requests()
3978 dout(10) << "resend_routed_requests" << dendl
;
3979 int mon
= get_leader();
3980 list
<Context
*> retry
;
3981 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
3982 p
!= routed_requests
.end();
3984 RoutedRequest
*rr
= p
->second
;
3987 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
3988 rr
->op
->mark_event("retry routed request");
3989 retry
.push_back(new C_RetryMessage(this, rr
->op
));
3991 assert(rr
->session
->routed_request_tids
.count(p
->first
));
3992 rr
->session
->routed_request_tids
.erase(p
->first
);
3996 bufferlist::iterator q
= rr
->request_bl
.begin();
3997 PaxosServiceMessage
*req
= (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
3998 rr
->op
->mark_event("resend forwarded message to leader");
3999 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
<< dendl
;
4000 MForward
*forward
= new MForward(rr
->tid
, req
, rr
->con_features
,
4002 req
->put(); // forward takes its own ref; drop ours.
4003 forward
->client
= rr
->client_inst
;
4004 forward
->set_priority(req
->get_priority());
4005 messenger
->send_message(forward
, monmap
->get_inst(mon
));
4009 routed_requests
.clear();
4010 finish_contexts(g_ceph_context
, retry
);
4014 void Monitor::remove_session(MonSession
*s
)
4016 dout(10) << "remove_session " << s
<< " " << s
->inst
4017 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
4020 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
4021 p
!= s
->routed_request_tids
.end();
4023 assert(routed_requests
.count(*p
));
4024 RoutedRequest
*rr
= routed_requests
[*p
];
4025 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
4027 routed_requests
.erase(*p
);
4029 s
->routed_request_tids
.clear();
4030 s
->con
->set_priv(NULL
);
4031 session_map
.remove_session(s
);
4032 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4033 logger
->inc(l_mon_session_rm
);
4036 void Monitor::remove_all_sessions()
4038 Mutex::Locker
l(session_map_lock
);
4039 while (!session_map
.sessions
.empty()) {
4040 MonSession
*s
= session_map
.sessions
.front();
4043 logger
->inc(l_mon_session_rm
);
4046 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4049 void Monitor::send_command(const entity_inst_t
& inst
,
4050 const vector
<string
>& com
)
4052 dout(10) << "send_command " << inst
<< "" << com
<< dendl
;
4053 MMonCommand
*c
= new MMonCommand(monmap
->fsid
);
4055 try_send_message(c
, inst
);
4058 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
4061 * Wait list the new session until we're in the quorum, assuming it's
4063 * tick() will periodically send them back through so we can send
4064 * the client elsewhere if we don't think we're getting back in.
4066 * But we whitelist a few sorts of messages:
4067 * 1) Monitors can talk to us at any time, of course.
4068 * 2) auth messages. It's unlikely to go through much faster, but
4069 * it's possible we've just lost our quorum status and we want to take...
4070 * 3) command messages. We want to accept these under all possible
4073 Message
*m
= op
->get_req();
4074 MonSession
*s
= op
->get_session();
4075 ConnectionRef con
= op
->get_connection();
4076 utime_t too_old
= ceph_clock_now();
4077 too_old
-= g_ceph_context
->_conf
->mon_lease
;
4078 if (m
->get_recv_stamp() > too_old
&&
4079 con
->is_connected()) {
4080 dout(5) << "waitlisting message " << *m
<< dendl
;
4081 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
4082 op
->mark_wait_for_quorum();
4084 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
4086 // proxied sessions aren't registered and don't have a con; don't remove
4088 if (!s
->proxy_con
) {
4089 Mutex::Locker
l(session_map_lock
);
4096 void Monitor::_ms_dispatch(Message
*m
)
4098 if (is_shutdown()) {
4103 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
4104 bool src_is_mon
= op
->is_src_mon();
4105 op
->mark_event("mon:_ms_dispatch");
4106 MonSession
*s
= op
->get_session();
4107 if (s
&& s
->closed
) {
4111 if (src_is_mon
&& s
) {
4112 ConnectionRef con
= m
->get_connection();
4113 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
4114 // only update features if this is a non-anonymous connection
4115 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
4116 << " (was " << s
->con_features
4117 << ", now " << con
->get_features() << ")" << dendl
;
4118 // connection features changed - recreate session.
4119 if (s
->con
&& s
->con
!= con
) {
4120 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
4121 << " changed from session; mark down and replace" << dendl
;
4122 s
->con
->mark_down();
4124 if (s
->item
.is_on_list()) {
4125 // forwarded messages' sessions are not in the sessions map and
4126 // exist only while the op is being handled.
4135 // if the sender is not a monitor, make sure their first message for a
4136 // session is an MAuth. If it is not, assume it's a stray message,
4137 // and considering that we are creating a new session it is safe to
4138 // assume that the sender hasn't authenticated yet, so we have no way
4139 // of assessing whether we should handle it or not.
4140 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
4141 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4142 m
->get_type() != CEPH_MSG_PING
)) {
4143 dout(1) << __func__
<< " dropping stray message " << *m
4144 << " from " << m
->get_source_inst() << dendl
;
4148 ConnectionRef con
= m
->get_connection();
4150 Mutex::Locker
l(session_map_lock
);
4151 s
= session_map
.new_session(m
->get_source_inst(), con
.get());
4154 con
->set_priv(s
->get());
4155 dout(10) << __func__
<< " new session " << s
<< " " << *s
4156 << " features 0x" << std::hex
4157 << s
->con_features
<< std::dec
<< dendl
;
4160 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4161 logger
->inc(l_mon_session_add
);
4164 // give it monitor caps; the peer type has been authenticated
4165 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4166 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4167 s
->caps
= *mon_caps
;
4171 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->inst
4177 s
->session_timeout
= ceph_clock_now();
4178 s
->session_timeout
+= g_conf
->mon_session_timeout
;
4180 if (s
->auth_handler
) {
4181 s
->entity_name
= s
->auth_handler
->get_entity_name();
4183 dout(20) << " caps " << s
->caps
.get_str() << dendl
;
4185 if ((is_synchronizing() ||
4186 (s
->global_id
== 0 && !exited_quorum
.is_zero())) &&
4188 m
->get_type() != CEPH_MSG_PING
) {
4189 waitlist_or_zap_client(op
);
4196 void Monitor::dispatch_op(MonOpRequestRef op
)
4198 op
->mark_event("mon:dispatch_op");
4199 MonSession
*s
= op
->get_session();
4202 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4206 /* we will consider the default type as being 'monitor' until proven wrong */
4207 op
->set_type_monitor();
4208 /* deal with all messages that do not necessarily need caps */
4209 bool dealt_with
= true;
4210 switch (op
->get_req()->get_type()) {
4212 case MSG_MON_GLOBAL_ID
:
4214 op
->set_type_service();
4215 /* no need to check caps here */
4216 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4223 /* MMonGetMap may be used by clients to obtain a monmap *before*
4224 * authenticating with the monitor. We need to handle these without
4225 * checking caps because, even on a cluster without cephx, we only set
4226 * session caps *after* the auth handshake. A good example of this
4227 * is when a client calls MonClient::get_monmap_privately(), which does
4228 * not authenticate when obtaining a monmap.
4230 case CEPH_MSG_MON_GET_MAP
:
4231 handle_mon_get_map(op
);
4234 case CEPH_MSG_MON_METADATA
:
4235 return handle_mon_metadata(op
);
4244 /* well, maybe the op belongs to a service... */
4245 op
->set_type_service();
4246 /* deal with all messages which caps should be checked somewhere else */
4248 switch (op
->get_req()->get_type()) {
4251 case CEPH_MSG_MON_GET_OSDMAP
:
4252 case CEPH_MSG_POOLOP
:
4253 case MSG_OSD_BEACON
:
4254 case MSG_OSD_MARK_ME_DOWN
:
4256 case MSG_OSD_FAILURE
:
4259 case MSG_OSD_PGTEMP
:
4260 case MSG_OSD_PG_CREATED
:
4261 case MSG_REMOVE_SNAPS
:
4262 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4266 case MSG_MDS_BEACON
:
4267 case MSG_MDS_OFFLOAD_TARGETS
:
4268 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4272 case MSG_MGR_BEACON
:
4273 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4277 case MSG_MON_MGR_REPORT
:
4278 case CEPH_MSG_STATFS
:
4279 case MSG_GETPOOLSTATS
:
4280 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4285 paxos_service
[PAXOS_PGMAP
]->dispatch(op
);
4290 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4293 // handle_command() does its own caps checking
4294 case MSG_MON_COMMAND
:
4295 op
->set_type_command();
4306 /* nop, looks like it's not a service message; revert back to monitor */
4307 op
->set_type_monitor();
4309 /* messages we, the Monitor class, need to deal with
4310 * but may be sent by clients. */
4312 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4313 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4314 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4320 switch (op
->get_req()->get_type()) {
4323 case CEPH_MSG_MON_GET_VERSION
:
4324 handle_get_version(op
);
4327 case CEPH_MSG_MON_SUBSCRIBE
:
4328 /* FIXME: check what's being subscribed, filter accordingly */
4329 handle_subscribe(op
);
4339 if (!op
->is_src_mon()) {
4340 dout(1) << __func__
<< " unexpected monitor message from"
4341 << " non-monitor entity " << op
->get_req()->get_source_inst()
4342 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4346 /* messages that should only be sent by another monitor */
4348 switch (op
->get_req()->get_type()) {
4358 // Sync (i.e., the new slurp, but on steroids)
4366 /* log acks are sent from a monitor we sent the MLog to, and are
4367 never sent by clients to us. */
4369 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4374 op
->set_type_service();
4375 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4381 op
->set_type_paxos();
4382 MMonPaxos
*pm
= static_cast<MMonPaxos
*>(op
->get_req());
4383 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4388 if (state
== STATE_SYNCHRONIZING
) {
4389 // we are synchronizing. These messages would do us no
4390 // good, thus just drop them and ignore them.
4391 dout(10) << __func__
<< " ignore paxos msg from "
4392 << pm
->get_source_inst() << dendl
;
4397 if (pm
->epoch
> get_epoch()) {
4401 if (pm
->epoch
!= get_epoch()) {
4405 paxos
->dispatch(op
);
4410 case MSG_MON_ELECTION
:
4411 op
->set_type_election();
4412 //check privileges here for simplicity
4413 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4414 dout(0) << "MMonElection received from entity without enough caps!"
4415 << op
->get_session()->caps
<< dendl
;
4418 if (!is_probing() && !is_synchronizing()) {
4419 elector
.dispatch(op
);
4428 handle_timecheck(op
);
4431 case MSG_MON_HEALTH
:
4432 health_monitor
->dispatch(op
);
4435 case MSG_MON_HEALTH_CHECKS
:
4436 op
->set_type_service();
4437 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4445 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4454 void Monitor::handle_ping(MonOpRequestRef op
)
4456 MPing
*m
= static_cast<MPing
*>(op
->get_req());
4457 dout(10) << __func__
<< " " << *m
<< dendl
;
4458 MPing
*reply
= new MPing
;
4459 entity_inst_t inst
= m
->get_source_inst();
4461 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4462 f
->open_object_section("pong");
4464 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
4465 get_health_status(false, f
.get(), nullptr);
4467 list
<string
> health_str
;
4468 get_health(health_str
, nullptr, f
.get());
4473 get_mon_status(f
.get(), ss
);
4479 ::encode(ss
.str(), payload
);
4480 reply
->set_payload(payload
);
4481 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4482 messenger
->send_message(reply
, inst
);
4485 void Monitor::timecheck_start()
4487 dout(10) << __func__
<< dendl
;
4488 timecheck_cleanup();
4489 timecheck_start_round();
4492 void Monitor::timecheck_finish()
4494 dout(10) << __func__
<< dendl
;
4495 timecheck_cleanup();
4498 void Monitor::timecheck_start_round()
4500 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4501 assert(is_leader());
4503 if (monmap
->size() == 1) {
4504 assert(0 == "We are alone; this shouldn't have been scheduled!");
4508 if (timecheck_round
% 2) {
4509 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4510 utime_t curr_time
= ceph_clock_now();
4511 double max
= g_conf
->mon_timecheck_interval
*3;
4512 if (curr_time
- timecheck_round_start
< max
) {
4513 dout(10) << __func__
<< " keep current round going" << dendl
;
4516 dout(10) << __func__
4517 << " finish current timecheck and start new" << dendl
;
4518 timecheck_cancel_round();
4522 assert(timecheck_round
% 2 == 0);
4525 timecheck_round_start
= ceph_clock_now();
4526 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4530 dout(10) << __func__
<< " setting up next event" << dendl
;
4531 timecheck_reset_event();
4534 void Monitor::timecheck_finish_round(bool success
)
4536 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4537 assert(timecheck_round
% 2);
4539 timecheck_round_start
= utime_t();
4542 assert(timecheck_waiting
.empty());
4543 assert(timecheck_acks
== quorum
.size());
4545 timecheck_check_skews();
4549 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4550 << " peers still waiting:";
4551 for (map
<entity_inst_t
,utime_t
>::iterator p
= timecheck_waiting
.begin();
4552 p
!= timecheck_waiting
.end(); ++p
) {
4553 *_dout
<< " " << p
->first
.name
;
4556 timecheck_waiting
.clear();
4558 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4561 void Monitor::timecheck_cancel_round()
4563 timecheck_finish_round(false);
4566 void Monitor::timecheck_cleanup()
4568 timecheck_round
= 0;
4570 timecheck_round_start
= utime_t();
4572 if (timecheck_event
) {
4573 timer
.cancel_event(timecheck_event
);
4574 timecheck_event
= NULL
;
4576 timecheck_waiting
.clear();
4577 timecheck_skews
.clear();
4578 timecheck_latencies
.clear();
4580 timecheck_rounds_since_clean
= 0;
4583 void Monitor::timecheck_reset_event()
4585 if (timecheck_event
) {
4586 timer
.cancel_event(timecheck_event
);
4587 timecheck_event
= NULL
;
4591 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4593 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4594 delay
= cct
->_conf
->mon_timecheck_interval
;
4597 dout(10) << __func__
<< " delay " << delay
4598 << " rounds_since_clean " << timecheck_rounds_since_clean
4601 timecheck_event
= timer
.add_event_after(
4603 new C_MonContext(this, [this](int) {
4604 timecheck_start_round();
4608 void Monitor::timecheck_check_skews()
4610 dout(10) << __func__
<< dendl
;
4611 assert(is_leader());
4612 assert((timecheck_round
% 2) == 0);
4613 if (monmap
->size() == 1) {
4614 assert(0 == "We are alone; we shouldn't have gotten here!");
4617 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4619 bool found_skew
= false;
4620 for (map
<entity_inst_t
, double>::iterator p
= timecheck_skews
.begin();
4621 p
!= timecheck_skews
.end(); ++p
) {
4624 if (timecheck_has_skew(p
->second
, &abs_skew
)) {
4625 dout(10) << __func__
4626 << " " << p
->first
<< " skew " << abs_skew
<< dendl
;
4632 ++timecheck_rounds_since_clean
;
4633 timecheck_reset_event();
4634 } else if (timecheck_rounds_since_clean
> 0) {
4636 << " no clock skews found after " << timecheck_rounds_since_clean
4637 << " rounds" << dendl
;
4638 // make sure the skews are really gone and not just a transient success
4639 // this will run just once if not in the presence of skews again.
4640 timecheck_rounds_since_clean
= 1;
4641 timecheck_reset_event();
4642 timecheck_rounds_since_clean
= 0;
4647 void Monitor::timecheck_report()
4649 dout(10) << __func__
<< dendl
;
4650 assert(is_leader());
4651 assert((timecheck_round
% 2) == 0);
4652 if (monmap
->size() == 1) {
4653 assert(0 == "We are alone; we shouldn't have gotten here!");
4657 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4658 bool do_output
= true; // only output report once
4659 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4660 if (monmap
->get_name(*q
) == name
)
4663 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_REPORT
);
4664 m
->epoch
= get_epoch();
4665 m
->round
= timecheck_round
;
4667 for (map
<entity_inst_t
, double>::iterator it
= timecheck_skews
.begin();
4668 it
!= timecheck_skews
.end(); ++it
) {
4669 double skew
= it
->second
;
4670 double latency
= timecheck_latencies
[it
->first
];
4672 m
->skews
[it
->first
] = skew
;
4673 m
->latencies
[it
->first
] = latency
;
4676 dout(25) << __func__
<< " " << it
->first
4677 << " latency " << latency
4678 << " skew " << skew
<< dendl
;
4682 entity_inst_t inst
= monmap
->get_inst(*q
);
4683 dout(10) << __func__
<< " send report to " << inst
<< dendl
;
4684 messenger
->send_message(m
, inst
);
4688 void Monitor::timecheck()
4690 dout(10) << __func__
<< dendl
;
4691 assert(is_leader());
4692 if (monmap
->size() == 1) {
4693 assert(0 == "We are alone; we shouldn't have gotten here!");
4696 assert(timecheck_round
% 2 != 0);
4698 timecheck_acks
= 1; // we ack ourselves
4700 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4701 << " round " << timecheck_round
<< dendl
;
4703 // we are at the eye of the storm; the point of reference
4704 timecheck_skews
[messenger
->get_myinst()] = 0.0;
4705 timecheck_latencies
[messenger
->get_myinst()] = 0.0;
4707 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4708 if (monmap
->get_name(*it
) == name
)
4711 entity_inst_t inst
= monmap
->get_inst(*it
);
4712 utime_t curr_time
= ceph_clock_now();
4713 timecheck_waiting
[inst
] = curr_time
;
4714 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_PING
);
4715 m
->epoch
= get_epoch();
4716 m
->round
= timecheck_round
;
4717 dout(10) << __func__
<< " send " << *m
<< " to " << inst
<< dendl
;
4718 messenger
->send_message(m
, inst
);
4722 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
4723 const double skew_bound
,
4724 const double latency
)
4726 health_status_t status
= HEALTH_OK
;
4727 assert(latency
>= 0);
4730 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
4731 status
= HEALTH_WARN
;
4732 ss
<< "clock skew " << abs_skew
<< "s"
4733 << " > max " << g_conf
->mon_clock_drift_allowed
<< "s";
4739 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
4741 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4742 dout(10) << __func__
<< " " << *m
<< dendl
;
4743 /* handles PONG's */
4744 assert(m
->op
== MTimeCheck::OP_PONG
);
4746 entity_inst_t other
= m
->get_source_inst();
4747 if (m
->epoch
< get_epoch()) {
4748 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
4749 << " from " << other
4750 << " curr " << get_epoch()
4751 << " -- severely lagged? discard" << dendl
;
4754 assert(m
->epoch
== get_epoch());
4756 if (m
->round
< timecheck_round
) {
4757 dout(1) << __func__
<< " got old round " << m
->round
4758 << " from " << other
4759 << " curr " << timecheck_round
<< " -- discard" << dendl
;
4763 utime_t curr_time
= ceph_clock_now();
4765 assert(timecheck_waiting
.count(other
) > 0);
4766 utime_t timecheck_sent
= timecheck_waiting
[other
];
4767 timecheck_waiting
.erase(other
);
4768 if (curr_time
< timecheck_sent
) {
4769 // our clock was readjusted -- drop everything until it all makes sense.
4770 dout(1) << __func__
<< " our clock was readjusted --"
4771 << " bump round and drop current check"
4773 timecheck_cancel_round();
4777 /* update peer latencies */
4778 double latency
= (double)(curr_time
- timecheck_sent
);
4780 if (timecheck_latencies
.count(other
) == 0)
4781 timecheck_latencies
[other
] = latency
;
4783 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
4784 timecheck_latencies
[other
] = avg_latency
;
4790 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4791 * and 'a' happens to be lower than 'b'; so we use double instead.
4793 * latency is always expected to be >= 0.
4795 * delta, the difference between theirs timestamp and ours, may either be
4796 * lower or higher than 0; will hardly ever be 0.
4798 * The absolute skew is the absolute delta minus the latency, which is
4799 * taken as a whole instead of an rtt given that there is some queueing
4800 * and dispatch times involved and it's hard to assess how long exactly
4801 * it took for the message to travel to the other side and be handled. So
4802 * we call it a bounded skew, the worst case scenario.
4806 * Given that the latency is always positive, we can establish that the
4807 * bounded skew will be:
4809 * 1. positive if the absolute delta is higher than the latency and
4811 * 2. negative if the absolute delta is higher than the latency and
4812 * delta is negative.
4813 * 3. zero if the absolute delta is lower than the latency.
4815 * On 3. we make a judgement call and treat the skew as non-existent.
4816 * This is because that, if the absolute delta is lower than the
4817 * latency, then the apparently existing skew is nothing more than a
4818 * side-effect of the high latency at work.
4820 * This may not be entirely true though, as a severely skewed clock
4821 * may be masked by an even higher latency, but with high latencies
4822 * we probably have worse issues to deal with than just skewed clocks.
4824 assert(latency
>= 0);
4826 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
4827 double abs_delta
= (delta
> 0 ? delta
: -delta
);
4828 double skew_bound
= abs_delta
- latency
;
4832 skew_bound
= -skew_bound
;
4835 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
4836 clog
->health(status
) << other
<< " " << ss
.str();
4838 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
4839 << " delta " << delta
<< " skew_bound " << skew_bound
4840 << " latency " << latency
<< dendl
;
4842 timecheck_skews
[other
] = skew_bound
;
4845 if (timecheck_acks
== quorum
.size()) {
4846 dout(10) << __func__
<< " got pongs from everybody ("
4847 << timecheck_acks
<< " total)" << dendl
;
4848 assert(timecheck_skews
.size() == timecheck_acks
);
4849 assert(timecheck_waiting
.empty());
4850 // everyone has acked, so bump the round to finish it.
4851 timecheck_finish_round();
4855 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
4857 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4858 dout(10) << __func__
<< " " << *m
<< dendl
;
4861 assert(m
->op
== MTimeCheck::OP_PING
|| m
->op
== MTimeCheck::OP_REPORT
);
4863 if (m
->epoch
!= get_epoch()) {
4864 dout(1) << __func__
<< " got wrong epoch "
4865 << "(ours " << get_epoch()
4866 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
4870 if (m
->round
< timecheck_round
) {
4871 dout(1) << __func__
<< " got old round " << m
->round
4872 << " current " << timecheck_round
4873 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
4877 timecheck_round
= m
->round
;
4879 if (m
->op
== MTimeCheck::OP_REPORT
) {
4880 assert((timecheck_round
% 2) == 0);
4881 timecheck_latencies
.swap(m
->latencies
);
4882 timecheck_skews
.swap(m
->skews
);
4886 assert((timecheck_round
% 2) != 0);
4887 MTimeCheck
*reply
= new MTimeCheck(MTimeCheck::OP_PONG
);
4888 utime_t curr_time
= ceph_clock_now();
4889 reply
->timestamp
= curr_time
;
4890 reply
->epoch
= m
->epoch
;
4891 reply
->round
= m
->round
;
4892 dout(10) << __func__
<< " send " << *m
4893 << " to " << m
->get_source_inst() << dendl
;
4894 m
->get_connection()->send_message(reply
);
4897 void Monitor::handle_timecheck(MonOpRequestRef op
)
4899 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4900 dout(10) << __func__
<< " " << *m
<< dendl
;
4903 if (m
->op
!= MTimeCheck::OP_PONG
) {
4904 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
4906 handle_timecheck_leader(op
);
4908 } else if (is_peon()) {
4909 if (m
->op
!= MTimeCheck::OP_PING
&& m
->op
!= MTimeCheck::OP_REPORT
) {
4910 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
4912 handle_timecheck_peon(op
);
4915 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
4919 void Monitor::handle_subscribe(MonOpRequestRef op
)
4921 MMonSubscribe
*m
= static_cast<MMonSubscribe
*>(op
->get_req());
4922 dout(10) << "handle_subscribe " << *m
<< dendl
;
4926 MonSession
*s
= op
->get_session();
4929 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
4932 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4933 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
4936 // remove conflicting subscribes
4937 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4938 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
4939 it
!= s
->sub_map
.end(); ) {
4940 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
4941 Mutex::Locker
l(session_map_lock
);
4942 session_map
.remove_sub((it
++)->second
);
4950 Mutex::Locker
l(session_map_lock
);
4951 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
4952 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
4953 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
4956 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
4957 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
4958 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
4959 Subscription
*sub
= s
->sub_map
[p
->first
];
4960 assert(sub
!= nullptr);
4961 mdsmon()->check_sub(sub
);
4963 } else if (p
->first
== "osdmap") {
4964 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
4965 if (s
->osd_epoch
> p
->second
.start
) {
4966 // client needs earlier osdmaps on purpose, so reset the sent epoch
4969 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
4971 } else if (p
->first
== "osd_pg_creates") {
4972 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
4973 if (monmap
->get_required_features().contains_all(
4974 ceph::features::mon::FEATURE_LUMINOUS
)) {
4975 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
4977 pgmon()->check_sub(s
->sub_map
["osd_pg_creates"]);
4980 } else if (p
->first
== "monmap") {
4981 monmon()->check_sub(s
->sub_map
[p
->first
]);
4982 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4983 logmon()->check_sub(s
->sub_map
[p
->first
]);
4984 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
4985 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
4986 } else if (p
->first
== "servicemap") {
4987 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
4992 // we only need to reply if the client is old enough to think it
4993 // has to send renewals.
4994 ConnectionRef con
= m
->get_connection();
4995 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
4996 m
->get_connection()->send_message(new MMonSubscribeAck(
4997 monmap
->get_fsid(), (int)g_conf
->mon_subscribe_interval
));
5002 void Monitor::handle_get_version(MonOpRequestRef op
)
5004 MMonGetVersion
*m
= static_cast<MMonGetVersion
*>(op
->get_req());
5005 dout(10) << "handle_get_version " << *m
<< dendl
;
5006 PaxosService
*svc
= NULL
;
5008 MonSession
*s
= op
->get_session();
5011 if (!is_leader() && !is_peon()) {
5012 dout(10) << " waiting for quorum" << dendl
;
5013 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
5017 if (m
->what
== "mdsmap") {
5019 } else if (m
->what
== "fsmap") {
5021 } else if (m
->what
== "osdmap") {
5023 } else if (m
->what
== "monmap") {
5026 derr
<< "invalid map type " << m
->what
<< dendl
;
5030 if (!svc
->is_readable()) {
5031 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
5035 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
5036 reply
->handle
= m
->handle
;
5037 reply
->version
= svc
->get_last_committed();
5038 reply
->oldest_version
= svc
->get_first_committed();
5039 reply
->set_tid(m
->get_tid());
5041 m
->get_connection()->send_message(reply
);
5047 bool Monitor::ms_handle_reset(Connection
*con
)
5049 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
5051 // ignore lossless monitor sessions
5052 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
5055 MonSession
*s
= static_cast<MonSession
*>(con
->get_priv());
5059 // break any con <-> session ref cycle
5060 s
->con
->set_priv(NULL
);
5065 Mutex::Locker
l(lock
);
5067 dout(10) << "reset/close on session " << s
->inst
<< dendl
;
5069 Mutex::Locker
l(session_map_lock
);
5076 bool Monitor::ms_handle_refused(Connection
*con
)
5078 // just log for now...
5079 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
5085 void Monitor::send_latest_monmap(Connection
*con
)
5088 monmap
->encode(bl
, con
->get_features());
5089 con
->send_message(new MMonMap(bl
));
5092 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
5094 MMonGetMap
*m
= static_cast<MMonGetMap
*>(op
->get_req());
5095 dout(10) << "handle_mon_get_map" << dendl
;
5096 send_latest_monmap(m
->get_connection().get());
5099 void Monitor::handle_mon_metadata(MonOpRequestRef op
)
5101 MMonMetadata
*m
= static_cast<MMonMetadata
*>(op
->get_req());
5103 dout(10) << __func__
<< dendl
;
5104 update_mon_metadata(m
->get_source().num(), std::move(m
->data
));
5108 void Monitor::update_mon_metadata(int from
, Metadata
&& m
)
5110 // NOTE: this is now for legacy (kraken or jewel) mons only.
5111 pending_metadata
[from
] = std::move(m
);
5113 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5115 ::encode(pending_metadata
, bl
);
5116 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5117 paxos
->trigger_propose();
5120 int Monitor::load_metadata()
5123 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5126 bufferlist::iterator it
= bl
.begin();
5127 ::decode(mon_metadata
, it
);
5129 pending_metadata
= mon_metadata
;
5133 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
5136 if (!mon_metadata
.count(mon
)) {
5137 err
<< "mon." << mon
<< " not found";
5140 const Metadata
& m
= mon_metadata
[mon
];
5141 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
5142 f
->dump_string(p
->first
.c_str(), p
->second
);
5147 void Monitor::count_metadata(const string
& field
, map
<string
,int> *out
)
5149 for (auto& p
: mon_metadata
) {
5150 auto q
= p
.second
.find(field
);
5151 if (q
== p
.second
.end()) {
5152 (*out
)["unknown"]++;
5154 (*out
)[q
->second
]++;
5159 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5161 map
<string
,int> by_val
;
5162 count_metadata(field
, &by_val
);
5163 f
->open_object_section(field
.c_str());
5164 for (auto& p
: by_val
) {
5165 f
->dump_int(p
.first
.c_str(), p
.second
);
5170 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5172 map
<string
, list
<int> > mons
; // hostname => mon
5173 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5174 it
!= mon_metadata
.end(); ++it
) {
5175 const Metadata
& m
= it
->second
;
5176 Metadata::const_iterator hostname
= m
.find("hostname");
5177 if (hostname
== m
.end()) {
5178 // not likely though
5181 mons
[hostname
->second
].push_back(it
->first
);
5184 dump_services(f
, mons
, "mon");
5188 // ----------------------------------------------
5191 int Monitor::scrub_start()
5193 dout(10) << __func__
<< dendl
;
5194 assert(is_leader());
5196 if (!scrub_result
.empty()) {
5197 clog
->info() << "scrub already in progress";
5201 scrub_event_cancel();
5202 scrub_result
.clear();
5203 scrub_state
.reset(new ScrubState
);
5209 int Monitor::scrub()
5211 assert(is_leader());
5212 assert(scrub_state
);
5214 scrub_cancel_timeout();
5215 wait_for_paxos_write();
5216 scrub_version
= paxos
->get_version();
5219 // scrub all keys if we're the only monitor in the quorum
5221 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5223 for (set
<int>::iterator p
= quorum
.begin();
5228 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5230 r
->key
= scrub_state
->last_key
;
5231 messenger
->send_message(r
, monmap
->get_inst(*p
));
5235 bool r
= _scrub(&scrub_result
[rank
],
5236 &scrub_state
->last_key
,
5239 scrub_state
->finished
= !r
;
5241 // only after we got our scrub results do we really care whether the
5242 // other monitors are late on their results. Also, this way we avoid
5243 // triggering the timeout if we end up getting stuck in _scrub() for
5244 // longer than the duration of the timeout.
5245 scrub_reset_timeout();
5247 if (quorum
.size() == 1) {
5248 assert(scrub_state
->finished
== true);
5254 void Monitor::handle_scrub(MonOpRequestRef op
)
5256 MMonScrub
*m
= static_cast<MMonScrub
*>(op
->get_req());
5257 dout(10) << __func__
<< " " << *m
<< dendl
;
5259 case MMonScrub::OP_SCRUB
:
5264 wait_for_paxos_write();
5266 if (m
->version
!= paxos
->get_version())
5269 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5273 reply
->key
= m
->key
;
5274 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5275 m
->get_connection()->send_message(reply
);
5279 case MMonScrub::OP_RESULT
:
5283 if (m
->version
!= scrub_version
)
5285 // reset the timeout each time we get a result
5286 scrub_reset_timeout();
5288 int from
= m
->get_source().num();
5289 assert(scrub_result
.count(from
) == 0);
5290 scrub_result
[from
] = m
->result
;
5292 if (scrub_result
.size() == quorum
.size()) {
5293 scrub_check_results();
5294 scrub_result
.clear();
5295 if (scrub_state
->finished
)
5305 bool Monitor::_scrub(ScrubResult
*r
,
5306 pair
<string
,string
> *start
,
5310 assert(start
!= NULL
);
5311 assert(num_keys
!= NULL
);
5313 set
<string
> prefixes
= get_sync_targets_names();
5314 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5316 dout(10) << __func__
<< " start (" << *start
<< ")"
5317 << " num_keys " << *num_keys
<< dendl
;
5319 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5321 int scrubbed_keys
= 0;
5322 pair
<string
,string
> last_key
;
5324 while (it
->has_next_chunk()) {
5326 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5329 pair
<string
,string
> k
= it
->get_next_key();
5330 if (prefixes
.count(k
.first
) == 0)
5333 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5334 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5335 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5341 int err
= store
->get(k
.first
, k
.second
, bl
);
5344 uint32_t key_crc
= bl
.crc32c(0);
5345 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5346 << " crc " << key_crc
<< dendl
;
5347 r
->prefix_keys
[k
.first
]++;
5348 if (r
->prefix_crc
.count(k
.first
) == 0) {
5349 r
->prefix_crc
[k
.first
] = 0;
5351 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5353 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5354 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5355 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5356 r
->prefix_crc
[k
.first
] += 1;
5363 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5364 << " scrubbed_keys " << scrubbed_keys
5365 << " has_next " << it
->has_next_chunk() << dendl
;
5368 *num_keys
= scrubbed_keys
;
5370 return it
->has_next_chunk();
5373 void Monitor::scrub_check_results()
5375 dout(10) << __func__
<< dendl
;
5379 ScrubResult
& mine
= scrub_result
[rank
];
5380 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5381 p
!= scrub_result
.end();
5383 if (p
->first
== rank
)
5385 if (p
->second
!= mine
) {
5387 clog
->error() << "scrub mismatch";
5388 clog
->error() << " mon." << rank
<< " " << mine
;
5389 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5393 clog
->debug() << "scrub ok on " << quorum
<< ": " << mine
;
5396 inline void Monitor::scrub_timeout()
5398 dout(1) << __func__
<< " restarting scrub" << dendl
;
5403 void Monitor::scrub_finish()
5405 dout(10) << __func__
<< dendl
;
5407 scrub_event_start();
5410 void Monitor::scrub_reset()
5412 dout(10) << __func__
<< dendl
;
5413 scrub_cancel_timeout();
5415 scrub_result
.clear();
5416 scrub_state
.reset();
5419 inline void Monitor::scrub_update_interval(int secs
)
5421 // we don't care about changes if we are not the leader.
5422 // changes will be visible if we become the leader.
5426 dout(1) << __func__
<< " new interval = " << secs
<< dendl
;
5428 // if scrub already in progress, all changes will already be visible during
5429 // the next round. Nothing to do.
5430 if (scrub_state
!= NULL
)
5433 scrub_event_cancel();
5434 scrub_event_start();
5437 void Monitor::scrub_event_start()
5439 dout(10) << __func__
<< dendl
;
5442 scrub_event_cancel();
5444 if (cct
->_conf
->mon_scrub_interval
<= 0) {
5445 dout(1) << __func__
<< " scrub event is disabled"
5446 << " (mon_scrub_interval = " << cct
->_conf
->mon_scrub_interval
5451 scrub_event
= timer
.add_event_after(
5452 cct
->_conf
->mon_scrub_interval
,
5453 new C_MonContext(this, [this](int) {
5458 void Monitor::scrub_event_cancel()
5460 dout(10) << __func__
<< dendl
;
5462 timer
.cancel_event(scrub_event
);
5467 inline void Monitor::scrub_cancel_timeout()
5469 if (scrub_timeout_event
) {
5470 timer
.cancel_event(scrub_timeout_event
);
5471 scrub_timeout_event
= NULL
;
5475 void Monitor::scrub_reset_timeout()
5477 dout(15) << __func__
<< " reset timeout event" << dendl
;
5478 scrub_cancel_timeout();
5479 scrub_timeout_event
= timer
.add_event_after(
5480 g_conf
->mon_scrub_timeout
,
5481 new C_MonContext(this, [this](int) {
5486 /************ TICK ***************/
5487 void Monitor::new_tick()
5489 timer
.add_event_after(g_conf
->mon_tick_interval
, new C_MonContext(this, [this](int) {
5494 void Monitor::tick()
5497 dout(11) << "tick" << dendl
;
5498 const utime_t now
= ceph_clock_now();
5500 // Check if we need to emit any delayed health check updated messages
5502 const auto min_period
= g_conf
->get_val
<int64_t>(
5503 "mon_health_log_update_period");
5504 for (auto& svc
: paxos_service
) {
5505 auto health
= svc
->get_health_checks();
5507 for (const auto &i
: health
.checks
) {
5508 const std::string
&code
= i
.first
;
5509 const std::string
&summary
= i
.second
.summary
;
5510 const health_status_t severity
= i
.second
.severity
;
5512 auto status_iter
= health_check_log_times
.find(code
);
5513 if (status_iter
== health_check_log_times
.end()) {
5517 auto &log_status
= status_iter
->second
;
5518 bool const changed
= log_status
.last_message
!= summary
5519 || log_status
.severity
!= severity
;
5521 if (changed
&& now
- log_status
.updated_at
> min_period
) {
5522 log_status
.last_message
= summary
;
5523 log_status
.updated_at
= now
;
5524 log_status
.severity
= severity
;
5527 ss
<< "Health check update: " << summary
<< " (" << code
<< ")";
5528 clog
->health(severity
) << ss
.str();
5535 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
) {
5542 Mutex::Locker
l(session_map_lock
);
5543 auto p
= session_map
.sessions
.begin();
5545 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5546 now
> (exited_quorum
+ 2*g_conf
->mon_lease
));
5552 // don't trim monitors
5553 if (s
->inst
.name
.is_mon())
5556 if (s
->session_timeout
< now
&& s
->con
) {
5557 // check keepalive, too
5558 s
->session_timeout
= s
->con
->get_last_keepalive();
5559 s
->session_timeout
+= g_conf
->mon_session_timeout
;
5561 if (s
->session_timeout
< now
) {
5562 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5563 << " (timeout " << s
->session_timeout
5564 << " < now " << now
<< ")" << dendl
;
5565 } else if (out_for_too_long
) {
5566 // boot the client Session because we've taken too long getting back in
5567 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5568 << " because we've been out of quorum too long" << dendl
;
5573 s
->con
->mark_down();
5575 logger
->inc(l_mon_session_trim
);
5578 sync_trim_providers();
5580 if (!maybe_wait_for_quorum
.empty()) {
5581 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5584 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5585 // this is only necessary on upgraded clusters.
5586 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5587 prepare_new_fingerprint(t
);
5588 paxos
->trigger_propose();
5594 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5597 nf
.generate_random();
5598 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5602 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5605 int Monitor::check_fsid()
5608 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5613 string
es(ebl
.c_str(), ebl
.length());
5615 // only keep the first line
5616 size_t pos
= es
.find_first_of('\n');
5617 if (pos
!= string::npos
)
5620 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5622 if (!ondisk
.parse(es
.c_str())) {
5623 derr
<< "error: unable to parse uuid" << dendl
;
5627 if (monmap
->get_fsid() != ondisk
) {
5628 derr
<< "error: cluster_uuid file exists with value " << ondisk
5629 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5636 int Monitor::write_fsid()
5638 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5640 int r
= store
->apply_transaction(t
);
5644 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5647 ss
<< monmap
->get_fsid() << "\n";
5648 string us
= ss
.str();
5653 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5658 * this is the closest thing to a traditional 'mkfs' for ceph.
5659 * initialize the monitor state machines to their initial values.
5661 int Monitor::mkfs(bufferlist
& osdmapbl
)
5663 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5665 // verify cluster fsid
5666 int r
= check_fsid();
5667 if (r
< 0 && r
!= -ENOENT
)
5671 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
5672 magicbl
.append("\n");
5673 t
->put(MONITOR_NAME
, "magic", magicbl
);
5676 features
= get_initial_supported_features();
5679 // save monmap, osdmap, keyring.
5680 bufferlist monmapbl
;
5681 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
5682 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5683 t
->put("mkfs", "monmap", monmapbl
);
5685 if (osdmapbl
.length()) {
5686 // make sure it's a valid osdmap
5689 om
.decode(osdmapbl
);
5691 catch (buffer::error
& e
) {
5692 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
5695 t
->put("mkfs", "osdmap", osdmapbl
);
5698 if (is_keyring_required()) {
5700 string keyring_filename
;
5702 r
= ceph_resolve_file_search(g_conf
->keyring
, keyring_filename
);
5704 derr
<< "unable to find a keyring file on " << g_conf
->keyring
5705 << ": " << cpp_strerror(r
) << dendl
;
5706 if (g_conf
->key
!= "") {
5707 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf
->key
+
5708 "\n\tcaps mon = \"allow *\"\n";
5710 bl
.append(keyring_plaintext
);
5712 bufferlist::iterator i
= bl
.begin();
5713 keyring
.decode_plaintext(i
);
5715 catch (const buffer::error
& e
) {
5716 derr
<< "error decoding keyring " << keyring_plaintext
5717 << ": " << e
.what() << dendl
;
5724 r
= keyring
.load(g_ceph_context
, keyring_filename
);
5726 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
5731 // put mon. key in external keyring; seed with everything else.
5732 extract_save_mon_key(keyring
);
5734 bufferlist keyringbl
;
5735 keyring
.encode_plaintext(keyringbl
);
5736 t
->put("mkfs", "keyring", keyringbl
);
5739 store
->apply_transaction(t
);
5744 int Monitor::write_default_keyring(bufferlist
& bl
)
5747 os
<< g_conf
->mon_data
<< "/keyring";
5750 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
, 0600);
5753 dout(0) << __func__
<< " failed to open " << os
.str()
5754 << ": " << cpp_strerror(err
) << dendl
;
5758 err
= bl
.write_fd(fd
);
5761 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5766 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
5768 EntityName mon_name
;
5769 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
5771 if (keyring
.get_auth(mon_name
, mon_key
)) {
5772 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
5774 pkey
.add(mon_name
, mon_key
);
5776 pkey
.encode_plaintext(bl
);
5777 write_default_keyring(bl
);
5778 keyring
.remove(mon_name
);
5782 bool Monitor::ms_get_authorizer(int service_id
, AuthAuthorizer
**authorizer
,
5785 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id
)
5791 // we only connect to other monitors and mgr; every else connects to us.
5792 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
5793 service_id
!= CEPH_ENTITY_TYPE_MGR
)
5796 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5798 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
5799 AuthNoneClientHandler
handler(g_ceph_context
, nullptr);
5800 handler
.set_global_id(0);
5801 *authorizer
= handler
.build_authorizer(service_id
);
5805 CephXServiceTicketInfo auth_ticket_info
;
5806 CephXSessionAuthInfo info
;
5810 name
.set_type(CEPH_ENTITY_TYPE_MON
);
5811 auth_ticket_info
.ticket
.name
= name
;
5812 auth_ticket_info
.ticket
.global_id
= 0;
5814 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
5815 // mon to mon authentication uses the private monitor shared key and not the
5818 if (!keyring
.get_secret(name
, secret
) &&
5819 !key_server
.get_secret(name
, secret
)) {
5820 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5822 stringstream ss
, ds
;
5823 int err
= key_server
.list_secrets(ds
);
5825 ss
<< "no installed auth entries!";
5827 ss
<< "installed auth entries:";
5828 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
5832 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
,
5833 secret
, (uint64_t)-1);
5835 dout(0) << __func__
<< " failed to build mon session_auth_info "
5836 << cpp_strerror(ret
) << dendl
;
5839 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
5841 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
);
5843 derr
<< __func__
<< " failed to build mgr service session_auth_info "
5844 << cpp_strerror(ret
) << dendl
;
5848 ceph_abort(); // see check at top of fn
5851 CephXTicketBlob blob
;
5852 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
5853 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl
;
5856 bufferlist ticket_data
;
5857 ::encode(blob
, ticket_data
);
5859 bufferlist::iterator iter
= ticket_data
.begin();
5860 CephXTicketHandler
handler(g_ceph_context
, service_id
);
5861 ::decode(handler
.ticket
, iter
);
5863 handler
.session_key
= info
.session_key
;
5865 *authorizer
= handler
.build_authorizer(0);
5870 bool Monitor::ms_verify_authorizer(Connection
*con
, int peer_type
,
5871 int protocol
, bufferlist
& authorizer_data
,
5872 bufferlist
& authorizer_reply
,
5873 bool& isvalid
, CryptoKey
& session_key
)
5875 dout(10) << "ms_verify_authorizer " << con
->get_peer_addr()
5876 << " " << ceph_entity_type_name(peer_type
)
5877 << " protocol " << protocol
<< dendl
;
5882 if (peer_type
== CEPH_ENTITY_TYPE_MON
&&
5883 auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5884 // monitor, and cephx is enabled
5886 if (protocol
== CEPH_AUTH_CEPHX
) {
5887 bufferlist::iterator iter
= authorizer_data
.begin();
5888 CephXServiceTicketInfo auth_ticket_info
;
5890 if (authorizer_data
.length()) {
5891 bool ret
= cephx_verify_authorizer(g_ceph_context
, &keyring
, iter
,
5892 auth_ticket_info
, authorizer_reply
);
5894 session_key
= auth_ticket_info
.session_key
;
5897 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con
->get_peer_addr() << dendl
;
5901 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl
;