1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
25 #include "common/version.h"
27 #include "osd/OSDMap.h"
29 #include "MonitorDBStore.h"
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
52 #include "messages/MAuthReply.h"
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/OldHealthMonitor.h"
81 #include "mon/HealthMonitor.h"
82 #include "mon/ConfigKeyService.h"
83 #include "common/config.h"
84 #include "common/cmdparse.h"
85 #include "include/assert.h"
86 #include "include/compat.h"
87 #include "perfglue/heap_profiler.h"
89 #include "auth/none/AuthNoneClientHandler.h"
91 #define dout_subsys ceph_subsys_mon
93 #define dout_prefix _prefix(_dout, this)
94 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
95 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
96 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
99 const string
Monitor::MONITOR_NAME
= "monitor";
100 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
105 #undef COMMAND_WITH_FLAG
106 MonCommand mon_commands
[] = {
107 #define FLAG(f) (MonCommand::FLAG_##f)
108 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
109 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
110 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
111 {parsesig, helptext, modulename, req_perms, avail, flags},
112 #include <mon/MonCommands.h>
114 #undef COMMAND_WITH_FLAG
118 void C_MonContext::finish(int r
) {
119 if (mon
->is_shutdown())
121 FunctionContext::finish(r
);
124 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
125 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
130 con_self(m
? m
->get_loopback_connection() : NULL
),
131 lock("Monitor::lock"),
133 finisher(cct_
, "mon_finisher", "fin"),
134 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf
->mon_cpu_threads
),
135 has_ever_joined(false),
136 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
138 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
139 key_server(cct
, &keyring
),
140 auth_cluster_required(cct
,
141 cct
->_conf
->auth_supported
.empty() ?
142 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
143 auth_service_required(cct
,
144 cct
->_conf
->auth_supported
.empty() ?
145 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
146 leader_supported_mon_commands(NULL
),
147 leader_supported_mon_commands_size(0),
148 mgr_messenger(mgr_m
),
149 mgr_client(cct_
, mgr_m
),
153 state(STATE_PROBING
),
156 required_features(0),
158 quorum_con_features(0),
162 scrub_timeout_event(NULL
),
165 sync_provider_count(0),
168 sync_start_version(0),
169 sync_timeout_event(NULL
),
170 sync_last_committed_floor(0),
174 timecheck_rounds_since_clean(0),
175 timecheck_event(NULL
),
177 paxos_service(PAXOS_NUM
),
179 routed_request_tid(0),
180 op_tracker(cct
, true, 1)
182 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
183 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
185 update_log_clients();
187 paxos
= new Paxos(this, "paxos");
189 paxos_service
[PAXOS_MDSMAP
] = new MDSMonitor(this, paxos
, "mdsmap");
190 paxos_service
[PAXOS_MONMAP
] = new MonmapMonitor(this, paxos
, "monmap");
191 paxos_service
[PAXOS_OSDMAP
] = new OSDMonitor(cct
, this, paxos
, "osdmap");
192 paxos_service
[PAXOS_PGMAP
] = new PGMonitor(this, paxos
, "pgmap");
193 paxos_service
[PAXOS_LOG
] = new LogMonitor(this, paxos
, "logm");
194 paxos_service
[PAXOS_AUTH
] = new AuthMonitor(this, paxos
, "auth");
195 paxos_service
[PAXOS_MGR
] = new MgrMonitor(this, paxos
, "mgr");
196 paxos_service
[PAXOS_MGRSTAT
] = new MgrStatMonitor(this, paxos
, "mgrstat");
197 paxos_service
[PAXOS_HEALTH
] = new HealthMonitor(this, paxos
, "health");
199 health_monitor
= new OldHealthMonitor(this);
200 config_key_service
= new ConfigKeyService(this, paxos
);
202 mon_caps
= new MonCap();
203 bool r
= mon_caps
->parse("allow *", NULL
);
206 exited_quorum
= ceph_clock_now();
208 // assume our commands until we have an election. this only means
209 // we won't reply with EINVAL before the election; any command that
210 // actually matters will wait until we have quorum etc and then
211 // retry (and revalidate).
212 const MonCommand
*cmds
;
214 get_locally_supported_monitor_commands(&cmds
, &cmdsize
);
215 set_leader_supported_commands(cmds
, cmdsize
);
217 // note: OSDMonitor may update this based on the luminous flag.
218 pgservice
= mgrstatmon()->get_pg_stat_service();
223 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
225 delete health_monitor
;
226 delete config_key_service
;
228 assert(session_map
.sessions
.empty());
230 if (leader_supported_mon_commands
!= mon_commands
)
231 delete[] leader_supported_mon_commands
;
235 class AdminHook
: public AdminSocketHook
{
238 explicit AdminHook(Monitor
*m
) : mon(m
) {}
239 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
240 bufferlist
& out
) override
{
242 mon
->do_admin_command(command
, cmdmap
, format
, ss
);
248 void Monitor::do_admin_command(string command
, cmdmap_t
& cmdmap
, string format
,
251 Mutex::Locker
l(lock
);
253 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
256 for (cmdmap_t::iterator p
= cmdmap
.begin();
257 p
!= cmdmap
.end(); ++p
) {
258 if (p
->first
== "prefix")
262 args
+= cmd_vartype_stringify(p
->second
);
264 args
= "[" + args
+ "]";
266 bool read_only
= (command
== "mon_status" ||
267 command
== "mon metadata" ||
268 command
== "quorum_status" ||
270 command
== "sessions");
272 (read_only
? audit_clog
->debug() : audit_clog
->info())
273 << "from='admin socket' entity='admin socket' "
274 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
276 if (command
== "mon_status") {
277 get_mon_status(f
.get(), ss
);
280 } else if (command
== "quorum_status") {
281 _quorum_status(f
.get(), ss
);
282 } else if (command
== "sync_force") {
284 if ((!cmd_getval(g_ceph_context
, cmdmap
, "validate", validate
)) ||
285 (validate
!= "--yes-i-really-mean-it")) {
286 ss
<< "are you SURE? this will mean the monitor store will be erased "
287 "the next time the monitor is restarted. pass "
288 "'--yes-i-really-mean-it' if you really do.";
291 sync_force(f
.get(), ss
);
292 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
293 if (!_add_bootstrap_peer_hint(command
, cmdmap
, ss
))
295 } else if (command
== "quorum enter") {
296 elector
.start_participating();
298 ss
<< "started responding to quorum, initiated new election";
299 } else if (command
== "quorum exit") {
301 elector
.stop_participating();
302 ss
<< "stopped responding to quorum, initiated new election";
303 } else if (command
== "ops") {
304 (void)op_tracker
.dump_ops_in_flight(f
.get());
308 } else if (command
== "sessions") {
311 f
->open_array_section("sessions");
312 for (auto p
: session_map
.sessions
) {
313 f
->dump_stream("session") << *p
;
320 assert(0 == "bad AdminSocket command binding");
322 (read_only
? audit_clog
->debug() : audit_clog
->info())
323 << "from='admin socket' "
324 << "entity='admin socket' "
325 << "cmd=" << command
<< " "
326 << "args=" << args
<< ": finished";
330 (read_only
? audit_clog
->debug() : audit_clog
->info())
331 << "from='admin socket' "
332 << "entity='admin socket' "
333 << "cmd=" << command
<< " "
334 << "args=" << args
<< ": aborted";
337 void Monitor::handle_signal(int signum
)
339 assert(signum
== SIGINT
|| signum
== SIGTERM
);
340 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
344 CompatSet
Monitor::get_initial_supported_features()
346 CompatSet::FeatureSet ceph_mon_feature_compat
;
347 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
348 CompatSet::FeatureSet ceph_mon_feature_incompat
;
349 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
350 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
351 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
352 ceph_mon_feature_incompat
);
355 CompatSet
Monitor::get_supported_features()
357 CompatSet compat
= get_initial_supported_features();
358 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
359 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
360 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
361 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
362 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
366 CompatSet
Monitor::get_legacy_features()
368 CompatSet::FeatureSet ceph_mon_feature_compat
;
369 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
370 CompatSet::FeatureSet ceph_mon_feature_incompat
;
371 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
372 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
373 ceph_mon_feature_incompat
);
376 int Monitor::check_features(MonitorDBStore
*store
)
378 CompatSet required
= get_supported_features();
381 read_features_off_disk(store
, &ondisk
);
383 if (!required
.writeable(ondisk
)) {
384 CompatSet diff
= required
.unsupported(ondisk
);
385 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
392 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
394 bufferlist featuresbl
;
395 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
396 if (featuresbl
.length() == 0) {
397 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
398 << "Assuming it is old-style and introducing one." << dendl
;
399 //we only want the baseline ~v.18 features assumed to be on disk.
400 //If new features are introduced this code needs to disappear or
402 *features
= get_legacy_features();
404 features
->encode(featuresbl
);
405 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
406 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
407 store
->apply_transaction(t
);
409 bufferlist::iterator it
= featuresbl
.begin();
410 features
->decode(it
);
414 void Monitor::read_features()
416 read_features_off_disk(store
, &features
);
417 dout(10) << "features " << features
<< dendl
;
419 calc_quorum_requirements();
420 dout(10) << "required_features " << required_features
<< dendl
;
423 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
427 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
430 const char** Monitor::get_tracked_conf_keys() const
432 static const char* KEYS
[] = {
433 "crushtool", // helpful for testing
434 "mon_election_timeout",
436 "mon_lease_renew_interval_factor",
437 "mon_lease_ack_timeout_factor",
438 "mon_accept_timeout_factor",
442 "clog_to_syslog_facility",
443 "clog_to_syslog_level",
445 "clog_to_graylog_host",
446 "clog_to_graylog_port",
449 // periodic health to clog
450 "mon_health_to_clog",
451 "mon_health_to_clog_interval",
452 "mon_health_to_clog_tick_interval",
454 "mon_scrub_interval",
460 void Monitor::handle_conf_change(const struct md_config_t
*conf
,
461 const std::set
<std::string
> &changed
)
465 dout(10) << __func__
<< " " << changed
<< dendl
;
467 if (changed
.count("clog_to_monitors") ||
468 changed
.count("clog_to_syslog") ||
469 changed
.count("clog_to_syslog_level") ||
470 changed
.count("clog_to_syslog_facility") ||
471 changed
.count("clog_to_graylog") ||
472 changed
.count("clog_to_graylog_host") ||
473 changed
.count("clog_to_graylog_port") ||
474 changed
.count("host") ||
475 changed
.count("fsid")) {
476 update_log_clients();
479 if (changed
.count("mon_health_to_clog") ||
480 changed
.count("mon_health_to_clog_interval") ||
481 changed
.count("mon_health_to_clog_tick_interval")) {
482 health_to_clog_update_conf(changed
);
485 if (changed
.count("mon_scrub_interval")) {
486 scrub_update_interval(conf
->mon_scrub_interval
);
490 void Monitor::update_log_clients()
492 map
<string
,string
> log_to_monitors
;
493 map
<string
,string
> log_to_syslog
;
494 map
<string
,string
> log_channel
;
495 map
<string
,string
> log_prio
;
496 map
<string
,string
> log_to_graylog
;
497 map
<string
,string
> log_to_graylog_host
;
498 map
<string
,string
> log_to_graylog_port
;
502 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
503 log_channel
, log_prio
, log_to_graylog
,
504 log_to_graylog_host
, log_to_graylog_port
,
508 clog
->update_config(log_to_monitors
, log_to_syslog
,
509 log_channel
, log_prio
, log_to_graylog
,
510 log_to_graylog_host
, log_to_graylog_port
,
513 audit_clog
->update_config(log_to_monitors
, log_to_syslog
,
514 log_channel
, log_prio
, log_to_graylog
,
515 log_to_graylog_host
, log_to_graylog_port
,
519 int Monitor::sanitize_options()
523 // mon_lease must be greater than mon_lease_renewal; otherwise we
524 // may incur in leases expiring before they are renewed.
525 if (g_conf
->mon_lease_renew_interval_factor
>= 1.0) {
526 clog
->error() << "mon_lease_renew_interval_factor ("
527 << g_conf
->mon_lease_renew_interval_factor
528 << ") must be less than 1.0";
532 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
533 // got time to renew the lease and get an ack for it. Having both options
534 // with the same value, for a given small vale, could mean timing out if
535 // the monitors happened to be overloaded -- or even under normal load for
536 // a small enough value.
537 if (g_conf
->mon_lease_ack_timeout_factor
<= 1.0) {
538 clog
->error() << "mon_lease_ack_timeout_factor ("
539 << g_conf
->mon_lease_ack_timeout_factor
540 << ") must be greater than 1.0";
547 int Monitor::preinit()
551 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
553 int r
= sanitize_options();
555 derr
<< "option sanitization failed!" << dendl
;
562 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
563 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess");
564 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions", "sadd");
565 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions", "srm");
566 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions");
567 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in");
568 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started");
569 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won");
570 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost");
571 logger
= pcb
.create_perf_counters();
572 cct
->get_perfcounters_collection()->add(logger
);
575 assert(!cluster_logger
);
577 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
578 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
579 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
580 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
581 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
582 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
583 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
584 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster");
585 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space");
586 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space");
587 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
588 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
589 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
590 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
591 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
592 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
593 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
594 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
595 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
596 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects");
597 pcb
.add_u64(l_cluster_num_mds_up
, "num_mds_up", "MDSs that are up");
598 pcb
.add_u64(l_cluster_num_mds_in
, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
599 pcb
.add_u64(l_cluster_num_mds_failed
, "num_mds_failed", "Failed MDS");
600 pcb
.add_u64(l_cluster_mds_epoch
, "mds_epoch", "Current epoch of MDS map");
601 cluster_logger
= pcb
.create_perf_counters();
604 paxos
->init_logger();
606 // verify cluster_uuid
608 int r
= check_fsid();
620 // have we ever joined a quorum?
621 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
622 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
624 if (!has_ever_joined
) {
625 // impose initial quorum restrictions?
626 list
<string
> initial_members
;
627 get_str_list(g_conf
->mon_initial_members
, initial_members
);
629 if (!initial_members
.empty()) {
630 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
632 monmap
->set_initial_members(g_ceph_context
, initial_members
, name
, messenger
->get_myaddr(),
635 dout(10) << " monmap is " << *monmap
<< dendl
;
636 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
638 } else if (!monmap
->contains(name
)) {
639 derr
<< "not in monmap and have been in a quorum before; "
640 << "must have been removed" << dendl
;
641 if (g_conf
->mon_force_quorum_join
) {
642 dout(0) << "we should have died but "
643 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
645 derr
<< "commit suicide!" << dendl
;
652 // We have a potentially inconsistent store state in hands. Get rid of it
654 bool clear_store
= false;
655 if (store
->exists("mon_sync", "in_sync")) {
656 dout(1) << __func__
<< " clean up potentially inconsistent store state"
661 if (store
->get("mon_sync", "force_sync") > 0) {
662 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
667 set
<string
> sync_prefixes
= get_sync_targets_names();
668 store
->clear(sync_prefixes
);
672 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
673 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
676 health_monitor
->init();
678 if (is_keyring_required()) {
679 // we need to bootstrap authentication keys so we can form an
681 if (authmon()->get_last_committed() == 0) {
682 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
684 int err
= store
->get("mkfs", "keyring", bl
);
685 if (err
== 0 && bl
.length() > 0) {
686 // Attempt to decode and extract keyring only if it is found.
688 bufferlist::iterator p
= bl
.begin();
689 ::decode(keyring
, p
);
690 extract_save_mon_key(keyring
);
694 string keyring_loc
= g_conf
->mon_data
+ "/keyring";
696 r
= keyring
.load(cct
, keyring_loc
);
699 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
701 if (key_server
.get_auth(mon_name
, mon_key
)) {
702 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
703 keyring
.add(mon_name
, mon_key
);
705 keyring
.encode_plaintext(bl
);
706 write_default_keyring(bl
);
708 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
715 admin_hook
= new AdminHook(this);
716 AdminSocket
* admin_socket
= cct
->get_admin_socket();
718 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
720 r
= admin_socket
->register_command("mon_status", "mon_status", admin_hook
,
721 "show current monitor status");
723 r
= admin_socket
->register_command("quorum_status", "quorum_status",
724 admin_hook
, "show current quorum status");
726 r
= admin_socket
->register_command("sync_force",
727 "sync_force name=validate,"
729 "strings=--yes-i-really-mean-it",
731 "force sync of and clear monitor store");
733 r
= admin_socket
->register_command("add_bootstrap_peer_hint",
734 "add_bootstrap_peer_hint name=addr,"
737 "add peer address as potential bootstrap"
738 " peer for cluster bringup");
740 r
= admin_socket
->register_command("quorum enter", "quorum enter",
742 "force monitor back into quorum");
744 r
= admin_socket
->register_command("quorum exit", "quorum exit",
746 "force monitor out of the quorum");
748 r
= admin_socket
->register_command("ops",
751 "show the ops currently in flight");
753 r
= admin_socket
->register_command("sessions",
756 "list existing sessions");
761 // add ourselves as a conf observer
762 g_conf
->add_observer(this);
770 dout(2) << "init" << dendl
;
771 Mutex::Locker
l(lock
);
782 messenger
->add_dispatcher_tail(this);
785 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
786 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
790 // encode command sets
791 const MonCommand
*cmds
;
793 get_locally_supported_monitor_commands(&cmds
, &cmdsize
);
794 MonCommand::encode_array(cmds
, cmdsize
, supported_commands_bl
);
799 void Monitor::init_paxos()
801 dout(10) << __func__
<< dendl
;
805 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
806 paxos_service
[i
]->init();
809 refresh_from_paxos(NULL
);
812 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
814 dout(10) << __func__
<< dendl
;
817 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
820 bufferlist::iterator p
= bl
.begin();
821 ::decode(fingerprint
, p
);
823 catch (buffer::error
& e
) {
824 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
827 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
830 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
831 paxos_service
[i
]->refresh(need_bootstrap
);
833 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
834 paxos_service
[i
]->post_refresh();
839 void Monitor::register_cluster_logger()
841 if (!cluster_logger_registered
) {
842 dout(10) << "register_cluster_logger" << dendl
;
843 cluster_logger_registered
= true;
844 cct
->get_perfcounters_collection()->add(cluster_logger
);
846 dout(10) << "register_cluster_logger - already registered" << dendl
;
850 void Monitor::unregister_cluster_logger()
852 if (cluster_logger_registered
) {
853 dout(10) << "unregister_cluster_logger" << dendl
;
854 cluster_logger_registered
= false;
855 cct
->get_perfcounters_collection()->remove(cluster_logger
);
857 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
861 void Monitor::update_logger()
863 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
864 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
867 void Monitor::shutdown()
869 dout(1) << "shutdown" << dendl
;
873 wait_for_paxos_write();
875 state
= STATE_SHUTDOWN
;
877 g_conf
->remove_observer(this);
880 AdminSocket
* admin_socket
= cct
->get_admin_socket();
881 admin_socket
->unregister_command("mon_status");
882 admin_socket
->unregister_command("quorum_status");
883 admin_socket
->unregister_command("sync_force");
884 admin_socket
->unregister_command("add_bootstrap_peer_hint");
885 admin_socket
->unregister_command("quorum enter");
886 admin_socket
->unregister_command("quorum exit");
887 admin_socket
->unregister_command("ops");
888 admin_socket
->unregister_command("sessions");
895 mgr_client
.shutdown();
898 finisher
.wait_for_empty();
904 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
906 health_monitor
->shutdown();
908 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
909 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
915 remove_all_sessions();
918 cct
->get_perfcounters_collection()->remove(logger
);
922 if (cluster_logger
) {
923 if (cluster_logger_registered
)
924 cct
->get_perfcounters_collection()->remove(cluster_logger
);
925 delete cluster_logger
;
926 cluster_logger
= NULL
;
929 log_client
.shutdown();
931 // unlock before msgr shutdown...
934 messenger
->shutdown(); // last thing! ceph_mon.cc will delete mon.
935 mgr_messenger
->shutdown();
938 void Monitor::wait_for_paxos_write()
940 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
941 dout(10) << __func__
<< " flushing pending write" << dendl
;
945 dout(10) << __func__
<< " flushed pending write" << dendl
;
949 void Monitor::bootstrap()
951 dout(10) << "bootstrap" << dendl
;
952 wait_for_paxos_write();
954 sync_reset_requester();
955 unregister_cluster_logger();
956 cancel_probe_timeout();
959 int newrank
= monmap
->get_rank(messenger
->get_myaddr());
960 if (newrank
< 0 && rank
>= 0) {
961 // was i ever part of the quorum?
962 if (has_ever_joined
) {
963 dout(0) << " removed from monmap, suicide." << dendl
;
967 if (newrank
!= rank
) {
968 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
969 messenger
->set_myname(entity_name_t::MON(newrank
));
972 // reset all connections, or else our peers will think we are someone else.
973 messenger
->mark_down_all();
977 state
= STATE_PROBING
;
982 if (g_conf
->mon_compact_on_bootstrap
) {
983 dout(10) << "bootstrap -- triggering compaction" << dendl
;
985 dout(10) << "bootstrap -- finished compaction" << dendl
;
988 // singleton monitor?
989 if (monmap
->size() == 1 && rank
== 0) {
990 win_standalone_election();
994 reset_probe_timeout();
996 // i'm outside the quorum
997 if (monmap
->contains(name
))
998 outside_quorum
.insert(name
);
1001 dout(10) << "probing other monitors" << dendl
;
1002 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1004 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
),
1005 monmap
->get_inst(i
));
1007 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
1008 p
!= extra_probe_peers
.end();
1010 if (*p
!= messenger
->get_myaddr()) {
1012 i
.name
= entity_name_t::MON(-1);
1014 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
), i
);
1019 bool Monitor::_add_bootstrap_peer_hint(string cmd
, cmdmap_t
& cmdmap
, ostream
& ss
)
1022 if (!cmd_getval(g_ceph_context
, cmdmap
, "addr", addrstr
)) {
1023 ss
<< "unable to parse address string value '"
1024 << cmd_vartype_stringify(cmdmap
["addr"]) << "'";
1027 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' '"
1028 << addrstr
<< "'" << dendl
;
1031 const char *end
= 0;
1032 if (!addr
.parse(addrstr
.c_str(), &end
)) {
1033 ss
<< "failed to parse addr '" << addrstr
<< "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1037 if (is_leader() || is_peon()) {
1038 ss
<< "mon already active; ignoring bootstrap hint";
1042 if (addr
.get_port() == 0)
1043 addr
.set_port(CEPH_MON_PORT
);
1045 extra_probe_peers
.insert(addr
);
1046 ss
<< "adding peer " << addr
<< " to list: " << extra_probe_peers
;
1050 // called by bootstrap(), or on leader|peon -> electing
1051 void Monitor::_reset()
1053 dout(10) << __func__
<< dendl
;
1055 cancel_probe_timeout();
1057 health_events_cleanup();
1058 scrub_event_cancel();
1060 leader_since
= utime_t();
1061 if (!quorum
.empty()) {
1062 exited_quorum
= ceph_clock_now();
1065 outside_quorum
.clear();
1066 quorum_feature_map
.clear();
1072 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
1074 health_monitor
->finish();
1078 // -----------------------------------------------------------
1081 set
<string
> Monitor::get_sync_targets_names()
1083 set
<string
> targets
;
1084 targets
.insert(paxos
->get_name());
1085 for (int i
= 0; i
< PAXOS_NUM
; ++i
)
1086 paxos_service
[i
]->get_store_prefixes(targets
);
1087 ConfigKeyService
*config_key_service_ptr
= dynamic_cast<ConfigKeyService
*>(config_key_service
);
1088 assert(config_key_service_ptr
);
1089 config_key_service_ptr
->get_store_prefixes(targets
);
1094 void Monitor::sync_timeout()
1096 dout(10) << __func__
<< dendl
;
1097 assert(state
== STATE_SYNCHRONIZING
);
1101 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1103 dout(1) << __func__
<< dendl
;
1105 MonMap latest_monmap
;
1107 // Grab latest monmap from MonmapMonitor
1108 bufferlist monmon_bl
;
1109 int err
= monmon()->get_monmap(monmon_bl
);
1111 if (err
!= -ENOENT
) {
1113 << " something wrong happened while reading the store: "
1114 << cpp_strerror(err
) << dendl
;
1115 assert(0 == "error reading the store");
1118 latest_monmap
.decode(monmon_bl
);
1121 // Grab last backed up monmap (if any) and compare epochs
1122 if (store
->exists("mon_sync", "latest_monmap")) {
1123 bufferlist backup_bl
;
1124 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1127 << " something wrong happened while reading the store: "
1128 << cpp_strerror(err
) << dendl
;
1129 assert(0 == "error reading the store");
1131 assert(backup_bl
.length() > 0);
1133 MonMap backup_monmap
;
1134 backup_monmap
.decode(backup_bl
);
1136 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1137 latest_monmap
= backup_monmap
;
1140 // Check if our current monmap's epoch is greater than the one we've
1142 if (monmap
->epoch
> latest_monmap
.epoch
)
1143 latest_monmap
= *monmap
;
1145 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1147 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1150 void Monitor::sync_reset_requester()
1152 dout(10) << __func__
<< dendl
;
1154 if (sync_timeout_event
) {
1155 timer
.cancel_event(sync_timeout_event
);
1156 sync_timeout_event
= NULL
;
1159 sync_provider
= entity_inst_t();
1162 sync_start_version
= 0;
1165 void Monitor::sync_reset_provider()
1167 dout(10) << __func__
<< dendl
;
1168 sync_providers
.clear();
1171 void Monitor::sync_start(entity_inst_t
&other
, bool full
)
1173 dout(10) << __func__
<< " " << other
<< (full
? " full" : " recent") << dendl
;
1175 assert(state
== STATE_PROBING
||
1176 state
== STATE_SYNCHRONIZING
);
1177 state
= STATE_SYNCHRONIZING
;
1179 // make sure are not a provider for anyone!
1180 sync_reset_provider();
1185 // stash key state, and mark that we are syncing
1186 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1187 sync_stash_critical_state(t
);
1188 t
->put("mon_sync", "in_sync", 1);
1190 sync_last_committed_floor
= MAX(sync_last_committed_floor
, paxos
->get_version());
1191 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1192 << sync_last_committed_floor
<< dendl
;
1193 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1195 store
->apply_transaction(t
);
1197 assert(g_conf
->mon_sync_requester_kill_at
!= 1);
1199 // clear the underlying store
1200 set
<string
> targets
= get_sync_targets_names();
1201 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1202 store
->clear(targets
);
1204 // make sure paxos knows it has been reset. this prevents a
1205 // bootstrap and then different probe reply order from possibly
1206 // deciding a partial or no sync is needed.
1209 assert(g_conf
->mon_sync_requester_kill_at
!= 2);
1212 // assume 'other' as the leader. We will update the leader once we receive
1213 // a reply to the sync start.
1214 sync_provider
= other
;
1216 sync_reset_timeout();
1218 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1220 m
->last_committed
= paxos
->get_version();
1221 messenger
->send_message(m
, sync_provider
);
1224 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1226 dout(10) << __func__
<< dendl
;
1227 bufferlist backup_monmap
;
1228 sync_obtain_latest_monmap(backup_monmap
);
1229 assert(backup_monmap
.length() > 0);
1230 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1233 void Monitor::sync_reset_timeout()
1235 dout(10) << __func__
<< dendl
;
1236 if (sync_timeout_event
)
1237 timer
.cancel_event(sync_timeout_event
);
1238 sync_timeout_event
= new C_MonContext(this, [this](int) {
1241 timer
.add_event_after(g_conf
->mon_sync_timeout
, sync_timeout_event
);
1244 void Monitor::sync_finish(version_t last_committed
)
1246 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1248 assert(g_conf
->mon_sync_requester_kill_at
!= 7);
1251 // finalize the paxos commits
1252 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1253 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1255 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1257 dout(30) << __func__
<< " final tx dump:\n";
1258 JSONFormatter
f(true);
1263 store
->apply_transaction(tx
);
1266 assert(g_conf
->mon_sync_requester_kill_at
!= 8);
1268 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1269 t
->erase("mon_sync", "in_sync");
1270 t
->erase("mon_sync", "force_sync");
1271 t
->erase("mon_sync", "last_committed_floor");
1272 store
->apply_transaction(t
);
1274 assert(g_conf
->mon_sync_requester_kill_at
!= 9);
1278 assert(g_conf
->mon_sync_requester_kill_at
!= 10);
1283 void Monitor::handle_sync(MonOpRequestRef op
)
1285 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1286 dout(10) << __func__
<< " " << *m
<< dendl
;
1289 // provider ---------
1291 case MMonSync::OP_GET_COOKIE_FULL
:
1292 case MMonSync::OP_GET_COOKIE_RECENT
:
1293 handle_sync_get_cookie(op
);
1295 case MMonSync::OP_GET_CHUNK
:
1296 handle_sync_get_chunk(op
);
1299 // client -----------
1301 case MMonSync::OP_COOKIE
:
1302 handle_sync_cookie(op
);
1305 case MMonSync::OP_CHUNK
:
1306 case MMonSync::OP_LAST_CHUNK
:
1307 handle_sync_chunk(op
);
1309 case MMonSync::OP_NO_COOKIE
:
1310 handle_sync_no_cookie(op
);
1314 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1315 assert(0 == "unknown op");
1321 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1323 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1324 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1325 m
->get_connection()->send_message(reply
);
1328 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1330 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1331 if (is_synchronizing()) {
1332 _sync_reply_no_cookie(op
);
1336 assert(g_conf
->mon_sync_provider_kill_at
!= 1);
1338 // make sure they can understand us.
1339 if ((required_features
^ m
->get_connection()->get_features()) &
1340 required_features
) {
1341 dout(5) << " ignoring peer mon." << m
->get_source().num()
1342 << " has features " << std::hex
1343 << m
->get_connection()->get_features()
1344 << " but we require " << required_features
<< std::dec
<< dendl
;
1348 // make up a unique cookie. include election epoch (which persists
1349 // across restarts for the whole cluster) and a counter for this
1350 // process instance. there is no need to be unique *across*
1351 // monitors, though.
1352 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1353 assert(sync_providers
.count(cookie
) == 0);
1355 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1357 SyncProvider
& sp
= sync_providers
[cookie
];
1359 sp
.entity
= m
->get_source_inst();
1360 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1362 set
<string
> sync_targets
;
1363 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1365 sync_targets
= get_sync_targets_names();
1366 sp
.last_committed
= paxos
->get_version();
1367 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1369 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1371 // just catch up paxos
1372 sp
.last_committed
= m
->last_committed
;
1374 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1376 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1377 reply
->last_committed
= sp
.last_committed
;
1378 m
->get_connection()->send_message(reply
);
1381 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1383 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1384 dout(10) << __func__
<< " " << *m
<< dendl
;
1386 if (sync_providers
.count(m
->cookie
) == 0) {
1387 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1388 _sync_reply_no_cookie(op
);
1392 assert(g_conf
->mon_sync_provider_kill_at
!= 2);
1394 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1395 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1397 if (sp
.last_committed
< paxos
->get_first_committed() &&
1398 paxos
->get_first_committed() > 1) {
1399 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1400 << " < our fc " << paxos
->get_first_committed() << dendl
;
1401 sync_providers
.erase(m
->cookie
);
1402 _sync_reply_no_cookie(op
);
1406 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1407 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1409 int left
= g_conf
->mon_sync_max_payload_size
;
1410 while (sp
.last_committed
< paxos
->get_version() && left
> 0) {
1412 sp
.last_committed
++;
1414 int err
= store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1417 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1418 left
-= bl
.length();
1419 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1422 reply
->last_committed
= sp
.last_committed
;
1424 if (sp
.full
&& left
> 0) {
1425 sp
.synchronizer
->get_chunk_tx(tx
, left
);
1426 sp
.last_key
= sp
.synchronizer
->get_last_key();
1427 reply
->last_key
= sp
.last_key
;
1430 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1431 sp
.last_committed
< paxos
->get_version()) {
1432 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1433 << " key " << sp
.last_key
<< dendl
;
1435 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1436 << " key " << sp
.last_key
<< dendl
;
1437 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1439 assert(g_conf
->mon_sync_provider_kill_at
!= 3);
1441 // clean up our local state
1442 sync_providers
.erase(sp
.cookie
);
1445 ::encode(*tx
, reply
->chunk_bl
);
1447 m
->get_connection()->send_message(reply
);
1452 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1454 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1455 dout(10) << __func__
<< " " << *m
<< dendl
;
1457 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1460 if (m
->get_source_inst() != sync_provider
) {
1461 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1464 sync_cookie
= m
->cookie
;
1465 sync_start_version
= m
->last_committed
;
1467 sync_reset_timeout();
1468 sync_get_next_chunk();
1470 assert(g_conf
->mon_sync_requester_kill_at
!= 3);
1473 void Monitor::sync_get_next_chunk()
1475 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1476 if (g_conf
->mon_inject_sync_get_chunk_delay
> 0) {
1477 dout(20) << __func__
<< " injecting delay of " << g_conf
->mon_inject_sync_get_chunk_delay
<< dendl
;
1478 usleep((long long)(g_conf
->mon_inject_sync_get_chunk_delay
* 1000000.0));
1480 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1481 messenger
->send_message(r
, sync_provider
);
1483 assert(g_conf
->mon_sync_requester_kill_at
!= 4);
1486 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1488 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1489 dout(10) << __func__
<< " " << *m
<< dendl
;
1491 if (m
->cookie
!= sync_cookie
) {
1492 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1495 if (m
->get_source_inst() != sync_provider
) {
1496 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1500 assert(state
== STATE_SYNCHRONIZING
);
1501 assert(g_conf
->mon_sync_requester_kill_at
!= 5);
1503 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1504 tx
->append_from_encoded(m
->chunk_bl
);
1506 dout(30) << __func__
<< " tx dump:\n";
1507 JSONFormatter
f(true);
1512 store
->apply_transaction(tx
);
1514 assert(g_conf
->mon_sync_requester_kill_at
!= 6);
1517 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1518 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1519 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1521 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1523 dout(30) << __func__
<< " tx dump:\n";
1524 JSONFormatter
f(true);
1529 store
->apply_transaction(tx
);
1530 paxos
->init(); // to refresh what we just wrote
1533 if (m
->op
== MMonSync::OP_CHUNK
) {
1534 sync_reset_timeout();
1535 sync_get_next_chunk();
1536 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1537 sync_finish(m
->last_committed
);
1541 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1543 dout(10) << __func__
<< dendl
;
1547 void Monitor::sync_trim_providers()
1549 dout(20) << __func__
<< dendl
;
1551 utime_t now
= ceph_clock_now();
1552 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1553 while (p
!= sync_providers
.end()) {
1554 if (now
> p
->second
.timeout
) {
1555 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
<< " for " << p
->second
.entity
<< dendl
;
1556 sync_providers
.erase(p
++);
1563 // ---------------------------------------------------
1566 void Monitor::cancel_probe_timeout()
1568 if (probe_timeout_event
) {
1569 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1570 timer
.cancel_event(probe_timeout_event
);
1571 probe_timeout_event
= NULL
;
1573 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1577 void Monitor::reset_probe_timeout()
1579 cancel_probe_timeout();
1580 probe_timeout_event
= new C_MonContext(this, [this](int r
) {
1583 double t
= g_conf
->mon_probe_timeout
;
1584 timer
.add_event_after(t
, probe_timeout_event
);
1585 dout(10) << "reset_probe_timeout " << probe_timeout_event
<< " after " << t
<< " seconds" << dendl
;
1588 void Monitor::probe_timeout(int r
)
1590 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1591 assert(is_probing() || is_synchronizing());
1592 assert(probe_timeout_event
);
1593 probe_timeout_event
= NULL
;
1597 void Monitor::handle_probe(MonOpRequestRef op
)
1599 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1600 dout(10) << "handle_probe " << *m
<< dendl
;
1602 if (m
->fsid
!= monmap
->fsid
) {
1603 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1608 case MMonProbe::OP_PROBE
:
1609 handle_probe_probe(op
);
1612 case MMonProbe::OP_REPLY
:
1613 handle_probe_reply(op
);
1616 case MMonProbe::OP_MISSING_FEATURES
:
1617 derr
<< __func__
<< " missing features, have " << CEPH_FEATURES_ALL
1618 << ", required " << m
->required_features
1619 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
)
1625 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1627 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1629 dout(10) << "handle_probe_probe " << m
->get_source_inst() << *m
1630 << " features " << m
->get_connection()->get_features() << dendl
;
1631 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1633 dout(1) << " peer " << m
->get_source_addr() << " missing features "
1634 << missing
<< dendl
;
1635 if (m
->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY
)) {
1636 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1637 name
, has_ever_joined
);
1638 m
->required_features
= required_features
;
1639 m
->get_connection()->send_message(r
);
1644 if (!is_probing() && !is_synchronizing()) {
1645 // If the probing mon is way ahead of us, we need to re-bootstrap.
1646 // Normally we capture this case when we initially bootstrap, but
1647 // it is possible we pass those checks (we overlap with
1648 // quorum-to-be) but fail to join a quorum before it moves past
1649 // us. We need to be kicked back to bootstrap so we can
1650 // synchonize, not keep calling elections.
1651 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1652 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1653 << "ahead of us, re-bootstrapping" << dendl
;
1661 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
);
1664 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1665 r
->paxos_first_version
= paxos
->get_first_committed();
1666 r
->paxos_last_version
= paxos
->get_version();
1667 m
->get_connection()->send_message(r
);
1669 // did we discover a peer here?
1670 if (!monmap
->contains(m
->get_source_addr())) {
1671 dout(1) << " adding peer " << m
->get_source_addr()
1672 << " to list of hints" << dendl
;
1673 extra_probe_peers
.insert(m
->get_source_addr());
1680 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1682 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1683 dout(10) << "handle_probe_reply " << m
->get_source_inst() << *m
<< dendl
;
1684 dout(10) << " monmap is " << *monmap
<< dendl
;
1686 // discover name and addrs during probing or electing states.
1687 if (!is_probing() && !is_electing()) {
1691 // newer map, or they've joined a quorum and we haven't?
1693 monmap
->encode(mybl
, m
->get_connection()->get_features());
1694 // make sure it's actually different; the checks below err toward
1695 // taking the other guy's map, which could cause us to loop.
1696 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1697 MonMap
*newmap
= new MonMap
;
1698 newmap
->decode(m
->monmap_bl
);
1699 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1700 !has_ever_joined
)) {
1701 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1702 << ", mine was " << monmap
->get_epoch() << dendl
;
1704 monmap
->decode(m
->monmap_bl
);
1713 string peer_name
= monmap
->get_name(m
->get_source_addr());
1714 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
1715 dout(10) << " renaming peer " << m
->get_source_addr() << " "
1716 << peer_name
<< " -> " << m
->name
<< " in my monmap"
1718 monmap
->rename(peer_name
, m
->name
);
1720 if (is_electing()) {
1725 dout(10) << " peer name is " << peer_name
<< dendl
;
1728 // new initial peer?
1729 if (monmap
->get_epoch() == 0 &&
1730 monmap
->contains(m
->name
) &&
1731 monmap
->get_addr(m
->name
).is_blank_ip()) {
1732 dout(1) << " learned initial mon " << m
->name
<< " addr " << m
->get_source_addr() << dendl
;
1733 monmap
->set_addr(m
->name
, m
->get_source_addr());
1739 // end discover phase
1740 if (!is_probing()) {
1744 assert(paxos
!= NULL
);
1746 if (is_synchronizing()) {
1747 dout(10) << " currently syncing" << dendl
;
1751 entity_inst_t other
= m
->get_source_inst();
1753 if (m
->paxos_last_version
< sync_last_committed_floor
) {
1754 dout(10) << " peer paxos versions [" << m
->paxos_first_version
1755 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
1756 << sync_last_committed_floor
<< ", ignoring"
1759 if (paxos
->get_version() < m
->paxos_first_version
&&
1760 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
1761 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
1762 << "," << m
->paxos_last_version
<< "]"
1763 << " vs my version " << paxos
->get_version()
1764 << " (too far ahead)"
1766 cancel_probe_timeout();
1767 sync_start(other
, true);
1770 if (paxos
->get_version() + g_conf
->paxos_max_join_drift
< m
->paxos_last_version
) {
1771 dout(10) << " peer paxos last version " << m
->paxos_last_version
1772 << " vs my version " << paxos
->get_version()
1773 << " (too far ahead)"
1775 cancel_probe_timeout();
1776 sync_start(other
, false);
1781 // is there an existing quorum?
1782 if (m
->quorum
.size()) {
1783 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
1785 dout(10) << " peer paxos version " << m
->paxos_last_version
1786 << " vs my version " << paxos
->get_version()
1790 if (monmap
->contains(name
) &&
1791 !monmap
->get_addr(name
).is_blank_ip()) {
1792 // i'm part of the cluster; just initiate a new election
1795 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl
;
1796 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
1797 monmap
->get_inst(*m
->quorum
.begin()));
1800 if (monmap
->contains(m
->name
)) {
1801 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
1802 outside_quorum
.insert(m
->name
);
1804 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
1808 unsigned need
= monmap
->size() / 2 + 1;
1809 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
1810 if (outside_quorum
.size() >= need
) {
1811 if (outside_quorum
.count(name
)) {
1812 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
1815 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
1818 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
1823 void Monitor::join_election()
1825 dout(10) << __func__
<< dendl
;
1826 wait_for_paxos_write();
1828 state
= STATE_ELECTING
;
1830 logger
->inc(l_mon_num_elections
);
1833 void Monitor::start_election()
1835 dout(10) << "start_election" << dendl
;
1836 wait_for_paxos_write();
1838 state
= STATE_ELECTING
;
1840 logger
->inc(l_mon_num_elections
);
1841 logger
->inc(l_mon_election_call
);
1843 clog
->info() << "mon." << name
<< " calling new monitor election";
1844 elector
.call_election();
1847 void Monitor::win_standalone_election()
1849 dout(1) << "win_standalone_election" << dendl
;
1851 // bump election epoch, in case the previous epoch included other
1852 // monitors; we need to be able to make the distinction.
1854 elector
.advance_epoch();
1856 rank
= monmap
->get_rank(name
);
1861 map
<int,Metadata
> metadata
;
1862 collect_metadata(&metadata
[0]);
1864 const MonCommand
*my_cmds
= nullptr;
1866 get_locally_supported_monitor_commands(&my_cmds
, &cmdsize
);
1867 win_election(elector
.get_epoch(), q
,
1869 ceph::features::mon::get_supported(),
1874 const utime_t
& Monitor::get_leader_since() const
1876 assert(state
== STATE_LEADER
);
1877 return leader_since
;
1880 epoch_t
Monitor::get_epoch()
1882 return elector
.get_epoch();
1885 void Monitor::_finish_svc_election()
1887 assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
1889 for (auto p
: paxos_service
) {
1890 // we already called election_finished() on monmon(); avoid callig twice
1891 if (state
== STATE_LEADER
&& p
== monmon())
1893 p
->election_finished();
1897 void Monitor::win_election(epoch_t epoch
, set
<int>& active
, uint64_t features
,
1898 const mon_feature_t
& mon_features
,
1899 const map
<int,Metadata
>& metadata
,
1900 const MonCommand
*cmdset
, int cmdsize
)
1902 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
1903 << " features " << features
1904 << " mon_features " << mon_features
1906 assert(is_electing());
1907 state
= STATE_LEADER
;
1908 leader_since
= ceph_clock_now();
1911 quorum_con_features
= features
;
1912 quorum_mon_features
= mon_features
;
1913 pending_metadata
= metadata
;
1914 outside_quorum
.clear();
1916 clog
->info() << "mon." << name
<< "@" << rank
1917 << " won leader election with quorum " << quorum
;
1919 set_leader_supported_commands(cmdset
, cmdsize
);
1921 paxos
->leader_init();
1922 // NOTE: tell monmap monitor first. This is important for the
1923 // bootstrap case to ensure that the very first paxos proposal
1924 // codifies the monmap. Otherwise any manner of chaos can ensue
1925 // when monitors are call elections or participating in a paxos
1926 // round without agreeing on who the participants are.
1927 monmon()->election_finished();
1928 _finish_svc_election();
1929 health_monitor
->start(epoch
);
1931 logger
->inc(l_mon_election_win
);
1933 // inject new metadata in first transaction.
1935 // include previous metadata for missing mons (that aren't part of
1936 // the current quorum).
1937 map
<int,Metadata
> m
= metadata
;
1938 for (unsigned rank
= 0; rank
< monmap
->size(); ++rank
) {
1939 if (m
.count(rank
) == 0 &&
1940 mon_metadata
.count(rank
)) {
1941 m
[rank
] = mon_metadata
[rank
];
1945 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1946 // a new transaction immediately after the election finishes. We should
1947 // do that anyway for other reasons, though.
1948 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
1951 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
1955 if (monmap
->size() > 1 &&
1956 monmap
->get_epoch() > 0) {
1958 health_tick_start();
1959 do_health_to_clog_interval();
1960 scrub_event_start();
1964 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
1966 const mon_feature_t
& mon_features
)
1969 leader_since
= utime_t();
1972 outside_quorum
.clear();
1973 quorum_con_features
= features
;
1974 quorum_mon_features
= mon_features
;
1975 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
1976 << " quorum is " << quorum
<< " features are " << quorum_con_features
1977 << " mon_features are " << quorum_mon_features
1981 _finish_svc_election();
1982 health_monitor
->start(epoch
);
1984 logger
->inc(l_mon_election_lose
);
1988 if ((quorum_con_features
& CEPH_FEATURE_MON_METADATA
) &&
1989 !HAVE_FEATURE(quorum_con_features
, SERVER_LUMINOUS
)) {
1990 // for pre-luminous mons only
1992 collect_metadata(&sys_info
);
1993 messenger
->send_message(new MMonMetadata(sys_info
),
1994 monmap
->get_inst(get_leader()));
1998 void Monitor::collect_metadata(Metadata
*m
)
2000 collect_sys_info(m
, g_ceph_context
);
2001 (*m
)["addr"] = stringify(messenger
->get_myaddr());
2004 void Monitor::finish_election()
2006 apply_quorum_to_compatset_features();
2007 apply_monmap_to_compatset_features();
2009 exited_quorum
= utime_t();
2010 finish_contexts(g_ceph_context
, waitfor_quorum
);
2011 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
2012 resend_routed_requests();
2014 register_cluster_logger();
2016 // am i named properly?
2017 string cur_name
= monmap
->get_name(messenger
->get_myaddr());
2018 if (cur_name
!= name
) {
2019 dout(10) << " renaming myself from " << cur_name
<< " -> " << name
<< dendl
;
2020 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
2021 monmap
->get_inst(*quorum
.begin()));
2025 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2027 if (new_features
.compare(features
) != 0) {
2028 CompatSet diff
= features
.unsupported(new_features
);
2029 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2030 features
= new_features
;
2032 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2034 store
->apply_transaction(t
);
2036 calc_quorum_requirements();
2040 void Monitor::apply_quorum_to_compatset_features()
2042 CompatSet
new_features(features
);
2043 if (quorum_con_features
& CEPH_FEATURE_OSD_ERASURE_CODES
) {
2044 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2046 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2047 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2049 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
) {
2050 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2052 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
) {
2053 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2055 dout(5) << __func__
<< dendl
;
2056 _apply_compatset_features(new_features
);
2059 void Monitor::apply_monmap_to_compatset_features()
2061 CompatSet
new_features(features
);
2062 mon_feature_t monmap_features
= monmap
->get_required_features();
2064 /* persistent monmap features may go into the compatset.
2065 * optional monmap features may not - why?
2066 * because optional monmap features may be set/unset by the admin,
2067 * and possibly by other means that haven't yet been thought out,
2068 * so we can't make the monitor enforce them on start - because they
2070 * this, of course, does not invalidate setting a compatset feature
2071 * for an optional feature - as long as you make sure to clean it up
2072 * once you unset it.
2074 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2075 assert(ceph::features::mon::get_persistent().contains_all(
2076 ceph::features::mon::FEATURE_KRAKEN
));
2077 // this feature should only ever be set if the quorum supports it.
2078 assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2079 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2082 dout(5) << __func__
<< dendl
;
2083 _apply_compatset_features(new_features
);
2086 void Monitor::calc_quorum_requirements()
2088 required_features
= 0;
2091 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
)) {
2092 required_features
|= CEPH_FEATURE_OSD_ERASURE_CODES
;
2094 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2095 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2097 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
)) {
2098 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
;
2100 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
)) {
2101 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
;
2103 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2104 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2108 if (monmap
->get_required_features().contains_all(
2109 ceph::features::mon::FEATURE_KRAKEN
)) {
2110 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2112 if (monmap
->get_required_features().contains_all(
2113 ceph::features::mon::FEATURE_LUMINOUS
)) {
2114 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2116 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2119 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2121 *fm
+= session_map
.feature_map
;
2122 for (auto id
: quorum
) {
2124 *fm
+= quorum_feature_map
[id
];
2129 void Monitor::sync_force(Formatter
*f
, ostream
& ss
)
2131 bool free_formatter
= false;
2134 // louzy/lazy hack: default to json if no formatter has been defined
2135 f
= new JSONFormatter();
2136 free_formatter
= true;
2139 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2140 sync_stash_critical_state(tx
);
2141 tx
->put("mon_sync", "force_sync", 1);
2142 store
->apply_transaction(tx
);
2144 f
->open_object_section("sync_force");
2145 f
->dump_int("ret", 0);
2146 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2147 f
->close_section(); // sync_force
2153 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2155 bool free_formatter
= false;
2158 // louzy/lazy hack: default to json if no formatter has been defined
2159 f
= new JSONFormatter();
2160 free_formatter
= true;
2162 f
->open_object_section("quorum_status");
2163 f
->dump_int("election_epoch", get_epoch());
2165 f
->open_array_section("quorum");
2166 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2167 f
->dump_int("mon", *p
);
2168 f
->close_section(); // quorum
2170 list
<string
> quorum_names
= get_quorum_names();
2171 f
->open_array_section("quorum_names");
2172 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2173 f
->dump_string("mon", *p
);
2174 f
->close_section(); // quorum_names
2176 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(*quorum
.begin()));
2178 f
->open_object_section("monmap");
2180 f
->close_section(); // monmap
2182 f
->close_section(); // quorum_status
2188 void Monitor::get_mon_status(Formatter
*f
, ostream
& ss
)
2190 bool free_formatter
= false;
2193 // louzy/lazy hack: default to json if no formatter has been defined
2194 f
= new JSONFormatter();
2195 free_formatter
= true;
2198 f
->open_object_section("mon_status");
2199 f
->dump_string("name", name
);
2200 f
->dump_int("rank", rank
);
2201 f
->dump_string("state", get_state_name());
2202 f
->dump_int("election_epoch", get_epoch());
2204 f
->open_array_section("quorum");
2205 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2206 f
->dump_int("mon", *p
);
2209 f
->close_section(); // quorum
2211 f
->open_object_section("features");
2212 f
->dump_stream("required_con") << required_features
;
2213 mon_feature_t req_mon_features
= get_required_mon_features();
2214 req_mon_features
.dump(f
, "required_mon");
2215 f
->dump_stream("quorum_con") << quorum_con_features
;
2216 quorum_mon_features
.dump(f
, "quorum_mon");
2217 f
->close_section(); // features
2219 f
->open_array_section("outside_quorum");
2220 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2221 f
->dump_string("mon", *p
);
2222 f
->close_section(); // outside_quorum
2224 f
->open_array_section("extra_probe_peers");
2225 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
2226 p
!= extra_probe_peers
.end();
2228 f
->dump_stream("peer") << *p
;
2229 f
->close_section(); // extra_probe_peers
2231 f
->open_array_section("sync_provider");
2232 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2233 p
!= sync_providers
.end();
2235 f
->dump_unsigned("cookie", p
->second
.cookie
);
2236 f
->dump_stream("entity") << p
->second
.entity
;
2237 f
->dump_stream("timeout") << p
->second
.timeout
;
2238 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2239 f
->dump_stream("last_key") << p
->second
.last_key
;
2243 if (is_synchronizing()) {
2244 f
->open_object_section("sync");
2245 f
->dump_stream("sync_provider") << sync_provider
;
2246 f
->dump_unsigned("sync_cookie", sync_cookie
);
2247 f
->dump_unsigned("sync_start_version", sync_start_version
);
2251 if (g_conf
->mon_sync_provider_kill_at
> 0)
2252 f
->dump_int("provider_kill_at", g_conf
->mon_sync_provider_kill_at
);
2253 if (g_conf
->mon_sync_requester_kill_at
> 0)
2254 f
->dump_int("requester_kill_at", g_conf
->mon_sync_requester_kill_at
);
2256 f
->open_object_section("monmap");
2260 f
->dump_object("feature_map", session_map
.feature_map
);
2261 f
->close_section(); // mon_status
2263 if (free_formatter
) {
2264 // flush formatter to ss and delete it iff we created the formatter
2271 // health status to clog
2273 void Monitor::health_tick_start()
2275 if (!cct
->_conf
->mon_health_to_clog
||
2276 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2279 dout(15) << __func__
<< dendl
;
2282 health_tick_event
= new C_MonContext(this, [this](int r
) {
2285 do_health_to_clog();
2286 health_tick_start();
2288 timer
.add_event_after(cct
->_conf
->mon_health_to_clog_tick_interval
,
2292 void Monitor::health_tick_stop()
2294 dout(15) << __func__
<< dendl
;
2296 if (health_tick_event
) {
2297 timer
.cancel_event(health_tick_event
);
2298 health_tick_event
= NULL
;
2302 utime_t
Monitor::health_interval_calc_next_update()
2304 utime_t now
= ceph_clock_now();
2306 time_t secs
= now
.sec();
2307 int remainder
= secs
% cct
->_conf
->mon_health_to_clog_interval
;
2308 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2309 utime_t next
= utime_t(secs
+ adjustment
, 0);
2311 dout(20) << __func__
2312 << " now: " << now
<< ","
2313 << " next: " << next
<< ","
2314 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2320 void Monitor::health_interval_start()
2322 dout(15) << __func__
<< dendl
;
2324 if (!cct
->_conf
->mon_health_to_clog
||
2325 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2329 health_interval_stop();
2330 utime_t next
= health_interval_calc_next_update();
2331 health_interval_event
= new C_MonContext(this, [this](int r
) {
2334 do_health_to_clog_interval();
2336 timer
.add_event_at(next
, health_interval_event
);
2339 void Monitor::health_interval_stop()
2341 dout(15) << __func__
<< dendl
;
2342 if (health_interval_event
) {
2343 timer
.cancel_event(health_interval_event
);
2345 health_interval_event
= NULL
;
2348 void Monitor::health_events_cleanup()
2351 health_interval_stop();
2352 health_status_cache
.reset();
2355 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2357 dout(20) << __func__
<< dendl
;
2359 if (changed
.count("mon_health_to_clog")) {
2360 if (!cct
->_conf
->mon_health_to_clog
) {
2361 health_events_cleanup();
2363 if (!health_tick_event
) {
2364 health_tick_start();
2366 if (!health_interval_event
) {
2367 health_interval_start();
2372 if (changed
.count("mon_health_to_clog_interval")) {
2373 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2374 health_interval_stop();
2376 health_interval_start();
2380 if (changed
.count("mon_health_to_clog_tick_interval")) {
2381 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2384 health_tick_start();
2389 void Monitor::do_health_to_clog_interval()
2391 // outputting to clog may have been disabled in the conf
2392 // since we were scheduled.
2393 if (!cct
->_conf
->mon_health_to_clog
||
2394 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2397 dout(10) << __func__
<< dendl
;
2399 // do we have a cached value for next_clog_update? if not,
2400 // do we know when the last update was?
2402 do_health_to_clog(true);
2403 health_interval_start();
2406 void Monitor::do_health_to_clog(bool force
)
2408 // outputting to clog may have been disabled in the conf
2409 // since we were scheduled.
2410 if (!cct
->_conf
->mon_health_to_clog
||
2411 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2414 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2416 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2418 health_status_t level
= get_health_status(false, nullptr, &summary
);
2420 summary
== health_status_cache
.summary
&&
2421 level
== health_status_cache
.overall
)
2423 if (level
== HEALTH_OK
)
2424 clog
->info() << "overall " << summary
;
2425 else if (level
== HEALTH_WARN
)
2426 clog
->warn() << "overall " << summary
;
2427 else if (level
== HEALTH_ERR
)
2428 clog
->error() << "overall " << summary
;
2431 health_status_cache
.summary
= summary
;
2432 health_status_cache
.overall
= level
;
2435 list
<string
> status
;
2436 health_status_t overall
= get_health(status
, NULL
, NULL
);
2437 dout(25) << __func__
2438 << (force
? " (force)" : "")
2441 string summary
= joinify(status
.begin(), status
.end(), string("; "));
2444 overall
== health_status_cache
.overall
&&
2445 !health_status_cache
.summary
.empty() &&
2446 health_status_cache
.summary
== summary
) {
2451 clog
->info() << summary
;
2453 health_status_cache
.overall
= overall
;
2454 health_status_cache
.summary
= summary
;
2458 health_status_t
Monitor::get_health_status(
2465 health_status_t r
= HEALTH_OK
;
2466 bool compat
= g_conf
->mon_health_preluminous_compat
;
2468 f
->open_object_section("health");
2469 f
->open_object_section("checks");
2473 string
*psummary
= f
? nullptr : &summary
;
2474 for (auto& svc
: paxos_service
) {
2475 r
= std::min(r
, svc
->get_health_checks().dump_summary(
2476 f
, psummary
, sep2
, want_detail
));
2481 f
->dump_stream("status") << r
;
2483 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2484 *plain
= stringify(r
);
2485 if (summary
.size()) {
2493 f
->open_array_section("summary");
2494 for (auto& svc
: paxos_service
) {
2495 svc
->get_health_checks().dump_summary_compat(f
);
2498 f
->dump_stream("overall_status") << r
;
2503 f
->open_array_section("detail");
2506 for (auto& svc
: paxos_service
) {
2507 svc
->get_health_checks().dump_detail(f
, plain
, compat
);
2520 void Monitor::log_health(
2521 const health_check_map_t
& updated
,
2522 const health_check_map_t
& previous
,
2523 MonitorDBStore::TransactionRef t
)
2525 if (!g_conf
->mon_health_to_clog
) {
2528 // FIXME: log atomically as part of @t instead of using clog.
2529 dout(10) << __func__
<< " updated " << updated
.checks
.size()
2530 << " previous " << previous
.checks
.size()
2532 for (auto& p
: updated
.checks
) {
2533 auto q
= previous
.checks
.find(p
.first
);
2534 if (q
== previous
.checks
.end()) {
2537 ss
<< "Health check failed: " << p
.second
.summary
<< " ("
2539 if (p
.second
.severity
== HEALTH_WARN
)
2540 clog
->warn() << ss
.str();
2542 clog
->error() << ss
.str();
2544 if (p
.second
.summary
!= q
->second
.summary
||
2545 p
.second
.severity
!= q
->second
.severity
) {
2546 // summary or severity changed (ignore detail changes at this level)
2548 ss
<< "Health check update: " << p
.second
.summary
<< " (" << p
.first
<< ")";
2549 if (p
.second
.severity
== HEALTH_WARN
)
2550 clog
->warn() << ss
.str();
2552 clog
->error() << ss
.str();
2556 for (auto& p
: previous
.checks
) {
2557 if (!updated
.checks
.count(p
.first
)) {
2560 if (p
.first
== "DEGRADED_OBJECTS") {
2561 clog
->info() << "All degraded objects recovered";
2562 } else if (p
.first
== "OSD_FLAGS") {
2563 clog
->info() << "OSD flags cleared";
2565 clog
->info() << "Health check cleared: " << p
.first
<< " (was: "
2566 << p
.second
.summary
<< ")";
2571 if (previous
.checks
.size() && updated
.checks
.size() == 0) {
2572 // We might be going into a fully healthy state, check
2574 bool any_checks
= false;
2575 for (auto& svc
: paxos_service
) {
2576 if (&(svc
->get_health_checks()) == &(previous
)) {
2577 // Ignore the ones we're clearing right now
2581 if (svc
->get_health_checks().checks
.size() > 0) {
2587 clog
->info() << "Cluster is now healthy";
2592 health_status_t
Monitor::get_health(list
<string
>& status
,
2593 bufferlist
*detailbl
,
2596 list
<pair
<health_status_t
,string
> > summary
;
2597 list
<pair
<health_status_t
,string
> > detail
;
2600 f
->open_object_section("health");
2602 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin();
2603 p
!= paxos_service
.end();
2605 PaxosService
*s
= *p
;
2606 s
->get_health(summary
, detailbl
? &detail
: NULL
, cct
);
2609 health_monitor
->get_health(summary
, (detailbl
? &detail
: NULL
));
2611 health_status_t overall
= HEALTH_OK
;
2612 if (!timecheck_skews
.empty()) {
2614 for (map
<entity_inst_t
,double>::iterator i
= timecheck_skews
.begin();
2615 i
!= timecheck_skews
.end(); ++i
) {
2616 entity_inst_t inst
= i
->first
;
2617 double skew
= i
->second
;
2618 double latency
= timecheck_latencies
[inst
];
2619 string name
= monmap
->get_name(inst
.addr
);
2621 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
2622 if (tcstatus
!= HEALTH_OK
) {
2623 if (overall
> tcstatus
)
2625 warns
.push_back(name
);
2626 ostringstream tmp_ss
;
2627 tmp_ss
<< "mon." << name
2628 << " addr " << inst
.addr
<< " " << tcss
.str()
2629 << " (latency " << latency
<< "s)";
2630 detail
.push_back(make_pair(tcstatus
, tmp_ss
.str()));
2633 if (!warns
.empty()) {
2635 ss
<< "clock skew detected on";
2636 while (!warns
.empty()) {
2637 ss
<< " mon." << warns
.front();
2642 status
.push_back(ss
.str());
2643 summary
.push_back(make_pair(HEALTH_WARN
, "Monitor clock skew detected "));
2648 f
->open_array_section("summary");
2649 if (!summary
.empty()) {
2650 while (!summary
.empty()) {
2651 if (overall
> summary
.front().first
)
2652 overall
= summary
.front().first
;
2653 status
.push_back(summary
.front().second
);
2655 f
->open_object_section("item");
2656 f
->dump_stream("severity") << summary
.front().first
;
2657 f
->dump_string("summary", summary
.front().second
);
2660 summary
.pop_front();
2668 status
.push_front(fss
.str());
2670 f
->dump_stream("overall_status") << overall
;
2673 f
->open_array_section("detail");
2674 while (!detail
.empty()) {
2676 f
->dump_string("item", detail
.front().second
);
2677 else if (detailbl
!= NULL
) {
2678 detailbl
->append(detail
.front().second
);
2679 detailbl
->append('\n');
2692 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
)
2695 f
->open_object_section("status");
2698 f
->dump_stream("fsid") << monmap
->get_fsid();
2699 get_health_status(false, f
, nullptr);
2700 f
->dump_unsigned("election_epoch", get_epoch());
2702 f
->open_array_section("quorum");
2703 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2704 f
->dump_int("rank", *p
);
2706 f
->open_array_section("quorum_names");
2707 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2708 f
->dump_string("id", monmap
->get_name(*p
));
2711 f
->open_object_section("monmap");
2714 f
->open_object_section("osdmap");
2715 osdmon()->osdmap
.print_summary(f
, cout
, string(12, ' '));
2717 f
->open_object_section("pgmap");
2718 pgservice
->print_summary(f
, NULL
);
2720 f
->open_object_section("fsmap");
2721 mdsmon()->get_fsmap().print_summary(f
, NULL
);
2723 f
->open_object_section("mgrmap");
2724 mgrmon()->get_map().print_summary(f
, nullptr);
2727 f
->dump_object("servicemap", mgrstatmon()->get_service_map());
2730 ss
<< " cluster:\n";
2731 ss
<< " id: " << monmap
->get_fsid() << "\n";
2734 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2735 get_health_status(false, nullptr, &health
,
2739 get_health(ls
, NULL
, f
);
2740 health
= joinify(ls
.begin(), ls
.end(),
2743 ss
<< " health: " << health
<< "\n";
2745 ss
<< "\n \n services:\n";
2748 auto& service_map
= mgrstatmon()->get_service_map();
2749 for (auto& p
: service_map
.services
) {
2750 maxlen
= std::max(maxlen
, p
.first
.size());
2752 string
spacing(maxlen
- 3, ' ');
2753 const auto quorum_names
= get_quorum_names();
2754 const auto mon_count
= monmap
->mon_info
.size();
2755 ss
<< " mon: " << spacing
<< mon_count
<< " daemons, quorum "
2757 if (quorum_names
.size() != mon_count
) {
2758 std::list
<std::string
> out_of_q
;
2759 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
2760 if (quorum
.count(i
) == 0) {
2761 out_of_q
.push_back(monmap
->ranks
[i
]);
2764 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
2765 out_of_q
.end(), std::string(", "));
2768 if (mgrmon()->in_use()) {
2769 ss
<< " mgr: " << spacing
;
2770 mgrmon()->get_map().print_summary(nullptr, &ss
);
2773 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2774 ss
<< " mds: " << spacing
<< mdsmon()->get_fsmap() << "\n";
2776 ss
<< " osd: " << spacing
;
2777 osdmon()->osdmap
.print_summary(NULL
, ss
, string(maxlen
+ 6, ' '));
2779 for (auto& p
: service_map
.services
) {
2780 ss
<< " " << p
.first
<< ": " << string(maxlen
- p
.first
.size(), ' ')
2781 << p
.second
.get_summary() << "\n";
2785 ss
<< "\n \n data:\n";
2786 pgservice
->print_summary(NULL
, &ss
);
2791 void Monitor::_generate_command_map(map
<string
,cmd_vartype
>& cmdmap
,
2792 map
<string
,string
> ¶m_str_map
)
2794 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
2795 p
!= cmdmap
.end(); ++p
) {
2796 if (p
->first
== "prefix")
2798 if (p
->first
== "caps") {
2800 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
2801 cv
.size() % 2 == 0) {
2802 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
2803 string k
= string("caps_") + cv
[i
];
2804 param_str_map
[k
] = cv
[i
+ 1];
2809 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
2813 const MonCommand
*Monitor::_get_moncommand(
2814 const string
&cmd_prefix
,
2815 const MonCommand
*cmds
,
2818 const MonCommand
*this_cmd
= NULL
;
2819 for (const MonCommand
*cp
= cmds
;
2820 cp
< &cmds
[cmds_size
]; cp
++) {
2821 if (cp
->cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
2829 bool Monitor::_allowed_command(MonSession
*s
, string
&module
, string
&prefix
,
2830 const map
<string
,cmd_vartype
>& cmdmap
,
2831 const map
<string
,string
>& param_str_map
,
2832 const MonCommand
*this_cmd
) {
2834 bool cmd_r
= this_cmd
->requires_perm('r');
2835 bool cmd_w
= this_cmd
->requires_perm('w');
2836 bool cmd_x
= this_cmd
->requires_perm('x');
2838 bool capable
= s
->caps
.is_capable(
2840 CEPH_ENTITY_TYPE_MON
,
2842 module
, prefix
, param_str_map
,
2843 cmd_r
, cmd_w
, cmd_x
);
2845 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
2849 void Monitor::format_command_descriptions(const std::vector
<MonCommand
> &commands
,
2855 f
->open_object_section("command_descriptions");
2856 for (const auto &cmd
: commands
) {
2857 unsigned flags
= cmd
.flags
;
2858 if (hide_mgr_flag
) {
2859 flags
&= ~MonCommand::FLAG_MGR
;
2861 ostringstream secname
;
2862 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
2863 dump_cmddesc_to_json(f
, secname
.str(),
2864 cmd
.cmdstring
, cmd
.helpstring
, cmd
.module
,
2865 cmd
.req_perms
, cmd
.availability
, flags
);
2868 f
->close_section(); // command_descriptions
2873 void Monitor::get_locally_supported_monitor_commands(const MonCommand
**cmds
,
2876 *cmds
= mon_commands
;
2877 *count
= ARRAY_SIZE(mon_commands
);
2879 void Monitor::set_leader_supported_commands(const MonCommand
*cmds
, int size
)
2881 if (leader_supported_mon_commands
!= mon_commands
)
2882 delete[] leader_supported_mon_commands
;
2883 leader_supported_mon_commands
= cmds
;
2884 leader_supported_mon_commands_size
= size
;
2887 bool Monitor::is_keyring_required()
2889 string auth_cluster_required
= g_conf
->auth_supported
.empty() ?
2890 g_conf
->auth_cluster_required
: g_conf
->auth_supported
;
2891 string auth_service_required
= g_conf
->auth_supported
.empty() ?
2892 g_conf
->auth_service_required
: g_conf
->auth_supported
;
2894 return auth_service_required
== "cephx" ||
2895 auth_cluster_required
== "cephx";
2898 struct C_MgrProxyCommand
: public Context
{
2904 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
2905 : mon(mon
), op(op
), size(s
) { }
2906 void finish(int r
) {
2907 Mutex::Locker
l(mon
->lock
);
2908 mon
->mgr_proxy_bytes
-= size
;
2909 mon
->reply_command(op
, r
, outs
, outbl
, 0);
2913 void Monitor::handle_command(MonOpRequestRef op
)
2915 assert(op
->is_type_command());
2916 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
2917 if (m
->fsid
!= monmap
->fsid
) {
2918 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
2919 reply_command(op
, -EPERM
, "wrong fsid", 0);
2923 MonSession
*session
= static_cast<MonSession
*>(
2924 m
->get_connection()->get_priv());
2926 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
2929 BOOST_SCOPE_EXIT_ALL(=) {
2933 if (m
->cmd
.empty()) {
2934 string rs
= "No command supplied";
2935 reply_command(op
, -EINVAL
, rs
, 0);
2940 vector
<string
> fullcmd
;
2941 map
<string
, cmd_vartype
> cmdmap
;
2942 stringstream ss
, ds
;
2946 rs
= "unrecognized command";
2948 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
2949 // ss has reason for failure
2952 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
2953 reply_command(op
, r
, rs
, 0);
2957 // check return value. If no prefix parameter provided,
2958 // return value will be false, then return error info.
2959 if (!cmd_getval(g_ceph_context
, cmdmap
, "prefix", prefix
)) {
2960 reply_command(op
, -EINVAL
, "command prefix not found", 0);
2964 // check prefix is empty
2965 if (prefix
.empty()) {
2966 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
2970 if (prefix
== "get_command_descriptions") {
2972 Formatter
*f
= Formatter::create("json");
2973 // hide mgr commands until luminous upgrade is complete
2974 bool hide_mgr_flag
=
2975 osdmon()->osdmap
.require_osd_release
< CEPH_RELEASE_LUMINOUS
;
2977 std::vector
<MonCommand
> commands
;
2978 commands
= static_cast<MgrMonitor
*>(
2979 paxos_service
[PAXOS_MGR
])->get_command_descs();
2981 for (int i
= 0 ; i
< leader_supported_mon_commands_size
; ++i
) {
2982 commands
.push_back(leader_supported_mon_commands
[i
]);
2985 format_command_descriptions(commands
, f
, &rdata
, hide_mgr_flag
);
2987 reply_command(op
, 0, "", rdata
, 0);
2994 dout(0) << "handle_command " << *m
<< dendl
;
2997 cmd_getval(g_ceph_context
, cmdmap
, "format", format
, string("plain"));
2998 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
3000 get_str_vec(prefix
, fullcmd
);
3002 // make sure fullcmd is not empty.
3003 // invalid prefix will cause empty vector fullcmd.
3004 // such as, prefix=";,,;"
3005 if (fullcmd
.empty()) {
3006 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
3010 module
= fullcmd
[0];
3012 // validate command is in leader map
3014 const MonCommand
*leader_cmd
;
3015 const auto& mgr_cmds
= mgrmon()->get_command_descs();
3016 const MonCommand
*mgr_cmd
= nullptr;
3017 if (!mgr_cmds
.empty()) {
3018 mgr_cmd
= _get_moncommand(prefix
, &mgr_cmds
.at(0), mgr_cmds
.size());
3020 leader_cmd
= _get_moncommand(prefix
,
3021 // the boost underlying this isn't const for some reason
3022 const_cast<MonCommand
*>(leader_supported_mon_commands
),
3023 leader_supported_mon_commands_size
);
3025 leader_cmd
= mgr_cmd
;
3027 reply_command(op
, -EINVAL
, "command not known", 0);
3031 // validate command is in our map & matches, or forward if it is allowed
3032 const MonCommand
*mon_cmd
= _get_moncommand(prefix
, mon_commands
,
3033 ARRAY_SIZE(mon_commands
));
3039 if (leader_cmd
->is_noforward()) {
3040 reply_command(op
, -EINVAL
,
3041 "command not locally supported and not allowed to forward",
3045 dout(10) << "Command not locally supported, forwarding request "
3047 forward_request_leader(op
);
3049 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
3050 if (mon_cmd
->is_noforward()) {
3051 reply_command(op
, -EINVAL
,
3052 "command not compatible with leader and not allowed to forward",
3056 dout(10) << "Command not compatible with leader, forwarding request "
3058 forward_request_leader(op
);
3063 if (mon_cmd
->is_obsolete() ||
3064 (cct
->_conf
->mon_debug_deprecated_as_obsolete
3065 && mon_cmd
->is_deprecated())) {
3066 reply_command(op
, -ENOTSUP
,
3067 "command is obsolete; please check usage and/or man page",
3072 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
3073 dout(10) << "Got forward for noforward command " << m
<< dendl
;
3074 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
3078 /* what we perceive as being the service the command falls under */
3079 string
service(mon_cmd
->module
);
3081 dout(25) << __func__
<< " prefix='" << prefix
3082 << "' module='" << module
3083 << "' service='" << service
<< "'" << dendl
;
3086 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
3088 // validate user's permissions for requested command
3089 map
<string
,string
> param_str_map
;
3090 _generate_command_map(cmdmap
, param_str_map
);
3091 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
3092 param_str_map
, mon_cmd
)) {
3093 dout(1) << __func__
<< " access denied" << dendl
;
3094 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3095 << "from='" << session
->inst
<< "' "
3096 << "entity='" << session
->entity_name
<< "' "
3097 << "cmd=" << m
->cmd
<< ": access denied";
3098 reply_command(op
, -EACCES
, "access denied", 0);
3102 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
3103 << "from='" << session
->inst
<< "' "
3104 << "entity='" << session
->entity_name
<< "' "
3105 << "cmd=" << m
->cmd
<< ": dispatch";
3107 if (mon_cmd
->is_mgr() &&
3108 osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3109 const auto& hdr
= m
->get_header();
3110 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
3112 g_conf
->mon_client_bytes
* g_conf
->mon_mgr_proxy_client_bytes_ratio
;
3113 if (mgr_proxy_bytes
+ size
> max
) {
3114 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
3115 << " + " << size
<< " > max " << max
<< dendl
;
3116 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
3119 mgr_proxy_bytes
+= size
;
3120 dout(10) << __func__
<< " proxying mgr command (+" << size
3121 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
3122 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
3123 mgr_client
.start_command(m
->cmd
,
3127 new C_OnFinisher(fin
, &finisher
));
3131 if (module
== "mds" || module
== "fs") {
3132 mdsmon()->dispatch(op
);
3135 if ((module
== "osd" || prefix
== "pg map") &&
3136 prefix
!= "osd last-stat-seq") {
3137 osdmon()->dispatch(op
);
3141 if (module
== "pg") {
3142 pgmon()->dispatch(op
);
3145 if (module
== "mon" &&
3146 /* Let the Monitor class handle the following commands:
3151 prefix
!= "mon compact" &&
3152 prefix
!= "mon scrub" &&
3153 prefix
!= "mon sync force" &&
3154 prefix
!= "mon metadata" &&
3155 prefix
!= "mon versions" &&
3156 prefix
!= "mon count-metadata") {
3157 monmon()->dispatch(op
);
3160 if (module
== "auth") {
3161 authmon()->dispatch(op
);
3164 if (module
== "log") {
3165 logmon()->dispatch(op
);
3169 if (module
== "config-key") {
3170 config_key_service
->dispatch(op
);
3174 if (module
== "mgr") {
3175 mgrmon()->dispatch(op
);
3179 if (prefix
== "fsid") {
3181 f
->open_object_section("fsid");
3182 f
->dump_stream("fsid") << monmap
->fsid
;
3189 reply_command(op
, 0, "", rdata
, 0);
3193 if (prefix
== "scrub" || prefix
== "mon scrub") {
3194 wait_for_paxos_write();
3196 int r
= scrub_start();
3197 reply_command(op
, r
, "", rdata
, 0);
3198 } else if (is_peon()) {
3199 forward_request_leader(op
);
3201 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3206 if (prefix
== "compact" || prefix
== "mon compact") {
3207 dout(1) << "triggering manual compaction" << dendl
;
3208 utime_t start
= ceph_clock_now();
3210 utime_t end
= ceph_clock_now();
3212 dout(1) << "finished manual compaction in " << end
<< " seconds" << dendl
;
3214 oss
<< "compacted " << g_conf
->get_val
<std::string
>("mon_keyvaluedb") << " in " << end
<< " seconds";
3218 else if (prefix
== "injectargs") {
3219 vector
<string
> injected_args
;
3220 cmd_getval(g_ceph_context
, cmdmap
, "injected_args", injected_args
);
3221 if (!injected_args
.empty()) {
3222 dout(0) << "parsing injected options '" << injected_args
<< "'" << dendl
;
3224 r
= g_conf
->injectargs(str_join(injected_args
, " "), &oss
);
3225 ss
<< "injectargs:" << oss
.str();
3229 rs
= "must supply options to be parsed in a single string";
3232 } else if (prefix
== "time-sync-status") {
3234 f
.reset(Formatter::create("json-pretty"));
3235 f
->open_object_section("time_sync");
3236 if (!timecheck_skews
.empty()) {
3237 f
->open_object_section("time_skew_status");
3238 for (auto& i
: timecheck_skews
) {
3239 entity_inst_t inst
= i
.first
;
3240 double skew
= i
.second
;
3241 double latency
= timecheck_latencies
[inst
];
3242 string name
= monmap
->get_name(inst
.addr
);
3244 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
3245 f
->open_object_section(name
.c_str());
3246 f
->dump_float("skew", skew
);
3247 f
->dump_float("latency", latency
);
3248 f
->dump_stream("health") << tcstatus
;
3249 if (tcstatus
!= HEALTH_OK
) {
3250 f
->dump_stream("details") << tcss
.str();
3256 f
->open_object_section("timechecks");
3257 f
->dump_unsigned("epoch", get_epoch());
3258 f
->dump_int("round", timecheck_round
);
3259 f
->dump_stream("round_status") << ((timecheck_round
%2) ?
3260 "on-going" : "finished");
3266 } else if (prefix
== "config set") {
3268 cmd_getval(cct
, cmdmap
, "key", key
);
3270 cmd_getval(cct
, cmdmap
, "value", val
);
3271 r
= g_conf
->set_val(key
, val
, true, &ss
);
3274 } else if (prefix
== "status" ||
3275 prefix
== "health" ||
3278 cmd_getval(g_ceph_context
, cmdmap
, "detail", detail
);
3280 if (prefix
== "status") {
3281 // get_cluster_status handles f == NULL
3282 get_cluster_status(ds
, f
.get());
3289 } else if (prefix
== "health") {
3290 if (osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
3292 get_health_status(detail
== "detail", f
.get(), f
? nullptr : &plain
);
3296 rdata
.append(plain
);
3299 list
<string
> health_str
;
3300 get_health(health_str
, detail
== "detail" ? &rdata
: NULL
, f
.get());
3305 assert(!health_str
.empty());
3306 ds
<< health_str
.front();
3307 health_str
.pop_front();
3308 if (!health_str
.empty()) {
3310 ds
<< joinify(health_str
.begin(), health_str
.end(), string("; "));
3315 if (detail
== "detail")
3319 } else if (prefix
== "df") {
3320 bool verbose
= (detail
== "detail");
3322 f
->open_object_section("stats");
3324 pgservice
->dump_fs_stats(&ds
, f
.get(), verbose
);
3327 pgservice
->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3335 assert(0 == "We should never get here!");
3341 } else if (prefix
== "report") {
3343 // this must be formatted, in its current form
3345 f
.reset(Formatter::create("json-pretty"));
3346 f
->open_object_section("report");
3347 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3348 f
->dump_string("version", ceph_version_to_str());
3349 f
->dump_string("commit", git_version_to_str());
3350 f
->dump_stream("timestamp") << ceph_clock_now();
3352 vector
<string
> tagsvec
;
3353 cmd_getval(g_ceph_context
, cmdmap
, "tags", tagsvec
);
3354 string tagstr
= str_join(tagsvec
, " ");
3355 if (!tagstr
.empty())
3356 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3357 f
->dump_string("tag", tagstr
);
3360 get_health(hs
, NULL
, f
.get());
3362 monmon()->dump_info(f
.get());
3363 osdmon()->dump_info(f
.get());
3364 mdsmon()->dump_info(f
.get());
3365 authmon()->dump_info(f
.get());
3366 pgservice
->dump_info(f
.get());
3368 paxos
->dump_info(f
.get());
3374 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT
);
3377 } else if (prefix
== "osd last-stat-seq") {
3379 cmd_getval(g_ceph_context
, cmdmap
, "id", osd
);
3380 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3382 f
->dump_unsigned("seq", seq
);
3390 } else if (prefix
== "node ls") {
3391 string
node_type("all");
3392 cmd_getval(g_ceph_context
, cmdmap
, "type", node_type
);
3394 f
.reset(Formatter::create("json-pretty"));
3395 if (node_type
== "all") {
3396 f
->open_object_section("nodes");
3397 print_nodes(f
.get(), ds
);
3398 osdmon()->print_nodes(f
.get());
3399 mdsmon()->print_nodes(f
.get());
3401 } else if (node_type
== "mon") {
3402 print_nodes(f
.get(), ds
);
3403 } else if (node_type
== "osd") {
3404 osdmon()->print_nodes(f
.get());
3405 } else if (node_type
== "mds") {
3406 mdsmon()->print_nodes(f
.get());
3412 } else if (prefix
== "features") {
3413 if (!is_leader() && !is_peon()) {
3414 dout(10) << " waiting for quorum" << dendl
;
3415 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3419 forward_request_leader(op
);
3423 f
.reset(Formatter::create("json-pretty"));
3425 get_combined_feature_map(&fm
);
3426 f
->dump_object("features", fm
);
3430 } else if (prefix
== "mon metadata") {
3432 f
.reset(Formatter::create("json-pretty"));
3435 bool all
= !cmd_getval(g_ceph_context
, cmdmap
, "id", name
);
3437 // Dump a single mon's metadata
3438 int mon
= monmap
->get_rank(name
);
3440 rs
= "requested mon not found";
3444 f
->open_object_section("mon_metadata");
3445 r
= get_mon_metadata(mon
, f
.get(), ds
);
3448 // Dump all mons' metadata
3450 f
->open_array_section("mon_metadata");
3451 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3452 std::ostringstream get_err
;
3453 f
->open_object_section("mon");
3454 f
->dump_string("name", monmap
->get_name(rank
));
3455 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3457 if (r
== -ENOENT
|| r
== -EINVAL
) {
3458 dout(1) << get_err
.str() << dendl
;
3459 // Drop error, list what metadata we do have
3461 } else if (r
!= 0) {
3462 derr
<< "Unexpected error from get_mon_metadata: "
3463 << cpp_strerror(r
) << dendl
;
3464 ds
<< get_err
.str();
3474 } else if (prefix
== "mon versions") {
3476 f
.reset(Formatter::create("json-pretty"));
3477 count_metadata("ceph_version", f
.get());
3482 } else if (prefix
== "mon count-metadata") {
3484 f
.reset(Formatter::create("json-pretty"));
3486 cmd_getval(g_ceph_context
, cmdmap
, "property", field
);
3487 count_metadata(field
, f
.get());
3492 } else if (prefix
== "quorum_status") {
3493 // make sure our map is readable and up to date
3494 if (!is_leader() && !is_peon()) {
3495 dout(10) << " waiting for quorum" << dendl
;
3496 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3499 _quorum_status(f
.get(), ds
);
3503 } else if (prefix
== "mon_status") {
3504 get_mon_status(f
.get(), ds
);
3510 } else if (prefix
== "sync force" ||
3511 prefix
== "mon sync force") {
3512 string validate1
, validate2
;
3513 cmd_getval(g_ceph_context
, cmdmap
, "validate1", validate1
);
3514 cmd_getval(g_ceph_context
, cmdmap
, "validate2", validate2
);
3515 if (validate1
!= "--yes-i-really-mean-it" ||
3516 validate2
!= "--i-know-what-i-am-doing") {
3518 rs
= "are you SURE? this will mean the monitor store will be "
3519 "erased. pass '--yes-i-really-mean-it "
3520 "--i-know-what-i-am-doing' if you really do.";
3523 sync_force(f
.get(), ds
);
3526 } else if (prefix
== "heap") {
3527 if (!ceph_using_tcmalloc())
3528 rs
= "tcmalloc not enabled, can't use heap profiler commands\n";
3531 cmd_getval(g_ceph_context
, cmdmap
, "heapcmd", heapcmd
);
3532 // XXX 1-element vector, change at callee or make vector here?
3533 vector
<string
> heapcmd_vec
;
3534 get_str_vec(heapcmd
, heapcmd_vec
);
3535 ceph_heap_profiler_handle_command(heapcmd_vec
, ds
);
3540 } else if (prefix
== "quorum") {
3542 cmd_getval(g_ceph_context
, cmdmap
, "quorumcmd", quorumcmd
);
3543 if (quorumcmd
== "exit") {
3545 elector
.stop_participating();
3546 rs
= "stopped responding to quorum, initiated new election";
3548 } else if (quorumcmd
== "enter") {
3549 elector
.start_participating();
3551 rs
= "started responding to quorum, initiated new election";
3554 rs
= "needs a valid 'quorum' command";
3557 } else if (prefix
== "version") {
3559 f
->open_object_section("version");
3560 f
->dump_string("version", pretty_version_to_str());
3564 ds
<< pretty_version_to_str();
3569 } else if (prefix
== "versions") {
3571 f
.reset(Formatter::create("json-pretty"));
3572 map
<string
,int> overall
;
3573 f
->open_object_section("version");
3574 map
<string
,int> mon
, mgr
, osd
, mds
;
3576 count_metadata("ceph_version", &mon
);
3577 f
->open_object_section("mon");
3578 for (auto& p
: mon
) {
3579 f
->dump_int(p
.first
.c_str(), p
.second
);
3580 overall
[p
.first
] += p
.second
;
3584 mgrmon()->count_metadata("ceph_version", &mgr
);
3585 f
->open_object_section("mgr");
3586 for (auto& p
: mgr
) {
3587 f
->dump_int(p
.first
.c_str(), p
.second
);
3588 overall
[p
.first
] += p
.second
;
3592 osdmon()->count_metadata("ceph_version", &osd
);
3593 f
->open_object_section("osd");
3594 for (auto& p
: osd
) {
3595 f
->dump_int(p
.first
.c_str(), p
.second
);
3596 overall
[p
.first
] += p
.second
;
3600 mdsmon()->count_metadata("ceph_version", &mds
);
3601 f
->open_object_section("mds");
3602 for (auto& p
: mon
) {
3603 f
->dump_int(p
.first
.c_str(), p
.second
);
3604 overall
[p
.first
] += p
.second
;
3608 for (auto& p
: mgrstatmon()->get_service_map().services
) {
3609 f
->open_object_section(p
.first
.c_str());
3611 p
.second
.count_metadata("ceph_version", &m
);
3613 f
->dump_int(q
.first
.c_str(), q
.second
);
3614 overall
[q
.first
] += q
.second
;
3619 f
->open_object_section("overall");
3620 for (auto& p
: overall
) {
3621 f
->dump_int(p
.first
.c_str(), p
.second
);
3631 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3632 reply_command(op
, r
, rs
, rdata
, 0);
3635 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3638 reply_command(op
, rc
, rs
, rdata
, version
);
3641 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3642 bufferlist
& rdata
, version_t version
)
3644 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
3645 assert(m
->get_type() == MSG_MON_COMMAND
);
3646 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3647 reply
->set_tid(m
->get_tid());
3648 reply
->set_data(rdata
);
3649 send_reply(op
, reply
);
3653 // ------------------------
3654 // request/reply routing
3656 // a client/mds/osd will connect to a random monitor. we need to forward any
3657 // messages requiring state updates to the leader, and then route any replies
3658 // back via the correct monitor and back to them. (the monitor will not
3659 // initiate any connections.)
3661 void Monitor::forward_request_leader(MonOpRequestRef op
)
3663 op
->mark_event(__func__
);
3665 int mon
= get_leader();
3666 MonSession
*session
= op
->get_session();
3667 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
3669 if (req
->get_source().is_mon() && req
->get_source_addr() != messenger
->get_myaddr()) {
3670 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
3671 } else if (session
->proxy_con
) {
3672 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
3673 } else if (!session
->closed
) {
3674 RoutedRequest
*rr
= new RoutedRequest
;
3675 rr
->tid
= ++routed_request_tid
;
3676 rr
->client_inst
= req
->get_source_inst();
3677 rr
->con
= req
->get_connection();
3678 rr
->con_features
= rr
->con
->get_features();
3679 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
3680 rr
->session
= static_cast<MonSession
*>(session
->get());
3682 routed_requests
[rr
->tid
] = rr
;
3683 session
->routed_request_tids
.insert(rr
->tid
);
3685 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
3686 << " features " << rr
->con_features
<< dendl
;
3688 MForward
*forward
= new MForward(rr
->tid
,
3692 forward
->set_priority(req
->get_priority());
3693 if (session
->auth_handler
) {
3694 forward
->entity_name
= session
->entity_name
;
3695 } else if (req
->get_source().is_mon()) {
3696 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
3698 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3699 op
->mark_forwarded();
3700 assert(op
->get_req()->get_type() != 0);
3702 dout(10) << "forward_request no session for request " << *req
<< dendl
;
3706 // fake connection attached to forwarded messages
3707 struct AnonConnection
: public Connection
{
3708 explicit AnonConnection(CephContext
*cct
) : Connection(cct
, NULL
) {}
3710 int send_message(Message
*m
) override
{
3711 assert(!"send_message on anonymous connection");
3713 void send_keepalive() override
{
3714 assert(!"send_keepalive on anonymous connection");
3716 void mark_down() override
{
3719 void mark_disposable() override
{
3722 bool is_connected() override
{ return false; }
3725 //extract the original message and put it into the regular dispatch function
3726 void Monitor::handle_forward(MonOpRequestRef op
)
3728 MForward
*m
= static_cast<MForward
*>(op
->get_req());
3729 dout(10) << "received forwarded message from " << m
->client
3730 << " via " << m
->get_source_inst() << dendl
;
3731 MonSession
*session
= op
->get_session();
3734 if (!session
->is_capable("mon", MON_CAP_X
)) {
3735 dout(0) << "forward from entity with insufficient caps! "
3736 << session
->caps
<< dendl
;
3738 // see PaxosService::dispatch(); we rely on this being anon
3739 // (c->msgr == NULL)
3740 PaxosServiceMessage
*req
= m
->claim_message();
3741 assert(req
!= NULL
);
3743 ConnectionRef
c(new AnonConnection(cct
));
3744 MonSession
*s
= new MonSession(req
->get_source_inst(),
3745 static_cast<Connection
*>(c
.get()));
3746 c
->set_priv(s
->get());
3747 c
->set_peer_addr(m
->client
.addr
);
3748 c
->set_peer_type(m
->client
.name
.type());
3749 c
->set_features(m
->con_features
);
3751 s
->caps
= m
->client_caps
;
3752 dout(10) << " caps are " << s
->caps
<< dendl
;
3753 s
->entity_name
= m
->entity_name
;
3754 dout(10) << " entity name '" << s
->entity_name
<< "' type "
3755 << s
->entity_name
.get_type() << dendl
;
3756 s
->proxy_con
= m
->get_connection();
3757 s
->proxy_tid
= m
->tid
;
3759 req
->set_connection(c
);
3761 // not super accurate, but better than nothing.
3762 req
->set_recv_stamp(m
->get_recv_stamp());
3765 * note which election epoch this is; we will drop the message if
3766 * there is a future election since our peers will resend routed
3767 * requests in that case.
3769 req
->rx_election_epoch
= get_epoch();
3771 /* Because this is a special fake connection, we need to break
3772 the ref loop between Connection and MonSession differently
3773 than we normally do. Here, the Message refers to the Connection
3774 which refers to the Session, and nobody else refers to the Connection
3775 or the Session. And due to the special nature of this message,
3776 nobody refers to the Connection via the Session. So, clear out that
3777 half of the ref loop.*/
3780 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
3787 void Monitor::try_send_message(Message
*m
, const entity_inst_t
& to
)
3789 dout(10) << "try_send_message " << *m
<< " to " << to
<< dendl
;
3792 encode_message(m
, quorum_con_features
, bl
);
3794 messenger
->send_message(m
, to
);
3796 for (int i
=0; i
<(int)monmap
->size(); i
++) {
3798 messenger
->send_message(new MRoute(bl
, to
), monmap
->get_inst(i
));
3802 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
3804 op
->mark_event(__func__
);
3806 MonSession
*session
= op
->get_session();
3808 Message
*req
= op
->get_req();
3809 ConnectionRef con
= op
->get_connection();
3811 reply
->set_cct(g_ceph_context
);
3812 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
3815 dout(2) << "send_reply no connection, dropping reply " << *reply
3816 << " to " << req
<< " " << *req
<< dendl
;
3818 op
->mark_event("reply: no connection");
3822 if (!session
->con
&& !session
->proxy_con
) {
3823 dout(2) << "send_reply no connection, dropping reply " << *reply
3824 << " to " << req
<< " " << *req
<< dendl
;
3826 op
->mark_event("reply: no connection");
3830 if (session
->proxy_con
) {
3831 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
3832 << " via " << session
->proxy_con
->get_peer_addr()
3833 << " for request " << *req
<< dendl
;
3834 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
3835 op
->mark_event("reply: send routed request");
3837 session
->con
->send_message(reply
);
3838 op
->mark_event("reply: send");
3842 void Monitor::no_reply(MonOpRequestRef op
)
3844 MonSession
*session
= op
->get_session();
3845 Message
*req
= op
->get_req();
3847 if (session
->proxy_con
) {
3848 dout(10) << "no_reply to " << req
->get_source_inst()
3849 << " via " << session
->proxy_con
->get_peer_addr()
3850 << " for request " << *req
<< dendl
;
3851 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
3852 op
->mark_event("no_reply: send routed request");
3854 dout(10) << "no_reply to " << req
->get_source_inst()
3855 << " " << *req
<< dendl
;
3856 op
->mark_event("no_reply");
3860 void Monitor::handle_route(MonOpRequestRef op
)
3862 MRoute
*m
= static_cast<MRoute
*>(op
->get_req());
3863 MonSession
*session
= op
->get_session();
3865 if (!session
->is_capable("mon", MON_CAP_X
)) {
3866 dout(0) << "MRoute received from entity without appropriate perms! "
3871 dout(10) << "handle_route " << *m
->msg
<< " to " << m
->dest
<< dendl
;
3873 dout(10) << "handle_route null to " << m
->dest
<< dendl
;
3876 if (m
->session_mon_tid
) {
3877 if (routed_requests
.count(m
->session_mon_tid
)) {
3878 RoutedRequest
*rr
= routed_requests
[m
->session_mon_tid
];
3880 // reset payload, in case encoding is dependent on target features
3882 m
->msg
->clear_payload();
3883 rr
->con
->send_message(m
->msg
);
3886 if (m
->send_osdmap_first
) {
3887 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
3888 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
3889 true, MonOpRequestRef());
3891 assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
3892 routed_requests
.erase(m
->session_mon_tid
);
3893 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
3896 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
3899 dout(10) << " not a routed request, trying to send anyway" << dendl
;
3901 messenger
->send_message(m
->msg
, m
->dest
);
3907 void Monitor::resend_routed_requests()
3909 dout(10) << "resend_routed_requests" << dendl
;
3910 int mon
= get_leader();
3911 list
<Context
*> retry
;
3912 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
3913 p
!= routed_requests
.end();
3915 RoutedRequest
*rr
= p
->second
;
3918 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
3919 rr
->op
->mark_event("retry routed request");
3920 retry
.push_back(new C_RetryMessage(this, rr
->op
));
3922 assert(rr
->session
->routed_request_tids
.count(p
->first
));
3923 rr
->session
->routed_request_tids
.erase(p
->first
);
3927 bufferlist::iterator q
= rr
->request_bl
.begin();
3928 PaxosServiceMessage
*req
= (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
3929 rr
->op
->mark_event("resend forwarded message to leader");
3930 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
<< dendl
;
3931 MForward
*forward
= new MForward(rr
->tid
, req
, rr
->con_features
,
3933 req
->put(); // forward takes its own ref; drop ours.
3934 forward
->client
= rr
->client_inst
;
3935 forward
->set_priority(req
->get_priority());
3936 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3940 routed_requests
.clear();
3941 finish_contexts(g_ceph_context
, retry
);
3945 void Monitor::remove_session(MonSession
*s
)
3947 dout(10) << "remove_session " << s
<< " " << s
->inst
3948 << " features 0x" << std::hex
<< s
->con_features
<< std::dec
<< dendl
;
3951 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
3952 p
!= s
->routed_request_tids
.end();
3954 assert(routed_requests
.count(*p
));
3955 RoutedRequest
*rr
= routed_requests
[*p
];
3956 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
3958 routed_requests
.erase(*p
);
3960 s
->routed_request_tids
.clear();
3961 s
->con
->set_priv(NULL
);
3962 session_map
.remove_session(s
);
3963 logger
->set(l_mon_num_sessions
, session_map
.get_size());
3964 logger
->inc(l_mon_session_rm
);
3967 void Monitor::remove_all_sessions()
3969 Mutex::Locker
l(session_map_lock
);
3970 while (!session_map
.sessions
.empty()) {
3971 MonSession
*s
= session_map
.sessions
.front();
3974 logger
->inc(l_mon_session_rm
);
3977 logger
->set(l_mon_num_sessions
, session_map
.get_size());
3980 void Monitor::send_command(const entity_inst_t
& inst
,
3981 const vector
<string
>& com
)
3983 dout(10) << "send_command " << inst
<< "" << com
<< dendl
;
3984 MMonCommand
*c
= new MMonCommand(monmap
->fsid
);
3986 try_send_message(c
, inst
);
3989 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
3992 * Wait list the new session until we're in the quorum, assuming it's
3994 * tick() will periodically send them back through so we can send
3995 * the client elsewhere if we don't think we're getting back in.
3997 * But we whitelist a few sorts of messages:
3998 * 1) Monitors can talk to us at any time, of course.
3999 * 2) auth messages. It's unlikely to go through much faster, but
4000 * it's possible we've just lost our quorum status and we want to take...
4001 * 3) command messages. We want to accept these under all possible
4004 Message
*m
= op
->get_req();
4005 MonSession
*s
= op
->get_session();
4006 ConnectionRef con
= op
->get_connection();
4007 utime_t too_old
= ceph_clock_now();
4008 too_old
-= g_ceph_context
->_conf
->mon_lease
;
4009 if (m
->get_recv_stamp() > too_old
&&
4010 con
->is_connected()) {
4011 dout(5) << "waitlisting message " << *m
<< dendl
;
4012 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
4013 op
->mark_wait_for_quorum();
4015 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
4017 // proxied sessions aren't registered and don't have a con; don't remove
4019 if (!s
->proxy_con
) {
4020 Mutex::Locker
l(session_map_lock
);
4027 void Monitor::_ms_dispatch(Message
*m
)
4029 if (is_shutdown()) {
4034 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
4035 bool src_is_mon
= op
->is_src_mon();
4036 op
->mark_event("mon:_ms_dispatch");
4037 MonSession
*s
= op
->get_session();
4038 if (s
&& s
->closed
) {
4042 if (src_is_mon
&& s
) {
4043 ConnectionRef con
= m
->get_connection();
4044 if (con
->get_messenger() && con
->get_features() != s
->con_features
) {
4045 // only update features if this is a non-anonymous connection
4046 dout(10) << __func__
<< " feature change for " << m
->get_source_inst()
4047 << " (was " << s
->con_features
4048 << ", now " << con
->get_features() << ")" << dendl
;
4049 // connection features changed - recreate session.
4050 if (s
->con
&& s
->con
!= con
) {
4051 dout(10) << __func__
<< " connection for " << m
->get_source_inst()
4052 << " changed from session; mark down and replace" << dendl
;
4053 s
->con
->mark_down();
4055 if (s
->item
.is_on_list()) {
4056 // forwarded messages' sessions are not in the sessions map and
4057 // exist only while the op is being handled.
4066 // if the sender is not a monitor, make sure their first message for a
4067 // session is an MAuth. If it is not, assume it's a stray message,
4068 // and considering that we are creating a new session it is safe to
4069 // assume that the sender hasn't authenticated yet, so we have no way
4070 // of assessing whether we should handle it or not.
4071 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
4072 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
4073 m
->get_type() != CEPH_MSG_PING
)) {
4074 dout(1) << __func__
<< " dropping stray message " << *m
4075 << " from " << m
->get_source_inst() << dendl
;
4079 ConnectionRef con
= m
->get_connection();
4081 Mutex::Locker
l(session_map_lock
);
4082 s
= session_map
.new_session(m
->get_source_inst(), con
.get());
4085 con
->set_priv(s
->get());
4086 dout(10) << __func__
<< " new session " << s
<< " " << *s
4087 << " features 0x" << std::hex
4088 << s
->con_features
<< std::dec
<< dendl
;
4091 logger
->set(l_mon_num_sessions
, session_map
.get_size());
4092 logger
->inc(l_mon_session_add
);
4095 // give it monitor caps; the peer type has been authenticated
4096 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
4097 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
4098 s
->caps
= *mon_caps
;
4102 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->inst
4108 s
->session_timeout
= ceph_clock_now();
4109 s
->session_timeout
+= g_conf
->mon_session_timeout
;
4111 if (s
->auth_handler
) {
4112 s
->entity_name
= s
->auth_handler
->get_entity_name();
4114 dout(20) << " caps " << s
->caps
.get_str() << dendl
;
4116 if ((is_synchronizing() ||
4117 (s
->global_id
== 0 && !exited_quorum
.is_zero())) &&
4119 m
->get_type() != CEPH_MSG_PING
) {
4120 waitlist_or_zap_client(op
);
4127 void Monitor::dispatch_op(MonOpRequestRef op
)
4129 op
->mark_event("mon:dispatch_op");
4130 MonSession
*s
= op
->get_session();
4133 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
4137 /* we will consider the default type as being 'monitor' until proven wrong */
4138 op
->set_type_monitor();
4139 /* deal with all messages that do not necessarily need caps */
4140 bool dealt_with
= true;
4141 switch (op
->get_req()->get_type()) {
4143 case MSG_MON_GLOBAL_ID
:
4145 op
->set_type_service();
4146 /* no need to check caps here */
4147 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
4154 /* MMonGetMap may be used by clients to obtain a monmap *before*
4155 * authenticating with the monitor. We need to handle these without
4156 * checking caps because, even on a cluster without cephx, we only set
4157 * session caps *after* the auth handshake. A good example of this
4158 * is when a client calls MonClient::get_monmap_privately(), which does
4159 * not authenticate when obtaining a monmap.
4161 case CEPH_MSG_MON_GET_MAP
:
4162 handle_mon_get_map(op
);
4165 case CEPH_MSG_MON_METADATA
:
4166 return handle_mon_metadata(op
);
4175 /* well, maybe the op belongs to a service... */
4176 op
->set_type_service();
4177 /* deal with all messages which caps should be checked somewhere else */
4179 switch (op
->get_req()->get_type()) {
4182 case CEPH_MSG_MON_GET_OSDMAP
:
4183 case CEPH_MSG_POOLOP
:
4184 case MSG_OSD_BEACON
:
4185 case MSG_OSD_MARK_ME_DOWN
:
4187 case MSG_OSD_FAILURE
:
4190 case MSG_OSD_PGTEMP
:
4191 case MSG_OSD_PG_CREATED
:
4192 case MSG_REMOVE_SNAPS
:
4193 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
4197 case MSG_MDS_BEACON
:
4198 case MSG_MDS_OFFLOAD_TARGETS
:
4199 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
4203 case MSG_MGR_BEACON
:
4204 paxos_service
[PAXOS_MGR
]->dispatch(op
);
4208 case MSG_MON_MGR_REPORT
:
4209 case CEPH_MSG_STATFS
:
4210 case MSG_GETPOOLSTATS
:
4211 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
4216 paxos_service
[PAXOS_PGMAP
]->dispatch(op
);
4221 paxos_service
[PAXOS_LOG
]->dispatch(op
);
4224 // handle_command() does its own caps checking
4225 case MSG_MON_COMMAND
:
4226 op
->set_type_command();
4237 /* nop, looks like it's not a service message; revert back to monitor */
4238 op
->set_type_monitor();
4240 /* messages we, the Monitor class, need to deal with
4241 * but may be sent by clients. */
4243 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
4244 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
4245 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
4251 switch (op
->get_req()->get_type()) {
4254 case CEPH_MSG_MON_GET_VERSION
:
4255 handle_get_version(op
);
4258 case CEPH_MSG_MON_SUBSCRIBE
:
4259 /* FIXME: check what's being subscribed, filter accordingly */
4260 handle_subscribe(op
);
4270 if (!op
->is_src_mon()) {
4271 dout(1) << __func__
<< " unexpected monitor message from"
4272 << " non-monitor entity " << op
->get_req()->get_source_inst()
4273 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
4277 /* messages that should only be sent by another monitor */
4279 switch (op
->get_req()->get_type()) {
4289 // Sync (i.e., the new slurp, but on steroids)
4297 /* log acks are sent from a monitor we sent the MLog to, and are
4298 never sent by clients to us. */
4300 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
4305 op
->set_type_service();
4306 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
4312 op
->set_type_paxos();
4313 MMonPaxos
*pm
= static_cast<MMonPaxos
*>(op
->get_req());
4314 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4319 if (state
== STATE_SYNCHRONIZING
) {
4320 // we are synchronizing. These messages would do us no
4321 // good, thus just drop them and ignore them.
4322 dout(10) << __func__
<< " ignore paxos msg from "
4323 << pm
->get_source_inst() << dendl
;
4328 if (pm
->epoch
> get_epoch()) {
4332 if (pm
->epoch
!= get_epoch()) {
4336 paxos
->dispatch(op
);
4341 case MSG_MON_ELECTION
:
4342 op
->set_type_election();
4343 //check privileges here for simplicity
4344 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4345 dout(0) << "MMonElection received from entity without enough caps!"
4346 << op
->get_session()->caps
<< dendl
;
4349 if (!is_probing() && !is_synchronizing()) {
4350 elector
.dispatch(op
);
4359 handle_timecheck(op
);
4362 case MSG_MON_HEALTH
:
4363 health_monitor
->dispatch(op
);
4366 case MSG_MON_HEALTH_CHECKS
:
4367 op
->set_type_service();
4368 paxos_service
[PAXOS_HEALTH
]->dispatch(op
);
4376 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4385 void Monitor::handle_ping(MonOpRequestRef op
)
4387 MPing
*m
= static_cast<MPing
*>(op
->get_req());
4388 dout(10) << __func__
<< " " << *m
<< dendl
;
4389 MPing
*reply
= new MPing
;
4390 entity_inst_t inst
= m
->get_source_inst();
4392 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4393 f
->open_object_section("pong");
4395 list
<string
> health_str
;
4396 get_health(health_str
, NULL
, f
.get());
4399 get_mon_status(f
.get(), ss
);
4405 ::encode(ss
.str(), payload
);
4406 reply
->set_payload(payload
);
4407 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4408 messenger
->send_message(reply
, inst
);
4411 void Monitor::timecheck_start()
4413 dout(10) << __func__
<< dendl
;
4414 timecheck_cleanup();
4415 timecheck_start_round();
4418 void Monitor::timecheck_finish()
4420 dout(10) << __func__
<< dendl
;
4421 timecheck_cleanup();
4424 void Monitor::timecheck_start_round()
4426 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4427 assert(is_leader());
4429 if (monmap
->size() == 1) {
4430 assert(0 == "We are alone; this shouldn't have been scheduled!");
4434 if (timecheck_round
% 2) {
4435 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4436 utime_t curr_time
= ceph_clock_now();
4437 double max
= g_conf
->mon_timecheck_interval
*3;
4438 if (curr_time
- timecheck_round_start
< max
) {
4439 dout(10) << __func__
<< " keep current round going" << dendl
;
4442 dout(10) << __func__
4443 << " finish current timecheck and start new" << dendl
;
4444 timecheck_cancel_round();
4448 assert(timecheck_round
% 2 == 0);
4451 timecheck_round_start
= ceph_clock_now();
4452 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4456 dout(10) << __func__
<< " setting up next event" << dendl
;
4457 timecheck_reset_event();
4460 void Monitor::timecheck_finish_round(bool success
)
4462 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4463 assert(timecheck_round
% 2);
4465 timecheck_round_start
= utime_t();
4468 assert(timecheck_waiting
.empty());
4469 assert(timecheck_acks
== quorum
.size());
4471 timecheck_check_skews();
4475 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4476 << " peers still waiting:";
4477 for (map
<entity_inst_t
,utime_t
>::iterator p
= timecheck_waiting
.begin();
4478 p
!= timecheck_waiting
.end(); ++p
) {
4479 *_dout
<< " " << p
->first
.name
;
4482 timecheck_waiting
.clear();
4484 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4487 void Monitor::timecheck_cancel_round()
4489 timecheck_finish_round(false);
4492 void Monitor::timecheck_cleanup()
4494 timecheck_round
= 0;
4496 timecheck_round_start
= utime_t();
4498 if (timecheck_event
) {
4499 timer
.cancel_event(timecheck_event
);
4500 timecheck_event
= NULL
;
4502 timecheck_waiting
.clear();
4503 timecheck_skews
.clear();
4504 timecheck_latencies
.clear();
4506 timecheck_rounds_since_clean
= 0;
4509 void Monitor::timecheck_reset_event()
4511 if (timecheck_event
) {
4512 timer
.cancel_event(timecheck_event
);
4513 timecheck_event
= NULL
;
4517 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4519 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4520 delay
= cct
->_conf
->mon_timecheck_interval
;
4523 dout(10) << __func__
<< " delay " << delay
4524 << " rounds_since_clean " << timecheck_rounds_since_clean
4527 timecheck_event
= new C_MonContext(this, [this](int) {
4528 timecheck_start_round();
4530 timer
.add_event_after(delay
, timecheck_event
);
4533 void Monitor::timecheck_check_skews()
4535 dout(10) << __func__
<< dendl
;
4536 assert(is_leader());
4537 assert((timecheck_round
% 2) == 0);
4538 if (monmap
->size() == 1) {
4539 assert(0 == "We are alone; we shouldn't have gotten here!");
4542 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4544 bool found_skew
= false;
4545 for (map
<entity_inst_t
, double>::iterator p
= timecheck_skews
.begin();
4546 p
!= timecheck_skews
.end(); ++p
) {
4549 if (timecheck_has_skew(p
->second
, &abs_skew
)) {
4550 dout(10) << __func__
4551 << " " << p
->first
<< " skew " << abs_skew
<< dendl
;
4557 ++timecheck_rounds_since_clean
;
4558 timecheck_reset_event();
4559 } else if (timecheck_rounds_since_clean
> 0) {
4561 << " no clock skews found after " << timecheck_rounds_since_clean
4562 << " rounds" << dendl
;
4563 // make sure the skews are really gone and not just a transient success
4564 // this will run just once if not in the presence of skews again.
4565 timecheck_rounds_since_clean
= 1;
4566 timecheck_reset_event();
4567 timecheck_rounds_since_clean
= 0;
4572 void Monitor::timecheck_report()
4574 dout(10) << __func__
<< dendl
;
4575 assert(is_leader());
4576 assert((timecheck_round
% 2) == 0);
4577 if (monmap
->size() == 1) {
4578 assert(0 == "We are alone; we shouldn't have gotten here!");
4582 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4583 bool do_output
= true; // only output report once
4584 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4585 if (monmap
->get_name(*q
) == name
)
4588 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_REPORT
);
4589 m
->epoch
= get_epoch();
4590 m
->round
= timecheck_round
;
4592 for (map
<entity_inst_t
, double>::iterator it
= timecheck_skews
.begin();
4593 it
!= timecheck_skews
.end(); ++it
) {
4594 double skew
= it
->second
;
4595 double latency
= timecheck_latencies
[it
->first
];
4597 m
->skews
[it
->first
] = skew
;
4598 m
->latencies
[it
->first
] = latency
;
4601 dout(25) << __func__
<< " " << it
->first
4602 << " latency " << latency
4603 << " skew " << skew
<< dendl
;
4607 entity_inst_t inst
= monmap
->get_inst(*q
);
4608 dout(10) << __func__
<< " send report to " << inst
<< dendl
;
4609 messenger
->send_message(m
, inst
);
4613 void Monitor::timecheck()
4615 dout(10) << __func__
<< dendl
;
4616 assert(is_leader());
4617 if (monmap
->size() == 1) {
4618 assert(0 == "We are alone; we shouldn't have gotten here!");
4621 assert(timecheck_round
% 2 != 0);
4623 timecheck_acks
= 1; // we ack ourselves
4625 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4626 << " round " << timecheck_round
<< dendl
;
4628 // we are at the eye of the storm; the point of reference
4629 timecheck_skews
[messenger
->get_myinst()] = 0.0;
4630 timecheck_latencies
[messenger
->get_myinst()] = 0.0;
4632 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4633 if (monmap
->get_name(*it
) == name
)
4636 entity_inst_t inst
= monmap
->get_inst(*it
);
4637 utime_t curr_time
= ceph_clock_now();
4638 timecheck_waiting
[inst
] = curr_time
;
4639 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_PING
);
4640 m
->epoch
= get_epoch();
4641 m
->round
= timecheck_round
;
4642 dout(10) << __func__
<< " send " << *m
<< " to " << inst
<< dendl
;
4643 messenger
->send_message(m
, inst
);
4647 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
4648 const double skew_bound
,
4649 const double latency
)
4651 health_status_t status
= HEALTH_OK
;
4652 assert(latency
>= 0);
4655 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
4656 status
= HEALTH_WARN
;
4657 ss
<< "clock skew " << abs_skew
<< "s"
4658 << " > max " << g_conf
->mon_clock_drift_allowed
<< "s";
4664 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
4666 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4667 dout(10) << __func__
<< " " << *m
<< dendl
;
4668 /* handles PONG's */
4669 assert(m
->op
== MTimeCheck::OP_PONG
);
4671 entity_inst_t other
= m
->get_source_inst();
4672 if (m
->epoch
< get_epoch()) {
4673 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
4674 << " from " << other
4675 << " curr " << get_epoch()
4676 << " -- severely lagged? discard" << dendl
;
4679 assert(m
->epoch
== get_epoch());
4681 if (m
->round
< timecheck_round
) {
4682 dout(1) << __func__
<< " got old round " << m
->round
4683 << " from " << other
4684 << " curr " << timecheck_round
<< " -- discard" << dendl
;
4688 utime_t curr_time
= ceph_clock_now();
4690 assert(timecheck_waiting
.count(other
) > 0);
4691 utime_t timecheck_sent
= timecheck_waiting
[other
];
4692 timecheck_waiting
.erase(other
);
4693 if (curr_time
< timecheck_sent
) {
4694 // our clock was readjusted -- drop everything until it all makes sense.
4695 dout(1) << __func__
<< " our clock was readjusted --"
4696 << " bump round and drop current check"
4698 timecheck_cancel_round();
4702 /* update peer latencies */
4703 double latency
= (double)(curr_time
- timecheck_sent
);
4705 if (timecheck_latencies
.count(other
) == 0)
4706 timecheck_latencies
[other
] = latency
;
4708 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
4709 timecheck_latencies
[other
] = avg_latency
;
4715 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4716 * and 'a' happens to be lower than 'b'; so we use double instead.
4718 * latency is always expected to be >= 0.
4720 * delta, the difference between theirs timestamp and ours, may either be
4721 * lower or higher than 0; will hardly ever be 0.
4723 * The absolute skew is the absolute delta minus the latency, which is
4724 * taken as a whole instead of an rtt given that there is some queueing
4725 * and dispatch times involved and it's hard to assess how long exactly
4726 * it took for the message to travel to the other side and be handled. So
4727 * we call it a bounded skew, the worst case scenario.
4731 * Given that the latency is always positive, we can establish that the
4732 * bounded skew will be:
4734 * 1. positive if the absolute delta is higher than the latency and
4736 * 2. negative if the absolute delta is higher than the latency and
4737 * delta is negative.
4738 * 3. zero if the absolute delta is lower than the latency.
4740 * On 3. we make a judgement call and treat the skew as non-existent.
4741 * This is because that, if the absolute delta is lower than the
4742 * latency, then the apparently existing skew is nothing more than a
4743 * side-effect of the high latency at work.
4745 * This may not be entirely true though, as a severely skewed clock
4746 * may be masked by an even higher latency, but with high latencies
4747 * we probably have worse issues to deal with than just skewed clocks.
4749 assert(latency
>= 0);
4751 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
4752 double abs_delta
= (delta
> 0 ? delta
: -delta
);
4753 double skew_bound
= abs_delta
- latency
;
4757 skew_bound
= -skew_bound
;
4760 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
4761 if (status
== HEALTH_ERR
)
4762 clog
->error() << other
<< " " << ss
.str();
4763 else if (status
== HEALTH_WARN
)
4764 clog
->warn() << other
<< " " << ss
.str();
4766 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
4767 << " delta " << delta
<< " skew_bound " << skew_bound
4768 << " latency " << latency
<< dendl
;
4770 timecheck_skews
[other
] = skew_bound
;
4773 if (timecheck_acks
== quorum
.size()) {
4774 dout(10) << __func__
<< " got pongs from everybody ("
4775 << timecheck_acks
<< " total)" << dendl
;
4776 assert(timecheck_skews
.size() == timecheck_acks
);
4777 assert(timecheck_waiting
.empty());
4778 // everyone has acked, so bump the round to finish it.
4779 timecheck_finish_round();
4783 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
4785 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4786 dout(10) << __func__
<< " " << *m
<< dendl
;
4789 assert(m
->op
== MTimeCheck::OP_PING
|| m
->op
== MTimeCheck::OP_REPORT
);
4791 if (m
->epoch
!= get_epoch()) {
4792 dout(1) << __func__
<< " got wrong epoch "
4793 << "(ours " << get_epoch()
4794 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
4798 if (m
->round
< timecheck_round
) {
4799 dout(1) << __func__
<< " got old round " << m
->round
4800 << " current " << timecheck_round
4801 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
4805 timecheck_round
= m
->round
;
4807 if (m
->op
== MTimeCheck::OP_REPORT
) {
4808 assert((timecheck_round
% 2) == 0);
4809 timecheck_latencies
.swap(m
->latencies
);
4810 timecheck_skews
.swap(m
->skews
);
4814 assert((timecheck_round
% 2) != 0);
4815 MTimeCheck
*reply
= new MTimeCheck(MTimeCheck::OP_PONG
);
4816 utime_t curr_time
= ceph_clock_now();
4817 reply
->timestamp
= curr_time
;
4818 reply
->epoch
= m
->epoch
;
4819 reply
->round
= m
->round
;
4820 dout(10) << __func__
<< " send " << *m
4821 << " to " << m
->get_source_inst() << dendl
;
4822 m
->get_connection()->send_message(reply
);
4825 void Monitor::handle_timecheck(MonOpRequestRef op
)
4827 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4828 dout(10) << __func__
<< " " << *m
<< dendl
;
4831 if (m
->op
!= MTimeCheck::OP_PONG
) {
4832 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
4834 handle_timecheck_leader(op
);
4836 } else if (is_peon()) {
4837 if (m
->op
!= MTimeCheck::OP_PING
&& m
->op
!= MTimeCheck::OP_REPORT
) {
4838 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
4840 handle_timecheck_peon(op
);
4843 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
4847 void Monitor::handle_subscribe(MonOpRequestRef op
)
4849 MMonSubscribe
*m
= static_cast<MMonSubscribe
*>(op
->get_req());
4850 dout(10) << "handle_subscribe " << *m
<< dendl
;
4854 MonSession
*s
= op
->get_session();
4857 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
4860 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4861 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
4864 // remove conflicting subscribes
4865 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4866 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
4867 it
!= s
->sub_map
.end(); ) {
4868 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
4869 Mutex::Locker
l(session_map_lock
);
4870 session_map
.remove_sub((it
++)->second
);
4878 Mutex::Locker
l(session_map_lock
);
4879 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
4880 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
4881 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
4884 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
4885 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
4886 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
4887 Subscription
*sub
= s
->sub_map
[p
->first
];
4888 assert(sub
!= nullptr);
4889 mdsmon()->check_sub(sub
);
4891 } else if (p
->first
== "osdmap") {
4892 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
4893 if (s
->osd_epoch
> p
->second
.start
) {
4894 // client needs earlier osdmaps on purpose, so reset the sent epoch
4897 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
4899 } else if (p
->first
== "osd_pg_creates") {
4900 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
4901 if (monmap
->get_required_features().contains_all(
4902 ceph::features::mon::FEATURE_LUMINOUS
)) {
4903 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
4905 pgmon()->check_sub(s
->sub_map
["osd_pg_creates"]);
4908 } else if (p
->first
== "monmap") {
4909 monmon()->check_sub(s
->sub_map
[p
->first
]);
4910 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4911 logmon()->check_sub(s
->sub_map
[p
->first
]);
4912 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
4913 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
4914 } else if (p
->first
== "servicemap") {
4915 mgrstatmon()->check_sub(s
->sub_map
[p
->first
]);
4920 // we only need to reply if the client is old enough to think it
4921 // has to send renewals.
4922 ConnectionRef con
= m
->get_connection();
4923 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
4924 m
->get_connection()->send_message(new MMonSubscribeAck(
4925 monmap
->get_fsid(), (int)g_conf
->mon_subscribe_interval
));
4930 void Monitor::handle_get_version(MonOpRequestRef op
)
4932 MMonGetVersion
*m
= static_cast<MMonGetVersion
*>(op
->get_req());
4933 dout(10) << "handle_get_version " << *m
<< dendl
;
4934 PaxosService
*svc
= NULL
;
4936 MonSession
*s
= op
->get_session();
4939 if (!is_leader() && !is_peon()) {
4940 dout(10) << " waiting for quorum" << dendl
;
4941 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
4945 if (m
->what
== "mdsmap") {
4947 } else if (m
->what
== "fsmap") {
4949 } else if (m
->what
== "osdmap") {
4951 } else if (m
->what
== "monmap") {
4954 derr
<< "invalid map type " << m
->what
<< dendl
;
4958 if (!svc
->is_readable()) {
4959 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
4963 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
4964 reply
->handle
= m
->handle
;
4965 reply
->version
= svc
->get_last_committed();
4966 reply
->oldest_version
= svc
->get_first_committed();
4967 reply
->set_tid(m
->get_tid());
4969 m
->get_connection()->send_message(reply
);
4975 bool Monitor::ms_handle_reset(Connection
*con
)
4977 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
4979 // ignore lossless monitor sessions
4980 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
4983 MonSession
*s
= static_cast<MonSession
*>(con
->get_priv());
4987 // break any con <-> session ref cycle
4988 s
->con
->set_priv(NULL
);
4993 Mutex::Locker
l(lock
);
4995 dout(10) << "reset/close on session " << s
->inst
<< dendl
;
4997 Mutex::Locker
l(session_map_lock
);
5004 bool Monitor::ms_handle_refused(Connection
*con
)
5006 // just log for now...
5007 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
5013 void Monitor::send_latest_monmap(Connection
*con
)
5016 monmap
->encode(bl
, con
->get_features());
5017 con
->send_message(new MMonMap(bl
));
5020 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
5022 MMonGetMap
*m
= static_cast<MMonGetMap
*>(op
->get_req());
5023 dout(10) << "handle_mon_get_map" << dendl
;
5024 send_latest_monmap(m
->get_connection().get());
5027 void Monitor::handle_mon_metadata(MonOpRequestRef op
)
5029 MMonMetadata
*m
= static_cast<MMonMetadata
*>(op
->get_req());
5031 dout(10) << __func__
<< dendl
;
5032 update_mon_metadata(m
->get_source().num(), std::move(m
->data
));
5036 void Monitor::update_mon_metadata(int from
, Metadata
&& m
)
5038 // NOTE: this is now for legacy (kraken or jewel) mons only.
5039 pending_metadata
[from
] = std::move(m
);
5041 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5043 ::encode(pending_metadata
, bl
);
5044 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5045 paxos
->trigger_propose();
5048 int Monitor::load_metadata()
5051 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
5054 bufferlist::iterator it
= bl
.begin();
5055 ::decode(mon_metadata
, it
);
5057 pending_metadata
= mon_metadata
;
5061 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
5064 if (!mon_metadata
.count(mon
)) {
5065 err
<< "mon." << mon
<< " not found";
5068 const Metadata
& m
= mon_metadata
[mon
];
5069 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
5070 f
->dump_string(p
->first
.c_str(), p
->second
);
5075 void Monitor::count_metadata(const string
& field
, map
<string
,int> *out
)
5077 for (auto& p
: mon_metadata
) {
5078 auto q
= p
.second
.find(field
);
5079 if (q
== p
.second
.end()) {
5080 (*out
)["unknown"]++;
5082 (*out
)[q
->second
]++;
5087 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
5089 map
<string
,int> by_val
;
5090 count_metadata(field
, &by_val
);
5091 f
->open_object_section(field
.c_str());
5092 for (auto& p
: by_val
) {
5093 f
->dump_int(p
.first
.c_str(), p
.second
);
5098 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
5100 map
<string
, list
<int> > mons
; // hostname => mon
5101 for (map
<int, Metadata
>::iterator it
= mon_metadata
.begin();
5102 it
!= mon_metadata
.end(); ++it
) {
5103 const Metadata
& m
= it
->second
;
5104 Metadata::const_iterator hostname
= m
.find("hostname");
5105 if (hostname
== m
.end()) {
5106 // not likely though
5109 mons
[hostname
->second
].push_back(it
->first
);
5112 dump_services(f
, mons
, "mon");
5116 // ----------------------------------------------
5119 int Monitor::scrub_start()
5121 dout(10) << __func__
<< dendl
;
5122 assert(is_leader());
5124 if (!scrub_result
.empty()) {
5125 clog
->info() << "scrub already in progress";
5129 scrub_event_cancel();
5130 scrub_result
.clear();
5131 scrub_state
.reset(new ScrubState
);
5137 int Monitor::scrub()
5139 assert(is_leader());
5140 assert(scrub_state
);
5142 scrub_cancel_timeout();
5143 wait_for_paxos_write();
5144 scrub_version
= paxos
->get_version();
5147 // scrub all keys if we're the only monitor in the quorum
5149 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
5151 for (set
<int>::iterator p
= quorum
.begin();
5156 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
5158 r
->key
= scrub_state
->last_key
;
5159 messenger
->send_message(r
, monmap
->get_inst(*p
));
5163 bool r
= _scrub(&scrub_result
[rank
],
5164 &scrub_state
->last_key
,
5167 scrub_state
->finished
= !r
;
5169 // only after we got our scrub results do we really care whether the
5170 // other monitors are late on their results. Also, this way we avoid
5171 // triggering the timeout if we end up getting stuck in _scrub() for
5172 // longer than the duration of the timeout.
5173 scrub_reset_timeout();
5175 if (quorum
.size() == 1) {
5176 assert(scrub_state
->finished
== true);
5182 void Monitor::handle_scrub(MonOpRequestRef op
)
5184 MMonScrub
*m
= static_cast<MMonScrub
*>(op
->get_req());
5185 dout(10) << __func__
<< " " << *m
<< dendl
;
5187 case MMonScrub::OP_SCRUB
:
5192 wait_for_paxos_write();
5194 if (m
->version
!= paxos
->get_version())
5197 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
5201 reply
->key
= m
->key
;
5202 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
5203 m
->get_connection()->send_message(reply
);
5207 case MMonScrub::OP_RESULT
:
5211 if (m
->version
!= scrub_version
)
5213 // reset the timeout each time we get a result
5214 scrub_reset_timeout();
5216 int from
= m
->get_source().num();
5217 assert(scrub_result
.count(from
) == 0);
5218 scrub_result
[from
] = m
->result
;
5220 if (scrub_result
.size() == quorum
.size()) {
5221 scrub_check_results();
5222 scrub_result
.clear();
5223 if (scrub_state
->finished
)
5233 bool Monitor::_scrub(ScrubResult
*r
,
5234 pair
<string
,string
> *start
,
5238 assert(start
!= NULL
);
5239 assert(num_keys
!= NULL
);
5241 set
<string
> prefixes
= get_sync_targets_names();
5242 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5244 dout(10) << __func__
<< " start (" << *start
<< ")"
5245 << " num_keys " << *num_keys
<< dendl
;
5247 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
5249 int scrubbed_keys
= 0;
5250 pair
<string
,string
> last_key
;
5252 while (it
->has_next_chunk()) {
5254 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
5257 pair
<string
,string
> k
= it
->get_next_key();
5258 if (prefixes
.count(k
.first
) == 0)
5261 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
5262 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
5263 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
5269 int err
= store
->get(k
.first
, k
.second
, bl
);
5272 uint32_t key_crc
= bl
.crc32c(0);
5273 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
5274 << " crc " << key_crc
<< dendl
;
5275 r
->prefix_keys
[k
.first
]++;
5276 if (r
->prefix_crc
.count(k
.first
) == 0) {
5277 r
->prefix_crc
[k
.first
] = 0;
5279 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
5281 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
5282 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
5283 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
5284 r
->prefix_crc
[k
.first
] += 1;
5291 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
5292 << " scrubbed_keys " << scrubbed_keys
5293 << " has_next " << it
->has_next_chunk() << dendl
;
5296 *num_keys
= scrubbed_keys
;
5298 return it
->has_next_chunk();
5301 void Monitor::scrub_check_results()
5303 dout(10) << __func__
<< dendl
;
5307 ScrubResult
& mine
= scrub_result
[rank
];
5308 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
5309 p
!= scrub_result
.end();
5311 if (p
->first
== rank
)
5313 if (p
->second
!= mine
) {
5315 clog
->error() << "scrub mismatch";
5316 clog
->error() << " mon." << rank
<< " " << mine
;
5317 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5321 clog
->info() << "scrub ok on " << quorum
<< ": " << mine
;
5324 inline void Monitor::scrub_timeout()
5326 dout(1) << __func__
<< " restarting scrub" << dendl
;
5331 void Monitor::scrub_finish()
5333 dout(10) << __func__
<< dendl
;
5335 scrub_event_start();
5338 void Monitor::scrub_reset()
5340 dout(10) << __func__
<< dendl
;
5341 scrub_cancel_timeout();
5343 scrub_result
.clear();
5344 scrub_state
.reset();
5347 inline void Monitor::scrub_update_interval(int secs
)
5349 // we don't care about changes if we are not the leader.
5350 // changes will be visible if we become the leader.
5354 dout(1) << __func__
<< " new interval = " << secs
<< dendl
;
5356 // if scrub already in progress, all changes will already be visible during
5357 // the next round. Nothing to do.
5358 if (scrub_state
!= NULL
)
5361 scrub_event_cancel();
5362 scrub_event_start();
5365 void Monitor::scrub_event_start()
5367 dout(10) << __func__
<< dendl
;
5370 scrub_event_cancel();
5372 if (cct
->_conf
->mon_scrub_interval
<= 0) {
5373 dout(1) << __func__
<< " scrub event is disabled"
5374 << " (mon_scrub_interval = " << cct
->_conf
->mon_scrub_interval
5379 scrub_event
= new C_MonContext(this, [this](int) {
5382 timer
.add_event_after(cct
->_conf
->mon_scrub_interval
, scrub_event
);
5385 void Monitor::scrub_event_cancel()
5387 dout(10) << __func__
<< dendl
;
5389 timer
.cancel_event(scrub_event
);
5394 inline void Monitor::scrub_cancel_timeout()
5396 if (scrub_timeout_event
) {
5397 timer
.cancel_event(scrub_timeout_event
);
5398 scrub_timeout_event
= NULL
;
5402 void Monitor::scrub_reset_timeout()
5404 dout(15) << __func__
<< " reset timeout event" << dendl
;
5405 scrub_cancel_timeout();
5407 scrub_timeout_event
= new C_MonContext(this, [this](int) {
5410 timer
.add_event_after(g_conf
->mon_scrub_timeout
, scrub_timeout_event
);
5413 /************ TICK ***************/
5414 void Monitor::new_tick()
5416 timer
.add_event_after(g_conf
->mon_tick_interval
, new C_MonContext(this, [this](int) {
5421 void Monitor::tick()
5424 dout(11) << "tick" << dendl
;
5426 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
) {
5432 utime_t now
= ceph_clock_now();
5434 Mutex::Locker
l(session_map_lock
);
5435 auto p
= session_map
.sessions
.begin();
5437 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5438 now
> (exited_quorum
+ 2*g_conf
->mon_lease
));
5444 // don't trim monitors
5445 if (s
->inst
.name
.is_mon())
5448 if (s
->session_timeout
< now
&& s
->con
) {
5449 // check keepalive, too
5450 s
->session_timeout
= s
->con
->get_last_keepalive();
5451 s
->session_timeout
+= g_conf
->mon_session_timeout
;
5453 if (s
->session_timeout
< now
) {
5454 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5455 << " (timeout " << s
->session_timeout
5456 << " < now " << now
<< ")" << dendl
;
5457 } else if (out_for_too_long
) {
5458 // boot the client Session because we've taken too long getting back in
5459 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5460 << " because we've been out of quorum too long" << dendl
;
5465 s
->con
->mark_down();
5467 logger
->inc(l_mon_session_trim
);
5470 sync_trim_providers();
5472 if (!maybe_wait_for_quorum
.empty()) {
5473 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5476 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5477 // this is only necessary on upgraded clusters.
5478 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5479 prepare_new_fingerprint(t
);
5480 paxos
->trigger_propose();
5486 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5489 nf
.generate_random();
5490 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5494 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5497 int Monitor::check_fsid()
5500 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5505 string
es(ebl
.c_str(), ebl
.length());
5507 // only keep the first line
5508 size_t pos
= es
.find_first_of('\n');
5509 if (pos
!= string::npos
)
5512 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5514 if (!ondisk
.parse(es
.c_str())) {
5515 derr
<< "error: unable to parse uuid" << dendl
;
5519 if (monmap
->get_fsid() != ondisk
) {
5520 derr
<< "error: cluster_uuid file exists with value " << ondisk
5521 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5528 int Monitor::write_fsid()
5530 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5532 int r
= store
->apply_transaction(t
);
5536 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5539 ss
<< monmap
->get_fsid() << "\n";
5540 string us
= ss
.str();
5545 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5550 * this is the closest thing to a traditional 'mkfs' for ceph.
5551 * initialize the monitor state machines to their initial values.
5553 int Monitor::mkfs(bufferlist
& osdmapbl
)
5555 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5557 // verify cluster fsid
5558 int r
= check_fsid();
5559 if (r
< 0 && r
!= -ENOENT
)
5563 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
5564 magicbl
.append("\n");
5565 t
->put(MONITOR_NAME
, "magic", magicbl
);
5568 features
= get_initial_supported_features();
5571 // save monmap, osdmap, keyring.
5572 bufferlist monmapbl
;
5573 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
5574 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5575 t
->put("mkfs", "monmap", monmapbl
);
5577 if (osdmapbl
.length()) {
5578 // make sure it's a valid osdmap
5581 om
.decode(osdmapbl
);
5583 catch (buffer::error
& e
) {
5584 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
5587 t
->put("mkfs", "osdmap", osdmapbl
);
5590 if (is_keyring_required()) {
5592 string keyring_filename
;
5594 r
= ceph_resolve_file_search(g_conf
->keyring
, keyring_filename
);
5596 derr
<< "unable to find a keyring file on " << g_conf
->keyring
5597 << ": " << cpp_strerror(r
) << dendl
;
5598 if (g_conf
->key
!= "") {
5599 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf
->key
+
5600 "\n\tcaps mon = \"allow *\"\n";
5602 bl
.append(keyring_plaintext
);
5604 bufferlist::iterator i
= bl
.begin();
5605 keyring
.decode_plaintext(i
);
5607 catch (const buffer::error
& e
) {
5608 derr
<< "error decoding keyring " << keyring_plaintext
5609 << ": " << e
.what() << dendl
;
5616 r
= keyring
.load(g_ceph_context
, keyring_filename
);
5618 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
5623 // put mon. key in external keyring; seed with everything else.
5624 extract_save_mon_key(keyring
);
5626 bufferlist keyringbl
;
5627 keyring
.encode_plaintext(keyringbl
);
5628 t
->put("mkfs", "keyring", keyringbl
);
5631 store
->apply_transaction(t
);
5636 int Monitor::write_default_keyring(bufferlist
& bl
)
5639 os
<< g_conf
->mon_data
<< "/keyring";
5642 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
, 0600);
5645 dout(0) << __func__
<< " failed to open " << os
.str()
5646 << ": " << cpp_strerror(err
) << dendl
;
5650 err
= bl
.write_fd(fd
);
5653 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5658 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
5660 EntityName mon_name
;
5661 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
5663 if (keyring
.get_auth(mon_name
, mon_key
)) {
5664 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
5666 pkey
.add(mon_name
, mon_key
);
5668 pkey
.encode_plaintext(bl
);
5669 write_default_keyring(bl
);
5670 keyring
.remove(mon_name
);
5674 bool Monitor::ms_get_authorizer(int service_id
, AuthAuthorizer
**authorizer
,
5677 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id
)
5683 // we only connect to other monitors and mgr; every else connects to us.
5684 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
5685 service_id
!= CEPH_ENTITY_TYPE_MGR
)
5688 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5690 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
5691 AuthNoneClientHandler
handler(g_ceph_context
, nullptr);
5692 handler
.set_global_id(0);
5693 *authorizer
= handler
.build_authorizer(service_id
);
5697 CephXServiceTicketInfo auth_ticket_info
;
5698 CephXSessionAuthInfo info
;
5702 name
.set_type(CEPH_ENTITY_TYPE_MON
);
5703 auth_ticket_info
.ticket
.name
= name
;
5704 auth_ticket_info
.ticket
.global_id
= 0;
5706 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
5707 // mon to mon authentication uses the private monitor shared key and not the
5710 if (!keyring
.get_secret(name
, secret
) &&
5711 !key_server
.get_secret(name
, secret
)) {
5712 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5714 stringstream ss
, ds
;
5715 int err
= key_server
.list_secrets(ds
);
5717 ss
<< "no installed auth entries!";
5719 ss
<< "installed auth entries:";
5720 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
5724 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
,
5725 secret
, (uint64_t)-1);
5727 dout(0) << __func__
<< " failed to build mon session_auth_info "
5728 << cpp_strerror(ret
) << dendl
;
5731 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
5733 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
);
5735 derr
<< __func__
<< " failed to build mgr service session_auth_info "
5736 << cpp_strerror(ret
) << dendl
;
5740 ceph_abort(); // see check at top of fn
5743 CephXTicketBlob blob
;
5744 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
5745 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl
;
5748 bufferlist ticket_data
;
5749 ::encode(blob
, ticket_data
);
5751 bufferlist::iterator iter
= ticket_data
.begin();
5752 CephXTicketHandler
handler(g_ceph_context
, service_id
);
5753 ::decode(handler
.ticket
, iter
);
5755 handler
.session_key
= info
.session_key
;
5757 *authorizer
= handler
.build_authorizer(0);
5762 bool Monitor::ms_verify_authorizer(Connection
*con
, int peer_type
,
5763 int protocol
, bufferlist
& authorizer_data
,
5764 bufferlist
& authorizer_reply
,
5765 bool& isvalid
, CryptoKey
& session_key
)
5767 dout(10) << "ms_verify_authorizer " << con
->get_peer_addr()
5768 << " " << ceph_entity_type_name(peer_type
)
5769 << " protocol " << protocol
<< dendl
;
5774 if (peer_type
== CEPH_ENTITY_TYPE_MON
&&
5775 auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5776 // monitor, and cephx is enabled
5778 if (protocol
== CEPH_AUTH_CEPHX
) {
5779 bufferlist::iterator iter
= authorizer_data
.begin();
5780 CephXServiceTicketInfo auth_ticket_info
;
5782 if (authorizer_data
.length()) {
5783 bool ret
= cephx_verify_authorizer(g_ceph_context
, &keyring
, iter
,
5784 auth_ticket_info
, authorizer_reply
);
5786 session_key
= auth_ticket_info
.session_key
;
5789 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con
->get_peer_addr() << dendl
;
5793 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl
;