1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
25 #include "common/version.h"
27 #include "osd/OSDMap.h"
29 #include "MonitorDBStore.h"
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
52 #include "messages/MAuthReply.h"
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/OldHealthMonitor.h"
81 #include "mon/HealthMonitor.h"
82 #include "mon/ConfigKeyService.h"
83 #include "common/config.h"
84 #include "common/cmdparse.h"
85 #include "include/assert.h"
86 #include "include/compat.h"
87 #include "perfglue/heap_profiler.h"
89 #include "auth/none/AuthNoneClientHandler.h"
91 #define dout_subsys ceph_subsys_mon
93 #define dout_prefix _prefix(_dout, this)
94 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
95 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
96 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
99 const string
Monitor::MONITOR_NAME
= "monitor";
100 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
105 #undef COMMAND_WITH_FLAG
106 #define FLAG(f) (MonCommand::FLAG_##f)
107 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
108 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
109 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
110 {parsesig, helptext, modulename, req_perms, avail, flags},
111 MonCommand mon_commands
[] = {
112 #include <mon/MonCommands.h>
114 MonCommand pgmonitor_commands
[] = {
115 #include <mon/PGMonitorCommands.h>
118 #undef COMMAND_WITH_FLAG
121 void C_MonContext::finish(int r
) {
122 if (mon
->is_shutdown())
124 FunctionContext::finish(r
);
127 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
128 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
133 con_self(m
? m
->get_loopback_connection() : NULL
),
134 lock("Monitor::lock"),
136 finisher(cct_
, "mon_finisher", "fin"),
137 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf
->mon_cpu_threads
),
138 has_ever_joined(false),
139 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
141 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
142 key_server(cct
, &keyring
),
143 auth_cluster_required(cct
,
144 cct
->_conf
->auth_supported
.empty() ?
145 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
146 auth_service_required(cct
,
147 cct
->_conf
->auth_supported
.empty() ?
148 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
149 mgr_messenger(mgr_m
),
150 mgr_client(cct_
, mgr_m
),
154 state(STATE_PROBING
),
157 required_features(0),
159 quorum_con_features(0),
163 scrub_timeout_event(NULL
),
166 sync_provider_count(0),
169 sync_start_version(0),
170 sync_timeout_event(NULL
),
171 sync_last_committed_floor(0),
175 timecheck_rounds_since_clean(0),
176 timecheck_event(NULL
),
178 paxos_service(PAXOS_NUM
),
180 routed_request_tid(0),
181 op_tracker(cct
, true, 1)
183 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
184 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
186 update_log_clients();
188 paxos
= new Paxos(this, "paxos");
190 paxos_service
[PAXOS_MDSMAP
] = new MDSMonitor(this, paxos
, "mdsmap");
191 paxos_service
[PAXOS_MONMAP
] = new MonmapMonitor(this, paxos
, "monmap");
192 paxos_service
[PAXOS_OSDMAP
] = new OSDMonitor(cct
, this, paxos
, "osdmap");
193 paxos_service
[PAXOS_PGMAP
] = new PGMonitor(this, paxos
, "pgmap");
194 paxos_service
[PAXOS_LOG
] = new LogMonitor(this, paxos
, "logm");
195 paxos_service
[PAXOS_AUTH
] = new AuthMonitor(this, paxos
, "auth");
196 paxos_service
[PAXOS_MGR
] = new MgrMonitor(this, paxos
, "mgr");
197 paxos_service
[PAXOS_MGRSTAT
] = new MgrStatMonitor(this, paxos
, "mgrstat");
198 paxos_service
[PAXOS_HEALTH
] = new HealthMonitor(this, paxos
, "health");
200 health_monitor
= new OldHealthMonitor(this);
201 config_key_service
= new ConfigKeyService(this, paxos
);
203 mon_caps
= new MonCap();
204 bool r
= mon_caps
->parse("allow *", NULL
);
207 exited_quorum
= ceph_clock_now();
209 // prepare local commands
210 local_mon_commands
.resize(ARRAY_SIZE(mon_commands
));
211 for (unsigned i
= 0; i
< ARRAY_SIZE(mon_commands
); ++i
) {
212 local_mon_commands
[i
] = mon_commands
[i
];
214 MonCommand::encode_vector(local_mon_commands
, local_mon_commands_bl
);
216 local_upgrading_mon_commands
= local_mon_commands
;
217 for (unsigned i
= 0; i
< ARRAY_SIZE(pgmonitor_commands
); ++i
) {
218 local_upgrading_mon_commands
.push_back(pgmonitor_commands
[i
]);
220 MonCommand::encode_vector(local_upgrading_mon_commands
,
221 local_upgrading_mon_commands_bl
);
223 // assume our commands until we have an election. this only means
224 // we won't reply with EINVAL before the election; any command that
225 // actually matters will wait until we have quorum etc and then
226 // retry (and revalidate).
227 leader_mon_commands
= local_mon_commands
;
229 // note: OSDMonitor may update this based on the luminous flag.
230 pgservice
= mgrstatmon()->get_pg_stat_service();
235 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
237 delete health_monitor
;
238 delete config_key_service
;
240 assert(session_map
.sessions
.empty());
245 class AdminHook
: public AdminSocketHook
{
248 explicit AdminHook(Monitor
*m
) : mon(m
) {}
249 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
250 bufferlist
& out
) override
{
252 mon
->do_admin_command(command
, cmdmap
, format
, ss
);
258 void Monitor::do_admin_command(string command
, cmdmap_t
& cmdmap
, string format
,
261 Mutex::Locker
l(lock
);
263 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
266 for (cmdmap_t::iterator p
= cmdmap
.begin();
267 p
!= cmdmap
.end(); ++p
) {
268 if (p
->first
== "prefix")
272 args
+= cmd_vartype_stringify(p
->second
);
274 args
= "[" + args
+ "]";
276 bool read_only
= (command
== "mon_status" ||
277 command
== "mon metadata" ||
278 command
== "quorum_status" ||
280 command
== "sessions");
282 (read_only
? audit_clog
->debug() : audit_clog
->info())
283 << "from='admin socket' entity='admin socket' "
284 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
286 if (command
== "mon_status") {
287 get_mon_status(f
.get(), ss
);
290 } else if (command
== "quorum_status") {
291 _quorum_status(f
.get(), ss
);
292 } else if (command
== "sync_force") {
294 if ((!cmd_getval(g_ceph_context
, cmdmap
, "validate", validate
)) ||
295 (validate
!= "--yes-i-really-mean-it")) {
296 ss
<< "are you SURE? this will mean the monitor store will be erased "
297 "the next time the monitor is restarted. pass "
298 "'--yes-i-really-mean-it' if you really do.";
301 sync_force(f
.get(), ss
);
302 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
303 if (!_add_bootstrap_peer_hint(command
, cmdmap
, ss
))
305 } else if (command
== "quorum enter") {
306 elector
.start_participating();
308 ss
<< "started responding to quorum, initiated new election";
309 } else if (command
== "quorum exit") {
311 elector
.stop_participating();
312 ss
<< "stopped responding to quorum, initiated new election";
313 } else if (command
== "ops") {
314 (void)op_tracker
.dump_ops_in_flight(f
.get());
318 } else if (command
== "sessions") {
321 f
->open_array_section("sessions");
322 for (auto p
: session_map
.sessions
) {
323 f
->dump_stream("session") << *p
;
330 assert(0 == "bad AdminSocket command binding");
332 (read_only
? audit_clog
->debug() : audit_clog
->info())
333 << "from='admin socket' "
334 << "entity='admin socket' "
335 << "cmd=" << command
<< " "
336 << "args=" << args
<< ": finished";
340 (read_only
? audit_clog
->debug() : audit_clog
->info())
341 << "from='admin socket' "
342 << "entity='admin socket' "
343 << "cmd=" << command
<< " "
344 << "args=" << args
<< ": aborted";
347 void Monitor::handle_signal(int signum
)
349 assert(signum
== SIGINT
|| signum
== SIGTERM
);
350 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
354 CompatSet
Monitor::get_initial_supported_features()
356 CompatSet::FeatureSet ceph_mon_feature_compat
;
357 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
358 CompatSet::FeatureSet ceph_mon_feature_incompat
;
359 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
360 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
361 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
362 ceph_mon_feature_incompat
);
365 CompatSet
Monitor::get_supported_features()
367 CompatSet compat
= get_initial_supported_features();
368 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
369 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
370 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
371 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
372 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
376 CompatSet
Monitor::get_legacy_features()
378 CompatSet::FeatureSet ceph_mon_feature_compat
;
379 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
380 CompatSet::FeatureSet ceph_mon_feature_incompat
;
381 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
382 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
383 ceph_mon_feature_incompat
);
386 int Monitor::check_features(MonitorDBStore
*store
)
388 CompatSet required
= get_supported_features();
391 read_features_off_disk(store
, &ondisk
);
393 if (!required
.writeable(ondisk
)) {
394 CompatSet diff
= required
.unsupported(ondisk
);
395 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
402 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
404 bufferlist featuresbl
;
405 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
406 if (featuresbl
.length() == 0) {
407 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
408 << "Assuming it is old-style and introducing one." << dendl
;
409 //we only want the baseline ~v.18 features assumed to be on disk.
410 //If new features are introduced this code needs to disappear or
412 *features
= get_legacy_features();
414 features
->encode(featuresbl
);
415 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
416 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
417 store
->apply_transaction(t
);
419 bufferlist::iterator it
= featuresbl
.begin();
420 features
->decode(it
);
424 void Monitor::read_features()
426 read_features_off_disk(store
, &features
);
427 dout(10) << "features " << features
<< dendl
;
429 calc_quorum_requirements();
430 dout(10) << "required_features " << required_features
<< dendl
;
433 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
437 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
440 const char** Monitor::get_tracked_conf_keys() const
442 static const char* KEYS
[] = {
443 "crushtool", // helpful for testing
444 "mon_election_timeout",
446 "mon_lease_renew_interval_factor",
447 "mon_lease_ack_timeout_factor",
448 "mon_accept_timeout_factor",
452 "clog_to_syslog_facility",
453 "clog_to_syslog_level",
455 "clog_to_graylog_host",
456 "clog_to_graylog_port",
459 // periodic health to clog
460 "mon_health_to_clog",
461 "mon_health_to_clog_interval",
462 "mon_health_to_clog_tick_interval",
464 "mon_scrub_interval",
470 void Monitor::handle_conf_change(const struct md_config_t
*conf
,
471 const std::set
<std::string
> &changed
)
475 dout(10) << __func__
<< " " << changed
<< dendl
;
477 if (changed
.count("clog_to_monitors") ||
478 changed
.count("clog_to_syslog") ||
479 changed
.count("clog_to_syslog_level") ||
480 changed
.count("clog_to_syslog_facility") ||
481 changed
.count("clog_to_graylog") ||
482 changed
.count("clog_to_graylog_host") ||
483 changed
.count("clog_to_graylog_port") ||
484 changed
.count("host") ||
485 changed
.count("fsid")) {
486 update_log_clients();
489 if (changed
.count("mon_health_to_clog") ||
490 changed
.count("mon_health_to_clog_interval") ||
491 changed
.count("mon_health_to_clog_tick_interval")) {
492 health_to_clog_update_conf(changed
);
495 if (changed
.count("mon_scrub_interval")) {
496 scrub_update_interval(conf
->mon_scrub_interval
);
500 void Monitor::update_log_clients()
502 map
<string
,string
> log_to_monitors
;
503 map
<string
,string
> log_to_syslog
;
504 map
<string
,string
> log_channel
;
505 map
<string
,string
> log_prio
;
506 map
<string
,string
> log_to_graylog
;
507 map
<string
,string
> log_to_graylog_host
;
508 map
<string
,string
> log_to_graylog_port
;
512 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
513 log_channel
, log_prio
, log_to_graylog
,
514 log_to_graylog_host
, log_to_graylog_port
,
518 clog
->update_config(log_to_monitors
, log_to_syslog
,
519 log_channel
, log_prio
, log_to_graylog
,
520 log_to_graylog_host
, log_to_graylog_port
,
523 audit_clog
->update_config(log_to_monitors
, log_to_syslog
,
524 log_channel
, log_prio
, log_to_graylog
,
525 log_to_graylog_host
, log_to_graylog_port
,
529 int Monitor::sanitize_options()
533 // mon_lease must be greater than mon_lease_renewal; otherwise we
534 // may incur in leases expiring before they are renewed.
535 if (g_conf
->mon_lease_renew_interval_factor
>= 1.0) {
536 clog
->error() << "mon_lease_renew_interval_factor ("
537 << g_conf
->mon_lease_renew_interval_factor
538 << ") must be less than 1.0";
542 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
543 // got time to renew the lease and get an ack for it. Having both options
544 // with the same value, for a given small vale, could mean timing out if
545 // the monitors happened to be overloaded -- or even under normal load for
546 // a small enough value.
547 if (g_conf
->mon_lease_ack_timeout_factor
<= 1.0) {
548 clog
->error() << "mon_lease_ack_timeout_factor ("
549 << g_conf
->mon_lease_ack_timeout_factor
550 << ") must be greater than 1.0";
557 int Monitor::preinit()
561 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
563 int r
= sanitize_options();
565 derr
<< "option sanitization failed!" << dendl
;
572 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
573 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess");
574 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions", "sadd");
575 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions", "srm");
576 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions");
577 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in");
578 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started");
579 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won");
580 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost");
581 logger
= pcb
.create_perf_counters();
582 cct
->get_perfcounters_collection()->add(logger
);
585 assert(!cluster_logger
);
587 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
588 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
589 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
590 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
591 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
592 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
593 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
594 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster");
595 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space");
596 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space");
597 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
598 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
599 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
600 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
601 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
602 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
603 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
604 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
605 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
606 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects");
607 pcb
.add_u64(l_cluster_num_mds_up
, "num_mds_up", "MDSs that are up");
608 pcb
.add_u64(l_cluster_num_mds_in
, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
609 pcb
.add_u64(l_cluster_num_mds_failed
, "num_mds_failed", "Failed MDS");
610 pcb
.add_u64(l_cluster_mds_epoch
, "mds_epoch", "Current epoch of MDS map");
611 cluster_logger
= pcb
.create_perf_counters();
614 paxos
->init_logger();
616 // verify cluster_uuid
618 int r
= check_fsid();
630 // have we ever joined a quorum?
631 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
632 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
634 if (!has_ever_joined
) {
635 // impose initial quorum restrictions?
636 list
<string
> initial_members
;
637 get_str_list(g_conf
->mon_initial_members
, initial_members
);
639 if (!initial_members
.empty()) {
640 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
642 monmap
->set_initial_members(g_ceph_context
, initial_members
, name
, messenger
->get_myaddr(),
645 dout(10) << " monmap is " << *monmap
<< dendl
;
646 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
648 } else if (!monmap
->contains(name
)) {
649 derr
<< "not in monmap and have been in a quorum before; "
650 << "must have been removed" << dendl
;
651 if (g_conf
->mon_force_quorum_join
) {
652 dout(0) << "we should have died but "
653 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
655 derr
<< "commit suicide!" << dendl
;
662 // We have a potentially inconsistent store state in hands. Get rid of it
664 bool clear_store
= false;
665 if (store
->exists("mon_sync", "in_sync")) {
666 dout(1) << __func__
<< " clean up potentially inconsistent store state"
671 if (store
->get("mon_sync", "force_sync") > 0) {
672 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
677 set
<string
> sync_prefixes
= get_sync_targets_names();
678 store
->clear(sync_prefixes
);
682 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
683 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
686 health_monitor
->init();
688 if (is_keyring_required()) {
689 // we need to bootstrap authentication keys so we can form an
691 if (authmon()->get_last_committed() == 0) {
692 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
694 int err
= store
->get("mkfs", "keyring", bl
);
695 if (err
== 0 && bl
.length() > 0) {
696 // Attempt to decode and extract keyring only if it is found.
698 bufferlist::iterator p
= bl
.begin();
699 ::decode(keyring
, p
);
700 extract_save_mon_key(keyring
);
704 string keyring_loc
= g_conf
->mon_data
+ "/keyring";
706 r
= keyring
.load(cct
, keyring_loc
);
709 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
711 if (key_server
.get_auth(mon_name
, mon_key
)) {
712 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
713 keyring
.add(mon_name
, mon_key
);
715 keyring
.encode_plaintext(bl
);
716 write_default_keyring(bl
);
718 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
725 admin_hook
= new AdminHook(this);
726 AdminSocket
* admin_socket
= cct
->get_admin_socket();
728 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
730 r
= admin_socket
->register_command("mon_status", "mon_status", admin_hook
,
731 "show current monitor status");
733 r
= admin_socket
->register_command("quorum_status", "quorum_status",
734 admin_hook
, "show current quorum status");
736 r
= admin_socket
->register_command("sync_force",
737 "sync_force name=validate,"
739 "strings=--yes-i-really-mean-it",
741 "force sync of and clear monitor store");
743 r
= admin_socket
->register_command("add_bootstrap_peer_hint",
744 "add_bootstrap_peer_hint name=addr,"
747 "add peer address as potential bootstrap"
748 " peer for cluster bringup");
750 r
= admin_socket
->register_command("quorum enter", "quorum enter",
752 "force monitor back into quorum");
754 r
= admin_socket
->register_command("quorum exit", "quorum exit",
756 "force monitor out of the quorum");
758 r
= admin_socket
->register_command("ops",
761 "show the ops currently in flight");
763 r
= admin_socket
->register_command("sessions",
766 "list existing sessions");
771 // add ourselves as a conf observer
772 g_conf
->add_observer(this);
780 dout(2) << "init" << dendl
;
781 Mutex::Locker
l(lock
);
792 messenger
->add_dispatcher_tail(this);
795 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
796 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
802 void Monitor::init_paxos()
804 dout(10) << __func__
<< dendl
;
808 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
809 paxos_service
[i
]->init();
812 refresh_from_paxos(NULL
);
815 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
817 dout(10) << __func__
<< dendl
;
820 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
823 bufferlist::iterator p
= bl
.begin();
824 ::decode(fingerprint
, p
);
826 catch (buffer::error
& e
) {
827 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
830 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
833 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
834 paxos_service
[i
]->refresh(need_bootstrap
);
836 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
837 paxos_service
[i
]->post_refresh();
842 void Monitor::register_cluster_logger()
844 if (!cluster_logger_registered
) {
845 dout(10) << "register_cluster_logger" << dendl
;
846 cluster_logger_registered
= true;
847 cct
->get_perfcounters_collection()->add(cluster_logger
);
849 dout(10) << "register_cluster_logger - already registered" << dendl
;
853 void Monitor::unregister_cluster_logger()
855 if (cluster_logger_registered
) {
856 dout(10) << "unregister_cluster_logger" << dendl
;
857 cluster_logger_registered
= false;
858 cct
->get_perfcounters_collection()->remove(cluster_logger
);
860 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
864 void Monitor::update_logger()
866 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
867 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
870 void Monitor::shutdown()
872 dout(1) << "shutdown" << dendl
;
876 wait_for_paxos_write();
878 state
= STATE_SHUTDOWN
;
880 g_conf
->remove_observer(this);
883 AdminSocket
* admin_socket
= cct
->get_admin_socket();
884 admin_socket
->unregister_command("mon_status");
885 admin_socket
->unregister_command("quorum_status");
886 admin_socket
->unregister_command("sync_force");
887 admin_socket
->unregister_command("add_bootstrap_peer_hint");
888 admin_socket
->unregister_command("quorum enter");
889 admin_socket
->unregister_command("quorum exit");
890 admin_socket
->unregister_command("ops");
891 admin_socket
->unregister_command("sessions");
898 mgr_client
.shutdown();
901 finisher
.wait_for_empty();
907 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
909 health_monitor
->shutdown();
911 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
912 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
918 remove_all_sessions();
921 cct
->get_perfcounters_collection()->remove(logger
);
925 if (cluster_logger
) {
926 if (cluster_logger_registered
)
927 cct
->get_perfcounters_collection()->remove(cluster_logger
);
928 delete cluster_logger
;
929 cluster_logger
= NULL
;
932 log_client
.shutdown();
934 // unlock before msgr shutdown...
937 messenger
->shutdown(); // last thing! ceph_mon.cc will delete mon.
938 mgr_messenger
->shutdown();
941 void Monitor::wait_for_paxos_write()
943 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
944 dout(10) << __func__
<< " flushing pending write" << dendl
;
948 dout(10) << __func__
<< " flushed pending write" << dendl
;
952 void Monitor::bootstrap()
954 dout(10) << "bootstrap" << dendl
;
955 wait_for_paxos_write();
957 sync_reset_requester();
958 unregister_cluster_logger();
959 cancel_probe_timeout();
962 int newrank
= monmap
->get_rank(messenger
->get_myaddr());
963 if (newrank
< 0 && rank
>= 0) {
964 // was i ever part of the quorum?
965 if (has_ever_joined
) {
966 dout(0) << " removed from monmap, suicide." << dendl
;
970 if (newrank
!= rank
) {
971 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
972 messenger
->set_myname(entity_name_t::MON(newrank
));
975 // reset all connections, or else our peers will think we are someone else.
976 messenger
->mark_down_all();
980 state
= STATE_PROBING
;
985 if (g_conf
->mon_compact_on_bootstrap
) {
986 dout(10) << "bootstrap -- triggering compaction" << dendl
;
988 dout(10) << "bootstrap -- finished compaction" << dendl
;
991 // singleton monitor?
992 if (monmap
->size() == 1 && rank
== 0) {
993 win_standalone_election();
997 reset_probe_timeout();
999 // i'm outside the quorum
1000 if (monmap
->contains(name
))
1001 outside_quorum
.insert(name
);
1004 dout(10) << "probing other monitors" << dendl
;
1005 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1007 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
),
1008 monmap
->get_inst(i
));
1010 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
1011 p
!= extra_probe_peers
.end();
1013 if (*p
!= messenger
->get_myaddr()) {
1015 i
.name
= entity_name_t::MON(-1);
1017 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
), i
);
1022 bool Monitor::_add_bootstrap_peer_hint(string cmd
, cmdmap_t
& cmdmap
, ostream
& ss
)
1025 if (!cmd_getval(g_ceph_context
, cmdmap
, "addr", addrstr
)) {
1026 ss
<< "unable to parse address string value '"
1027 << cmd_vartype_stringify(cmdmap
["addr"]) << "'";
1030 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' '"
1031 << addrstr
<< "'" << dendl
;
1034 const char *end
= 0;
1035 if (!addr
.parse(addrstr
.c_str(), &end
)) {
1036 ss
<< "failed to parse addr '" << addrstr
<< "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1040 if (is_leader() || is_peon()) {
1041 ss
<< "mon already active; ignoring bootstrap hint";
1045 if (addr
.get_port() == 0)
1046 addr
.set_port(CEPH_MON_PORT
);
1048 extra_probe_peers
.insert(addr
);
1049 ss
<< "adding peer " << addr
<< " to list: " << extra_probe_peers
;
1053 // called by bootstrap(), or on leader|peon -> electing
1054 void Monitor::_reset()
1056 dout(10) << __func__
<< dendl
;
1058 cancel_probe_timeout();
1060 health_events_cleanup();
1061 scrub_event_cancel();
1063 leader_since
= utime_t();
1064 if (!quorum
.empty()) {
1065 exited_quorum
= ceph_clock_now();
1068 outside_quorum
.clear();
1069 quorum_feature_map
.clear();
1075 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
1077 health_monitor
->finish();
1081 // -----------------------------------------------------------
1084 set
<string
> Monitor::get_sync_targets_names()
1086 set
<string
> targets
;
1087 targets
.insert(paxos
->get_name());
1088 for (int i
= 0; i
< PAXOS_NUM
; ++i
)
1089 paxos_service
[i
]->get_store_prefixes(targets
);
1090 ConfigKeyService
*config_key_service_ptr
= dynamic_cast<ConfigKeyService
*>(config_key_service
);
1091 assert(config_key_service_ptr
);
1092 config_key_service_ptr
->get_store_prefixes(targets
);
1097 void Monitor::sync_timeout()
1099 dout(10) << __func__
<< dendl
;
1100 assert(state
== STATE_SYNCHRONIZING
);
1104 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1106 dout(1) << __func__
<< dendl
;
1108 MonMap latest_monmap
;
1110 // Grab latest monmap from MonmapMonitor
1111 bufferlist monmon_bl
;
1112 int err
= monmon()->get_monmap(monmon_bl
);
1114 if (err
!= -ENOENT
) {
1116 << " something wrong happened while reading the store: "
1117 << cpp_strerror(err
) << dendl
;
1118 assert(0 == "error reading the store");
1121 latest_monmap
.decode(monmon_bl
);
1124 // Grab last backed up monmap (if any) and compare epochs
1125 if (store
->exists("mon_sync", "latest_monmap")) {
1126 bufferlist backup_bl
;
1127 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1130 << " something wrong happened while reading the store: "
1131 << cpp_strerror(err
) << dendl
;
1132 assert(0 == "error reading the store");
1134 assert(backup_bl
.length() > 0);
1136 MonMap backup_monmap
;
1137 backup_monmap
.decode(backup_bl
);
1139 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1140 latest_monmap
= backup_monmap
;
1143 // Check if our current monmap's epoch is greater than the one we've
1145 if (monmap
->epoch
> latest_monmap
.epoch
)
1146 latest_monmap
= *monmap
;
1148 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1150 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1153 void Monitor::sync_reset_requester()
1155 dout(10) << __func__
<< dendl
;
1157 if (sync_timeout_event
) {
1158 timer
.cancel_event(sync_timeout_event
);
1159 sync_timeout_event
= NULL
;
1162 sync_provider
= entity_inst_t();
1165 sync_start_version
= 0;
1168 void Monitor::sync_reset_provider()
1170 dout(10) << __func__
<< dendl
;
1171 sync_providers
.clear();
1174 void Monitor::sync_start(entity_inst_t
&other
, bool full
)
1176 dout(10) << __func__
<< " " << other
<< (full
? " full" : " recent") << dendl
;
1178 assert(state
== STATE_PROBING
||
1179 state
== STATE_SYNCHRONIZING
);
1180 state
= STATE_SYNCHRONIZING
;
1182 // make sure are not a provider for anyone!
1183 sync_reset_provider();
1188 // stash key state, and mark that we are syncing
1189 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1190 sync_stash_critical_state(t
);
1191 t
->put("mon_sync", "in_sync", 1);
1193 sync_last_committed_floor
= MAX(sync_last_committed_floor
, paxos
->get_version());
1194 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1195 << sync_last_committed_floor
<< dendl
;
1196 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1198 store
->apply_transaction(t
);
1200 assert(g_conf
->mon_sync_requester_kill_at
!= 1);
1202 // clear the underlying store
1203 set
<string
> targets
= get_sync_targets_names();
1204 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1205 store
->clear(targets
);
1207 // make sure paxos knows it has been reset. this prevents a
1208 // bootstrap and then different probe reply order from possibly
1209 // deciding a partial or no sync is needed.
1212 assert(g_conf
->mon_sync_requester_kill_at
!= 2);
1215 // assume 'other' as the leader. We will update the leader once we receive
1216 // a reply to the sync start.
1217 sync_provider
= other
;
1219 sync_reset_timeout();
1221 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1223 m
->last_committed
= paxos
->get_version();
1224 messenger
->send_message(m
, sync_provider
);
1227 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1229 dout(10) << __func__
<< dendl
;
1230 bufferlist backup_monmap
;
1231 sync_obtain_latest_monmap(backup_monmap
);
1232 assert(backup_monmap
.length() > 0);
1233 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1236 void Monitor::sync_reset_timeout()
1238 dout(10) << __func__
<< dendl
;
1239 if (sync_timeout_event
)
1240 timer
.cancel_event(sync_timeout_event
);
1241 sync_timeout_event
= new C_MonContext(this, [this](int) {
1244 timer
.add_event_after(g_conf
->mon_sync_timeout
, sync_timeout_event
);
1247 void Monitor::sync_finish(version_t last_committed
)
1249 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1251 assert(g_conf
->mon_sync_requester_kill_at
!= 7);
1254 // finalize the paxos commits
1255 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1256 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1258 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1260 dout(30) << __func__
<< " final tx dump:\n";
1261 JSONFormatter
f(true);
1266 store
->apply_transaction(tx
);
1269 assert(g_conf
->mon_sync_requester_kill_at
!= 8);
1271 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1272 t
->erase("mon_sync", "in_sync");
1273 t
->erase("mon_sync", "force_sync");
1274 t
->erase("mon_sync", "last_committed_floor");
1275 store
->apply_transaction(t
);
1277 assert(g_conf
->mon_sync_requester_kill_at
!= 9);
1281 assert(g_conf
->mon_sync_requester_kill_at
!= 10);
1286 void Monitor::handle_sync(MonOpRequestRef op
)
1288 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1289 dout(10) << __func__
<< " " << *m
<< dendl
;
1292 // provider ---------
1294 case MMonSync::OP_GET_COOKIE_FULL
:
1295 case MMonSync::OP_GET_COOKIE_RECENT
:
1296 handle_sync_get_cookie(op
);
1298 case MMonSync::OP_GET_CHUNK
:
1299 handle_sync_get_chunk(op
);
1302 // client -----------
1304 case MMonSync::OP_COOKIE
:
1305 handle_sync_cookie(op
);
1308 case MMonSync::OP_CHUNK
:
1309 case MMonSync::OP_LAST_CHUNK
:
1310 handle_sync_chunk(op
);
1312 case MMonSync::OP_NO_COOKIE
:
1313 handle_sync_no_cookie(op
);
1317 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1318 assert(0 == "unknown op");
1324 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1326 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1327 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1328 m
->get_connection()->send_message(reply
);
1331 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1333 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1334 if (is_synchronizing()) {
1335 _sync_reply_no_cookie(op
);
1339 assert(g_conf
->mon_sync_provider_kill_at
!= 1);
1341 // make sure they can understand us.
1342 if ((required_features
^ m
->get_connection()->get_features()) &
1343 required_features
) {
1344 dout(5) << " ignoring peer mon." << m
->get_source().num()
1345 << " has features " << std::hex
1346 << m
->get_connection()->get_features()
1347 << " but we require " << required_features
<< std::dec
<< dendl
;
1351 // make up a unique cookie. include election epoch (which persists
1352 // across restarts for the whole cluster) and a counter for this
1353 // process instance. there is no need to be unique *across*
1354 // monitors, though.
1355 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1356 assert(sync_providers
.count(cookie
) == 0);
1358 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1360 SyncProvider
& sp
= sync_providers
[cookie
];
1362 sp
.entity
= m
->get_source_inst();
1363 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1365 set
<string
> sync_targets
;
1366 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1368 sync_targets
= get_sync_targets_names();
1369 sp
.last_committed
= paxos
->get_version();
1370 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1372 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1374 // just catch up paxos
1375 sp
.last_committed
= m
->last_committed
;
1377 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1379 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1380 reply
->last_committed
= sp
.last_committed
;
1381 m
->get_connection()->send_message(reply
);
1384 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1386 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1387 dout(10) << __func__
<< " " << *m
<< dendl
;
1389 if (sync_providers
.count(m
->cookie
) == 0) {
1390 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1391 _sync_reply_no_cookie(op
);
1395 assert(g_conf
->mon_sync_provider_kill_at
!= 2);
1397 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1398 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1400 if (sp
.last_committed
< paxos
->get_first_committed() &&
1401 paxos
->get_first_committed() > 1) {
1402 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1403 << " < our fc " << paxos
->get_first_committed() << dendl
;
1404 sync_providers
.erase(m
->cookie
);
1405 _sync_reply_no_cookie(op
);
1409 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1410 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1412 int left
= g_conf
->mon_sync_max_payload_size
;
1413 while (sp
.last_committed
< paxos
->get_version() && left
> 0) {
1415 sp
.last_committed
++;
1417 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1420 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1421 left
-= bl
.length();
1422 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1425 reply
->last_committed
= sp
.last_committed
;
1427 if (sp
.full
&& left
> 0) {
1428 sp
.synchronizer
->get_chunk_tx(tx
, left
);
1429 sp
.last_key
= sp
.synchronizer
->get_last_key();
1430 reply
->last_key
= sp
.last_key
;
1433 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1434 sp
.last_committed
< paxos
->get_version()) {
1435 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1436 << " key " << sp
.last_key
<< dendl
;
1438 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1439 << " key " << sp
.last_key
<< dendl
;
1440 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1442 assert(g_conf
->mon_sync_provider_kill_at
!= 3);
1444 // clean up our local state
1445 sync_providers
.erase(sp
.cookie
);
1448 ::encode(*tx
, reply
->chunk_bl
);
1450 m
->get_connection()->send_message(reply
);
1455 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1457 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1458 dout(10) << __func__
<< " " << *m
<< dendl
;
1460 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1463 if (m
->get_source_inst() != sync_provider
) {
1464 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1467 sync_cookie
= m
->cookie
;
1468 sync_start_version
= m
->last_committed
;
1470 sync_reset_timeout();
1471 sync_get_next_chunk();
1473 assert(g_conf
->mon_sync_requester_kill_at
!= 3);
1476 void Monitor::sync_get_next_chunk()
1478 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1479 if (g_conf
->mon_inject_sync_get_chunk_delay
> 0) {
1480 dout(20) << __func__
<< " injecting delay of " << g_conf
->mon_inject_sync_get_chunk_delay
<< dendl
;
1481 usleep((long long)(g_conf
->mon_inject_sync_get_chunk_delay
* 1000000.0));
1483 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1484 messenger
->send_message(r
, sync_provider
);
1486 assert(g_conf
->mon_sync_requester_kill_at
!= 4);
1489 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1491 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1492 dout(10) << __func__
<< " " << *m
<< dendl
;
1494 if (m
->cookie
!= sync_cookie
) {
1495 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1498 if (m
->get_source_inst() != sync_provider
) {
1499 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1503 assert(state
== STATE_SYNCHRONIZING
);
1504 assert(g_conf
->mon_sync_requester_kill_at
!= 5);
1506 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1507 tx
->append_from_encoded(m
->chunk_bl
);
1509 dout(30) << __func__
<< " tx dump:\n";
1510 JSONFormatter
f(true);
1515 store
->apply_transaction(tx
);
1517 assert(g_conf
->mon_sync_requester_kill_at
!= 6);
1520 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1521 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1522 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1524 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1526 dout(30) << __func__
<< " tx dump:\n";
1527 JSONFormatter
f(true);
1532 store
->apply_transaction(tx
);
1533 paxos
->init(); // to refresh what we just wrote
1536 if (m
->op
== MMonSync::OP_CHUNK
) {
1537 sync_reset_timeout();
1538 sync_get_next_chunk();
1539 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1540 sync_finish(m
->last_committed
);
1544 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1546 dout(10) << __func__
<< dendl
;
1550 void Monitor::sync_trim_providers()
1552 dout(20) << __func__
<< dendl
;
1554 utime_t now
= ceph_clock_now();
1555 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1556 while (p
!= sync_providers
.end()) {
1557 if (now
> p
->second
.timeout
) {
1558 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
<< " for " << p
->second
.entity
<< dendl
;
1559 sync_providers
.erase(p
++);
1566 // ---------------------------------------------------
1569 void Monitor::cancel_probe_timeout()
1571 if (probe_timeout_event
) {
1572 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1573 timer
.cancel_event(probe_timeout_event
);
1574 probe_timeout_event
= NULL
;
1576 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1580 void Monitor::reset_probe_timeout()
1582 cancel_probe_timeout();
1583 probe_timeout_event
= new C_MonContext(this, [this](int r
) {
1586 double t
= g_conf
->mon_probe_timeout
;
1587 timer
.add_event_after(t
, probe_timeout_event
);
1588 dout(10) << "reset_probe_timeout " << probe_timeout_event
<< " after " << t
<< " seconds" << dendl
;
1591 void Monitor::probe_timeout(int r
)
1593 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1594 assert(is_probing() || is_synchronizing());
1595 assert(probe_timeout_event
);
1596 probe_timeout_event
= NULL
;
1600 void Monitor::handle_probe(MonOpRequestRef op
)
1602 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1603 dout(10) << "handle_probe " << *m
<< dendl
;
1605 if (m
->fsid
!= monmap
->fsid
) {
1606 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1611 case MMonProbe::OP_PROBE
:
1612 handle_probe_probe(op
);
1615 case MMonProbe::OP_REPLY
:
1616 handle_probe_reply(op
);
1619 case MMonProbe::OP_MISSING_FEATURES
:
1620 derr
<< __func__
<< " missing features, have " << CEPH_FEATURES_ALL
1621 << ", required " << m
->required_features
1622 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
)
1628 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1630 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1632 dout(10) << "handle_probe_probe " << m
->get_source_inst() << *m
1633 << " features " << m
->get_connection()->get_features() << dendl
;
1634 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1636 dout(1) << " peer " << m
->get_source_addr() << " missing features "
1637 << missing
<< dendl
;
1638 if (m
->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY
)) {
1639 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1640 name
, has_ever_joined
);
1641 m
->required_features
= required_features
;
1642 m
->get_connection()->send_message(r
);
1647 if (!is_probing() && !is_synchronizing()) {
1648 // If the probing mon is way ahead of us, we need to re-bootstrap.
1649 // Normally we capture this case when we initially bootstrap, but
1650 // it is possible we pass those checks (we overlap with
1651 // quorum-to-be) but fail to join a quorum before it moves past
1652 // us. We need to be kicked back to bootstrap so we can
1653 // synchonize, not keep calling elections.
1654 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1655 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1656 << "ahead of us, re-bootstrapping" << dendl
;
1664 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
);
1667 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1668 r
->paxos_first_version
= paxos
->get_first_committed();
1669 r
->paxos_last_version
= paxos
->get_version();
1670 m
->get_connection()->send_message(r
);
1672 // did we discover a peer here?
1673 if (!monmap
->contains(m
->get_source_addr())) {
1674 dout(1) << " adding peer " << m
->get_source_addr()
1675 << " to list of hints" << dendl
;
1676 extra_probe_peers
.insert(m
->get_source_addr());
1683 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1685 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1686 dout(10) << "handle_probe_reply " << m
->get_source_inst() << *m
<< dendl
;
1687 dout(10) << " monmap is " << *monmap
<< dendl
;
1689 // discover name and addrs during probing or electing states.
1690 if (!is_probing() && !is_electing()) {
1694 // newer map, or they've joined a quorum and we haven't?
1696 monmap
->encode(mybl
, m
->get_connection()->get_features());
1697 // make sure it's actually different; the checks below err toward
1698 // taking the other guy's map, which could cause us to loop.
1699 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1700 MonMap
*newmap
= new MonMap
;
1701 newmap
->decode(m
->monmap_bl
);
1702 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1703 !has_ever_joined
)) {
1704 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1705 << ", mine was " << monmap
->get_epoch() << dendl
;
1707 monmap
->decode(m
->monmap_bl
);
1716 string peer_name
= monmap
->get_name(m
->get_source_addr());
1717 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
1718 dout(10) << " renaming peer " << m
->get_source_addr() << " "
1719 << peer_name
<< " -> " << m
->name
<< " in my monmap"
1721 monmap
->rename(peer_name
, m
->name
);
1723 if (is_electing()) {
1728 dout(10) << " peer name is " << peer_name
<< dendl
;
1731 // new initial peer?
1732 if (monmap
->get_epoch() == 0 &&
1733 monmap
->contains(m
->name
) &&
1734 monmap
->get_addr(m
->name
).is_blank_ip()) {
1735 dout(1) << " learned initial mon " << m
->name
<< " addr " << m
->get_source_addr() << dendl
;
1736 monmap
->set_addr(m
->name
, m
->get_source_addr());
1742 // end discover phase
1743 if (!is_probing()) {
1747 assert(paxos
!= NULL
);
1749 if (is_synchronizing()) {
1750 dout(10) << " currently syncing" << dendl
;
1754 entity_inst_t other
= m
->get_source_inst();
1756 if (m
->paxos_last_version
< sync_last_committed_floor
) {
1757 dout(10) << " peer paxos versions [" << m
->paxos_first_version
1758 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
1759 << sync_last_committed_floor
<< ", ignoring"
1762 if (paxos
->get_version() < m
->paxos_first_version
&&
1763 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
1764 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
1765 << "," << m
->paxos_last_version
<< "]"
1766 << " vs my version " << paxos
->get_version()
1767 << " (too far ahead)"
1769 cancel_probe_timeout();
1770 sync_start(other
, true);
1773 if (paxos
->get_version() + g_conf
->paxos_max_join_drift
< m
->paxos_last_version
) {
1774 dout(10) << " peer paxos last version " << m
->paxos_last_version
1775 << " vs my version " << paxos
->get_version()
1776 << " (too far ahead)"
1778 cancel_probe_timeout();
1779 sync_start(other
, false);
1784 // is there an existing quorum?
1785 if (m
->quorum
.size()) {
1786 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
1788 dout(10) << " peer paxos version " << m
->paxos_last_version
1789 << " vs my version " << paxos
->get_version()
1793 if (monmap
->contains(name
) &&
1794 !monmap
->get_addr(name
).is_blank_ip()) {
1795 // i'm part of the cluster; just initiate a new election
1798 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl
;
1799 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
1800 monmap
->get_inst(*m
->quorum
.begin()));
1803 if (monmap
->contains(m
->name
)) {
1804 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
1805 outside_quorum
.insert(m
->name
);
1807 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
1811 unsigned need
= monmap
->size() / 2 + 1;
1812 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
1813 if (outside_quorum
.size() >= need
) {
1814 if (outside_quorum
.count(name
)) {
1815 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
1818 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
1821 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
1826 void Monitor::join_election()
1828 dout(10) << __func__
<< dendl
;
1829 wait_for_paxos_write();
1831 state
= STATE_ELECTING
;
1833 logger
->inc(l_mon_num_elections
);
1836 void Monitor::start_election()
1838 dout(10) << "start_election" << dendl
;
1839 wait_for_paxos_write();
1841 state
= STATE_ELECTING
;
1843 logger
->inc(l_mon_num_elections
);
1844 logger
->inc(l_mon_election_call
);
1846 clog
->info() << "mon." << name
<< " calling new monitor election";
1847 elector
.call_election();
1850 void Monitor::win_standalone_election()
1852 dout(1) << "win_standalone_election" << dendl
;
1854 // bump election epoch, in case the previous epoch included other
1855 // monitors; we need to be able to make the distinction.
1857 elector
.advance_epoch();
1859 rank
= monmap
->get_rank(name
);
1864 map
<int,Metadata
> metadata
;
1865 collect_metadata(&metadata
[0]);
1867 win_election(elector
.get_epoch(), q
,
1869 ceph::features::mon::get_supported(),
1873 const utime_t
& Monitor::get_leader_since() const
1875 assert(state
== STATE_LEADER
);
1876 return leader_since
;
1879 epoch_t
Monitor::get_epoch()
1881 return elector
.get_epoch();
1884 void Monitor::_finish_svc_election()
1886 assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
1888 for (auto p
: paxos_service
) {
1889 // we already called election_finished() on monmon(); avoid callig twice
1890 if (state
== STATE_LEADER
&& p
== monmon())
1892 p
->election_finished();
1896 void Monitor::win_election(epoch_t epoch
, set
<int>& active
, uint64_t features
,
1897 const mon_feature_t
& mon_features
,
1898 const map
<int,Metadata
>& metadata
)
1900 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
1901 << " features " << features
1902 << " mon_features " << mon_features
1904 assert(is_electing());
1905 state
= STATE_LEADER
;
1906 leader_since
= ceph_clock_now();
1909 quorum_con_features
= features
;
1910 quorum_mon_features
= mon_features
;
1911 pending_metadata
= metadata
;
1912 outside_quorum
.clear();
1914 clog
->info() << "mon." << name
<< "@" << rank
1915 << " won leader election with quorum " << quorum
;
1917 set_leader_commands(get_local_commands(mon_features
));
1919 paxos
->leader_init();
1920 // NOTE: tell monmap monitor first. This is important for the
1921 // bootstrap case to ensure that the very first paxos proposal
1922 // codifies the monmap. Otherwise any manner of chaos can ensue
1923 // when monitors are call elections or participating in a paxos
1924 // round without agreeing on who the participants are.
1925 monmon()->election_finished();
1926 _finish_svc_election();
1927 health_monitor
->start(epoch
);
1929 logger
->inc(l_mon_election_win
);
1931 // inject new metadata in first transaction.
1933 // include previous metadata for missing mons (that aren't part of
1934 // the current quorum).
1935 map
<int,Metadata
> m
= metadata
;
1936 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
1937 if (m
.count(rank
) == 0 &&
1938 mon_metadata
.count(rank
)) {
1939 m
[rank
] = mon_metadata
[rank
];
1943 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1944 // a new transaction immediately after the election finishes. We should
1945 // do that anyway for other reasons, though.
1946 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
1949 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
1953 if (monmap
->size() > 1 &&
1954 monmap
->get_epoch() > 0) {
1956 health_tick_start();
1957 do_health_to_clog_interval();
1958 scrub_event_start();
1962 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
1964 const mon_feature_t
& mon_features
)
1967 leader_since
= utime_t();
1970 outside_quorum
.clear();
1971 quorum_con_features
= features
;
1972 quorum_mon_features
= mon_features
;
1973 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
1974 << " quorum is " << quorum
<< " features are " << quorum_con_features
1975 << " mon_features are " << quorum_mon_features
1979 _finish_svc_election();
1980 health_monitor
->start(epoch
);
1982 logger
->inc(l_mon_election_lose
);
1986 if ((quorum_con_features
& CEPH_FEATURE_MON_METADATA
) &&
1987 !HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
)) {
1988 // for pre-luminous mons only
1990 collect_metadata(&sys_info
);
1991 messenger
->send_message(new MMonMetadata(sys_info
),
1992 monmap
->get_inst(get_leader()));
1996 void Monitor::collect_metadata(Metadata
*m
)
1998 collect_sys_info(m
, g_ceph_context
);
1999 (*m
)["addr"] = stringify(messenger
->get_myaddr());
2002 void Monitor::finish_election()
2004 apply_quorum_to_compatset_features();
2005 apply_monmap_to_compatset_features();
2007 exited_quorum
= utime_t();
2008 finish_contexts(g_ceph_context
, waitfor_quorum
);
2009 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2010 resend_routed_requests();
2012 register_cluster_logger();
2014 // am i named properly?
2015 string cur_name
= monmap
->get_name(messenger
->get_myaddr());
2016 if (cur_name
!= name
) {
2017 dout(10) << " renaming myself from " << cur_name
<< " -> " << name
<< dendl
;
2018 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
2019 monmap
->get_inst(*quorum
.begin()));
2023 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2025 if (new_features
.compare(features
) != 0) {
2026 CompatSet diff
= features
.unsupported(new_features
);
2027 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2028 features
= new_features
;
2030 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2032 store
->apply_transaction(t
);
2034 calc_quorum_requirements();
2038 void Monitor::apply_quorum_to_compatset_features()
2040 CompatSet
new_features(features
);
2041 if (quorum_con_features
& CEPH_FEATURE_OSD_ERASURE_CODES
) {
2042 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2044 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2045 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2047 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
) {
2048 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2050 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
) {
2051 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2053 dout(5) << __func__
<< dendl
;
2054 _apply_compatset_features(new_features
);
2057 void Monitor::apply_monmap_to_compatset_features()
2059 CompatSet
new_features(features
);
2060 mon_feature_t monmap_features
= monmap
->get_required_features();
2062 /* persistent monmap features may go into the compatset.
2063 * optional monmap features may not - why?
2064 * because optional monmap features may be set/unset by the admin,
2065 * and possibly by other means that haven't yet been thought out,
2066 * so we can't make the monitor enforce them on start - because they
2068 * this, of course, does not invalidate setting a compatset feature
2069 * for an optional feature - as long as you make sure to clean it up
2070 * once you unset it.
2072 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2073 assert(ceph::features::mon::get_persistent().contains_all(
2074 ceph::features::mon::FEATURE_KRAKEN
));
2075 // this feature should only ever be set if the quorum supports it.
2076 assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2077 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2080 dout(5) << __func__
<< dendl
;
2081 _apply_compatset_features(new_features
);
2084 void Monitor::calc_quorum_requirements()
2086 required_features
= 0;
2089 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
)) {
2090 required_features
|= CEPH_FEATURE_OSD_ERASURE_CODES
;
2092 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2093 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2095 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
)) {
2096 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
;
2098 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
)) {
2099 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
;
2101 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2102 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2106 if (monmap
->get_required_features().contains_all(
2107 ceph::features::mon::FEATURE_KRAKEN
)) {
2108 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2110 if (monmap
->get_required_features().contains_all(
2111 ceph::features::mon::FEATURE_LUMINOUS
)) {
2112 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2114 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2117 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2119 *fm
+= session_map
.feature_map
;
2120 for (auto id
: quorum
) {
2122 *fm
+= quorum_feature_map
[id
];
2127 void Monitor::sync_force(Formatter
*f
, ostream
& ss
)
2129 bool free_formatter
= false;
2132 // louzy/lazy hack: default to json if no formatter has been defined
2133 f
= new JSONFormatter();
2134 free_formatter
= true;
2137 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2138 sync_stash_critical_state(tx
);
2139 tx
->put("mon_sync", "force_sync", 1);
2140 store
->apply_transaction(tx
);
2142 f
->open_object_section("sync_force");
2143 f
->dump_int("ret", 0);
2144 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2145 f
->close_section(); // sync_force
2151 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2153 bool free_formatter
= false;
2156 // louzy/lazy hack: default to json if no formatter has been defined
2157 f
= new JSONFormatter();
2158 free_formatter
= true;
2160 f
->open_object_section("quorum_status");
2161 f
->dump_int("election_epoch", get_epoch());
2163 f
->open_array_section("quorum");
2164 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2165 f
->dump_int("mon", *p
);
2166 f
->close_section(); // quorum
2168 list
<string
> quorum_names
= get_quorum_names();
2169 f
->open_array_section("quorum_names");
2170 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2171 f
->dump_string("mon", *p
);
2172 f
->close_section(); // quorum_names
2174 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(*quorum
.begin()));
2176 f
->open_object_section("monmap");
2178 f
->close_section(); // monmap
2180 f
->close_section(); // quorum_status
2186 void Monitor::get_mon_status(Formatter
*f
, ostream
& ss
)
2188 bool free_formatter
= false;
2191 // louzy/lazy hack: default to json if no formatter has been defined
2192 f
= new JSONFormatter();
2193 free_formatter
= true;
2196 f
->open_object_section("mon_status");
2197 f
->dump_string("name", name
);
2198 f
->dump_int("rank", rank
);
2199 f
->dump_string("state", get_state_name());
2200 f
->dump_int("election_epoch", get_epoch());
2202 f
->open_array_section("quorum");
2203 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2204 f
->dump_int("mon", *p
);
2207 f
->close_section(); // quorum
2209 f
->open_object_section("features");
2210 f
->dump_stream("required_con") << required_features
;
2211 mon_feature_t req_mon_features
= get_required_mon_features();
2212 req_mon_features
.dump(f
, "required_mon");
2213 f
->dump_stream("quorum_con") << quorum_con_features
;
2214 quorum_mon_features
.dump(f
, "quorum_mon");
2215 f
->close_section(); // features
2217 f
->open_array_section("outside_quorum");
2218 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2219 f
->dump_string("mon", *p
);
2220 f
->close_section(); // outside_quorum
2222 f
->open_array_section("extra_probe_peers");
2223 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
2224 p
!= extra_probe_peers
.end();
2226 f
->dump_stream("peer") << *p
;
2227 f
->close_section(); // extra_probe_peers
2229 f
->open_array_section("sync_provider");
2230 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2231 p
!= sync_providers
.end();
2233 f
->dump_unsigned("cookie", p
->second
.cookie
);
2234 f
->dump_stream("entity") << p
->second
.entity
;
2235 f
->dump_stream("timeout") << p
->second
.timeout
;
2236 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2237 f
->dump_stream("last_key") << p
->second
.last_key
;
2241 if (is_synchronizing()) {
2242 f
->open_object_section("sync");
2243 f
->dump_stream("sync_provider") << sync_provider
;
2244 f
->dump_unsigned("sync_cookie", sync_cookie
);
2245 f
->dump_unsigned("sync_start_version", sync_start_version
);
2249 if (g_conf
->mon_sync_provider_kill_at
> 0)
2250 f
->dump_int("provider_kill_at", g_conf
->mon_sync_provider_kill_at
);
2251 if (g_conf
->mon_sync_requester_kill_at
> 0)
2252 f
->dump_int("requester_kill_at", g_conf
->mon_sync_requester_kill_at
);
2254 f
->open_object_section("monmap");
2258 f
->dump_object("feature_map", session_map
.feature_map
);
2259 f
->close_section(); // mon_status
2261 if (free_formatter
) {
2262 // flush formatter to ss and delete it iff we created the formatter
2269 // health status to clog
2271 void Monitor::health_tick_start()
2273 if (!cct
->_conf
->mon_health_to_clog
||
2274 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2277 dout(15) << __func__
<< dendl
;
2280 health_tick_event
= new C_MonContext(this, [this](int r
) {
2283 do_health_to_clog();
2284 health_tick_start();
2286 timer
.add_event_after(cct
->_conf
->mon_health_to_clog_tick_interval
,
2290 void Monitor::health_tick_stop()
2292 dout(15) << __func__
<< dendl
;
2294 if (health_tick_event
) {
2295 timer
.cancel_event(health_tick_event
);
2296 health_tick_event
= NULL
;
2300 utime_t
Monitor::health_interval_calc_next_update()
2302 utime_t now
= ceph_clock_now();
2304 time_t secs
= now
.sec();
2305 int remainder
= secs
% cct
->_conf
->mon_health_to_clog_interval
;
2306 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2307 utime_t next
= utime_t(secs
+ adjustment
, 0);
2309 dout(20) << __func__
2310 << " now: " << now
<< ","
2311 << " next: " << next
<< ","
2312 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2318 void Monitor::health_interval_start()
2320 dout(15) << __func__
<< dendl
;
2322 if (!cct
->_conf
->mon_health_to_clog
||
2323 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2327 health_interval_stop();
2328 utime_t next
= health_interval_calc_next_update();
2329 health_interval_event
= new C_MonContext(this, [this](int r
) {
2332 do_health_to_clog_interval();
2334 timer
.add_event_at(next
, health_interval_event
);
2337 void Monitor::health_interval_stop()
2339 dout(15) << __func__
<< dendl
;
2340 if (health_interval_event
) {
2341 timer
.cancel_event(health_interval_event
);
2343 health_interval_event
= NULL
;
2346 void Monitor::health_events_cleanup()
2349 health_interval_stop();
2350 health_status_cache
.reset();
2353 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2355 dout(20) << __func__
<< dendl
;
2357 if (changed
.count("mon_health_to_clog")) {
2358 if (!cct
->_conf
->mon_health_to_clog
) {
2359 health_events_cleanup();
2361 if (!health_tick_event
) {
2362 health_tick_start();
2364 if (!health_interval_event
) {
2365 health_interval_start();
2370 if (changed
.count("mon_health_to_clog_interval")) {
2371 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2372 health_interval_stop();
2374 health_interval_start();
2378 if (changed
.count("mon_health_to_clog_tick_interval")) {
2379 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2382 health_tick_start();
2387 void Monitor::do_health_to_clog_interval()
2389 // outputting to clog may have been disabled in the conf
2390 // since we were scheduled.
2391 if (!cct
->_conf
->mon_health_to_clog
||
2392 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2395 dout(10) << __func__
<< dendl
;
2397 // do we have a cached value for next_clog_update? if not,
2398 // do we know when the last update was?
2400 do_health_to_clog(true);
2401 health_interval_start();
2404 void Monitor::do_health_to_clog(bool force
)
2406 // outputting to clog may have been disabled in the conf
2407 // since we were scheduled.
2408 if (!cct
->_conf
->mon_health_to_clog
||
2409 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2412 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2414 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2416 health_status_t level
= get_health_status(false, nullptr, &summary
);
2418 summary
== health_status_cache
.summary
&&
2419 level
== health_status_cache
.overall
)
2421 if (level
== HEALTH_OK
)
2422 clog
->info() << "overall " << summary
;
2423 else if (level
== HEALTH_WARN
)
2424 clog
->warn() << "overall " << summary
;
2425 else if (level
== HEALTH_ERR
)
2426 clog
->error() << "overall " << summary
;
2429 health_status_cache
.summary
= summary
;
2430 health_status_cache
.overall
= level
;
2433 list
<string
> status
;
2434 health_status_t overall
= get_health(status
, NULL
, NULL
);
2435 dout(25) << __func__
2436 << (force
? " (force)" : "")
2439 string summary
= joinify(status
.begin(), status
.end(), string("; "));
2442 overall
== health_status_cache
.overall
&&
2443 !health_status_cache
.summary
.empty() &&
2444 health_status_cache
.summary
== summary
) {
2449 clog
->info() << summary
;
2451 health_status_cache
.overall
= overall
;
2452 health_status_cache
.summary
= summary
;
2456 health_status_t
Monitor::get_health_status(
2463 health_status_t r
= HEALTH_OK
;
2464 bool compat
= g_conf
->mon_health_preluminous_compat
;
2465 bool compat_warn
= g_conf
->get_val
<bool>("mon_health_preluminous_compat_warning");
2467 f
->open_object_section("health");
2468 f
->open_object_section("checks");
2472 string
*psummary
= f
? nullptr : &summary
;
2473 for (auto& svc
: paxos_service
) {
2474 r
= std::min(r
, svc
->get_health_checks().dump_summary(
2475 f
, psummary
, sep2
, want_detail
));
2480 f
->dump_stream("status") << r
;
2482 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2483 *plain
= stringify(r
);
2484 if (summary
.size()) {
2491 if (f
&& (compat
|| compat_warn
)) {
2492 health_status_t cr
= compat_warn
? min(HEALTH_WARN
, r
) : r
;
2494 f
->open_array_section("summary");
2496 f
->open_object_section("item");
2497 f
->dump_stream("severity") << HEALTH_WARN
;
2498 f
->dump_string("summary", "'ceph health' JSON format has changed in luminous; update your health monitoring scripts");
2501 for (auto& svc
: paxos_service
) {
2502 svc
->get_health_checks().dump_summary_compat(f
);
2506 f
->dump_stream("overall_status") << cr
;
2510 if (f
&& (compat
|| compat_warn
)) {
2511 f
->open_array_section("detail");
2513 f
->dump_string("item", "'ceph health' JSON format has changed in luminous. If you see this your monitoring system is scraping the wrong fields. Disable this with 'mon health preluminous compat warning = false'");
2517 for (auto& svc
: paxos_service
) {
2518 svc
->get_health_checks().dump_detail(f
, plain
, compat
);
2521 if (f
&& (compat
|| compat_warn
)) {
2531 void Monitor::log_health(
2532 const health_check_map_t
& updated
,
2533 const health_check_map_t
& previous
,
2534 MonitorDBStore::TransactionRef t
)
2536 if (!g_conf
->mon_health_to_clog
) {
2539 // FIXME: log atomically as part of @t instead of using clog.
2540 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2541 << " previous " << previous
.checks
.size()
2543 for (auto& p
: updated
.checks
) {
2544 auto q
= previous
.checks
.find(p
.first
);
2545 if (q
== previous
.checks
.end()) {
2548 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2550 if (p
.second
.severity
== HEALTH_WARN
)
2551 clog
->warn() << ss
.str();
2553 clog
->error() << ss
.str();
2555 if (p
.second
.summary
!= q
->second
.summary
||
2556 p
.second
.severity
!= q
->second
.severity
) {
2557 // summary or severity changed (ignore detail changes at this level)
2559 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2560 if (p
.second
.severity
== HEALTH_WARN
)
2561 clog
->warn() << ss
.str();
2563 clog
->error() << ss
.str();
2567 for (auto& p
: previous
.checks
) {
2568 if (!updated
.checks
.count(p
.first
)) {
2571 if (p
.first
== "DEGRADED_OBJECTS") {
2572 clog
->info() << "All degraded objects recovered";
2573 } else if (p
.first
== "OSD_FLAGS") {
2574 clog
->info() << "OSD flags cleared";
2576 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2577 << p
.second
.summary
<< ")";
2582 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2583 // We might be going into a fully healthy state, check
2585 bool any_checks
= false;
2586 for (auto& svc
: paxos_service
) {
2587 if (&(svc
->get_health_checks()) == &(previous
)) {
2588 // Ignore the ones we're clearing right now
2592 if (svc
->get_health_checks().checks
.size() > 0) {
2598 clog
->info() << "Cluster is now healthy";
2603 health_status_t
Monitor::get_health(list
<string
>& status
,
2604 bufferlist
*detailbl
,
2607 list
<pair
<health_status_t
,string
> > summary
;
2608 list
<pair
<health_status_t
,string
> > detail
;
2611 f
->open_object_section("health");
2613 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin();
2614 p
!= paxos_service
.end();
2616 PaxosService
*s
= *p
;
2617 s
->get_health(summary
, detailbl
? &detail
: NULL
, cct
);
2620 health_monitor
->get_health(summary
, (detailbl
? &detail
: NULL
));
2622 health_status_t overall
= HEALTH_OK
;
2623 if (!timecheck_skews
.empty()) {
2625 for (map
<entity_inst_t
,double>::iterator i
= timecheck_skews
.begin();
2626 i
!= timecheck_skews
.end(); ++i
) {
2627 entity_inst_t inst
= i
->first
;
2628 double skew
= i
->second
;
2629 double latency
= timecheck_latencies
[inst
];
2630 string name
= monmap
->get_name(inst
.addr
);
2632 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
2633 if (tcstatus
!= HEALTH_OK
) {
2634 if (overall
> tcstatus
)
2636 warns
.push_back(name
);
2637 ostringstream tmp_ss
;
2638 tmp_ss
<< "mon." << name
2639 << " addr " << inst
.addr
<< " " << tcss
.str()
2640 << " (latency " << latency
<< "s)";
2641 detail
.push_back(make_pair(tcstatus
, tmp_ss
.str()));
2644 if (!warns
.empty()) {
2646 ss
<< "clock skew detected on";
2647 while (!warns
.empty()) {
2648 ss
<< " mon." << warns
.front();
2653 status
.push_back(ss
.str());
2654 summary
.push_back(make_pair(HEALTH_WARN
, "Monitor clock skew detected "));
2659 f
->open_array_section("summary");
2660 if (!summary
.empty()) {
2661 while (!summary
.empty()) {
2662 if (overall
> summary
.front().first
)
2663 overall
= summary
.front().first
;
2664 status
.push_back(summary
.front().second
);
2666 f
->open_object_section("item");
2667 f
->dump_stream("severity") << summary
.front().first
;
2668 f
->dump_string("summary", summary
.front().second
);
2671 summary
.pop_front();
2679 status
.push_front(fss
.str());
2681 f
->dump_stream("overall_status") << overall
;
2684 f
->open_array_section("detail");
2685 while (!detail
.empty()) {
2687 f
->dump_string("item", detail
.front().second
);
2688 else if (detailbl
!= NULL
) {
2689 detailbl
->append(detail
.front().second
);
2690 detailbl
->append('\n');
2703 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
)
2706 f
->open_object_section("status");
2709 f
->dump_stream("fsid") << monmap
->get_fsid();
2710 get_health_status(false, f
, nullptr);
2711 f
->dump_unsigned("election_epoch", get_epoch());
2713 f
->open_array_section("quorum");
2714 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2715 f
->dump_int("rank", *p
);
2717 f
->open_array_section("quorum_names");
2718 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2719 f
->dump_string("id", monmap
->get_name(*p
));
2722 f
->open_object_section("monmap");
2725 f
->open_object_section("osdmap");
2726 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
2728 f
->open_object_section("pgmap");
2729 pgservice
->print_summary(f
, NULL
);
2731 f
->open_object_section("fsmap");
2732 mdsmon()->get_fsmap().print_summary(f
, NULL
);
2734 f
->open_object_section("mgrmap");
2735 mgrmon()->get_map().print_summary(f
, nullptr);
2738 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
2741 ss
<< " cluster:\n";
2742 ss
<< " id: " << monmap
->get_fsid() << "\n";
2745 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2746 get_health_status(false, nullptr, &health
,
2750 get_health(ls
, NULL
, f
);
2751 health
= joinify(ls
.begin(), ls
.end(),
2754 ss
<< " health: " << health
<< "\n";
2756 ss
<< "\n \n services:\n";
2759 auto& service_map
= mgrstatmon()->get_service_map();
2760 for (auto& p
: service_map
.services
) {
2761 maxlen
= std::max(maxlen
, p
.first
.size());
2763 string
spacing(maxlen
- 3, ' ');
2764 const auto quorum_names
= get_quorum_names();
2765 const auto mon_count
= monmap
->mon_info
.size();
2766 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
2768 if (quorum_names
.size() != mon_count
) {
2769 std::list
<std::string
> out_of_q
;
2770 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
2771 if (quorum
.count(i
) == 0) {
2772 out_of_q
.push_back(monmap
->ranks
[i
]);
2775 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
2776 out_of_q
.end(), std::string(", "));
2779 if (mgrmon()->in_use()) {
2780 ss
<< " mgr: " << spacing
;
2781 mgrmon()->get_map().print_summary(nullptr, &ss
);
2784 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2785 ss
<< " mds: " << spacing
<< mdsmon()->get_fsmap() << "\n";
2787 ss
<< " osd: " << spacing
;
2788 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
2790 for (auto& p
: service_map
.services
) {
2791 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
2792 << p
.second
.get_summary() << "\n";
2796 ss
<< "\n \n data:\n";
2797 pgservice
->print_summary(NULL
, &ss
);
2802 void Monitor::_generate_command_map(map
<string
,cmd_vartype
>& cmdmap
,
2803 map
<string
,string
> ¶m_str_map
)
2805 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
2806 p
!= cmdmap
.end(); ++p
) {
2807 if (p
->first
== "prefix")
2809 if (p
->first
== "caps") {
2811 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
2812 cv
.size() % 2 == 0) {
2813 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
2814 string k
= string("caps_") + cv
[i
];
2815 param_str_map
[k
] = cv
[i
+ 1];
2820 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
2824 const MonCommand
*Monitor::_get_moncommand(
2825 const string
&cmd_prefix
,
2826 const vector
<MonCommand
>& cmds
)
2828 for (auto& c
: cmds
) {
2829 if (c
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
2836 bool Monitor::_allowed_command(MonSession
*s
, string
&module
, string
&prefix
,
2837 const map
<string
,cmd_vartype
>& cmdmap
,
2838 const map
<string
,string
>& param_str_map
,
2839 const MonCommand
*this_cmd
) {
2841 bool cmd_r
= this_cmd
->requires_perm('r');
2842 bool cmd_w
= this_cmd
->requires_perm('w');
2843 bool cmd_x
= this_cmd
->requires_perm('x');
2845 bool capable
= s
->caps
.is_capable(
2847 CEPH_ENTITY_TYPE_MON
,
2849 module
, prefix
, param_str_map
,
2850 cmd_r
, cmd_w
, cmd_x
);
2852 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
2856 void Monitor::format_command_descriptions(const std::vector
<MonCommand
> &commands
,
2862 f
->open_object_section("command_descriptions");
2863 for (const auto &cmd
: commands
) {
2864 unsigned flags
= cmd
.flags
;
2865 if (hide_mgr_flag
) {
2866 flags
&= ~MonCommand::FLAG_MGR
;
2868 ostringstream secname
;
2869 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
2870 dump_cmddesc_to_json(f
, secname
.str(),
2871 cmd
.cmdstring
, cmd
.helpstring
, cmd
.module
,
2872 cmd
.req_perms
, cmd
.availability
, flags
);
2875 f
->close_section(); // command_descriptions
2880 bool Monitor::is_keyring_required()
2882 string auth_cluster_required
= g_conf
->auth_supported
.empty() ?
2883 g_conf
->auth_cluster_required
: g_conf
->auth_supported
;
2884 string auth_service_required
= g_conf
->auth_supported
.empty() ?
2885 g_conf
->auth_service_required
: g_conf
->auth_supported
;
2887 return auth_service_required
== "cephx" ||
2888 auth_cluster_required
== "cephx";
2891 struct C_MgrProxyCommand
: public Context
{
2897 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
2898 : mon(mon
), op(op
), size(s
) { }
2899 void finish(int r
) {
2900 Mutex::Locker
l(mon
->lock
);
2901 mon
->mgr_proxy_bytes
-= size
;
2902 mon
->reply_command(op
, r
, outs
, outbl
, 0);
2906 void Monitor::handle_command(MonOpRequestRef op
)
2908 assert(op
->is_type_command());
2909 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
2910 if (m
->fsid
!= monmap
->fsid
) {
2911 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
2912 reply_command(op
, -EPERM
, "wrong fsid", 0);
2916 MonSession
*session
= static_cast<MonSession
*>(
2917 m
->get_connection()->get_priv());
2919 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
2922 BOOST_SCOPE_EXIT_ALL(=) {
2926 if (m
->cmd
.empty()) {
2927 string rs
= "No command supplied";
2928 reply_command(op
, -EINVAL
, rs
, 0);
2933 vector
<string
> fullcmd
;
2934 map
<string
, cmd_vartype
> cmdmap
;
2935 stringstream ss
, ds
;
2939 rs
= "unrecognized command";
2941 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
2942 // ss has reason for failure
2945 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
2946 reply_command(op
, r
, rs
, 0);
2950 // check return value. If no prefix parameter provided,
2951 // return value will be false, then return error info.
2952 if (!cmd_getval(g_ceph_context
, cmdmap
, "prefix", prefix
)) {
2953 reply_command(op
, -EINVAL
, "command prefix not found", 0);
2957 // check prefix is empty
2958 if (prefix
.empty()) {
2959 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
2963 if (prefix
== "get_command_descriptions") {
2965 Formatter
*f
= Formatter::create("json");
2966 // hide mgr commands until luminous upgrade is complete
2967 bool hide_mgr_flag
=
2968 osdmon()->osdmap
.require_osd_release
< CEPH_RELEASE_LUMINOUS
;
2970 std::vector
<MonCommand
> commands
;
2972 // only include mgr commands once all mons are upgrade (and we've dropped
2973 // the hard-coded PGMonitor commands)
2974 if (quorum_mon_features
.contains_all(ceph::features::mon::FEATURE_LUMINOUS
)) {
2975 commands
= static_cast<MgrMonitor
*>(
2976 paxos_service
[PAXOS_MGR
])->get_command_descs();
2979 for (auto& c
: leader_mon_commands
) {
2980 commands
.push_back(c
);
2983 format_command_descriptions(commands
, f
, &rdata
, hide_mgr_flag
);
2985 reply_command(op
, 0, "", rdata
, 0);
2992 dout(0) << "handle_command " << *m
<< dendl
;
2995 cmd_getval(g_ceph_context
, cmdmap
, "format", format
, string("plain"));
2996 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
2998 get_str_vec(prefix
, fullcmd
);
3000 // make sure fullcmd is not empty.
3001 // invalid prefix will cause empty vector fullcmd.
3002 // such as, prefix=";,,;"
3003 if (fullcmd
.empty()) {
3004 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3008 module
= fullcmd
[0];
3010 // validate command is in leader map
3012 const MonCommand
*leader_cmd
;
3013 const auto& mgr_cmds
= mgrmon()->get_command_descs();
3014 const MonCommand
*mgr_cmd
= nullptr;
3015 if (!mgr_cmds
.empty()) {
3016 mgr_cmd
= _get_moncommand(prefix
, mgr_cmds
);
3018 leader_cmd
= _get_moncommand(prefix
, leader_mon_commands
);
3020 leader_cmd
= mgr_cmd
;
3022 reply_command(op
, -EINVAL
, "command not known", 0);
3026 // validate command is in our map & matches, or forward if it is allowed
3027 const MonCommand
*mon_cmd
= _get_moncommand(
3029 get_local_commands(quorum_mon_features
));
3035 if (leader_cmd
->is_noforward()) {
3036 reply_command(op
, -EINVAL
,
3037 "command not locally supported and not allowed to forward",
3041 dout(10) << "Command not locally supported, forwarding request "
3043 forward_request_leader(op
);
3045 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3046 if (mon_cmd
->is_noforward()) {
3047 reply_command(op
, -EINVAL
,
3048 "command not compatible with leader and not allowed to forward",
3052 dout(10) << "Command not compatible with leader, forwarding request "
3054 forward_request_leader(op
);
3059 if (mon_cmd
->is_obsolete() ||
3060 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3061 && mon_cmd
->is_deprecated())) {
3062 reply_command(op
, -ENOTSUP
,
3063 "command is obsolete; please check usage and/or man page",
3068 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3069 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3070 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3074 /* what we perceive as being the service the command falls under */
3075 string
service(mon_cmd
->module
);
3077 dout(25) << __func__
<< " prefix='" << prefix
3078 << "' module='" << module
3079 << "' service='" << service
<< "'" << dendl
;
3082 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3084 // validate user's permissions for requested command
3085 map
<string
,string
> param_str_map
;
3086 _generate_command_map(cmdmap
, param_str_map
);
3087 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3088 param_str_map
, mon_cmd
)) {
3089 dout(1) << __func__
<< " access denied" << dendl
;
3090 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3091 << "from='" << session
->inst
<< "' "
3092 << "entity='" << session
->entity_name
<< "' "
3093 << "cmd=" << m
->cmd
<< ": access denied";
3094 reply_command(op
, -EACCES
, "access denied", 0);
3098 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3099 << "from='" << session
->inst
<< "' "
3100 << "entity='" << session
->entity_name
<< "' "
3101 << "cmd=" << m
->cmd
<< ": dispatch";
3103 if (mon_cmd
->is_mgr() &&
3104 osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3105 const auto& hdr
= m
->get_header();
3106 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3108 g_conf
->mon_client_bytes
* g_conf
->mon_mgr_proxy_client_bytes_ratio
;
3109 if (mgr_proxy_bytes
+ size
> max
) {
3110 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3111 << " + " << size
<< " > max " << max
<< dendl
;
3112 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3115 mgr_proxy_bytes
+= size
;
3116 dout(10) << __func__
<< " proxying mgr command (+" << size
3117 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3118 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3119 mgr_client
.start_command(m
->cmd
,
3123 new C_OnFinisher(fin
, &finisher
));
3127 if ((module
== "mds" || module
== "fs") &&
3128 prefix
!= "fs authorize") {
3129 mdsmon()->dispatch(op
);
3132 if ((module
== "osd" || prefix
== "pg map") &&
3133 prefix
!= "osd last-stat-seq") {
3134 osdmon()->dispatch(op
);
3138 if (module
== "pg") {
3139 pgmon()->dispatch(op
);
3142 if (module
== "mon" &&
3143 /* Let the Monitor class handle the following commands:
3148 prefix
!= "mon compact" &&
3149 prefix
!= "mon scrub" &&
3150 prefix
!= "mon sync force" &&
3151 prefix
!= "mon metadata" &&
3152 prefix
!= "mon versions" &&
3153 prefix
!= "mon count-metadata") {
3154 monmon()->dispatch(op
);
3157 if (module
== "auth" || prefix
== "fs authorize") {
3158 authmon()->dispatch(op
);
3161 if (module
== "log") {
3162 logmon()->dispatch(op
);
3166 if (module
== "config-key") {
3167 config_key_service
->dispatch(op
);
3171 if (module
== "mgr") {
3172 mgrmon()->dispatch(op
);
3176 if (prefix
== "fsid") {
3178 f
->open_object_section("fsid");
3179 f
->dump_stream("fsid") << monmap
->fsid
;
3186 reply_command(op
, 0, "", rdata
, 0);
3190 if (prefix
== "scrub" || prefix
== "mon scrub") {
3191 wait_for_paxos_write();
3193 int r
= scrub_start();
3194 reply_command(op
, r
, "", rdata
, 0);
3195 } else if (is_peon()) {
3196 forward_request_leader(op
);
3198 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3203 if (prefix
== "compact" || prefix
== "mon compact") {
3204 dout(1) << "triggering manual compaction" << dendl
;
3205 utime_t start
= ceph_clock_now();
3207 utime_t end
= ceph_clock_now();
3209 dout(1) << "finished manual compaction in " << end
<< " seconds" << dendl
;
3211 oss
<< "compacted " << g_conf
->get_val
<std::string
>("mon_keyvaluedb") << " in " << end
<< " seconds";
3215 else if (prefix
== "injectargs") {
3216 vector
<string
> injected_args
;
3217 cmd_getval(g_ceph_context
, cmdmap
, "injected_args", injected_args
);
3218 if (!injected_args
.empty()) {
3219 dout(0) << "parsing injected options '" << injected_args
<< "'" << dendl
;
3221 r
= g_conf
->injectargs(str_join(injected_args
, " "), &oss
);
3222 ss
<< "injectargs:" << oss
.str();
3226 rs
= "must supply options to be parsed in a single string";
3229 } else if (prefix
== "time-sync-status") {
3231 f
.reset(Formatter::create("json-pretty"));
3232 f
->open_object_section("time_sync");
3233 if (!timecheck_skews
.empty()) {
3234 f
->open_object_section("time_skew_status");
3235 for (auto& i
: timecheck_skews
) {
3236 entity_inst_t inst
= i
.first
;
3237 double skew
= i
.second
;
3238 double latency
= timecheck_latencies
[inst
];
3239 string name
= monmap
->get_name(inst
.addr
);
3241 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3242 f
->open_object_section(name
.c_str());
3243 f
->dump_float("skew", skew
);
3244 f
->dump_float("latency", latency
);
3245 f
->dump_stream("health") << tcstatus
;
3246 if (tcstatus
!= HEALTH_OK
) {
3247 f
->dump_stream("details") << tcss
.str();
3253 f
->open_object_section("timechecks");
3254 f
->dump_unsigned("epoch", get_epoch());
3255 f
->dump_int("round", timecheck_round
);
3256 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3257 "on-going" : "finished");
3263 } else if (prefix
== "config set") {
3265 cmd_getval(cct
, cmdmap
, "key", key
);
3267 cmd_getval(cct
, cmdmap
, "value", val
);
3268 r
= g_conf
->set_val(key
, val
, true, &ss
);
3270 g_conf
->apply_changes(nullptr);
3274 } else if (prefix
== "status" ||
3275 prefix
== "health" ||
3278 cmd_getval(g_ceph_context
, cmdmap
, "detail", detail
);
3280 if (prefix
== "status") {
3281 // get_cluster_status handles f == NULL
3282 get_cluster_status(ds
, f
.get());
3289 } else if (prefix
== "health") {
3290 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3292 get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3296 rdata
.append(plain
);
3299 list
<string
> health_str
;
3300 get_health(health_str
, detail
== "detail" ? &rdata
: NULL
, f
.get());
3305 assert(!health_str
.empty());
3306 ds
<< health_str
.front();
3307 health_str
.pop_front();
3308 if (!health_str
.empty()) {
3310 ds
<< joinify(health_str
.begin(), health_str
.end(), string("; "));
3315 if (detail
== "detail")
3319 } else if (prefix
== "df") {
3320 bool verbose
= (detail
== "detail");
3322 f
->open_object_section("stats");
3324 pgservice
->dump_fs_stats(&ds
, f
.get(), verbose
);
3327 pgservice
->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3335 assert(0 == "We should never get here!");
3341 } else if (prefix
== "report") {
3343 // this must be formatted, in its current form
3345 f
.reset(Formatter::create("json-pretty"));
3346 f
->open_object_section("report");
3347 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3348 f
->dump_string("version", ceph_version_to_str());
3349 f
->dump_string("commit", git_version_to_str());
3350 f
->dump_stream("timestamp") << ceph_clock_now();
3352 vector
<string
> tagsvec
;
3353 cmd_getval(g_ceph_context
, cmdmap
, "tags", tagsvec
);
3354 string tagstr
= str_join(tagsvec
, " ");
3355 if (!tagstr
.empty())
3356 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3357 f
->dump_string("tag", tagstr
);
3360 get_health(hs
, NULL
, f
.get());
3362 monmon()->dump_info(f
.get());
3363 osdmon()->dump_info(f
.get());
3364 mdsmon()->dump_info(f
.get());
3365 authmon()->dump_info(f
.get());
3366 pgservice
->dump_info(f
.get());
3368 paxos
->dump_info(f
.get());
3374 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT
);
3377 } else if (prefix
== "osd last-stat-seq") {
3379 cmd_getval(g_ceph_context
, cmdmap
, "id", osd
);
3380 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3382 f
->dump_unsigned("seq", seq
);
3390 } else if (prefix
== "node ls") {
3391 string
node_type("all");
3392 cmd_getval(g_ceph_context
, cmdmap
, "type", node_type
);
3394 f
.reset(Formatter::create("json-pretty"));
3395 if (node_type
== "all") {
3396 f
->open_object_section("nodes");
3397 print_nodes(f
.get(), ds
);
3398 osdmon()->print_nodes(f
.get());
3399 mdsmon()->print_nodes(f
.get());
3401 } else if (node_type
== "mon") {
3402 print_nodes(f
.get(), ds
);
3403 } else if (node_type
== "osd") {
3404 osdmon()->print_nodes(f
.get());
3405 } else if (node_type
== "mds") {
3406 mdsmon()->print_nodes(f
.get());
3412 } else if (prefix
== "features") {
3413 if (!is_leader() && !is_peon()) {
3414 dout(10) << " waiting for quorum" << dendl
;
3415 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3419 forward_request_leader(op
);
3423 f
.reset(Formatter::create("json-pretty"));
3425 get_combined_feature_map(&fm
);
3426 f
->dump_object("features", fm
);
3430 } else if (prefix
== "mon metadata") {
3432 f
.reset(Formatter::create("json-pretty"));
3435 bool all
= !cmd_getval(g_ceph_context
, cmdmap
, "id", name
);
3437 // Dump a single mon's metadata
3438 int mon
= monmap
->get_rank(name
);
3440 rs
= "requested mon not found";
3444 f
->open_object_section("mon_metadata");
3445 r
= get_mon_metadata(mon
, f
.get(), ds
);
3448 // Dump all mons' metadata
3450 f
->open_array_section("mon_metadata");
3451 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3452 std::ostringstream get_err
;
3453 f
->open_object_section("mon");
3454 f
->dump_string("name", monmap
->get_name(rank
));
3455 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3457 if (r
== -ENOENT
|| r
== -EINVAL
) {
3458 dout(1) << get_err
.str() << dendl
;
3459 // Drop error, list what metadata we do have
3461 } else if (r
!= 0) {
3462 derr
<< "Unexpected error from get_mon_metadata: "
3463 << cpp_strerror(r
) << dendl
;
3464 ds
<< get_err
.str();
3474 } else if (prefix
== "mon versions") {
3476 f
.reset(Formatter::create("json-pretty"));
3477 count_metadata("ceph_version", f
.get());
3482 } else if (prefix
== "mon count-metadata") {
3484 f
.reset(Formatter::create("json-pretty"));
3486 cmd_getval(g_ceph_context
, cmdmap
, "property", field
);
3487 count_metadata(field
, f
.get());
3492 } else if (prefix
== "quorum_status") {
3493 // make sure our map is readable and up to date
3494 if (!is_leader() && !is_peon()) {
3495 dout(10) << " waiting for quorum" << dendl
;
3496 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3499 _quorum_status(f
.get(), ds
);
3503 } else if (prefix
== "mon_status") {
3504 get_mon_status(f
.get(), ds
);
3510 } else if (prefix
== "sync force" ||
3511 prefix
== "mon sync force") {
3512 string validate1
, validate2
;
3513 cmd_getval(g_ceph_context
, cmdmap
, "validate1", validate1
);
3514 cmd_getval(g_ceph_context
, cmdmap
, "validate2", validate2
);
3515 if (validate1
!= "--yes-i-really-mean-it" ||
3516 validate2
!= "--i-know-what-i-am-doing") {
3518 rs
= "are you SURE? this will mean the monitor store will be "
3519 "erased. pass '--yes-i-really-mean-it "
3520 "--i-know-what-i-am-doing' if you really do.";
3523 sync_force(f
.get(), ds
);
3526 } else if (prefix
== "heap") {
3527 if (!ceph_using_tcmalloc())
3528 rs
= "tcmalloc not enabled, can't use heap profiler commands\n";
3531 cmd_getval(g_ceph_context
, cmdmap
, "heapcmd", heapcmd
);
3532 // XXX 1-element vector, change at callee or make vector here?
3533 vector
<string
> heapcmd_vec
;
3534 get_str_vec(heapcmd
, heapcmd_vec
);
3535 ceph_heap_profiler_handle_command(heapcmd_vec
, ds
);
3540 } else if (prefix
== "quorum") {
3542 cmd_getval(g_ceph_context
, cmdmap
, "quorumcmd", quorumcmd
);
3543 if (quorumcmd
== "exit") {
3545 elector
.stop_participating();
3546 rs
= "stopped responding to quorum, initiated new election";
3548 } else if (quorumcmd
== "enter") {
3549 elector
.start_participating();
3551 rs
= "started responding to quorum, initiated new election";
3554 rs
= "needs a valid 'quorum' command";
3557 } else if (prefix
== "version") {
3559 f
->open_object_section("version");
3560 f
->dump_string("version", pretty_version_to_str());
3564 ds
<< pretty_version_to_str();
3569 } else if (prefix
== "versions") {
3571 f
.reset(Formatter::create("json-pretty"));
3572 map
<string
,int> overall
;
3573 f
->open_object_section("version");
3574 map
<string
,int> mon
, mgr
, osd
, mds
;
3576 count_metadata("ceph_version", &mon
);
3577 f
->open_object_section("mon");
3578 for (auto& p
: mon
) {
3579 f
->dump_int(p
.first
.c_str(), p
.second
);
3580 overall
[p
.first
] += p
.second
;
3584 mgrmon()->count_metadata("ceph_version", &mgr
);
3585 f
->open_object_section("mgr");
3586 for (auto& p
: mgr
) {
3587 f
->dump_int(p
.first
.c_str(), p
.second
);
3588 overall
[p
.first
] += p
.second
;
3592 osdmon()->count_metadata("ceph_version", &osd
);
3593 f
->open_object_section("osd");
3594 for (auto& p
: osd
) {
3595 f
->dump_int(p
.first
.c_str(), p
.second
);
3596 overall
[p
.first
] += p
.second
;
3600 mdsmon()->count_metadata("ceph_version", &mds
);
3601 f
->open_object_section("mds");
3602 for (auto& p
: mon
) {
3603 f
->dump_int(p
.first
.c_str(), p
.second
);
3604 overall
[p
.first
] += p
.second
;
3608 for (auto& p
: mgrstatmon()->get_service_map().services
) {
3609 f
->open_object_section(p
.first
.c_str());
3611 p
.second
.count_metadata("ceph_version", &m
);
3613 f
->dump_int(q
.first
.c_str(), q
.second
);
3614 overall
[q
.first
] += q
.second
;
3619 f
->open_object_section("overall");
3620 for (auto& p
: overall
) {
3621 f
->dump_int(p
.first
.c_str(), p
.second
);
3631 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3632 reply_command(op
, r
, rs
, rdata
, 0);
3635 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3638 reply_command(op
, rc
, rs
, rdata
, version
);
3641 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3642 bufferlist
& rdata
, version_t version
)
3644 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
3645 assert(m
->get_type() == MSG_MON_COMMAND
);
3646 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3647 reply
->set_tid(m
->get_tid());
3648 reply
->set_data(rdata
);
3649 send_reply(op
, reply
);
3653 // ------------------------
3654 // request/reply routing
3656 // a client/mds/osd will connect to a random monitor. we need to forward any
3657 // messages requiring state updates to the leader, and then route any replies
3658 // back via the correct monitor and back to them. (the monitor will not
3659 // initiate any connections.)
3661 void Monitor::forward_request_leader(MonOpRequestRef op
)
3663 op
->mark_event(__func__
);
3665 int mon
= get_leader();
3666 MonSession
*session
= op
->get_session();
3667 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
3669 if (req
->get_source().is_mon() && req
->get_source_addr() != messenger
->get_myaddr()) {
3670 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
3671 } else if (session
->proxy_con
) {
3672 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
3673 } else if (!session
->closed
) {
3674 RoutedRequest
*rr
= new RoutedRequest
;
3675 rr
->tid
= ++routed_request_tid
;
3676 rr
->client_inst
= req
->get_source_inst();
3677 rr
->con
= req
->get_connection();
3678 rr
->con_features
= rr
->con
->get_features();
3679 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
3680 rr
->session
= static_cast<MonSession
*>(session
->get());
3682 routed_requests
[rr
->tid
] = rr
;
3683 session
->routed_request_tids
.insert(rr
->tid
);
3685 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
3686 << " features " << rr
->con_features
<< dendl
;
3688 MForward
*forward
= new MForward(rr
->tid
,
3692 forward
->set_priority(req
->get_priority());
3693 if (session
->auth_handler
) {
3694 forward
->entity_name
= session
->entity_name
;
3695 } else if (req
->get_source().is_mon()) {
3696 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
3698 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3699 op
->mark_forwarded();
3700 assert(op
->get_req()->get_type() != 0);
3702 dout(10) << "forward_request no session for request " << *req
<< dendl
;
3706 // fake connection attached to forwarded messages
3707 struct AnonConnection
: public Connection
{
3708 explicit AnonConnection(CephContext
*cct
) : Connection(cct
, NULL
) {}
3710 int send_message(Message
*m
) override
{
3711 assert(!"send_message on anonymous connection");
3713 void send_keepalive() override
{
3714 assert(!"send_keepalive on anonymous connection");
3716 void mark_down() override
{
3719 void mark_disposable() override
{
3722 bool is_connected() override
{ return false; }
3725 //extract the original message and put it into the regular dispatch function
3726 void Monitor::handle_forward(MonOpRequestRef op
)
3728 MForward
*m
= static_cast<MForward
*>(op
->get_req());
3729 dout(10) << "received forwarded message from " << m
->client
3730 << " via " << m
->get_source_inst() << dendl
;
3731 MonSession
*session
= op
->get_session();
3734 if (!session
->is_capable("mon", MON_CAP_X
)) {
3735 dout(0) << "forward from entity with insufficient caps! "
3736 << session
->caps
<< dendl
;
3738 // see PaxosService::dispatch(); we rely on this being anon
3739 // (c->msgr == NULL)
3740 PaxosServiceMessage
*req
= m
->claim_message();
3741 assert(req
!= NULL
);
3743 ConnectionRef
c(new AnonConnection(cct
));
3744 MonSession
*s
= new MonSession(req
->get_source_inst(),
3745 static_cast<Connection
*>(c
.get()));
3746 c
->set_priv(s
->get());
3747 c
->set_peer_addr(m
->client
.addr
);
3748 c
->set_peer_type(m
->client
.name
.type());
3749 c
->set_features(m
->con_features
);
3751 s
->caps
= m
->client_caps
;
3752 dout(10) << " caps are " << s
->caps
<< dendl
;
3753 s
->entity_name
= m
->entity_name
;
3754 dout(10) << " entity name '" << s
->entity_name
<< "' type "
3755 << s
->entity_name
.get_type() << dendl
;
3756 s
->proxy_con
= m
->get_connection();
3757 s
->proxy_tid
= m
->tid
;
3759 req
->set_connection(c
);
3761 // not super accurate, but better than nothing.
3762 req
->set_recv_stamp(m
->get_recv_stamp());
3765 * note which election epoch this is; we will drop the message if
3766 * there is a future election since our peers will resend routed
3767 * requests in that case.
3769 req
->rx_election_epoch
= get_epoch();
3771 /* Because this is a special fake connection, we need to break
3772 the ref loop between Connection and MonSession differently
3773 than we normally do. Here, the Message refers to the Connection
3774 which refers to the Session, and nobody else refers to the Connection
3775 or the Session. And due to the special nature of this message,
3776 nobody refers to the Connection via the Session. So, clear out that
3777 half of the ref loop.*/
3780 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
3787 void Monitor::try_send_message(Message
*m
, const entity_inst_t
& to
)
3789 dout(10) << "try_send_message " << *m
<< " to " << to
<< dendl
;
3792 encode_message(m
, quorum_con_features
, bl
);
3794 messenger
->send_message(m
, to
);
3796 for (int i
=0; i
<(int)monmap
->size(); i
++) {
3798 messenger
->send_message(new MRoute(bl
, to
), monmap
->get_inst(i
));
3802 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
3804 op
->mark_event(__func__
);
3806 MonSession
*session
= op
->get_session();
3808 Message
*req
= op
->get_req();
3809 ConnectionRef con
= op
->get_connection();
3811 reply
->set_cct(g_ceph_context
);
3812 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
3815 dout(2) << "send_reply no connection, dropping reply " << *reply
3816 << " to " << req
<< " " << *req
<< dendl
;
3818 op
->mark_event("reply: no connection");
3822 if (!session
->con
&& !session
->proxy_con
) {
3823 dout(2) << "send_reply no connection, dropping reply " << *reply
3824 << " to " << req
<< " " << *req
<< dendl
;
3826 op
->mark_event("reply: no connection");
3830 if (session
->proxy_con
) {
3831 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
3832 << " via " << session
->proxy_con
->get_peer_addr()
3833 << " for request " << *req
<< dendl
;
3834 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
3835 op
->mark_event("reply: send routed request");
3837 session
->con
->send_message(reply
);
3838 op
->mark_event("reply: send");
3842 void Monitor::no_reply(MonOpRequestRef op
)
3844 MonSession
*session
= op
->get_session();
3845 Message
*req
= op
->get_req();
3847 if (session
->proxy_con
) {
3848 dout(10) << "no_reply to " << req
->get_source_inst()
3849 << " via " << session
->proxy_con
->get_peer_addr()
3850 << " for request " << *req
<< dendl
;
3851 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
3852 op
->mark_event("no_reply: send routed request");
3854 dout(10) << "no_reply to " << req
->get_source_inst()
3855 << " " << *req
<< dendl
;
3856 op
->mark_event("no_reply");
3860 void Monitor::handle_route(MonOpRequestRef op
)
3862 MRoute
*m
= static_cast<MRoute
*>(op
->get_req());
3863 MonSession
*session
= op
->get_session();
3865 if (!session
->is_capable("mon", MON_CAP_X
)) {
3866 dout(0) << "MRoute received from entity without appropriate perms! "
3871 dout(10) << "handle_route " << *m
->msg
<< " to " << m
->dest
<< dendl
;
3873 dout(10) << "handle_route null to " << m
->dest
<< dendl
;
3876 if (m
->session_mon_tid
) {
3877 if (routed_requests
.count(m
->session_mon_tid
)) {
3878 RoutedRequest
*rr
= routed_requests
[m
->session_mon_tid
];
3880 // reset payload, in case encoding is dependent on target features
3882 m
->msg
->clear_payload();
3883 rr
->con
->send_message(m
->msg
);
3886 if (m
->send_osdmap_first
) {
3887 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
3888 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
3889 true, MonOpRequestRef());
3891 assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
3892 routed_requests
.erase(m
->session_mon_tid
);
3893 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
3896 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
3899 dout(10) << " not a routed request, trying to send anyway" << dendl
;
3901 messenger
->send_message(m
->msg
, m
->dest
);
3907 void Monitor::resend_routed_requests()
3909 dout(10) << "resend_routed_requests" << dendl
;
3910 int mon
= get_leader();
3911 list
<Context
*> retry
;
3912 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
3913 p
!= routed_requests
.end();
3915 RoutedRequest
*rr
= p
->second
;
3918 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
3919 rr
->op
->mark_event("retry routed request");
3920 retry
.push_back(new C_RetryMessage(this, rr
->op
));
3922 assert(rr
->session
->routed_request_tids
.count(p
->first
));
3923 rr
->session
->routed_request_tids
.erase(p
->first
);
3927 bufferlist::iterator q
= rr
->request_bl
.begin();
3928 PaxosServiceMessage
*req
= (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
3929 rr
->op
->mark_event("resend forwarded message to leader");
3930 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
<< dendl
;
3931 MForward
*forward
= new MForward(rr
->tid
, req
, rr
->con_features
,
3933 req
->put(); // forward takes its own ref; drop ours.
3934 forward
->client
= rr
->client_inst
;
3935 forward
->set_priority(req
->get_priority());
3936 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3940 routed_requests
.clear();
3941 finish_contexts(g_ceph_context
, retry
);
3945 void Monitor::remove_session(MonSession
*s
)
3947 dout(10) << "remove_session " << s
<< " " << s
->inst
3948 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
3951 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
3952 p
!= s
->routed_request_tids
.end();
3954 assert(routed_requests
.count(*p
));
3955 RoutedRequest
*rr
= routed_requests
[*p
];
3956 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
3958 routed_requests
.erase(*p
);
3960 s
->routed_request_tids
.clear();
3961 s
->con
->set_priv(NULL
);
3962 session_map
.remove_session(s
);
3963 logger
->set(l_mon_num_sessions
, session_map
.get_size());
3964 logger
->inc(l_mon_session_rm
);
3967 void Monitor::remove_all_sessions()
3969 Mutex::Locker
l(session_map_lock
);
3970 while (!session_map
.sessions
.empty()) {
3971 MonSession
*s
= session_map
.sessions
.front();
3974 logger
->inc(l_mon_session_rm
);
3977 logger
->set(l_mon_num_sessions
, session_map
.get_size());
3980 void Monitor::send_command(const entity_inst_t
& inst
,
3981 const vector
<string
>& com
)
3983 dout(10) << "send_command " << inst
<< "" << com
<< dendl
;
3984 MMonCommand
*c
= new MMonCommand(monmap
->fsid
);
3986 try_send_message(c
, inst
);
3989 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
3992 * Wait list the new session until we're in the quorum, assuming it's
3994 * tick() will periodically send them back through so we can send
3995 * the client elsewhere if we don't think we're getting back in.
3997 * But we whitelist a few sorts of messages:
3998 * 1) Monitors can talk to us at any time, of course.
3999 * 2) auth messages. It's unlikely to go through much faster, but
4000 * it's possible we've just lost our quorum status and we want to take...
4001 * 3) command messages. We want to accept these under all possible
4004 Message
*m
= op
->get_req();
4005 MonSession
*s
= op
->get_session();
4006 ConnectionRef con
= op
->get_connection();
4007 utime_t too_old
= ceph_clock_now();
4008 too_old
-= g_ceph_context
->_conf
->mon_lease
;
4009 if (m
->get_recv_stamp() > too_old
&&
4010 con
->is_connected()) {
4011 dout(5) << "waitlisting message " << *m
<< dendl
;
4012 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
4013 op
->mark_wait_for_quorum();
4015 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
4017 // proxied sessions aren't registered and don't have a con; don't remove
4019 if (!s
->proxy_con
) {
4020 Mutex::Locker
l(session_map_lock
);
4027 void Monitor::_ms_dispatch(Message
*m
)
4029 if (is_shutdown()) {
4034 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
4035 bool src_is_mon
= op
->is_src_mon();
4036 op
->mark_event("mon:_ms_dispatch");
4037 MonSession
*s
= op
->get_session();
4038 if (s
&& s
->closed
) {
4042 if (src_is_mon
&& s
) {
4043 ConnectionRef con
= m
->get_connection();
4044 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
4045 // only update features if this is a non-anonymous connection
4046 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
4047 << " (was " << s
->con_features
4048 << ", now " << con
->get_features() << ")" << dendl
;
4049 // connection features changed - recreate session.
4050 if (s
->con
&& s
->con
!= con
) {
4051 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
4052 << " changed from session; mark down and replace" << dendl
;
4053 s
->con
->mark_down();
4055 if (s
->item
.is_on_list()) {
4056 // forwarded messages' sessions are not in the sessions map and
4057 // exist only while the op is being handled.
4066 // if the sender is not a monitor, make sure their first message for a
4067 // session is an MAuth. If it is not, assume it's a stray message,
4068 // and considering that we are creating a new session it is safe to
4069 // assume that the sender hasn't authenticated yet, so we have no way
4070 // of assessing whether we should handle it or not.
4071 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
4072 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4073 m
->get_type() != CEPH_MSG_PING
)) {
4074 dout(1) << __func__
<< " dropping stray message " << *m
4075 << " from " << m
->get_source_inst() << dendl
;
4079 ConnectionRef con
= m
->get_connection();
4081 Mutex::Locker
l(session_map_lock
);
4082 s
= session_map
.new_session(m
->get_source_inst(), con
.get());
4085 con
->set_priv(s
->get());
4086 dout(10) << __func__
<< " new session " << s
<< " " << *s
4087 << " features 0x" << std::hex
4088 << s
->con_features
<< std::dec
<< dendl
;
4091 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4092 logger
->inc(l_mon_session_add
);
4095 // give it monitor caps; the peer type has been authenticated
4096 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4097 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4098 s
->caps
= *mon_caps
;
4102 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->inst
4108 s
->session_timeout
= ceph_clock_now();
4109 s
->session_timeout
+= g_conf
->mon_session_timeout
;
4111 if (s
->auth_handler
) {
4112 s
->entity_name
= s
->auth_handler
->get_entity_name();
4114 dout(20) << " caps " << s
->caps
.get_str() << dendl
;
4116 if ((is_synchronizing() ||
4117 (s
->global_id
== 0 && !exited_quorum
.is_zero())) &&
4119 m
->get_type() != CEPH_MSG_PING
) {
4120 waitlist_or_zap_client(op
);
4127 void Monitor::dispatch_op(MonOpRequestRef op
)
4129 op
->mark_event("mon:dispatch_op");
4130 MonSession
*s
= op
->get_session();
4133 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4137 /* we will consider the default type as being 'monitor' until proven wrong */
4138 op
->set_type_monitor();
4139 /* deal with all messages that do not necessarily need caps */
4140 bool dealt_with
= true;
4141 switch (op
->get_req()->get_type()) {
4143 case MSG_MON_GLOBAL_ID
:
4145 op
->set_type_service();
4146 /* no need to check caps here */
4147 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4154 /* MMonGetMap may be used by clients to obtain a monmap *before*
4155 * authenticating with the monitor. We need to handle these without
4156 * checking caps because, even on a cluster without cephx, we only set
4157 * session caps *after* the auth handshake. A good example of this
4158 * is when a client calls MonClient::get_monmap_privately(), which does
4159 * not authenticate when obtaining a monmap.
4161 case CEPH_MSG_MON_GET_MAP
:
4162 handle_mon_get_map(op
);
4165 case CEPH_MSG_MON_METADATA
:
4166 return handle_mon_metadata(op
);
4175 /* well, maybe the op belongs to a service... */
4176 op
->set_type_service();
4177 /* deal with all messages which caps should be checked somewhere else */
4179 switch (op
->get_req()->get_type()) {
4182 case CEPH_MSG_MON_GET_OSDMAP
:
4183 case CEPH_MSG_POOLOP
:
4184 case MSG_OSD_BEACON
:
4185 case MSG_OSD_MARK_ME_DOWN
:
4187 case MSG_OSD_FAILURE
:
4190 case MSG_OSD_PGTEMP
:
4191 case MSG_OSD_PG_CREATED
:
4192 case MSG_REMOVE_SNAPS
:
4193 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4197 case MSG_MDS_BEACON
:
4198 case MSG_MDS_OFFLOAD_TARGETS
:
4199 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4203 case MSG_MGR_BEACON
:
4204 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4208 case MSG_MON_MGR_REPORT
:
4209 case CEPH_MSG_STATFS
:
4210 case MSG_GETPOOLSTATS
:
4211 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4216 paxos_service
[PAXOS_PGMAP
]->dispatch(op
);
4221 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4224 // handle_command() does its own caps checking
4225 case MSG_MON_COMMAND
:
4226 op
->set_type_command();
4237 /* nop, looks like it's not a service message; revert back to monitor */
4238 op
->set_type_monitor();
4240 /* messages we, the Monitor class, need to deal with
4241 * but may be sent by clients. */
4243 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4244 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4245 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4251 switch (op
->get_req()->get_type()) {
4254 case CEPH_MSG_MON_GET_VERSION
:
4255 handle_get_version(op
);
4258 case CEPH_MSG_MON_SUBSCRIBE
:
4259 /* FIXME: check what's being subscribed, filter accordingly */
4260 handle_subscribe(op
);
4270 if (!op
->is_src_mon()) {
4271 dout(1) << __func__
<< " unexpected monitor message from"
4272 << " non-monitor entity " << op
->get_req()->get_source_inst()
4273 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4277 /* messages that should only be sent by another monitor */
4279 switch (op
->get_req()->get_type()) {
4289 // Sync (i.e., the new slurp, but on steroids)
4297 /* log acks are sent from a monitor we sent the MLog to, and are
4298 never sent by clients to us. */
4300 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4305 op
->set_type_service();
4306 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4312 op
->set_type_paxos();
4313 MMonPaxos
*pm
= static_cast<MMonPaxos
*>(op
->get_req());
4314 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4319 if (state
== STATE_SYNCHRONIZING
) {
4320 // we are synchronizing. These messages would do us no
4321 // good, thus just drop them and ignore them.
4322 dout(10) << __func__
<< " ignore paxos msg from "
4323 << pm
->get_source_inst() << dendl
;
4328 if (pm
->epoch
> get_epoch()) {
4332 if (pm
->epoch
!= get_epoch()) {
4336 paxos
->dispatch(op
);
4341 case MSG_MON_ELECTION
:
4342 op
->set_type_election();
4343 //check privileges here for simplicity
4344 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4345 dout(0) << "MMonElection received from entity without enough caps!"
4346 << op
->get_session()->caps
<< dendl
;
4349 if (!is_probing() && !is_synchronizing()) {
4350 elector
.dispatch(op
);
4359 handle_timecheck(op
);
4362 case MSG_MON_HEALTH
:
4363 health_monitor
->dispatch(op
);
4366 case MSG_MON_HEALTH_CHECKS
:
4367 op
->set_type_service();
4368 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4376 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4385 void Monitor::handle_ping(MonOpRequestRef op
)
4387 MPing
*m
= static_cast<MPing
*>(op
->get_req());
4388 dout(10) << __func__
<< " " << *m
<< dendl
;
4389 MPing
*reply
= new MPing
;
4390 entity_inst_t inst
= m
->get_source_inst();
4392 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4393 f
->open_object_section("pong");
4395 list
<string
> health_str
;
4396 get_health(health_str
, NULL
, f
.get());
4399 get_mon_status(f
.get(), ss
);
4405 ::encode(ss
.str(), payload
);
4406 reply
->set_payload(payload
);
4407 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4408 messenger
->send_message(reply
, inst
);
4411 void Monitor::timecheck_start()
4413 dout(10) << __func__
<< dendl
;
4414 timecheck_cleanup();
4415 timecheck_start_round();
4418 void Monitor::timecheck_finish()
4420 dout(10) << __func__
<< dendl
;
4421 timecheck_cleanup();
4424 void Monitor::timecheck_start_round()
4426 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4427 assert(is_leader());
4429 if (monmap
->size() == 1) {
4430 assert(0 == "We are alone; this shouldn't have been scheduled!");
4434 if (timecheck_round
% 2) {
4435 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4436 utime_t curr_time
= ceph_clock_now();
4437 double max
= g_conf
->mon_timecheck_interval
*3;
4438 if (curr_time
- timecheck_round_start
< max
) {
4439 dout(10) << __func__
<< " keep current round going" << dendl
;
4442 dout(10) << __func__
4443 << " finish current timecheck and start new" << dendl
;
4444 timecheck_cancel_round();
4448 assert(timecheck_round
% 2 == 0);
4451 timecheck_round_start
= ceph_clock_now();
4452 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4456 dout(10) << __func__
<< " setting up next event" << dendl
;
4457 timecheck_reset_event();
4460 void Monitor::timecheck_finish_round(bool success
)
4462 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4463 assert(timecheck_round
% 2);
4465 timecheck_round_start
= utime_t();
4468 assert(timecheck_waiting
.empty());
4469 assert(timecheck_acks
== quorum
.size());
4471 timecheck_check_skews();
4475 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4476 << " peers still waiting:";
4477 for (map
<entity_inst_t
,utime_t
>::iterator p
= timecheck_waiting
.begin();
4478 p
!= timecheck_waiting
.end(); ++p
) {
4479 *_dout
<< " " << p
->first
.name
;
4482 timecheck_waiting
.clear();
4484 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4487 void Monitor::timecheck_cancel_round()
4489 timecheck_finish_round(false);
4492 void Monitor::timecheck_cleanup()
4494 timecheck_round
= 0;
4496 timecheck_round_start
= utime_t();
4498 if (timecheck_event
) {
4499 timer
.cancel_event(timecheck_event
);
4500 timecheck_event
= NULL
;
4502 timecheck_waiting
.clear();
4503 timecheck_skews
.clear();
4504 timecheck_latencies
.clear();
4506 timecheck_rounds_since_clean
= 0;
4509 void Monitor::timecheck_reset_event()
4511 if (timecheck_event
) {
4512 timer
.cancel_event(timecheck_event
);
4513 timecheck_event
= NULL
;
4517 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4519 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4520 delay
= cct
->_conf
->mon_timecheck_interval
;
4523 dout(10) << __func__
<< " delay " << delay
4524 << " rounds_since_clean " << timecheck_rounds_since_clean
4527 timecheck_event
= new C_MonContext(this, [this](int) {
4528 timecheck_start_round();
4530 timer
.add_event_after(delay
, timecheck_event
);
4533 void Monitor::timecheck_check_skews()
4535 dout(10) << __func__
<< dendl
;
4536 assert(is_leader());
4537 assert((timecheck_round
% 2) == 0);
4538 if (monmap
->size() == 1) {
4539 assert(0 == "We are alone; we shouldn't have gotten here!");
4542 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4544 bool found_skew
= false;
4545 for (map
<entity_inst_t
, double>::iterator p
= timecheck_skews
.begin();
4546 p
!= timecheck_skews
.end(); ++p
) {
4549 if (timecheck_has_skew(p
->second
, &abs_skew
)) {
4550 dout(10) << __func__
4551 << " " << p
->first
<< " skew " << abs_skew
<< dendl
;
4557 ++timecheck_rounds_since_clean
;
4558 timecheck_reset_event();
4559 } else if (timecheck_rounds_since_clean
> 0) {
4561 << " no clock skews found after " << timecheck_rounds_since_clean
4562 << " rounds" << dendl
;
4563 // make sure the skews are really gone and not just a transient success
4564 // this will run just once if not in the presence of skews again.
4565 timecheck_rounds_since_clean
= 1;
4566 timecheck_reset_event();
4567 timecheck_rounds_since_clean
= 0;
4572 void Monitor::timecheck_report()
4574 dout(10) << __func__
<< dendl
;
4575 assert(is_leader());
4576 assert((timecheck_round
% 2) == 0);
4577 if (monmap
->size() == 1) {
4578 assert(0 == "We are alone; we shouldn't have gotten here!");
4582 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4583 bool do_output
= true; // only output report once
4584 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4585 if (monmap
->get_name(*q
) == name
)
4588 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_REPORT
);
4589 m
->epoch
= get_epoch();
4590 m
->round
= timecheck_round
;
4592 for (map
<entity_inst_t
, double>::iterator it
= timecheck_skews
.begin();
4593 it
!= timecheck_skews
.end(); ++it
) {
4594 double skew
= it
->second
;
4595 double latency
= timecheck_latencies
[it
->first
];
4597 m
->skews
[it
->first
] = skew
;
4598 m
->latencies
[it
->first
] = latency
;
4601 dout(25) << __func__
<< " " << it
->first
4602 << " latency " << latency
4603 << " skew " << skew
<< dendl
;
4607 entity_inst_t inst
= monmap
->get_inst(*q
);
4608 dout(10) << __func__
<< " send report to " << inst
<< dendl
;
4609 messenger
->send_message(m
, inst
);
4613 void Monitor::timecheck()
4615 dout(10) << __func__
<< dendl
;
4616 assert(is_leader());
4617 if (monmap
->size() == 1) {
4618 assert(0 == "We are alone; we shouldn't have gotten here!");
4621 assert(timecheck_round
% 2 != 0);
4623 timecheck_acks
= 1; // we ack ourselves
4625 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4626 << " round " << timecheck_round
<< dendl
;
4628 // we are at the eye of the storm; the point of reference
4629 timecheck_skews
[messenger
->get_myinst()] = 0.0;
4630 timecheck_latencies
[messenger
->get_myinst()] = 0.0;
4632 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4633 if (monmap
->get_name(*it
) == name
)
4636 entity_inst_t inst
= monmap
->get_inst(*it
);
4637 utime_t curr_time
= ceph_clock_now();
4638 timecheck_waiting
[inst
] = curr_time
;
4639 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_PING
);
4640 m
->epoch
= get_epoch();
4641 m
->round
= timecheck_round
;
4642 dout(10) << __func__
<< " send " << *m
<< " to " << inst
<< dendl
;
4643 messenger
->send_message(m
, inst
);
4647 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
4648 const double skew_bound
,
4649 const double latency
)
4651 health_status_t status
= HEALTH_OK
;
4652 assert(latency
>= 0);
4655 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
4656 status
= HEALTH_WARN
;
4657 ss
<< "clock skew " << abs_skew
<< "s"
4658 << " > max " << g_conf
->mon_clock_drift_allowed
<< "s";
4664 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
4666 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4667 dout(10) << __func__
<< " " << *m
<< dendl
;
4668 /* handles PONG's */
4669 assert(m
->op
== MTimeCheck::OP_PONG
);
4671 entity_inst_t other
= m
->get_source_inst();
4672 if (m
->epoch
< get_epoch()) {
4673 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
4674 << " from " << other
4675 << " curr " << get_epoch()
4676 << " -- severely lagged? discard" << dendl
;
4679 assert(m
->epoch
== get_epoch());
4681 if (m
->round
< timecheck_round
) {
4682 dout(1) << __func__
<< " got old round " << m
->round
4683 << " from " << other
4684 << " curr " << timecheck_round
<< " -- discard" << dendl
;
4688 utime_t curr_time
= ceph_clock_now();
4690 assert(timecheck_waiting
.count(other
) > 0);
4691 utime_t timecheck_sent
= timecheck_waiting
[other
];
4692 timecheck_waiting
.erase(other
);
4693 if (curr_time
< timecheck_sent
) {
4694 // our clock was readjusted -- drop everything until it all makes sense.
4695 dout(1) << __func__
<< " our clock was readjusted --"
4696 << " bump round and drop current check"
4698 timecheck_cancel_round();
4702 /* update peer latencies */
4703 double latency
= (double)(curr_time
- timecheck_sent
);
4705 if (timecheck_latencies
.count(other
) == 0)
4706 timecheck_latencies
[other
] = latency
;
4708 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
4709 timecheck_latencies
[other
] = avg_latency
;
4715 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4716 * and 'a' happens to be lower than 'b'; so we use double instead.
4718 * latency is always expected to be >= 0.
4720 * delta, the difference between theirs timestamp and ours, may either be
4721 * lower or higher than 0; will hardly ever be 0.
4723 * The absolute skew is the absolute delta minus the latency, which is
4724 * taken as a whole instead of an rtt given that there is some queueing
4725 * and dispatch times involved and it's hard to assess how long exactly
4726 * it took for the message to travel to the other side and be handled. So
4727 * we call it a bounded skew, the worst case scenario.
4731 * Given that the latency is always positive, we can establish that the
4732 * bounded skew will be:
4734 * 1. positive if the absolute delta is higher than the latency and
4736 * 2. negative if the absolute delta is higher than the latency and
4737 * delta is negative.
4738 * 3. zero if the absolute delta is lower than the latency.
4740 * On 3. we make a judgement call and treat the skew as non-existent.
4741 * This is because that, if the absolute delta is lower than the
4742 * latency, then the apparently existing skew is nothing more than a
4743 * side-effect of the high latency at work.
4745 * This may not be entirely true though, as a severely skewed clock
4746 * may be masked by an even higher latency, but with high latencies
4747 * we probably have worse issues to deal with than just skewed clocks.
4749 assert(latency
>= 0);
4751 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
4752 double abs_delta
= (delta
> 0 ? delta
: -delta
);
4753 double skew_bound
= abs_delta
- latency
;
4757 skew_bound
= -skew_bound
;
4760 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
4761 if (status
== HEALTH_ERR
)
4762 clog
->error() << other
<< " " << ss
.str();
4763 else if (status
== HEALTH_WARN
)
4764 clog
->warn() << other
<< " " << ss
.str();
4766 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
4767 << " delta " << delta
<< " skew_bound " << skew_bound
4768 << " latency " << latency
<< dendl
;
4770 timecheck_skews
[other
] = skew_bound
;
4773 if (timecheck_acks
== quorum
.size()) {
4774 dout(10) << __func__
<< " got pongs from everybody ("
4775 << timecheck_acks
<< " total)" << dendl
;
4776 assert(timecheck_skews
.size() == timecheck_acks
);
4777 assert(timecheck_waiting
.empty());
4778 // everyone has acked, so bump the round to finish it.
4779 timecheck_finish_round();
4783 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
4785 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4786 dout(10) << __func__
<< " " << *m
<< dendl
;
4789 assert(m
->op
== MTimeCheck::OP_PING
|| m
->op
== MTimeCheck::OP_REPORT
);
4791 if (m
->epoch
!= get_epoch()) {
4792 dout(1) << __func__
<< " got wrong epoch "
4793 << "(ours " << get_epoch()
4794 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
4798 if (m
->round
< timecheck_round
) {
4799 dout(1) << __func__
<< " got old round " << m
->round
4800 << " current " << timecheck_round
4801 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
4805 timecheck_round
= m
->round
;
4807 if (m
->op
== MTimeCheck::OP_REPORT
) {
4808 assert((timecheck_round
% 2) == 0);
4809 timecheck_latencies
.swap(m
->latencies
);
4810 timecheck_skews
.swap(m
->skews
);
4814 assert((timecheck_round
% 2) != 0);
4815 MTimeCheck
*reply
= new MTimeCheck(MTimeCheck::OP_PONG
);
4816 utime_t curr_time
= ceph_clock_now();
4817 reply
->timestamp
= curr_time
;
4818 reply
->epoch
= m
->epoch
;
4819 reply
->round
= m
->round
;
4820 dout(10) << __func__
<< " send " << *m
4821 << " to " << m
->get_source_inst() << dendl
;
4822 m
->get_connection()->send_message(reply
);
4825 void Monitor::handle_timecheck(MonOpRequestRef op
)
4827 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4828 dout(10) << __func__
<< " " << *m
<< dendl
;
4831 if (m
->op
!= MTimeCheck::OP_PONG
) {
4832 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
4834 handle_timecheck_leader(op
);
4836 } else if (is_peon()) {
4837 if (m
->op
!= MTimeCheck::OP_PING
&& m
->op
!= MTimeCheck::OP_REPORT
) {
4838 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
4840 handle_timecheck_peon(op
);
4843 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
4847 void Monitor::handle_subscribe(MonOpRequestRef op
)
4849 MMonSubscribe
*m
= static_cast<MMonSubscribe
*>(op
->get_req());
4850 dout(10) << "handle_subscribe " << *m
<< dendl
;
4854 MonSession
*s
= op
->get_session();
4857 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
4860 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4861 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
4864 // remove conflicting subscribes
4865 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4866 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
4867 it
!= s
->sub_map
.end(); ) {
4868 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
4869 Mutex::Locker
l(session_map_lock
);
4870 session_map
.remove_sub((it
++)->second
);
4878 Mutex::Locker
l(session_map_lock
);
4879 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
4880 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
4881 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
4884 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
4885 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
4886 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
4887 Subscription
*sub
= s
->sub_map
[p
->first
];
4888 assert(sub
!= nullptr);
4889 mdsmon()->check_sub(sub
);
4891 } else if (p
->first
== "osdmap") {
4892 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
4893 if (s
->osd_epoch
> p
->second
.start
) {
4894 // client needs earlier osdmaps on purpose, so reset the sent epoch
4897 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
4899 } else if (p
->first
== "osd_pg_creates") {
4900 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
4901 if (monmap
->get_required_features().contains_all(
4902 ceph::features::mon::FEATURE_LUMINOUS
)) {
4903 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
4905 pgmon()->check_sub(s
->sub_map
["osd_pg_creates"]);
4908 } else if (p
->first
== "monmap") {
4909 monmon()->check_sub(s
->sub_map
[p
->first
]);
4910 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4911 logmon()->check_sub(s
->sub_map
[p
->first
]);
4912 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
4913 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
4914 } else if (p
->first
== "servicemap") {
4915 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
4920 // we only need to reply if the client is old enough to think it
4921 // has to send renewals.
4922 ConnectionRef con
= m
->get_connection();
4923 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
4924 m
->get_connection()->send_message(new MMonSubscribeAck(
4925 monmap
->get_fsid(), (int)g_conf
->mon_subscribe_interval
));
4930 void Monitor::handle_get_version(MonOpRequestRef op
)
4932 MMonGetVersion
*m
= static_cast<MMonGetVersion
*>(op
->get_req());
4933 dout(10) << "handle_get_version " << *m
<< dendl
;
4934 PaxosService
*svc
= NULL
;
4936 MonSession
*s
= op
->get_session();
4939 if (!is_leader() && !is_peon()) {
4940 dout(10) << " waiting for quorum" << dendl
;
4941 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
4945 if (m
->what
== "mdsmap") {
4947 } else if (m
->what
== "fsmap") {
4949 } else if (m
->what
== "osdmap") {
4951 } else if (m
->what
== "monmap") {
4954 derr
<< "invalid map type " << m
->what
<< dendl
;
4958 if (!svc
->is_readable()) {
4959 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
4963 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
4964 reply
->handle
= m
->handle
;
4965 reply
->version
= svc
->get_last_committed();
4966 reply
->oldest_version
= svc
->get_first_committed();
4967 reply
->set_tid(m
->get_tid());
4969 m
->get_connection()->send_message(reply
);
4975 bool Monitor::ms_handle_reset(Connection
*con
)
4977 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
4979 // ignore lossless monitor sessions
4980 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
4983 MonSession
*s
= static_cast<MonSession
*>(con
->get_priv());
4987 // break any con <-> session ref cycle
4988 s
->con
->set_priv(NULL
);
4993 Mutex::Locker
l(lock
);
4995 dout(10) << "reset/close on session " << s
->inst
<< dendl
;
4997 Mutex::Locker
l(session_map_lock
);
5004 bool Monitor::ms_handle_refused(Connection
*con
)
5006 // just log for now...
5007 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
5013 void Monitor::send_latest_monmap(Connection
*con
)
5016 monmap
->encode(bl
, con
->get_features());
5017 con
->send_message(new MMonMap(bl
));
5020 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
5022 MMonGetMap
*m
= static_cast<MMonGetMap
*>(op
->get_req());
5023 dout(10) << "handle_mon_get_map" << dendl
;
5024 send_latest_monmap(m
->get_connection().get());
5027 void Monitor::handle_mon_metadata(MonOpRequestRef op
)
5029 MMonMetadata
*m
= static_cast<MMonMetadata
*>(op
->get_req());
5031 dout(10) << __func__
<< dendl
;
5032 update_mon_metadata(m
->get_source().num(), std::move(m
->data
));
5036 void Monitor::update_mon_metadata(int from
, Metadata
&& m
)
5038 // NOTE: this is now for legacy (kraken or jewel) mons only.
5039 pending_metadata
[from
] = std::move(m
);
5041 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5043 ::encode(pending_metadata
, bl
);
5044 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5045 paxos
->trigger_propose();
5048 int Monitor::load_metadata()
5051 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5054 bufferlist::iterator it
= bl
.begin();
5055 ::decode(mon_metadata
, it
);
5057 pending_metadata
= mon_metadata
;
5061 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
5064 if (!mon_metadata
.count(mon
)) {
5065 err
<< "mon." << mon
<< " not found";
5068 const Metadata
& m
= mon_metadata
[mon
];
5069 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
5070 f
->dump_string(p
->first
.c_str(), p
->second
);
5075 void Monitor::count_metadata(const string
& field
, map
<string
,int> *out
)
5077 for (auto& p
: mon_metadata
) {
5078 auto q
= p
.second
.find(field
);
5079 if (q
== p
.second
.end()) {
5080 (*out
)["unknown"]++;
5082 (*out
)[q
->second
]++;
5087 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5089 map
<string
,int> by_val
;
5090 count_metadata(field
, &by_val
);
5091 f
->open_object_section(field
.c_str());
5092 for (auto& p
: by_val
) {
5093 f
->dump_int(p
.first
.c_str(), p
.second
);
5098 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5100 map
<string
, list
<int> > mons
; // hostname => mon
5101 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5102 it
!= mon_metadata
.end(); ++it
) {
5103 const Metadata
& m
= it
->second
;
5104 Metadata::const_iterator hostname
= m
.find("hostname");
5105 if (hostname
== m
.end()) {
5106 // not likely though
5109 mons
[hostname
->second
].push_back(it
->first
);
5112 dump_services(f
, mons
, "mon");
5116 // ----------------------------------------------
5119 int Monitor::scrub_start()
5121 dout(10) << __func__
<< dendl
;
5122 assert(is_leader());
5124 if (!scrub_result
.empty()) {
5125 clog
->info() << "scrub already in progress";
5129 scrub_event_cancel();
5130 scrub_result
.clear();
5131 scrub_state
.reset(new ScrubState
);
5137 int Monitor::scrub()
5139 assert(is_leader());
5140 assert(scrub_state
);
5142 scrub_cancel_timeout();
5143 wait_for_paxos_write();
5144 scrub_version
= paxos
->get_version();
5147 // scrub all keys if we're the only monitor in the quorum
5149 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5151 for (set
<int>::iterator p
= quorum
.begin();
5156 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5158 r
->key
= scrub_state
->last_key
;
5159 messenger
->send_message(r
, monmap
->get_inst(*p
));
5163 bool r
= _scrub(&scrub_result
[rank
],
5164 &scrub_state
->last_key
,
5167 scrub_state
->finished
= !r
;
5169 // only after we got our scrub results do we really care whether the
5170 // other monitors are late on their results. Also, this way we avoid
5171 // triggering the timeout if we end up getting stuck in _scrub() for
5172 // longer than the duration of the timeout.
5173 scrub_reset_timeout();
5175 if (quorum
.size() == 1) {
5176 assert(scrub_state
->finished
== true);
5182 void Monitor::handle_scrub(MonOpRequestRef op
)
5184 MMonScrub
*m
= static_cast<MMonScrub
*>(op
->get_req());
5185 dout(10) << __func__
<< " " << *m
<< dendl
;
5187 case MMonScrub::OP_SCRUB
:
5192 wait_for_paxos_write();
5194 if (m
->version
!= paxos
->get_version())
5197 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5201 reply
->key
= m
->key
;
5202 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5203 m
->get_connection()->send_message(reply
);
5207 case MMonScrub::OP_RESULT
:
5211 if (m
->version
!= scrub_version
)
5213 // reset the timeout each time we get a result
5214 scrub_reset_timeout();
5216 int from
= m
->get_source().num();
5217 assert(scrub_result
.count(from
) == 0);
5218 scrub_result
[from
] = m
->result
;
5220 if (scrub_result
.size() == quorum
.size()) {
5221 scrub_check_results();
5222 scrub_result
.clear();
5223 if (scrub_state
->finished
)
5233 bool Monitor::_scrub(ScrubResult
*r
,
5234 pair
<string
,string
> *start
,
5238 assert(start
!= NULL
);
5239 assert(num_keys
!= NULL
);
5241 set
<string
> prefixes
= get_sync_targets_names();
5242 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5244 dout(10) << __func__
<< " start (" << *start
<< ")"
5245 << " num_keys " << *num_keys
<< dendl
;
5247 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5249 int scrubbed_keys
= 0;
5250 pair
<string
,string
> last_key
;
5252 while (it
->has_next_chunk()) {
5254 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5257 pair
<string
,string
> k
= it
->get_next_key();
5258 if (prefixes
.count(k
.first
) == 0)
5261 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5262 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5263 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5269 int err
= store
->get(k
.first
, k
.second
, bl
);
5272 uint32_t key_crc
= bl
.crc32c(0);
5273 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5274 << " crc " << key_crc
<< dendl
;
5275 r
->prefix_keys
[k
.first
]++;
5276 if (r
->prefix_crc
.count(k
.first
) == 0) {
5277 r
->prefix_crc
[k
.first
] = 0;
5279 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5281 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5282 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5283 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5284 r
->prefix_crc
[k
.first
] += 1;
5291 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5292 << " scrubbed_keys " << scrubbed_keys
5293 << " has_next " << it
->has_next_chunk() << dendl
;
5296 *num_keys
= scrubbed_keys
;
5298 return it
->has_next_chunk();
5301 void Monitor::scrub_check_results()
5303 dout(10) << __func__
<< dendl
;
5307 ScrubResult
& mine
= scrub_result
[rank
];
5308 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5309 p
!= scrub_result
.end();
5311 if (p
->first
== rank
)
5313 if (p
->second
!= mine
) {
5315 clog
->error() << "scrub mismatch";
5316 clog
->error() << " mon." << rank
<< " " << mine
;
5317 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5321 clog
->debug() << "scrub ok on " << quorum
<< ": " << mine
;
5324 inline void Monitor::scrub_timeout()
5326 dout(1) << __func__
<< " restarting scrub" << dendl
;
5331 void Monitor::scrub_finish()
5333 dout(10) << __func__
<< dendl
;
5335 scrub_event_start();
5338 void Monitor::scrub_reset()
5340 dout(10) << __func__
<< dendl
;
5341 scrub_cancel_timeout();
5343 scrub_result
.clear();
5344 scrub_state
.reset();
5347 inline void Monitor::scrub_update_interval(int secs
)
5349 // we don't care about changes if we are not the leader.
5350 // changes will be visible if we become the leader.
5354 dout(1) << __func__
<< " new interval = " << secs
<< dendl
;
5356 // if scrub already in progress, all changes will already be visible during
5357 // the next round. Nothing to do.
5358 if (scrub_state
!= NULL
)
5361 scrub_event_cancel();
5362 scrub_event_start();
5365 void Monitor::scrub_event_start()
5367 dout(10) << __func__
<< dendl
;
5370 scrub_event_cancel();
5372 if (cct
->_conf
->mon_scrub_interval
<= 0) {
5373 dout(1) << __func__
<< " scrub event is disabled"
5374 << " (mon_scrub_interval = " << cct
->_conf
->mon_scrub_interval
5379 scrub_event
= new C_MonContext(this, [this](int) {
5382 timer
.add_event_after(cct
->_conf
->mon_scrub_interval
, scrub_event
);
5385 void Monitor::scrub_event_cancel()
5387 dout(10) << __func__
<< dendl
;
5389 timer
.cancel_event(scrub_event
);
5394 inline void Monitor::scrub_cancel_timeout()
5396 if (scrub_timeout_event
) {
5397 timer
.cancel_event(scrub_timeout_event
);
5398 scrub_timeout_event
= NULL
;
5402 void Monitor::scrub_reset_timeout()
5404 dout(15) << __func__
<< " reset timeout event" << dendl
;
5405 scrub_cancel_timeout();
5407 scrub_timeout_event
= new C_MonContext(this, [this](int) {
5410 timer
.add_event_after(g_conf
->mon_scrub_timeout
, scrub_timeout_event
);
5413 /************ TICK ***************/
5414 void Monitor::new_tick()
5416 timer
.add_event_after(g_conf
->mon_tick_interval
, new C_MonContext(this, [this](int) {
5421 void Monitor::tick()
5424 dout(11) << "tick" << dendl
;
5426 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
) {
5432 utime_t now
= ceph_clock_now();
5434 Mutex::Locker
l(session_map_lock
);
5435 auto p
= session_map
.sessions
.begin();
5437 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5438 now
> (exited_quorum
+ 2*g_conf
->mon_lease
));
5444 // don't trim monitors
5445 if (s
->inst
.name
.is_mon())
5448 if (s
->session_timeout
< now
&& s
->con
) {
5449 // check keepalive, too
5450 s
->session_timeout
= s
->con
->get_last_keepalive();
5451 s
->session_timeout
+= g_conf
->mon_session_timeout
;
5453 if (s
->session_timeout
< now
) {
5454 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5455 << " (timeout " << s
->session_timeout
5456 << " < now " << now
<< ")" << dendl
;
5457 } else if (out_for_too_long
) {
5458 // boot the client Session because we've taken too long getting back in
5459 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5460 << " because we've been out of quorum too long" << dendl
;
5465 s
->con
->mark_down();
5467 logger
->inc(l_mon_session_trim
);
5470 sync_trim_providers();
5472 if (!maybe_wait_for_quorum
.empty()) {
5473 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5476 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5477 // this is only necessary on upgraded clusters.
5478 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5479 prepare_new_fingerprint(t
);
5480 paxos
->trigger_propose();
5486 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5489 nf
.generate_random();
5490 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5494 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5497 int Monitor::check_fsid()
5500 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5505 string
es(ebl
.c_str(), ebl
.length());
5507 // only keep the first line
5508 size_t pos
= es
.find_first_of('\n');
5509 if (pos
!= string::npos
)
5512 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5514 if (!ondisk
.parse(es
.c_str())) {
5515 derr
<< "error: unable to parse uuid" << dendl
;
5519 if (monmap
->get_fsid() != ondisk
) {
5520 derr
<< "error: cluster_uuid file exists with value " << ondisk
5521 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5528 int Monitor::write_fsid()
5530 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5532 int r
= store
->apply_transaction(t
);
5536 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5539 ss
<< monmap
->get_fsid() << "\n";
5540 string us
= ss
.str();
5545 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5550 * this is the closest thing to a traditional 'mkfs' for ceph.
5551 * initialize the monitor state machines to their initial values.
5553 int Monitor::mkfs(bufferlist
& osdmapbl
)
5555 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5557 // verify cluster fsid
5558 int r
= check_fsid();
5559 if (r
< 0 && r
!= -ENOENT
)
5563 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
5564 magicbl
.append("\n");
5565 t
->put(MONITOR_NAME
, "magic", magicbl
);
5568 features
= get_initial_supported_features();
5571 // save monmap, osdmap, keyring.
5572 bufferlist monmapbl
;
5573 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
5574 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5575 t
->put("mkfs", "monmap", monmapbl
);
5577 if (osdmapbl
.length()) {
5578 // make sure it's a valid osdmap
5581 om
.decode(osdmapbl
);
5583 catch (buffer::error
& e
) {
5584 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
5587 t
->put("mkfs", "osdmap", osdmapbl
);
5590 if (is_keyring_required()) {
5592 string keyring_filename
;
5594 r
= ceph_resolve_file_search(g_conf
->keyring
, keyring_filename
);
5596 derr
<< "unable to find a keyring file on " << g_conf
->keyring
5597 << ": " << cpp_strerror(r
) << dendl
;
5598 if (g_conf
->key
!= "") {
5599 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf
->key
+
5600 "\n\tcaps mon = \"allow *\"\n";
5602 bl
.append(keyring_plaintext
);
5604 bufferlist::iterator i
= bl
.begin();
5605 keyring
.decode_plaintext(i
);
5607 catch (const buffer::error
& e
) {
5608 derr
<< "error decoding keyring " << keyring_plaintext
5609 << ": " << e
.what() << dendl
;
5616 r
= keyring
.load(g_ceph_context
, keyring_filename
);
5618 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
5623 // put mon. key in external keyring; seed with everything else.
5624 extract_save_mon_key(keyring
);
5626 bufferlist keyringbl
;
5627 keyring
.encode_plaintext(keyringbl
);
5628 t
->put("mkfs", "keyring", keyringbl
);
5631 store
->apply_transaction(t
);
5636 int Monitor::write_default_keyring(bufferlist
& bl
)
5639 os
<< g_conf
->mon_data
<< "/keyring";
5642 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
, 0600);
5645 dout(0) << __func__
<< " failed to open " << os
.str()
5646 << ": " << cpp_strerror(err
) << dendl
;
5650 err
= bl
.write_fd(fd
);
5653 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5658 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
5660 EntityName mon_name
;
5661 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
5663 if (keyring
.get_auth(mon_name
, mon_key
)) {
5664 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
5666 pkey
.add(mon_name
, mon_key
);
5668 pkey
.encode_plaintext(bl
);
5669 write_default_keyring(bl
);
5670 keyring
.remove(mon_name
);
5674 bool Monitor::ms_get_authorizer(int service_id
, AuthAuthorizer
**authorizer
,
5677 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id
)
5683 // we only connect to other monitors and mgr; every else connects to us.
5684 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
5685 service_id
!= CEPH_ENTITY_TYPE_MGR
)
5688 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5690 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
5691 AuthNoneClientHandler
handler(g_ceph_context
, nullptr);
5692 handler
.set_global_id(0);
5693 *authorizer
= handler
.build_authorizer(service_id
);
5697 CephXServiceTicketInfo auth_ticket_info
;
5698 CephXSessionAuthInfo info
;
5702 name
.set_type(CEPH_ENTITY_TYPE_MON
);
5703 auth_ticket_info
.ticket
.name
= name
;
5704 auth_ticket_info
.ticket
.global_id
= 0;
5706 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
5707 // mon to mon authentication uses the private monitor shared key and not the
5710 if (!keyring
.get_secret(name
, secret
) &&
5711 !key_server
.get_secret(name
, secret
)) {
5712 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5714 stringstream ss
, ds
;
5715 int err
= key_server
.list_secrets(ds
);
5717 ss
<< "no installed auth entries!";
5719 ss
<< "installed auth entries:";
5720 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
5724 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
,
5725 secret
, (uint64_t)-1);
5727 dout(0) << __func__
<< " failed to build mon session_auth_info "
5728 << cpp_strerror(ret
) << dendl
;
5731 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
5733 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
);
5735 derr
<< __func__
<< " failed to build mgr service session_auth_info "
5736 << cpp_strerror(ret
) << dendl
;
5740 ceph_abort(); // see check at top of fn
5743 CephXTicketBlob blob
;
5744 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
5745 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl
;
5748 bufferlist ticket_data
;
5749 ::encode(blob
, ticket_data
);
5751 bufferlist::iterator iter
= ticket_data
.begin();
5752 CephXTicketHandler
handler(g_ceph_context
, service_id
);
5753 ::decode(handler
.ticket
, iter
);
5755 handler
.session_key
= info
.session_key
;
5757 *authorizer
= handler
.build_authorizer(0);
5762 bool Monitor::ms_verify_authorizer(Connection
*con
, int peer_type
,
5763 int protocol
, bufferlist
& authorizer_data
,
5764 bufferlist
& authorizer_reply
,
5765 bool& isvalid
, CryptoKey
& session_key
)
5767 dout(10) << "ms_verify_authorizer " << con
->get_peer_addr()
5768 << " " << ceph_entity_type_name(peer_type
)
5769 << " protocol " << protocol
<< dendl
;
5774 if (peer_type
== CEPH_ENTITY_TYPE_MON
&&
5775 auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5776 // monitor, and cephx is enabled
5778 if (protocol
== CEPH_AUTH_CEPHX
) {
5779 bufferlist::iterator iter
= authorizer_data
.begin();
5780 CephXServiceTicketInfo auth_ticket_info
;
5782 if (authorizer_data
.length()) {
5783 bool ret
= cephx_verify_authorizer(g_ceph_context
, &keyring
, iter
,
5784 auth_ticket_info
, authorizer_reply
);
5786 session_key
= auth_ticket_info
.session_key
;
5789 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con
->get_peer_addr() << dendl
;
5793 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl
;