1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
25 #include "common/version.h"
27 #include "osd/OSDMap.h"
29 #include "MonitorDBStore.h"
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
52 #include "messages/MAuthReply.h"
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/HealthMonitor.h"
81 #include "mon/ConfigKeyService.h"
82 #include "common/config.h"
83 #include "common/cmdparse.h"
84 #include "include/assert.h"
85 #include "include/compat.h"
86 #include "perfglue/heap_profiler.h"
88 #include "auth/none/AuthNoneClientHandler.h"
90 #define dout_subsys ceph_subsys_mon
92 #define dout_prefix _prefix(_dout, this)
93 static ostream
& _prefix(std::ostream
*_dout
, const Monitor
*mon
) {
94 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
95 << "(" << mon
->get_state_name() << ") e" << mon
->monmap
->get_epoch() << " ";
98 const string
Monitor::MONITOR_NAME
= "monitor";
99 const string
Monitor::MONITOR_STORE_PREFIX
= "monitor_store";
104 #undef COMMAND_WITH_FLAG
105 MonCommand mon_commands
[] = {
106 #define FLAG(f) (MonCommand::FLAG_##f)
107 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
108 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
109 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
110 {parsesig, helptext, modulename, req_perms, avail, flags},
111 #include <mon/MonCommands.h>
113 #undef COMMAND_WITH_FLAG
115 // FIXME: slurp up the Mgr commands too
117 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
118 {parsesig, helptext, modulename, req_perms, avail, FLAG(MGR)},
119 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
120 {parsesig, helptext, modulename, req_perms, avail, flags | FLAG(MGR)},
121 #include <mgr/MgrCommands.h>
123 #undef COMMAND_WITH_FLAG
128 void C_MonContext::finish(int r
) {
129 if (mon
->is_shutdown())
131 FunctionContext::finish(r
);
134 Monitor::Monitor(CephContext
* cct_
, string nm
, MonitorDBStore
*s
,
135 Messenger
*m
, Messenger
*mgr_m
, MonMap
*map
) :
140 con_self(m
? m
->get_loopback_connection() : NULL
),
141 lock("Monitor::lock"),
143 finisher(cct_
, "mon_finisher", "fin"),
144 cpu_tp(cct
, "Monitor::cpu_tp", "cpu_tp", g_conf
->mon_cpu_threads
),
145 has_ever_joined(false),
146 logger(NULL
), cluster_logger(NULL
), cluster_logger_registered(false),
148 log_client(cct_
, messenger
, monmap
, LogClient::FLAG_MON
),
149 key_server(cct
, &keyring
),
150 auth_cluster_required(cct
,
151 cct
->_conf
->auth_supported
.empty() ?
152 cct
->_conf
->auth_cluster_required
: cct
->_conf
->auth_supported
),
153 auth_service_required(cct
,
154 cct
->_conf
->auth_supported
.empty() ?
155 cct
->_conf
->auth_service_required
: cct
->_conf
->auth_supported
),
156 leader_supported_mon_commands(NULL
),
157 leader_supported_mon_commands_size(0),
158 mgr_messenger(mgr_m
),
159 mgr_client(cct_
, mgr_m
),
163 state(STATE_PROBING
),
166 required_features(0),
168 quorum_con_features(0),
172 scrub_timeout_event(NULL
),
175 sync_provider_count(0),
178 sync_start_version(0),
179 sync_timeout_event(NULL
),
180 sync_last_committed_floor(0),
184 timecheck_rounds_since_clean(0),
185 timecheck_event(NULL
),
187 paxos_service(PAXOS_NUM
),
189 routed_request_tid(0),
190 op_tracker(cct
, true, 1)
192 clog
= log_client
.create_channel(CLOG_CHANNEL_CLUSTER
);
193 audit_clog
= log_client
.create_channel(CLOG_CHANNEL_AUDIT
);
195 update_log_clients();
197 paxos
= new Paxos(this, "paxos");
199 paxos_service
[PAXOS_MDSMAP
] = new MDSMonitor(this, paxos
, "mdsmap");
200 paxos_service
[PAXOS_MONMAP
] = new MonmapMonitor(this, paxos
, "monmap");
201 paxos_service
[PAXOS_OSDMAP
] = new OSDMonitor(cct
, this, paxos
, "osdmap");
202 paxos_service
[PAXOS_PGMAP
] = new PGMonitor(this, paxos
, "pgmap");
203 paxos_service
[PAXOS_LOG
] = new LogMonitor(this, paxos
, "logm");
204 paxos_service
[PAXOS_AUTH
] = new AuthMonitor(this, paxos
, "auth");
205 paxos_service
[PAXOS_MGR
] = new MgrMonitor(this, paxos
, "mgr");
206 paxos_service
[PAXOS_MGRSTAT
] = new MgrStatMonitor(this, paxos
, "mgrstat");
208 health_monitor
= new HealthMonitor(this);
209 config_key_service
= new ConfigKeyService(this, paxos
);
211 mon_caps
= new MonCap();
212 bool r
= mon_caps
->parse("allow *", NULL
);
215 exited_quorum
= ceph_clock_now();
217 // assume our commands until we have an election. this only means
218 // we won't reply with EINVAL before the election; any command that
219 // actually matters will wait until we have quorum etc and then
220 // retry (and revalidate).
221 const MonCommand
*cmds
;
223 get_locally_supported_monitor_commands(&cmds
, &cmdsize
);
224 set_leader_supported_commands(cmds
, cmdsize
);
226 // note: OSDMonitor may update this based on the luminous flag.
227 pgservice
= mgrstatmon()->get_pg_stat_service();
230 PaxosService
*Monitor::get_paxos_service_by_name(const string
& name
)
232 if (name
== "mdsmap")
233 return paxos_service
[PAXOS_MDSMAP
];
234 if (name
== "monmap")
235 return paxos_service
[PAXOS_MONMAP
];
236 if (name
== "osdmap")
237 return paxos_service
[PAXOS_OSDMAP
];
239 return paxos_service
[PAXOS_PGMAP
];
241 return paxos_service
[PAXOS_LOG
];
243 return paxos_service
[PAXOS_AUTH
];
245 return paxos_service
[PAXOS_MGR
];
247 assert(0 == "given name does not match known paxos service");
253 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
255 delete health_monitor
;
256 delete config_key_service
;
258 assert(session_map
.sessions
.empty());
260 if (leader_supported_mon_commands
!= mon_commands
)
261 delete[] leader_supported_mon_commands
;
265 class AdminHook
: public AdminSocketHook
{
268 explicit AdminHook(Monitor
*m
) : mon(m
) {}
269 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
270 bufferlist
& out
) override
{
272 mon
->do_admin_command(command
, cmdmap
, format
, ss
);
278 void Monitor::do_admin_command(string command
, cmdmap_t
& cmdmap
, string format
,
281 Mutex::Locker
l(lock
);
283 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
286 for (cmdmap_t::iterator p
= cmdmap
.begin();
287 p
!= cmdmap
.end(); ++p
) {
288 if (p
->first
== "prefix")
292 args
+= cmd_vartype_stringify(p
->second
);
294 args
= "[" + args
+ "]";
296 bool read_only
= (command
== "mon_status" ||
297 command
== "mon metadata" ||
298 command
== "quorum_status" ||
301 (read_only
? audit_clog
->debug() : audit_clog
->info())
302 << "from='admin socket' entity='admin socket' "
303 << "cmd='" << command
<< "' args=" << args
<< ": dispatch";
305 if (command
== "mon_status") {
306 get_mon_status(f
.get(), ss
);
309 } else if (command
== "quorum_status") {
310 _quorum_status(f
.get(), ss
);
311 } else if (command
== "sync_force") {
313 if ((!cmd_getval(g_ceph_context
, cmdmap
, "validate", validate
)) ||
314 (validate
!= "--yes-i-really-mean-it")) {
315 ss
<< "are you SURE? this will mean the monitor store will be erased "
316 "the next time the monitor is restarted. pass "
317 "'--yes-i-really-mean-it' if you really do.";
320 sync_force(f
.get(), ss
);
321 } else if (command
.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
322 if (!_add_bootstrap_peer_hint(command
, cmdmap
, ss
))
324 } else if (command
== "quorum enter") {
325 elector
.start_participating();
327 ss
<< "started responding to quorum, initiated new election";
328 } else if (command
== "quorum exit") {
330 elector
.stop_participating();
331 ss
<< "stopped responding to quorum, initiated new election";
332 } else if (command
== "ops") {
333 (void)op_tracker
.dump_ops_in_flight(f
.get());
338 assert(0 == "bad AdminSocket command binding");
340 (read_only
? audit_clog
->debug() : audit_clog
->info())
341 << "from='admin socket' "
342 << "entity='admin socket' "
343 << "cmd=" << command
<< " "
344 << "args=" << args
<< ": finished";
348 (read_only
? audit_clog
->debug() : audit_clog
->info())
349 << "from='admin socket' "
350 << "entity='admin socket' "
351 << "cmd=" << command
<< " "
352 << "args=" << args
<< ": aborted";
355 void Monitor::handle_signal(int signum
)
357 assert(signum
== SIGINT
|| signum
== SIGTERM
);
358 derr
<< "*** Got Signal " << sig_str(signum
) << " ***" << dendl
;
362 CompatSet
Monitor::get_initial_supported_features()
364 CompatSet::FeatureSet ceph_mon_feature_compat
;
365 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
366 CompatSet::FeatureSet ceph_mon_feature_incompat
;
367 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
368 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS
);
369 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
370 ceph_mon_feature_incompat
);
373 CompatSet
Monitor::get_supported_features()
375 CompatSet compat
= get_initial_supported_features();
376 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
377 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
378 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
379 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
380 compat
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
384 CompatSet
Monitor::get_legacy_features()
386 CompatSet::FeatureSet ceph_mon_feature_compat
;
387 CompatSet::FeatureSet ceph_mon_feature_ro_compat
;
388 CompatSet::FeatureSet ceph_mon_feature_incompat
;
389 ceph_mon_feature_incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_BASE
);
390 return CompatSet(ceph_mon_feature_compat
, ceph_mon_feature_ro_compat
,
391 ceph_mon_feature_incompat
);
394 int Monitor::check_features(MonitorDBStore
*store
)
396 CompatSet required
= get_supported_features();
399 read_features_off_disk(store
, &ondisk
);
401 if (!required
.writeable(ondisk
)) {
402 CompatSet diff
= required
.unsupported(ondisk
);
403 generic_derr
<< "ERROR: on disk data includes unsupported features: " << diff
<< dendl
;
410 void Monitor::read_features_off_disk(MonitorDBStore
*store
, CompatSet
*features
)
412 bufferlist featuresbl
;
413 store
->get(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
414 if (featuresbl
.length() == 0) {
415 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
416 << "Assuming it is old-style and introducing one." << dendl
;
417 //we only want the baseline ~v.18 features assumed to be on disk.
418 //If new features are introduced this code needs to disappear or
420 *features
= get_legacy_features();
422 features
->encode(featuresbl
);
423 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
424 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, featuresbl
);
425 store
->apply_transaction(t
);
427 bufferlist::iterator it
= featuresbl
.begin();
428 features
->decode(it
);
432 void Monitor::read_features()
434 read_features_off_disk(store
, &features
);
435 dout(10) << "features " << features
<< dendl
;
437 calc_quorum_requirements();
438 dout(10) << "required_features " << required_features
<< dendl
;
441 void Monitor::write_features(MonitorDBStore::TransactionRef t
)
445 t
->put(MONITOR_NAME
, COMPAT_SET_LOC
, bl
);
448 const char** Monitor::get_tracked_conf_keys() const
450 static const char* KEYS
[] = {
451 "crushtool", // helpful for testing
452 "mon_election_timeout",
454 "mon_lease_renew_interval_factor",
455 "mon_lease_ack_timeout_factor",
456 "mon_accept_timeout_factor",
460 "clog_to_syslog_facility",
461 "clog_to_syslog_level",
463 "clog_to_graylog_host",
464 "clog_to_graylog_port",
467 // periodic health to clog
468 "mon_health_to_clog",
469 "mon_health_to_clog_interval",
470 "mon_health_to_clog_tick_interval",
472 "mon_scrub_interval",
478 void Monitor::handle_conf_change(const struct md_config_t
*conf
,
479 const std::set
<std::string
> &changed
)
483 dout(10) << __func__
<< " " << changed
<< dendl
;
485 if (changed
.count("clog_to_monitors") ||
486 changed
.count("clog_to_syslog") ||
487 changed
.count("clog_to_syslog_level") ||
488 changed
.count("clog_to_syslog_facility") ||
489 changed
.count("clog_to_graylog") ||
490 changed
.count("clog_to_graylog_host") ||
491 changed
.count("clog_to_graylog_port") ||
492 changed
.count("host") ||
493 changed
.count("fsid")) {
494 update_log_clients();
497 if (changed
.count("mon_health_to_clog") ||
498 changed
.count("mon_health_to_clog_interval") ||
499 changed
.count("mon_health_to_clog_tick_interval")) {
500 health_to_clog_update_conf(changed
);
503 if (changed
.count("mon_scrub_interval")) {
504 scrub_update_interval(conf
->mon_scrub_interval
);
508 void Monitor::update_log_clients()
510 map
<string
,string
> log_to_monitors
;
511 map
<string
,string
> log_to_syslog
;
512 map
<string
,string
> log_channel
;
513 map
<string
,string
> log_prio
;
514 map
<string
,string
> log_to_graylog
;
515 map
<string
,string
> log_to_graylog_host
;
516 map
<string
,string
> log_to_graylog_port
;
520 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
521 log_channel
, log_prio
, log_to_graylog
,
522 log_to_graylog_host
, log_to_graylog_port
,
526 clog
->update_config(log_to_monitors
, log_to_syslog
,
527 log_channel
, log_prio
, log_to_graylog
,
528 log_to_graylog_host
, log_to_graylog_port
,
531 audit_clog
->update_config(log_to_monitors
, log_to_syslog
,
532 log_channel
, log_prio
, log_to_graylog
,
533 log_to_graylog_host
, log_to_graylog_port
,
537 int Monitor::sanitize_options()
541 // mon_lease must be greater than mon_lease_renewal; otherwise we
542 // may incur in leases expiring before they are renewed.
543 if (g_conf
->mon_lease_renew_interval_factor
>= 1.0) {
544 clog
->error() << "mon_lease_renew_interval_factor ("
545 << g_conf
->mon_lease_renew_interval_factor
546 << ") must be less than 1.0";
550 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
551 // got time to renew the lease and get an ack for it. Having both options
552 // with the same value, for a given small vale, could mean timing out if
553 // the monitors happened to be overloaded -- or even under normal load for
554 // a small enough value.
555 if (g_conf
->mon_lease_ack_timeout_factor
<= 1.0) {
556 clog
->error() << "mon_lease_ack_timeout_factor ("
557 << g_conf
->mon_lease_ack_timeout_factor
558 << ") must be greater than 1.0";
565 int Monitor::preinit()
569 dout(1) << "preinit fsid " << monmap
->fsid
<< dendl
;
571 int r
= sanitize_options();
573 derr
<< "option sanitization failed!" << dendl
;
580 PerfCountersBuilder
pcb(g_ceph_context
, "mon", l_mon_first
, l_mon_last
);
581 pcb
.add_u64(l_mon_num_sessions
, "num_sessions", "Open sessions", "sess");
582 pcb
.add_u64_counter(l_mon_session_add
, "session_add", "Created sessions", "sadd");
583 pcb
.add_u64_counter(l_mon_session_rm
, "session_rm", "Removed sessions", "srm");
584 pcb
.add_u64_counter(l_mon_session_trim
, "session_trim", "Trimmed sessions");
585 pcb
.add_u64_counter(l_mon_num_elections
, "num_elections", "Elections participated in");
586 pcb
.add_u64_counter(l_mon_election_call
, "election_call", "Elections started");
587 pcb
.add_u64_counter(l_mon_election_win
, "election_win", "Elections won");
588 pcb
.add_u64_counter(l_mon_election_lose
, "election_lose", "Elections lost");
589 logger
= pcb
.create_perf_counters();
590 cct
->get_perfcounters_collection()->add(logger
);
593 assert(!cluster_logger
);
595 PerfCountersBuilder
pcb(g_ceph_context
, "cluster", l_cluster_first
, l_cluster_last
);
596 pcb
.add_u64(l_cluster_num_mon
, "num_mon", "Monitors");
597 pcb
.add_u64(l_cluster_num_mon_quorum
, "num_mon_quorum", "Monitors in quorum");
598 pcb
.add_u64(l_cluster_num_osd
, "num_osd", "OSDs");
599 pcb
.add_u64(l_cluster_num_osd_up
, "num_osd_up", "OSDs that are up");
600 pcb
.add_u64(l_cluster_num_osd_in
, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
601 pcb
.add_u64(l_cluster_osd_epoch
, "osd_epoch", "Current epoch of OSD map");
602 pcb
.add_u64(l_cluster_osd_bytes
, "osd_bytes", "Total capacity of cluster");
603 pcb
.add_u64(l_cluster_osd_bytes_used
, "osd_bytes_used", "Used space");
604 pcb
.add_u64(l_cluster_osd_bytes_avail
, "osd_bytes_avail", "Available space");
605 pcb
.add_u64(l_cluster_num_pool
, "num_pool", "Pools");
606 pcb
.add_u64(l_cluster_num_pg
, "num_pg", "Placement groups");
607 pcb
.add_u64(l_cluster_num_pg_active_clean
, "num_pg_active_clean", "Placement groups in active+clean state");
608 pcb
.add_u64(l_cluster_num_pg_active
, "num_pg_active", "Placement groups in active state");
609 pcb
.add_u64(l_cluster_num_pg_peering
, "num_pg_peering", "Placement groups in peering state");
610 pcb
.add_u64(l_cluster_num_object
, "num_object", "Objects");
611 pcb
.add_u64(l_cluster_num_object_degraded
, "num_object_degraded", "Degraded (missing replicas) objects");
612 pcb
.add_u64(l_cluster_num_object_misplaced
, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
613 pcb
.add_u64(l_cluster_num_object_unfound
, "num_object_unfound", "Unfound objects");
614 pcb
.add_u64(l_cluster_num_bytes
, "num_bytes", "Size of all objects");
615 pcb
.add_u64(l_cluster_num_mds_up
, "num_mds_up", "MDSs that are up");
616 pcb
.add_u64(l_cluster_num_mds_in
, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
617 pcb
.add_u64(l_cluster_num_mds_failed
, "num_mds_failed", "Failed MDS");
618 pcb
.add_u64(l_cluster_mds_epoch
, "mds_epoch", "Current epoch of MDS map");
619 cluster_logger
= pcb
.create_perf_counters();
622 paxos
->init_logger();
624 // verify cluster_uuid
626 int r
= check_fsid();
638 // have we ever joined a quorum?
639 has_ever_joined
= (store
->get(MONITOR_NAME
, "joined") != 0);
640 dout(10) << "has_ever_joined = " << (int)has_ever_joined
<< dendl
;
642 if (!has_ever_joined
) {
643 // impose initial quorum restrictions?
644 list
<string
> initial_members
;
645 get_str_list(g_conf
->mon_initial_members
, initial_members
);
647 if (!initial_members
.empty()) {
648 dout(1) << " initial_members " << initial_members
<< ", filtering seed monmap" << dendl
;
650 monmap
->set_initial_members(g_ceph_context
, initial_members
, name
, messenger
->get_myaddr(),
653 dout(10) << " monmap is " << *monmap
<< dendl
;
654 dout(10) << " extra probe peers " << extra_probe_peers
<< dendl
;
656 } else if (!monmap
->contains(name
)) {
657 derr
<< "not in monmap and have been in a quorum before; "
658 << "must have been removed" << dendl
;
659 if (g_conf
->mon_force_quorum_join
) {
660 dout(0) << "we should have died but "
661 << "'mon_force_quorum_join' is set -- allowing boot" << dendl
;
663 derr
<< "commit suicide!" << dendl
;
670 // We have a potentially inconsistent store state in hands. Get rid of it
672 bool clear_store
= false;
673 if (store
->exists("mon_sync", "in_sync")) {
674 dout(1) << __func__
<< " clean up potentially inconsistent store state"
679 if (store
->get("mon_sync", "force_sync") > 0) {
680 dout(1) << __func__
<< " force sync by clearing store state" << dendl
;
685 set
<string
> sync_prefixes
= get_sync_targets_names();
686 store
->clear(sync_prefixes
);
690 sync_last_committed_floor
= store
->get("mon_sync", "last_committed_floor");
691 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor
<< dendl
;
694 health_monitor
->init();
696 if (is_keyring_required()) {
697 // we need to bootstrap authentication keys so we can form an
699 if (authmon()->get_last_committed() == 0) {
700 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl
;
702 int err
= store
->get("mkfs", "keyring", bl
);
703 if (err
== 0 && bl
.length() > 0) {
704 // Attempt to decode and extract keyring only if it is found.
706 bufferlist::iterator p
= bl
.begin();
707 ::decode(keyring
, p
);
708 extract_save_mon_key(keyring
);
712 string keyring_loc
= g_conf
->mon_data
+ "/keyring";
714 r
= keyring
.load(cct
, keyring_loc
);
717 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
719 if (key_server
.get_auth(mon_name
, mon_key
)) {
720 dout(1) << "copying mon. key from old db to external keyring" << dendl
;
721 keyring
.add(mon_name
, mon_key
);
723 keyring
.encode_plaintext(bl
);
724 write_default_keyring(bl
);
726 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
733 admin_hook
= new AdminHook(this);
734 AdminSocket
* admin_socket
= cct
->get_admin_socket();
736 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
738 r
= admin_socket
->register_command("mon_status", "mon_status", admin_hook
,
739 "show current monitor status");
741 r
= admin_socket
->register_command("quorum_status", "quorum_status",
742 admin_hook
, "show current quorum status");
744 r
= admin_socket
->register_command("sync_force",
745 "sync_force name=validate,"
747 "strings=--yes-i-really-mean-it",
749 "force sync of and clear monitor store");
751 r
= admin_socket
->register_command("add_bootstrap_peer_hint",
752 "add_bootstrap_peer_hint name=addr,"
755 "add peer address as potential bootstrap"
756 " peer for cluster bringup");
758 r
= admin_socket
->register_command("quorum enter", "quorum enter",
760 "force monitor back into quorum");
762 r
= admin_socket
->register_command("quorum exit", "quorum exit",
764 "force monitor out of the quorum");
766 r
= admin_socket
->register_command("ops",
769 "show the ops currently in flight");
774 // add ourselves as a conf observer
775 g_conf
->add_observer(this);
783 dout(2) << "init" << dendl
;
784 Mutex::Locker
l(lock
);
795 messenger
->add_dispatcher_tail(this);
798 mgr_messenger
->add_dispatcher_tail(&mgr_client
);
799 mgr_messenger
->add_dispatcher_tail(this); // for auth ms_* calls
803 // encode command sets
804 const MonCommand
*cmds
;
806 get_locally_supported_monitor_commands(&cmds
, &cmdsize
);
807 MonCommand::encode_array(cmds
, cmdsize
, supported_commands_bl
);
812 void Monitor::init_paxos()
814 dout(10) << __func__
<< dendl
;
818 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
819 paxos_service
[i
]->init();
822 refresh_from_paxos(NULL
);
825 void Monitor::refresh_from_paxos(bool *need_bootstrap
)
827 dout(10) << __func__
<< dendl
;
830 int r
= store
->get(MONITOR_NAME
, "cluster_fingerprint", bl
);
833 bufferlist::iterator p
= bl
.begin();
834 ::decode(fingerprint
, p
);
836 catch (buffer::error
& e
) {
837 dout(10) << __func__
<< " failed to decode cluster_fingerprint" << dendl
;
840 dout(10) << __func__
<< " no cluster_fingerprint" << dendl
;
843 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
844 paxos_service
[i
]->refresh(need_bootstrap
);
846 for (int i
= 0; i
< PAXOS_NUM
; ++i
) {
847 paxos_service
[i
]->post_refresh();
851 void Monitor::register_cluster_logger()
853 if (!cluster_logger_registered
) {
854 dout(10) << "register_cluster_logger" << dendl
;
855 cluster_logger_registered
= true;
856 cct
->get_perfcounters_collection()->add(cluster_logger
);
858 dout(10) << "register_cluster_logger - already registered" << dendl
;
862 void Monitor::unregister_cluster_logger()
864 if (cluster_logger_registered
) {
865 dout(10) << "unregister_cluster_logger" << dendl
;
866 cluster_logger_registered
= false;
867 cct
->get_perfcounters_collection()->remove(cluster_logger
);
869 dout(10) << "unregister_cluster_logger - not registered" << dendl
;
873 void Monitor::update_logger()
875 cluster_logger
->set(l_cluster_num_mon
, monmap
->size());
876 cluster_logger
->set(l_cluster_num_mon_quorum
, quorum
.size());
879 void Monitor::shutdown()
881 dout(1) << "shutdown" << dendl
;
885 wait_for_paxos_write();
887 state
= STATE_SHUTDOWN
;
889 g_conf
->remove_observer(this);
892 AdminSocket
* admin_socket
= cct
->get_admin_socket();
893 admin_socket
->unregister_command("mon_status");
894 admin_socket
->unregister_command("quorum_status");
895 admin_socket
->unregister_command("sync_force");
896 admin_socket
->unregister_command("add_bootstrap_peer_hint");
897 admin_socket
->unregister_command("quorum enter");
898 admin_socket
->unregister_command("quorum exit");
899 admin_socket
->unregister_command("ops");
906 mgr_client
.shutdown();
909 finisher
.wait_for_empty();
915 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
917 health_monitor
->shutdown();
919 finish_contexts(g_ceph_context
, waitfor_quorum
, -ECANCELED
);
920 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
, -ECANCELED
);
926 remove_all_sessions();
929 cct
->get_perfcounters_collection()->remove(logger
);
933 if (cluster_logger
) {
934 if (cluster_logger_registered
)
935 cct
->get_perfcounters_collection()->remove(cluster_logger
);
936 delete cluster_logger
;
937 cluster_logger
= NULL
;
940 log_client
.shutdown();
942 // unlock before msgr shutdown...
945 messenger
->shutdown(); // last thing! ceph_mon.cc will delete mon.
946 mgr_messenger
->shutdown();
949 void Monitor::wait_for_paxos_write()
951 if (paxos
->is_writing() || paxos
->is_writing_previous()) {
952 dout(10) << __func__
<< " flushing pending write" << dendl
;
956 dout(10) << __func__
<< " flushed pending write" << dendl
;
960 void Monitor::bootstrap()
962 dout(10) << "bootstrap" << dendl
;
963 wait_for_paxos_write();
965 sync_reset_requester();
966 unregister_cluster_logger();
967 cancel_probe_timeout();
970 int newrank
= monmap
->get_rank(messenger
->get_myaddr());
971 if (newrank
< 0 && rank
>= 0) {
972 // was i ever part of the quorum?
973 if (has_ever_joined
) {
974 dout(0) << " removed from monmap, suicide." << dendl
;
978 if (newrank
!= rank
) {
979 dout(0) << " my rank is now " << newrank
<< " (was " << rank
<< ")" << dendl
;
980 messenger
->set_myname(entity_name_t::MON(newrank
));
983 // reset all connections, or else our peers will think we are someone else.
984 messenger
->mark_down_all();
988 state
= STATE_PROBING
;
993 if (g_conf
->mon_compact_on_bootstrap
) {
994 dout(10) << "bootstrap -- triggering compaction" << dendl
;
996 dout(10) << "bootstrap -- finished compaction" << dendl
;
999 // singleton monitor?
1000 if (monmap
->size() == 1 && rank
== 0) {
1001 win_standalone_election();
1005 reset_probe_timeout();
1007 // i'm outside the quorum
1008 if (monmap
->contains(name
))
1009 outside_quorum
.insert(name
);
1012 dout(10) << "probing other monitors" << dendl
;
1013 for (unsigned i
= 0; i
< monmap
->size(); i
++) {
1015 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
),
1016 monmap
->get_inst(i
));
1018 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
1019 p
!= extra_probe_peers
.end();
1021 if (*p
!= messenger
->get_myaddr()) {
1023 i
.name
= entity_name_t::MON(-1);
1025 messenger
->send_message(new MMonProbe(monmap
->fsid
, MMonProbe::OP_PROBE
, name
, has_ever_joined
), i
);
1030 bool Monitor::_add_bootstrap_peer_hint(string cmd
, cmdmap_t
& cmdmap
, ostream
& ss
)
1033 if (!cmd_getval(g_ceph_context
, cmdmap
, "addr", addrstr
)) {
1034 ss
<< "unable to parse address string value '"
1035 << cmd_vartype_stringify(cmdmap
["addr"]) << "'";
1038 dout(10) << "_add_bootstrap_peer_hint '" << cmd
<< "' '"
1039 << addrstr
<< "'" << dendl
;
1042 const char *end
= 0;
1043 if (!addr
.parse(addrstr
.c_str(), &end
)) {
1044 ss
<< "failed to parse addr '" << addrstr
<< "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1048 if (is_leader() || is_peon()) {
1049 ss
<< "mon already active; ignoring bootstrap hint";
1053 if (addr
.get_port() == 0)
1054 addr
.set_port(CEPH_MON_PORT
);
1056 extra_probe_peers
.insert(addr
);
1057 ss
<< "adding peer " << addr
<< " to list: " << extra_probe_peers
;
1061 // called by bootstrap(), or on leader|peon -> electing
1062 void Monitor::_reset()
1064 dout(10) << __func__
<< dendl
;
1066 cancel_probe_timeout();
1068 health_events_cleanup();
1069 scrub_event_cancel();
1071 leader_since
= utime_t();
1072 if (!quorum
.empty()) {
1073 exited_quorum
= ceph_clock_now();
1076 outside_quorum
.clear();
1077 quorum_feature_map
.clear();
1083 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
)
1085 health_monitor
->finish();
1089 // -----------------------------------------------------------
1092 set
<string
> Monitor::get_sync_targets_names()
1094 set
<string
> targets
;
1095 targets
.insert(paxos
->get_name());
1096 for (int i
= 0; i
< PAXOS_NUM
; ++i
)
1097 paxos_service
[i
]->get_store_prefixes(targets
);
1098 ConfigKeyService
*config_key_service_ptr
= dynamic_cast<ConfigKeyService
*>(config_key_service
);
1099 assert(config_key_service_ptr
);
1100 config_key_service_ptr
->get_store_prefixes(targets
);
1105 void Monitor::sync_timeout()
1107 dout(10) << __func__
<< dendl
;
1108 assert(state
== STATE_SYNCHRONIZING
);
1112 void Monitor::sync_obtain_latest_monmap(bufferlist
&bl
)
1114 dout(1) << __func__
<< dendl
;
1116 MonMap latest_monmap
;
1118 // Grab latest monmap from MonmapMonitor
1119 bufferlist monmon_bl
;
1120 int err
= monmon()->get_monmap(monmon_bl
);
1122 if (err
!= -ENOENT
) {
1124 << " something wrong happened while reading the store: "
1125 << cpp_strerror(err
) << dendl
;
1126 assert(0 == "error reading the store");
1129 latest_monmap
.decode(monmon_bl
);
1132 // Grab last backed up monmap (if any) and compare epochs
1133 if (store
->exists("mon_sync", "latest_monmap")) {
1134 bufferlist backup_bl
;
1135 int err
= store
->get("mon_sync", "latest_monmap", backup_bl
);
1138 << " something wrong happened while reading the store: "
1139 << cpp_strerror(err
) << dendl
;
1140 assert(0 == "error reading the store");
1142 assert(backup_bl
.length() > 0);
1144 MonMap backup_monmap
;
1145 backup_monmap
.decode(backup_bl
);
1147 if (backup_monmap
.epoch
> latest_monmap
.epoch
)
1148 latest_monmap
= backup_monmap
;
1151 // Check if our current monmap's epoch is greater than the one we've
1153 if (monmap
->epoch
> latest_monmap
.epoch
)
1154 latest_monmap
= *monmap
;
1156 dout(1) << __func__
<< " obtained monmap e" << latest_monmap
.epoch
<< dendl
;
1158 latest_monmap
.encode(bl
, CEPH_FEATURES_ALL
);
1161 void Monitor::sync_reset_requester()
1163 dout(10) << __func__
<< dendl
;
1165 if (sync_timeout_event
) {
1166 timer
.cancel_event(sync_timeout_event
);
1167 sync_timeout_event
= NULL
;
1170 sync_provider
= entity_inst_t();
1173 sync_start_version
= 0;
1176 void Monitor::sync_reset_provider()
1178 dout(10) << __func__
<< dendl
;
1179 sync_providers
.clear();
1182 void Monitor::sync_start(entity_inst_t
&other
, bool full
)
1184 dout(10) << __func__
<< " " << other
<< (full
? " full" : " recent") << dendl
;
1186 assert(state
== STATE_PROBING
||
1187 state
== STATE_SYNCHRONIZING
);
1188 state
= STATE_SYNCHRONIZING
;
1190 // make sure are not a provider for anyone!
1191 sync_reset_provider();
1196 // stash key state, and mark that we are syncing
1197 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1198 sync_stash_critical_state(t
);
1199 t
->put("mon_sync", "in_sync", 1);
1201 sync_last_committed_floor
= MAX(sync_last_committed_floor
, paxos
->get_version());
1202 dout(10) << __func__
<< " marking sync in progress, storing sync_last_committed_floor "
1203 << sync_last_committed_floor
<< dendl
;
1204 t
->put("mon_sync", "last_committed_floor", sync_last_committed_floor
);
1206 store
->apply_transaction(t
);
1208 assert(g_conf
->mon_sync_requester_kill_at
!= 1);
1210 // clear the underlying store
1211 set
<string
> targets
= get_sync_targets_names();
1212 dout(10) << __func__
<< " clearing prefixes " << targets
<< dendl
;
1213 store
->clear(targets
);
1215 // make sure paxos knows it has been reset. this prevents a
1216 // bootstrap and then different probe reply order from possibly
1217 // deciding a partial or no sync is needed.
1220 assert(g_conf
->mon_sync_requester_kill_at
!= 2);
1223 // assume 'other' as the leader. We will update the leader once we receive
1224 // a reply to the sync start.
1225 sync_provider
= other
;
1227 sync_reset_timeout();
1229 MMonSync
*m
= new MMonSync(sync_full
? MMonSync::OP_GET_COOKIE_FULL
: MMonSync::OP_GET_COOKIE_RECENT
);
1231 m
->last_committed
= paxos
->get_version();
1232 messenger
->send_message(m
, sync_provider
);
1235 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t
)
1237 dout(10) << __func__
<< dendl
;
1238 bufferlist backup_monmap
;
1239 sync_obtain_latest_monmap(backup_monmap
);
1240 assert(backup_monmap
.length() > 0);
1241 t
->put("mon_sync", "latest_monmap", backup_monmap
);
1244 void Monitor::sync_reset_timeout()
1246 dout(10) << __func__
<< dendl
;
1247 if (sync_timeout_event
)
1248 timer
.cancel_event(sync_timeout_event
);
1249 sync_timeout_event
= new C_MonContext(this, [this](int) {
1252 timer
.add_event_after(g_conf
->mon_sync_timeout
, sync_timeout_event
);
1255 void Monitor::sync_finish(version_t last_committed
)
1257 dout(10) << __func__
<< " lc " << last_committed
<< " from " << sync_provider
<< dendl
;
1259 assert(g_conf
->mon_sync_requester_kill_at
!= 7);
1262 // finalize the paxos commits
1263 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1264 paxos
->read_and_prepare_transactions(tx
, sync_start_version
,
1266 tx
->put(paxos
->get_name(), "last_committed", last_committed
);
1268 dout(30) << __func__
<< " final tx dump:\n";
1269 JSONFormatter
f(true);
1274 store
->apply_transaction(tx
);
1277 assert(g_conf
->mon_sync_requester_kill_at
!= 8);
1279 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1280 t
->erase("mon_sync", "in_sync");
1281 t
->erase("mon_sync", "force_sync");
1282 t
->erase("mon_sync", "last_committed_floor");
1283 store
->apply_transaction(t
);
1285 assert(g_conf
->mon_sync_requester_kill_at
!= 9);
1289 assert(g_conf
->mon_sync_requester_kill_at
!= 10);
1294 void Monitor::handle_sync(MonOpRequestRef op
)
1296 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1297 dout(10) << __func__
<< " " << *m
<< dendl
;
1300 // provider ---------
1302 case MMonSync::OP_GET_COOKIE_FULL
:
1303 case MMonSync::OP_GET_COOKIE_RECENT
:
1304 handle_sync_get_cookie(op
);
1306 case MMonSync::OP_GET_CHUNK
:
1307 handle_sync_get_chunk(op
);
1310 // client -----------
1312 case MMonSync::OP_COOKIE
:
1313 handle_sync_cookie(op
);
1316 case MMonSync::OP_CHUNK
:
1317 case MMonSync::OP_LAST_CHUNK
:
1318 handle_sync_chunk(op
);
1320 case MMonSync::OP_NO_COOKIE
:
1321 handle_sync_no_cookie(op
);
1325 dout(0) << __func__
<< " unknown op " << m
->op
<< dendl
;
1326 assert(0 == "unknown op");
1332 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op
)
1334 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1335 MMonSync
*reply
= new MMonSync(MMonSync::OP_NO_COOKIE
, m
->cookie
);
1336 m
->get_connection()->send_message(reply
);
1339 void Monitor::handle_sync_get_cookie(MonOpRequestRef op
)
1341 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1342 if (is_synchronizing()) {
1343 _sync_reply_no_cookie(op
);
1347 assert(g_conf
->mon_sync_provider_kill_at
!= 1);
1349 // make sure they can understand us.
1350 if ((required_features
^ m
->get_connection()->get_features()) &
1351 required_features
) {
1352 dout(5) << " ignoring peer mon." << m
->get_source().num()
1353 << " has features " << std::hex
1354 << m
->get_connection()->get_features()
1355 << " but we require " << required_features
<< std::dec
<< dendl
;
1359 // make up a unique cookie. include election epoch (which persists
1360 // across restarts for the whole cluster) and a counter for this
1361 // process instance. there is no need to be unique *across*
1362 // monitors, though.
1363 uint64_t cookie
= ((unsigned long long)elector
.get_epoch() << 24) + ++sync_provider_count
;
1364 assert(sync_providers
.count(cookie
) == 0);
1366 dout(10) << __func__
<< " cookie " << cookie
<< " for " << m
->get_source_inst() << dendl
;
1368 SyncProvider
& sp
= sync_providers
[cookie
];
1370 sp
.entity
= m
->get_source_inst();
1371 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1373 set
<string
> sync_targets
;
1374 if (m
->op
== MMonSync::OP_GET_COOKIE_FULL
) {
1376 sync_targets
= get_sync_targets_names();
1377 sp
.last_committed
= paxos
->get_version();
1378 sp
.synchronizer
= store
->get_synchronizer(sp
.last_key
, sync_targets
);
1380 dout(10) << __func__
<< " will sync prefixes " << sync_targets
<< dendl
;
1382 // just catch up paxos
1383 sp
.last_committed
= m
->last_committed
;
1385 dout(10) << __func__
<< " will sync from version " << sp
.last_committed
<< dendl
;
1387 MMonSync
*reply
= new MMonSync(MMonSync::OP_COOKIE
, sp
.cookie
);
1388 reply
->last_committed
= sp
.last_committed
;
1389 m
->get_connection()->send_message(reply
);
1392 void Monitor::handle_sync_get_chunk(MonOpRequestRef op
)
1394 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1395 dout(10) << __func__
<< " " << *m
<< dendl
;
1397 if (sync_providers
.count(m
->cookie
) == 0) {
1398 dout(10) << __func__
<< " no cookie " << m
->cookie
<< dendl
;
1399 _sync_reply_no_cookie(op
);
1403 assert(g_conf
->mon_sync_provider_kill_at
!= 2);
1405 SyncProvider
& sp
= sync_providers
[m
->cookie
];
1406 sp
.reset_timeout(g_ceph_context
, g_conf
->mon_sync_timeout
* 2);
1408 if (sp
.last_committed
< paxos
->get_first_committed() &&
1409 paxos
->get_first_committed() > 1) {
1410 dout(10) << __func__
<< " sync requester fell behind paxos, their lc " << sp
.last_committed
1411 << " < our fc " << paxos
->get_first_committed() << dendl
;
1412 sync_providers
.erase(m
->cookie
);
1413 _sync_reply_no_cookie(op
);
1417 MMonSync
*reply
= new MMonSync(MMonSync::OP_CHUNK
, sp
.cookie
);
1418 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1420 int left
= g_conf
->mon_sync_max_payload_size
;
1421 while (sp
.last_committed
< paxos
->get_version() && left
> 0) {
1423 sp
.last_committed
++;
1424 store
->get(paxos
->get_name(), sp
.last_committed
, bl
);
1425 // TODO: what if store->get returns error or empty bl?
1426 tx
->put(paxos
->get_name(), sp
.last_committed
, bl
);
1427 left
-= bl
.length();
1428 dout(20) << __func__
<< " including paxos state " << sp
.last_committed
1431 reply
->last_committed
= sp
.last_committed
;
1433 if (sp
.full
&& left
> 0) {
1434 sp
.synchronizer
->get_chunk_tx(tx
, left
);
1435 sp
.last_key
= sp
.synchronizer
->get_last_key();
1436 reply
->last_key
= sp
.last_key
;
1439 if ((sp
.full
&& sp
.synchronizer
->has_next_chunk()) ||
1440 sp
.last_committed
< paxos
->get_version()) {
1441 dout(10) << __func__
<< " chunk, through version " << sp
.last_committed
1442 << " key " << sp
.last_key
<< dendl
;
1444 dout(10) << __func__
<< " last chunk, through version " << sp
.last_committed
1445 << " key " << sp
.last_key
<< dendl
;
1446 reply
->op
= MMonSync::OP_LAST_CHUNK
;
1448 assert(g_conf
->mon_sync_provider_kill_at
!= 3);
1450 // clean up our local state
1451 sync_providers
.erase(sp
.cookie
);
1454 ::encode(*tx
, reply
->chunk_bl
);
1456 m
->get_connection()->send_message(reply
);
1461 void Monitor::handle_sync_cookie(MonOpRequestRef op
)
1463 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1464 dout(10) << __func__
<< " " << *m
<< dendl
;
1466 dout(10) << __func__
<< " already have a cookie, ignoring" << dendl
;
1469 if (m
->get_source_inst() != sync_provider
) {
1470 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1473 sync_cookie
= m
->cookie
;
1474 sync_start_version
= m
->last_committed
;
1476 sync_reset_timeout();
1477 sync_get_next_chunk();
1479 assert(g_conf
->mon_sync_requester_kill_at
!= 3);
1482 void Monitor::sync_get_next_chunk()
1484 dout(20) << __func__
<< " cookie " << sync_cookie
<< " provider " << sync_provider
<< dendl
;
1485 if (g_conf
->mon_inject_sync_get_chunk_delay
> 0) {
1486 dout(20) << __func__
<< " injecting delay of " << g_conf
->mon_inject_sync_get_chunk_delay
<< dendl
;
1487 usleep((long long)(g_conf
->mon_inject_sync_get_chunk_delay
* 1000000.0));
1489 MMonSync
*r
= new MMonSync(MMonSync::OP_GET_CHUNK
, sync_cookie
);
1490 messenger
->send_message(r
, sync_provider
);
1492 assert(g_conf
->mon_sync_requester_kill_at
!= 4);
1495 void Monitor::handle_sync_chunk(MonOpRequestRef op
)
1497 MMonSync
*m
= static_cast<MMonSync
*>(op
->get_req());
1498 dout(10) << __func__
<< " " << *m
<< dendl
;
1500 if (m
->cookie
!= sync_cookie
) {
1501 dout(10) << __func__
<< " cookie does not match, discarding" << dendl
;
1504 if (m
->get_source_inst() != sync_provider
) {
1505 dout(10) << __func__
<< " source does not match, discarding" << dendl
;
1509 assert(state
== STATE_SYNCHRONIZING
);
1510 assert(g_conf
->mon_sync_requester_kill_at
!= 5);
1512 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1513 tx
->append_from_encoded(m
->chunk_bl
);
1515 dout(30) << __func__
<< " tx dump:\n";
1516 JSONFormatter
f(true);
1521 store
->apply_transaction(tx
);
1523 assert(g_conf
->mon_sync_requester_kill_at
!= 6);
1526 dout(10) << __func__
<< " applying recent paxos transactions as we go" << dendl
;
1527 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
1528 paxos
->read_and_prepare_transactions(tx
, paxos
->get_version() + 1,
1530 tx
->put(paxos
->get_name(), "last_committed", m
->last_committed
);
1532 dout(30) << __func__
<< " tx dump:\n";
1533 JSONFormatter
f(true);
1538 store
->apply_transaction(tx
);
1539 paxos
->init(); // to refresh what we just wrote
1542 if (m
->op
== MMonSync::OP_CHUNK
) {
1543 sync_reset_timeout();
1544 sync_get_next_chunk();
1545 } else if (m
->op
== MMonSync::OP_LAST_CHUNK
) {
1546 sync_finish(m
->last_committed
);
1550 void Monitor::handle_sync_no_cookie(MonOpRequestRef op
)
1552 dout(10) << __func__
<< dendl
;
1556 void Monitor::sync_trim_providers()
1558 dout(20) << __func__
<< dendl
;
1560 utime_t now
= ceph_clock_now();
1561 map
<uint64_t,SyncProvider
>::iterator p
= sync_providers
.begin();
1562 while (p
!= sync_providers
.end()) {
1563 if (now
> p
->second
.timeout
) {
1564 dout(10) << __func__
<< " expiring cookie " << p
->second
.cookie
<< " for " << p
->second
.entity
<< dendl
;
1565 sync_providers
.erase(p
++);
1572 // ---------------------------------------------------
1575 void Monitor::cancel_probe_timeout()
1577 if (probe_timeout_event
) {
1578 dout(10) << "cancel_probe_timeout " << probe_timeout_event
<< dendl
;
1579 timer
.cancel_event(probe_timeout_event
);
1580 probe_timeout_event
= NULL
;
1582 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl
;
1586 void Monitor::reset_probe_timeout()
1588 cancel_probe_timeout();
1589 probe_timeout_event
= new C_MonContext(this, [this](int r
) {
1592 double t
= g_conf
->mon_probe_timeout
;
1593 timer
.add_event_after(t
, probe_timeout_event
);
1594 dout(10) << "reset_probe_timeout " << probe_timeout_event
<< " after " << t
<< " seconds" << dendl
;
1597 void Monitor::probe_timeout(int r
)
1599 dout(4) << "probe_timeout " << probe_timeout_event
<< dendl
;
1600 assert(is_probing() || is_synchronizing());
1601 assert(probe_timeout_event
);
1602 probe_timeout_event
= NULL
;
1606 void Monitor::handle_probe(MonOpRequestRef op
)
1608 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1609 dout(10) << "handle_probe " << *m
<< dendl
;
1611 if (m
->fsid
!= monmap
->fsid
) {
1612 dout(0) << "handle_probe ignoring fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
1617 case MMonProbe::OP_PROBE
:
1618 handle_probe_probe(op
);
1621 case MMonProbe::OP_REPLY
:
1622 handle_probe_reply(op
);
1625 case MMonProbe::OP_MISSING_FEATURES
:
1626 derr
<< __func__
<< " missing features, have " << CEPH_FEATURES_ALL
1627 << ", required " << m
->required_features
1628 << ", missing " << (m
->required_features
& ~CEPH_FEATURES_ALL
)
1634 void Monitor::handle_probe_probe(MonOpRequestRef op
)
1636 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1638 dout(10) << "handle_probe_probe " << m
->get_source_inst() << *m
1639 << " features " << m
->get_connection()->get_features() << dendl
;
1640 uint64_t missing
= required_features
& ~m
->get_connection()->get_features();
1642 dout(1) << " peer " << m
->get_source_addr() << " missing features "
1643 << missing
<< dendl
;
1644 if (m
->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY
)) {
1645 MMonProbe
*r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_MISSING_FEATURES
,
1646 name
, has_ever_joined
);
1647 m
->required_features
= required_features
;
1648 m
->get_connection()->send_message(r
);
1653 if (!is_probing() && !is_synchronizing()) {
1654 // If the probing mon is way ahead of us, we need to re-bootstrap.
1655 // Normally we capture this case when we initially bootstrap, but
1656 // it is possible we pass those checks (we overlap with
1657 // quorum-to-be) but fail to join a quorum before it moves past
1658 // us. We need to be kicked back to bootstrap so we can
1659 // synchonize, not keep calling elections.
1660 if (paxos
->get_version() + 1 < m
->paxos_first_version
) {
1661 dout(1) << " peer " << m
->get_source_addr() << " has first_committed "
1662 << "ahead of us, re-bootstrapping" << dendl
;
1670 r
= new MMonProbe(monmap
->fsid
, MMonProbe::OP_REPLY
, name
, has_ever_joined
);
1673 monmap
->encode(r
->monmap_bl
, m
->get_connection()->get_features());
1674 r
->paxos_first_version
= paxos
->get_first_committed();
1675 r
->paxos_last_version
= paxos
->get_version();
1676 m
->get_connection()->send_message(r
);
1678 // did we discover a peer here?
1679 if (!monmap
->contains(m
->get_source_addr())) {
1680 dout(1) << " adding peer " << m
->get_source_addr()
1681 << " to list of hints" << dendl
;
1682 extra_probe_peers
.insert(m
->get_source_addr());
1689 void Monitor::handle_probe_reply(MonOpRequestRef op
)
1691 MMonProbe
*m
= static_cast<MMonProbe
*>(op
->get_req());
1692 dout(10) << "handle_probe_reply " << m
->get_source_inst() << *m
<< dendl
;
1693 dout(10) << " monmap is " << *monmap
<< dendl
;
1695 // discover name and addrs during probing or electing states.
1696 if (!is_probing() && !is_electing()) {
1700 // newer map, or they've joined a quorum and we haven't?
1702 monmap
->encode(mybl
, m
->get_connection()->get_features());
1703 // make sure it's actually different; the checks below err toward
1704 // taking the other guy's map, which could cause us to loop.
1705 if (!mybl
.contents_equal(m
->monmap_bl
)) {
1706 MonMap
*newmap
= new MonMap
;
1707 newmap
->decode(m
->monmap_bl
);
1708 if (m
->has_ever_joined
&& (newmap
->get_epoch() > monmap
->get_epoch() ||
1709 !has_ever_joined
)) {
1710 dout(10) << " got newer/committed monmap epoch " << newmap
->get_epoch()
1711 << ", mine was " << monmap
->get_epoch() << dendl
;
1713 monmap
->decode(m
->monmap_bl
);
1722 string peer_name
= monmap
->get_name(m
->get_source_addr());
1723 if (monmap
->get_epoch() == 0 && peer_name
.compare(0, 7, "noname-") == 0) {
1724 dout(10) << " renaming peer " << m
->get_source_addr() << " "
1725 << peer_name
<< " -> " << m
->name
<< " in my monmap"
1727 monmap
->rename(peer_name
, m
->name
);
1729 if (is_electing()) {
1734 dout(10) << " peer name is " << peer_name
<< dendl
;
1737 // new initial peer?
1738 if (monmap
->get_epoch() == 0 &&
1739 monmap
->contains(m
->name
) &&
1740 monmap
->get_addr(m
->name
).is_blank_ip()) {
1741 dout(1) << " learned initial mon " << m
->name
<< " addr " << m
->get_source_addr() << dendl
;
1742 monmap
->set_addr(m
->name
, m
->get_source_addr());
1748 // end discover phase
1749 if (!is_probing()) {
1753 assert(paxos
!= NULL
);
1755 if (is_synchronizing()) {
1756 dout(10) << " currently syncing" << dendl
;
1760 entity_inst_t other
= m
->get_source_inst();
1762 if (m
->paxos_last_version
< sync_last_committed_floor
) {
1763 dout(10) << " peer paxos versions [" << m
->paxos_first_version
1764 << "," << m
->paxos_last_version
<< "] < my sync_last_committed_floor "
1765 << sync_last_committed_floor
<< ", ignoring"
1768 if (paxos
->get_version() < m
->paxos_first_version
&&
1769 m
->paxos_first_version
> 1) { // no need to sync if we're 0 and they start at 1.
1770 dout(10) << " peer paxos first versions [" << m
->paxos_first_version
1771 << "," << m
->paxos_last_version
<< "]"
1772 << " vs my version " << paxos
->get_version()
1773 << " (too far ahead)"
1775 cancel_probe_timeout();
1776 sync_start(other
, true);
1779 if (paxos
->get_version() + g_conf
->paxos_max_join_drift
< m
->paxos_last_version
) {
1780 dout(10) << " peer paxos last version " << m
->paxos_last_version
1781 << " vs my version " << paxos
->get_version()
1782 << " (too far ahead)"
1784 cancel_probe_timeout();
1785 sync_start(other
, false);
1790 // is there an existing quorum?
1791 if (m
->quorum
.size()) {
1792 dout(10) << " existing quorum " << m
->quorum
<< dendl
;
1794 dout(10) << " peer paxos version " << m
->paxos_last_version
1795 << " vs my version " << paxos
->get_version()
1799 if (monmap
->contains(name
) &&
1800 !monmap
->get_addr(name
).is_blank_ip()) {
1801 // i'm part of the cluster; just initiate a new election
1804 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl
;
1805 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
1806 monmap
->get_inst(*m
->quorum
.begin()));
1809 if (monmap
->contains(m
->name
)) {
1810 dout(10) << " mon." << m
->name
<< " is outside the quorum" << dendl
;
1811 outside_quorum
.insert(m
->name
);
1813 dout(10) << " mostly ignoring mon." << m
->name
<< ", not part of monmap" << dendl
;
1817 unsigned need
= monmap
->size() / 2 + 1;
1818 dout(10) << " outside_quorum now " << outside_quorum
<< ", need " << need
<< dendl
;
1819 if (outside_quorum
.size() >= need
) {
1820 if (outside_quorum
.count(name
)) {
1821 dout(10) << " that's enough to form a new quorum, calling election" << dendl
;
1824 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl
;
1827 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl
;
1832 void Monitor::join_election()
1834 dout(10) << __func__
<< dendl
;
1835 wait_for_paxos_write();
1837 state
= STATE_ELECTING
;
1839 logger
->inc(l_mon_num_elections
);
1842 void Monitor::start_election()
1844 dout(10) << "start_election" << dendl
;
1845 wait_for_paxos_write();
1847 state
= STATE_ELECTING
;
1849 logger
->inc(l_mon_num_elections
);
1850 logger
->inc(l_mon_election_call
);
1852 clog
->info() << "mon." << name
<< " calling new monitor election";
1853 elector
.call_election();
1856 void Monitor::win_standalone_election()
1858 dout(1) << "win_standalone_election" << dendl
;
1860 // bump election epoch, in case the previous epoch included other
1861 // monitors; we need to be able to make the distinction.
1863 elector
.advance_epoch();
1865 rank
= monmap
->get_rank(name
);
1870 const MonCommand
*my_cmds
;
1872 get_locally_supported_monitor_commands(&my_cmds
, &cmdsize
);
1873 win_election(elector
.get_epoch(), q
,
1875 ceph::features::mon::get_supported(),
1879 const utime_t
& Monitor::get_leader_since() const
1881 assert(state
== STATE_LEADER
);
1882 return leader_since
;
1885 epoch_t
Monitor::get_epoch()
1887 return elector
.get_epoch();
1890 void Monitor::_finish_svc_election()
1892 assert(state
== STATE_LEADER
|| state
== STATE_PEON
);
1894 for (auto p
: paxos_service
) {
1895 // we already called election_finished() on monmon(); avoid callig twice
1896 if (state
== STATE_LEADER
&& p
== monmon())
1898 p
->election_finished();
1902 void Monitor::win_election(epoch_t epoch
, set
<int>& active
, uint64_t features
,
1903 const mon_feature_t
& mon_features
,
1904 const MonCommand
*cmdset
, int cmdsize
)
1906 dout(10) << __func__
<< " epoch " << epoch
<< " quorum " << active
1907 << " features " << features
1908 << " mon_features " << mon_features
1910 assert(is_electing());
1911 state
= STATE_LEADER
;
1912 leader_since
= ceph_clock_now();
1915 quorum_con_features
= features
;
1916 quorum_mon_features
= mon_features
;
1917 outside_quorum
.clear();
1919 clog
->info() << "mon." << name
<< "@" << rank
1920 << " won leader election with quorum " << quorum
;
1922 set_leader_supported_commands(cmdset
, cmdsize
);
1924 paxos
->leader_init();
1925 // NOTE: tell monmap monitor first. This is important for the
1926 // bootstrap case to ensure that the very first paxos proposal
1927 // codifies the monmap. Otherwise any manner of chaos can ensue
1928 // when monitors are call elections or participating in a paxos
1929 // round without agreeing on who the participants are.
1930 monmon()->election_finished();
1931 _finish_svc_election();
1932 health_monitor
->start(epoch
);
1934 logger
->inc(l_mon_election_win
);
1937 if (monmap
->size() > 1 &&
1938 monmap
->get_epoch() > 0) {
1940 health_tick_start();
1941 do_health_to_clog_interval();
1942 scrub_event_start();
1946 collect_sys_info(&my_meta
, g_ceph_context
);
1947 my_meta
["addr"] = stringify(messenger
->get_myaddr());
1948 update_mon_metadata(rank
, std::move(my_meta
));
1951 void Monitor::lose_election(epoch_t epoch
, set
<int> &q
, int l
,
1953 const mon_feature_t
& mon_features
)
1956 leader_since
= utime_t();
1959 outside_quorum
.clear();
1960 quorum_con_features
= features
;
1961 quorum_mon_features
= mon_features
;
1962 dout(10) << "lose_election, epoch " << epoch
<< " leader is mon" << leader
1963 << " quorum is " << quorum
<< " features are " << quorum_con_features
1964 << " mon_features are " << quorum_mon_features
1968 _finish_svc_election();
1969 health_monitor
->start(epoch
);
1971 logger
->inc(l_mon_election_lose
);
1975 if (quorum_con_features
& CEPH_FEATURE_MON_METADATA
) {
1977 collect_sys_info(&sys_info
, g_ceph_context
);
1978 messenger
->send_message(new MMonMetadata(sys_info
),
1979 monmap
->get_inst(get_leader()));
1983 void Monitor::finish_election()
1985 apply_quorum_to_compatset_features();
1986 apply_monmap_to_compatset_features();
1988 exited_quorum
= utime_t();
1989 finish_contexts(g_ceph_context
, waitfor_quorum
);
1990 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
1991 resend_routed_requests();
1993 register_cluster_logger();
1995 // am i named properly?
1996 string cur_name
= monmap
->get_name(messenger
->get_myaddr());
1997 if (cur_name
!= name
) {
1998 dout(10) << " renaming myself from " << cur_name
<< " -> " << name
<< dendl
;
1999 messenger
->send_message(new MMonJoin(monmap
->fsid
, name
, messenger
->get_myaddr()),
2000 monmap
->get_inst(*quorum
.begin()));
2004 void Monitor::_apply_compatset_features(CompatSet
&new_features
)
2006 if (new_features
.compare(features
) != 0) {
2007 CompatSet diff
= features
.unsupported(new_features
);
2008 dout(1) << __func__
<< " enabling new quorum features: " << diff
<< dendl
;
2009 features
= new_features
;
2011 auto t
= std::make_shared
<MonitorDBStore::Transaction
>();
2013 store
->apply_transaction(t
);
2015 calc_quorum_requirements();
2019 void Monitor::apply_quorum_to_compatset_features()
2021 CompatSet
new_features(features
);
2022 if (quorum_con_features
& CEPH_FEATURE_OSD_ERASURE_CODES
) {
2023 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
);
2025 if (quorum_con_features
& CEPH_FEATURE_OSDMAP_ENC
) {
2026 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
);
2028 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
) {
2029 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
);
2031 if (quorum_con_features
& CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
) {
2032 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
);
2034 dout(5) << __func__
<< dendl
;
2035 _apply_compatset_features(new_features
);
2038 void Monitor::apply_monmap_to_compatset_features()
2040 CompatSet
new_features(features
);
2041 mon_feature_t monmap_features
= monmap
->get_required_features();
2043 /* persistent monmap features may go into the compatset.
2044 * optional monmap features may not - why?
2045 * because optional monmap features may be set/unset by the admin,
2046 * and possibly by other means that haven't yet been thought out,
2047 * so we can't make the monitor enforce them on start - because they
2049 * this, of course, does not invalidate setting a compatset feature
2050 * for an optional feature - as long as you make sure to clean it up
2051 * once you unset it.
2053 if (monmap_features
.contains_all(ceph::features::mon::FEATURE_KRAKEN
)) {
2054 assert(ceph::features::mon::get_persistent().contains_all(
2055 ceph::features::mon::FEATURE_KRAKEN
));
2056 // this feature should only ever be set if the quorum supports it.
2057 assert(HAVE_FEATURE(quorum_con_features
, SERVER_KRAKEN
));
2058 new_features
.incompat
.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
);
2061 dout(5) << __func__
<< dendl
;
2062 _apply_compatset_features(new_features
);
2065 void Monitor::calc_quorum_requirements()
2067 required_features
= 0;
2070 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES
)) {
2071 required_features
|= CEPH_FEATURE_OSD_ERASURE_CODES
;
2073 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC
)) {
2074 required_features
|= CEPH_FEATURE_OSDMAP_ENC
;
2076 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2
)) {
2077 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2
;
2079 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3
)) {
2080 required_features
|= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3
;
2082 if (features
.incompat
.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN
)) {
2083 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2087 if (monmap
->get_required_features().contains_all(
2088 ceph::features::mon::FEATURE_KRAKEN
)) {
2089 required_features
|= CEPH_FEATUREMASK_SERVER_KRAKEN
;
2091 if (monmap
->get_required_features().contains_all(
2092 ceph::features::mon::FEATURE_LUMINOUS
)) {
2093 required_features
|= CEPH_FEATUREMASK_SERVER_LUMINOUS
;
2095 dout(10) << __func__
<< " required_features " << required_features
<< dendl
;
2098 void Monitor::get_combined_feature_map(FeatureMap
*fm
)
2100 *fm
+= session_map
.feature_map
;
2101 for (auto id
: quorum
) {
2103 *fm
+= quorum_feature_map
[id
];
2108 void Monitor::sync_force(Formatter
*f
, ostream
& ss
)
2110 bool free_formatter
= false;
2113 // louzy/lazy hack: default to json if no formatter has been defined
2114 f
= new JSONFormatter();
2115 free_formatter
= true;
2118 auto tx(std::make_shared
<MonitorDBStore::Transaction
>());
2119 sync_stash_critical_state(tx
);
2120 tx
->put("mon_sync", "force_sync", 1);
2121 store
->apply_transaction(tx
);
2123 f
->open_object_section("sync_force");
2124 f
->dump_int("ret", 0);
2125 f
->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2126 f
->close_section(); // sync_force
2132 void Monitor::_quorum_status(Formatter
*f
, ostream
& ss
)
2134 bool free_formatter
= false;
2137 // louzy/lazy hack: default to json if no formatter has been defined
2138 f
= new JSONFormatter();
2139 free_formatter
= true;
2141 f
->open_object_section("quorum_status");
2142 f
->dump_int("election_epoch", get_epoch());
2144 f
->open_array_section("quorum");
2145 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2146 f
->dump_int("mon", *p
);
2147 f
->close_section(); // quorum
2149 list
<string
> quorum_names
= get_quorum_names();
2150 f
->open_array_section("quorum_names");
2151 for (list
<string
>::iterator p
= quorum_names
.begin(); p
!= quorum_names
.end(); ++p
)
2152 f
->dump_string("mon", *p
);
2153 f
->close_section(); // quorum_names
2155 f
->dump_string("quorum_leader_name", quorum
.empty() ? string() : monmap
->get_name(*quorum
.begin()));
2157 f
->open_object_section("monmap");
2159 f
->close_section(); // monmap
2161 f
->close_section(); // quorum_status
2167 void Monitor::get_mon_status(Formatter
*f
, ostream
& ss
)
2169 bool free_formatter
= false;
2172 // louzy/lazy hack: default to json if no formatter has been defined
2173 f
= new JSONFormatter();
2174 free_formatter
= true;
2177 f
->open_object_section("mon_status");
2178 f
->dump_string("name", name
);
2179 f
->dump_int("rank", rank
);
2180 f
->dump_string("state", get_state_name());
2181 f
->dump_int("election_epoch", get_epoch());
2183 f
->open_array_section("quorum");
2184 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
) {
2185 f
->dump_int("mon", *p
);
2188 f
->close_section(); // quorum
2190 f
->open_object_section("features");
2191 f
->dump_stream("required_con") << required_features
;
2192 mon_feature_t req_mon_features
= get_required_mon_features();
2193 req_mon_features
.dump(f
, "required_mon");
2194 f
->dump_stream("quorum_con") << quorum_con_features
;
2195 quorum_mon_features
.dump(f
, "quorum_mon");
2196 f
->close_section(); // features
2198 f
->open_array_section("outside_quorum");
2199 for (set
<string
>::iterator p
= outside_quorum
.begin(); p
!= outside_quorum
.end(); ++p
)
2200 f
->dump_string("mon", *p
);
2201 f
->close_section(); // outside_quorum
2203 f
->open_array_section("extra_probe_peers");
2204 for (set
<entity_addr_t
>::iterator p
= extra_probe_peers
.begin();
2205 p
!= extra_probe_peers
.end();
2207 f
->dump_stream("peer") << *p
;
2208 f
->close_section(); // extra_probe_peers
2210 f
->open_array_section("sync_provider");
2211 for (map
<uint64_t,SyncProvider
>::const_iterator p
= sync_providers
.begin();
2212 p
!= sync_providers
.end();
2214 f
->dump_unsigned("cookie", p
->second
.cookie
);
2215 f
->dump_stream("entity") << p
->second
.entity
;
2216 f
->dump_stream("timeout") << p
->second
.timeout
;
2217 f
->dump_unsigned("last_committed", p
->second
.last_committed
);
2218 f
->dump_stream("last_key") << p
->second
.last_key
;
2222 if (is_synchronizing()) {
2223 f
->open_object_section("sync");
2224 f
->dump_stream("sync_provider") << sync_provider
;
2225 f
->dump_unsigned("sync_cookie", sync_cookie
);
2226 f
->dump_unsigned("sync_start_version", sync_start_version
);
2230 if (g_conf
->mon_sync_provider_kill_at
> 0)
2231 f
->dump_int("provider_kill_at", g_conf
->mon_sync_provider_kill_at
);
2232 if (g_conf
->mon_sync_requester_kill_at
> 0)
2233 f
->dump_int("requester_kill_at", g_conf
->mon_sync_requester_kill_at
);
2235 f
->open_object_section("monmap");
2239 f
->dump_object("feature_map", session_map
.feature_map
);
2240 f
->close_section(); // mon_status
2242 if (free_formatter
) {
2243 // flush formatter to ss and delete it iff we created the formatter
2250 // health status to clog
2252 void Monitor::health_tick_start()
2254 if (!cct
->_conf
->mon_health_to_clog
||
2255 cct
->_conf
->mon_health_to_clog_tick_interval
<= 0)
2258 dout(15) << __func__
<< dendl
;
2261 health_tick_event
= new C_MonContext(this, [this](int r
) {
2264 do_health_to_clog();
2265 health_tick_start();
2267 timer
.add_event_after(cct
->_conf
->mon_health_to_clog_tick_interval
,
2271 void Monitor::health_tick_stop()
2273 dout(15) << __func__
<< dendl
;
2275 if (health_tick_event
) {
2276 timer
.cancel_event(health_tick_event
);
2277 health_tick_event
= NULL
;
2281 utime_t
Monitor::health_interval_calc_next_update()
2283 utime_t now
= ceph_clock_now();
2285 time_t secs
= now
.sec();
2286 int remainder
= secs
% cct
->_conf
->mon_health_to_clog_interval
;
2287 int adjustment
= cct
->_conf
->mon_health_to_clog_interval
- remainder
;
2288 utime_t next
= utime_t(secs
+ adjustment
, 0);
2290 dout(20) << __func__
2291 << " now: " << now
<< ","
2292 << " next: " << next
<< ","
2293 << " interval: " << cct
->_conf
->mon_health_to_clog_interval
2299 void Monitor::health_interval_start()
2301 dout(15) << __func__
<< dendl
;
2303 if (!cct
->_conf
->mon_health_to_clog
||
2304 cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2308 health_interval_stop();
2309 utime_t next
= health_interval_calc_next_update();
2310 health_interval_event
= new C_MonContext(this, [this](int r
) {
2313 do_health_to_clog_interval();
2315 timer
.add_event_at(next
, health_interval_event
);
2318 void Monitor::health_interval_stop()
2320 dout(15) << __func__
<< dendl
;
2321 if (health_interval_event
) {
2322 timer
.cancel_event(health_interval_event
);
2324 health_interval_event
= NULL
;
2327 void Monitor::health_events_cleanup()
2330 health_interval_stop();
2331 health_status_cache
.reset();
2334 void Monitor::health_to_clog_update_conf(const std::set
<std::string
> &changed
)
2336 dout(20) << __func__
<< dendl
;
2338 if (changed
.count("mon_health_to_clog")) {
2339 if (!cct
->_conf
->mon_health_to_clog
) {
2340 health_events_cleanup();
2342 if (!health_tick_event
) {
2343 health_tick_start();
2345 if (!health_interval_event
) {
2346 health_interval_start();
2351 if (changed
.count("mon_health_to_clog_interval")) {
2352 if (cct
->_conf
->mon_health_to_clog_interval
<= 0) {
2353 health_interval_stop();
2355 health_interval_start();
2359 if (changed
.count("mon_health_to_clog_tick_interval")) {
2360 if (cct
->_conf
->mon_health_to_clog_tick_interval
<= 0) {
2363 health_tick_start();
2368 void Monitor::do_health_to_clog_interval()
2370 // outputting to clog may have been disabled in the conf
2371 // since we were scheduled.
2372 if (!cct
->_conf
->mon_health_to_clog
||
2373 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2376 dout(10) << __func__
<< dendl
;
2378 // do we have a cached value for next_clog_update? if not,
2379 // do we know when the last update was?
2381 do_health_to_clog(true);
2382 health_interval_start();
2385 void Monitor::do_health_to_clog(bool force
)
2387 // outputting to clog may have been disabled in the conf
2388 // since we were scheduled.
2389 if (!cct
->_conf
->mon_health_to_clog
||
2390 cct
->_conf
->mon_health_to_clog_interval
<= 0)
2393 dout(10) << __func__
<< (force
? " (force)" : "") << dendl
;
2395 list
<string
> status
;
2396 health_status_t overall
= get_health(status
, NULL
, NULL
);
2398 dout(25) << __func__
2399 << (force
? " (force)" : "")
2402 string summary
= joinify(status
.begin(), status
.end(), string("; "));
2405 overall
== health_status_cache
.overall
&&
2406 !health_status_cache
.summary
.empty() &&
2407 health_status_cache
.summary
== summary
) {
2412 clog
->info() << summary
;
2414 health_status_cache
.overall
= overall
;
2415 health_status_cache
.summary
= summary
;
2418 health_status_t
Monitor::get_health(list
<string
>& status
,
2419 bufferlist
*detailbl
,
2422 list
<pair
<health_status_t
,string
> > summary
;
2423 list
<pair
<health_status_t
,string
> > detail
;
2426 f
->open_object_section("health");
2428 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin();
2429 p
!= paxos_service
.end();
2431 PaxosService
*s
= *p
;
2432 s
->get_health(summary
, detailbl
? &detail
: NULL
, cct
);
2435 health_monitor
->get_health(f
, summary
, (detailbl
? &detail
: NULL
));
2438 f
->open_object_section("timechecks");
2439 f
->dump_unsigned("epoch", get_epoch());
2440 f
->dump_int("round", timecheck_round
);
2441 f
->dump_stream("round_status")
2442 << ((timecheck_round
%2) ? "on-going" : "finished");
2445 health_status_t overall
= HEALTH_OK
;
2446 if (!timecheck_skews
.empty()) {
2449 f
->open_array_section("mons");
2450 for (map
<entity_inst_t
,double>::iterator i
= timecheck_skews
.begin();
2451 i
!= timecheck_skews
.end(); ++i
) {
2452 entity_inst_t inst
= i
->first
;
2453 double skew
= i
->second
;
2454 double latency
= timecheck_latencies
[inst
];
2455 string name
= monmap
->get_name(inst
.addr
);
2458 health_status_t tcstatus
= timecheck_status(tcss
, skew
, latency
);
2459 if (tcstatus
!= HEALTH_OK
) {
2460 if (overall
> tcstatus
)
2462 warns
.push_back(name
);
2464 ostringstream tmp_ss
;
2465 tmp_ss
<< "mon." << name
2466 << " addr " << inst
.addr
<< " " << tcss
.str()
2467 << " (latency " << latency
<< "s)";
2468 detail
.push_back(make_pair(tcstatus
, tmp_ss
.str()));
2472 f
->open_object_section("mon");
2473 f
->dump_string("name", name
.c_str());
2474 f
->dump_float("skew", skew
);
2475 f
->dump_float("latency", latency
);
2476 f
->dump_stream("health") << tcstatus
;
2477 if (tcstatus
!= HEALTH_OK
)
2478 f
->dump_stream("details") << tcss
.str();
2482 if (!warns
.empty()) {
2484 ss
<< "clock skew detected on";
2485 while (!warns
.empty()) {
2486 ss
<< " mon." << warns
.front();
2491 status
.push_back(ss
.str());
2492 summary
.push_back(make_pair(HEALTH_WARN
, "Monitor clock skew detected "));
2501 f
->open_array_section("summary");
2502 if (!summary
.empty()) {
2503 while (!summary
.empty()) {
2504 if (overall
> summary
.front().first
)
2505 overall
= summary
.front().first
;
2506 status
.push_back(summary
.front().second
);
2508 f
->open_object_section("item");
2509 f
->dump_stream("severity") << summary
.front().first
;
2510 f
->dump_string("summary", summary
.front().second
);
2513 summary
.pop_front();
2521 status
.push_front(fss
.str());
2523 f
->dump_stream("overall_status") << overall
;
2526 f
->open_array_section("detail");
2527 while (!detail
.empty()) {
2529 f
->dump_string("item", detail
.front().second
);
2530 else if (detailbl
!= NULL
) {
2531 detailbl
->append(detail
.front().second
);
2532 detailbl
->append('\n');
2545 void Monitor::get_cluster_status(stringstream
&ss
, Formatter
*f
)
2548 f
->open_object_section("status");
2550 // reply with the status for all the components
2551 list
<string
> health
;
2552 get_health(health
, NULL
, f
);
2555 f
->dump_stream("fsid") << monmap
->get_fsid();
2556 f
->dump_unsigned("election_epoch", get_epoch());
2558 f
->open_array_section("quorum");
2559 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2560 f
->dump_int("rank", *p
);
2562 f
->open_array_section("quorum_names");
2563 for (set
<int>::iterator p
= quorum
.begin(); p
!= quorum
.end(); ++p
)
2564 f
->dump_string("id", monmap
->get_name(*p
));
2567 f
->open_object_section("monmap");
2570 f
->open_object_section("osdmap");
2571 osdmon()->osdmap
.print_summary(f
, cout
);
2573 f
->open_object_section("pgmap");
2574 pgservice
->print_summary(f
, NULL
);
2576 f
->open_object_section("fsmap");
2577 mdsmon()->get_fsmap().print_summary(f
, NULL
);
2580 f
->open_object_section("mgrmap");
2581 mgrmon()->get_map().print_summary(f
, nullptr);
2586 ss
<< " cluster:\n";
2587 ss
<< " id: " << monmap
->get_fsid() << "\n";
2588 ss
<< " health: " << joinify(health
.begin(), health
.end(),
2589 string("\n ")) << "\n";
2590 ss
<< "\n \n services:\n";
2591 const auto quorum_names
= get_quorum_names();
2592 const auto mon_count
= monmap
->mon_info
.size();
2593 ss
<< " mon: " << mon_count
<< " daemons, quorum "
2595 if (quorum_names
.size() != mon_count
) {
2596 std::list
<std::string
> out_of_q
;
2597 for (size_t i
= 0; i
< monmap
->ranks
.size(); ++i
) {
2598 if (quorum
.count(i
) == 0) {
2599 out_of_q
.push_back(monmap
->ranks
[i
]);
2602 ss
<< ", out of quorum: " << joinify(out_of_q
.begin(),
2603 out_of_q
.end(), std::string(", "));
2606 if (mgrmon()->in_use()) {
2608 mgrmon()->get_map().print_summary(nullptr, &ss
);
2611 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2612 ss
<< " mds: " << mdsmon()->get_fsmap() << "\n";
2615 osdmon()->osdmap
.print_summary(NULL
, ss
);
2617 ss
<< "\n \n data:\n";
2618 pgservice
->print_summary(NULL
, &ss
);
2623 void Monitor::_generate_command_map(map
<string
,cmd_vartype
>& cmdmap
,
2624 map
<string
,string
> ¶m_str_map
)
2626 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
2627 p
!= cmdmap
.end(); ++p
) {
2628 if (p
->first
== "prefix")
2630 if (p
->first
== "caps") {
2632 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
2633 cv
.size() % 2 == 0) {
2634 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
2635 string k
= string("caps_") + cv
[i
];
2636 param_str_map
[k
] = cv
[i
+ 1];
2641 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
2645 const MonCommand
*Monitor::_get_moncommand(const string
&cmd_prefix
,
2646 MonCommand
*cmds
, int cmds_size
)
2648 MonCommand
*this_cmd
= NULL
;
2649 for (MonCommand
*cp
= cmds
;
2650 cp
< &cmds
[cmds_size
]; cp
++) {
2651 if (cp
->cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
2659 bool Monitor::_allowed_command(MonSession
*s
, string
&module
, string
&prefix
,
2660 const map
<string
,cmd_vartype
>& cmdmap
,
2661 const map
<string
,string
>& param_str_map
,
2662 const MonCommand
*this_cmd
) {
2664 bool cmd_r
= this_cmd
->requires_perm('r');
2665 bool cmd_w
= this_cmd
->requires_perm('w');
2666 bool cmd_x
= this_cmd
->requires_perm('x');
2668 bool capable
= s
->caps
.is_capable(
2670 CEPH_ENTITY_TYPE_MON
,
2672 module
, prefix
, param_str_map
,
2673 cmd_r
, cmd_w
, cmd_x
);
2675 dout(10) << __func__
<< " " << (capable
? "" : "not ") << "capable" << dendl
;
2679 void Monitor::format_command_descriptions(const MonCommand
*commands
,
2680 unsigned commands_size
,
2686 f
->open_object_section("command_descriptions");
2687 for (const MonCommand
*cp
= commands
;
2688 cp
< &commands
[commands_size
]; cp
++) {
2690 unsigned flags
= cp
->flags
;
2691 if (hide_mgr_flag
) {
2692 flags
&= ~MonCommand::FLAG_MGR
;
2694 ostringstream secname
;
2695 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
2696 dump_cmddesc_to_json(f
, secname
.str(),
2697 cp
->cmdstring
, cp
->helpstring
, cp
->module
,
2698 cp
->req_perms
, cp
->availability
, flags
);
2701 f
->close_section(); // command_descriptions
2706 void Monitor::get_locally_supported_monitor_commands(const MonCommand
**cmds
,
2709 *cmds
= mon_commands
;
2710 *count
= ARRAY_SIZE(mon_commands
);
2712 void Monitor::set_leader_supported_commands(const MonCommand
*cmds
, int size
)
2714 if (leader_supported_mon_commands
!= mon_commands
)
2715 delete[] leader_supported_mon_commands
;
2716 leader_supported_mon_commands
= cmds
;
2717 leader_supported_mon_commands_size
= size
;
2720 bool Monitor::is_keyring_required()
2722 string auth_cluster_required
= g_conf
->auth_supported
.empty() ?
2723 g_conf
->auth_cluster_required
: g_conf
->auth_supported
;
2724 string auth_service_required
= g_conf
->auth_supported
.empty() ?
2725 g_conf
->auth_service_required
: g_conf
->auth_supported
;
2727 return auth_service_required
== "cephx" ||
2728 auth_cluster_required
== "cephx";
2731 struct C_MgrProxyCommand
: public Context
{
2737 C_MgrProxyCommand(Monitor
*mon
, MonOpRequestRef op
, uint64_t s
)
2738 : mon(mon
), op(op
), size(s
) { }
2739 void finish(int r
) {
2740 Mutex::Locker
l(mon
->lock
);
2741 mon
->mgr_proxy_bytes
-= size
;
2742 mon
->reply_command(op
, r
, outs
, outbl
, 0);
2746 void Monitor::handle_command(MonOpRequestRef op
)
2748 assert(op
->is_type_command());
2749 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
2750 if (m
->fsid
!= monmap
->fsid
) {
2751 dout(0) << "handle_command on fsid " << m
->fsid
<< " != " << monmap
->fsid
<< dendl
;
2752 reply_command(op
, -EPERM
, "wrong fsid", 0);
2756 MonSession
*session
= static_cast<MonSession
*>(
2757 m
->get_connection()->get_priv());
2759 dout(5) << __func__
<< " dropping stray message " << *m
<< dendl
;
2762 BOOST_SCOPE_EXIT_ALL(=) {
2766 if (m
->cmd
.empty()) {
2767 string rs
= "No command supplied";
2768 reply_command(op
, -EINVAL
, rs
, 0);
2773 vector
<string
> fullcmd
;
2774 map
<string
, cmd_vartype
> cmdmap
;
2775 stringstream ss
, ds
;
2779 rs
= "unrecognized command";
2781 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
2782 // ss has reason for failure
2785 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
2786 reply_command(op
, r
, rs
, 0);
2790 // check return value. If no prefix parameter provided,
2791 // return value will be false, then return error info.
2792 if (!cmd_getval(g_ceph_context
, cmdmap
, "prefix", prefix
)) {
2793 reply_command(op
, -EINVAL
, "command prefix not found", 0);
2797 // check prefix is empty
2798 if (prefix
.empty()) {
2799 reply_command(op
, -EINVAL
, "command prefix must not be empty", 0);
2803 if (prefix
== "get_command_descriptions") {
2805 Formatter
*f
= Formatter::create("json");
2806 // hide mgr commands until luminous upgrade is complete
2807 bool hide_mgr_flag
=
2808 osdmon()->osdmap
.require_osd_release
< CEPH_RELEASE_LUMINOUS
;
2809 format_command_descriptions(leader_supported_mon_commands
,
2810 leader_supported_mon_commands_size
, f
, &rdata
,
2813 reply_command(op
, 0, "", rdata
, 0);
2820 dout(0) << "handle_command " << *m
<< dendl
;
2823 cmd_getval(g_ceph_context
, cmdmap
, "format", format
, string("plain"));
2824 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
2826 get_str_vec(prefix
, fullcmd
);
2828 // make sure fullcmd is not empty.
2829 // invalid prefix will cause empty vector fullcmd.
2830 // such as, prefix=";,,;"
2831 if (fullcmd
.empty()) {
2832 reply_command(op
, -EINVAL
, "command requires a prefix to be valid", 0);
2836 module
= fullcmd
[0];
2838 // validate command is in leader map
2840 const MonCommand
*leader_cmd
;
2841 leader_cmd
= _get_moncommand(prefix
,
2842 // the boost underlying this isn't const for some reason
2843 const_cast<MonCommand
*>(leader_supported_mon_commands
),
2844 leader_supported_mon_commands_size
);
2846 reply_command(op
, -EINVAL
, "command not known", 0);
2849 // validate command is in our map & matches, or forward if it is allowed
2850 const MonCommand
*mon_cmd
= _get_moncommand(prefix
, mon_commands
,
2851 ARRAY_SIZE(mon_commands
));
2854 if (leader_cmd
->is_noforward()) {
2855 reply_command(op
, -EINVAL
,
2856 "command not locally supported and not allowed to forward",
2860 dout(10) << "Command not locally supported, forwarding request "
2862 forward_request_leader(op
);
2864 } else if (!mon_cmd
->is_compat(leader_cmd
)) {
2865 if (mon_cmd
->is_noforward()) {
2866 reply_command(op
, -EINVAL
,
2867 "command not compatible with leader and not allowed to forward",
2871 dout(10) << "Command not compatible with leader, forwarding request "
2873 forward_request_leader(op
);
2878 if (mon_cmd
->is_obsolete() ||
2879 (cct
->_conf
->mon_debug_deprecated_as_obsolete
2880 && mon_cmd
->is_deprecated())) {
2881 reply_command(op
, -ENOTSUP
,
2882 "command is obsolete; please check usage and/or man page",
2887 if (session
->proxy_con
&& mon_cmd
->is_noforward()) {
2888 dout(10) << "Got forward for noforward command " << m
<< dendl
;
2889 reply_command(op
, -EINVAL
, "forward for noforward command", rdata
, 0);
2893 /* what we perceive as being the service the command falls under */
2894 string
service(mon_cmd
->module
);
2896 dout(25) << __func__
<< " prefix='" << prefix
2897 << "' module='" << module
2898 << "' service='" << service
<< "'" << dendl
;
2901 (mon_cmd
->requires_perm('w') || mon_cmd
->requires_perm('x'));
2903 // validate user's permissions for requested command
2904 map
<string
,string
> param_str_map
;
2905 _generate_command_map(cmdmap
, param_str_map
);
2906 if (!_allowed_command(session
, service
, prefix
, cmdmap
,
2907 param_str_map
, mon_cmd
)) {
2908 dout(1) << __func__
<< " access denied" << dendl
;
2909 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
2910 << "from='" << session
->inst
<< "' "
2911 << "entity='" << session
->entity_name
<< "' "
2912 << "cmd=" << m
->cmd
<< ": access denied";
2913 reply_command(op
, -EACCES
, "access denied", 0);
2917 (cmd_is_rw
? audit_clog
->info() : audit_clog
->debug())
2918 << "from='" << session
->inst
<< "' "
2919 << "entity='" << session
->entity_name
<< "' "
2920 << "cmd=" << m
->cmd
<< ": dispatch";
2922 if (mon_cmd
->is_mgr() &&
2923 osdmon()->osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2924 const auto& hdr
= m
->get_header();
2925 uint64_t size
= hdr
.front_len
+ hdr
.middle_len
+ hdr
.data_len
;
2927 g_conf
->mon_client_bytes
* g_conf
->mon_mgr_proxy_client_bytes_ratio
;
2928 if (mgr_proxy_bytes
+ size
> max
) {
2929 dout(10) << __func__
<< " current mgr proxy bytes " << mgr_proxy_bytes
2930 << " + " << size
<< " > max " << max
<< dendl
;
2931 reply_command(op
, -EAGAIN
, "hit limit on proxied mgr commands", rdata
, 0);
2934 mgr_proxy_bytes
+= size
;
2935 dout(10) << __func__
<< " proxying mgr command (+" << size
2936 << " -> " << mgr_proxy_bytes
<< ")" << dendl
;
2937 C_MgrProxyCommand
*fin
= new C_MgrProxyCommand(this, op
, size
);
2938 mgr_client
.start_command(m
->cmd
,
2942 new C_OnFinisher(fin
, &finisher
));
2946 if (module
== "mds" || module
== "fs") {
2947 mdsmon()->dispatch(op
);
2950 if ((module
== "osd" || prefix
== "pg map") &&
2951 prefix
!= "osd last-stat-seq") {
2952 osdmon()->dispatch(op
);
2956 if (module
== "pg") {
2957 pgmon()->dispatch(op
);
2960 if (module
== "mon" &&
2961 /* Let the Monitor class handle the following commands:
2966 prefix
!= "mon compact" &&
2967 prefix
!= "mon scrub" &&
2968 prefix
!= "mon sync force" &&
2969 prefix
!= "mon metadata" &&
2970 prefix
!= "mon versions" &&
2971 prefix
!= "mon count-metadata") {
2972 monmon()->dispatch(op
);
2975 if (module
== "auth") {
2976 authmon()->dispatch(op
);
2979 if (module
== "log") {
2980 logmon()->dispatch(op
);
2984 if (module
== "config-key") {
2985 config_key_service
->dispatch(op
);
2989 if (module
== "mgr") {
2990 mgrmon()->dispatch(op
);
2994 if (prefix
== "fsid") {
2996 f
->open_object_section("fsid");
2997 f
->dump_stream("fsid") << monmap
->fsid
;
3004 reply_command(op
, 0, "", rdata
, 0);
3008 if (prefix
== "scrub" || prefix
== "mon scrub") {
3009 wait_for_paxos_write();
3011 int r
= scrub_start();
3012 reply_command(op
, r
, "", rdata
, 0);
3013 } else if (is_peon()) {
3014 forward_request_leader(op
);
3016 reply_command(op
, -EAGAIN
, "no quorum", rdata
, 0);
3021 if (prefix
== "compact" || prefix
== "mon compact") {
3022 dout(1) << "triggering manual compaction" << dendl
;
3023 utime_t start
= ceph_clock_now();
3025 utime_t end
= ceph_clock_now();
3027 dout(1) << "finished manual compaction in " << end
<< " seconds" << dendl
;
3029 oss
<< "compacted leveldb in " << end
;
3033 else if (prefix
== "injectargs") {
3034 vector
<string
> injected_args
;
3035 cmd_getval(g_ceph_context
, cmdmap
, "injected_args", injected_args
);
3036 if (!injected_args
.empty()) {
3037 dout(0) << "parsing injected options '" << injected_args
<< "'" << dendl
;
3039 r
= g_conf
->injectargs(str_join(injected_args
, " "), &oss
);
3040 ss
<< "injectargs:" << oss
.str();
3044 rs
= "must supply options to be parsed in a single string";
3047 } else if (prefix
== "status" ||
3048 prefix
== "health" ||
3051 cmd_getval(g_ceph_context
, cmdmap
, "detail", detail
);
3053 if (prefix
== "status") {
3054 // get_cluster_status handles f == NULL
3055 get_cluster_status(ds
, f
.get());
3062 } else if (prefix
== "health") {
3063 list
<string
> health_str
;
3064 get_health(health_str
, detail
== "detail" ? &rdata
: NULL
, f
.get());
3069 assert(!health_str
.empty());
3070 ds
<< health_str
.front();
3071 health_str
.pop_front();
3072 if (!health_str
.empty()) {
3074 ds
<< joinify(health_str
.begin(), health_str
.end(), string("; "));
3079 if (detail
== "detail")
3082 } else if (prefix
== "df") {
3083 bool verbose
= (detail
== "detail");
3085 f
->open_object_section("stats");
3087 pgservice
->dump_fs_stats(&ds
, f
.get(), verbose
);
3090 pgservice
->dump_pool_stats(osdmon()->osdmap
, &ds
, f
.get(), verbose
);
3098 assert(0 == "We should never get here!");
3104 } else if (prefix
== "report") {
3106 // this must be formatted, in its current form
3108 f
.reset(Formatter::create("json-pretty"));
3109 f
->open_object_section("report");
3110 f
->dump_stream("cluster_fingerprint") << fingerprint
;
3111 f
->dump_string("version", ceph_version_to_str());
3112 f
->dump_string("commit", git_version_to_str());
3113 f
->dump_stream("timestamp") << ceph_clock_now();
3115 vector
<string
> tagsvec
;
3116 cmd_getval(g_ceph_context
, cmdmap
, "tags", tagsvec
);
3117 string tagstr
= str_join(tagsvec
, " ");
3118 if (!tagstr
.empty())
3119 tagstr
= tagstr
.substr(0, tagstr
.find_last_of(' '));
3120 f
->dump_string("tag", tagstr
);
3123 get_health(hs
, NULL
, f
.get());
3125 monmon()->dump_info(f
.get());
3126 osdmon()->dump_info(f
.get());
3127 mdsmon()->dump_info(f
.get());
3128 authmon()->dump_info(f
.get());
3129 pgservice
->dump_info(f
.get());
3131 paxos
->dump_info(f
.get());
3137 ss2
<< "report " << rdata
.crc32c(CEPH_MON_PORT
);
3140 } else if (prefix
== "osd last-stat-seq") {
3142 cmd_getval(g_ceph_context
, cmdmap
, "id", osd
);
3143 uint64_t seq
= mgrstatmon()->get_last_osd_stat_seq(osd
);
3145 f
->dump_unsigned("seq", seq
);
3153 } else if (prefix
== "node ls") {
3154 string
node_type("all");
3155 cmd_getval(g_ceph_context
, cmdmap
, "type", node_type
);
3157 f
.reset(Formatter::create("json-pretty"));
3158 if (node_type
== "all") {
3159 f
->open_object_section("nodes");
3160 print_nodes(f
.get(), ds
);
3161 osdmon()->print_nodes(f
.get());
3162 mdsmon()->print_nodes(f
.get());
3164 } else if (node_type
== "mon") {
3165 print_nodes(f
.get(), ds
);
3166 } else if (node_type
== "osd") {
3167 osdmon()->print_nodes(f
.get());
3168 } else if (node_type
== "mds") {
3169 mdsmon()->print_nodes(f
.get());
3175 } else if (prefix
== "features") {
3176 if (!is_leader() && !is_peon()) {
3177 dout(10) << " waiting for quorum" << dendl
;
3178 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3182 forward_request_leader(op
);
3186 f
.reset(Formatter::create("json-pretty"));
3188 get_combined_feature_map(&fm
);
3189 f
->dump_object("features", fm
);
3193 } else if (prefix
== "mon metadata") {
3195 f
.reset(Formatter::create("json-pretty"));
3198 bool all
= !cmd_getval(g_ceph_context
, cmdmap
, "id", name
);
3200 // Dump a single mon's metadata
3201 int mon
= monmap
->get_rank(name
);
3203 rs
= "requested mon not found";
3207 f
->open_object_section("mon_metadata");
3208 r
= get_mon_metadata(mon
, f
.get(), ds
);
3211 // Dump all mons' metadata
3213 f
->open_array_section("mon_metadata");
3214 for (unsigned int rank
= 0; rank
< monmap
->size(); ++rank
) {
3215 std::ostringstream get_err
;
3216 f
->open_object_section("mon");
3217 f
->dump_string("name", monmap
->get_name(rank
));
3218 r
= get_mon_metadata(rank
, f
.get(), get_err
);
3220 if (r
== -ENOENT
|| r
== -EINVAL
) {
3221 dout(1) << get_err
.str() << dendl
;
3222 // Drop error, list what metadata we do have
3224 } else if (r
!= 0) {
3225 derr
<< "Unexpected error from get_mon_metadata: "
3226 << cpp_strerror(r
) << dendl
;
3227 ds
<< get_err
.str();
3237 } else if (prefix
== "mon versions") {
3239 f
.reset(Formatter::create("json-pretty"));
3240 count_metadata("ceph_version", f
.get());
3245 } else if (prefix
== "mon count-metadata") {
3247 f
.reset(Formatter::create("json-pretty"));
3249 cmd_getval(g_ceph_context
, cmdmap
, "property", field
);
3250 count_metadata(field
, f
.get());
3255 } else if (prefix
== "quorum_status") {
3256 // make sure our map is readable and up to date
3257 if (!is_leader() && !is_peon()) {
3258 dout(10) << " waiting for quorum" << dendl
;
3259 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
3262 _quorum_status(f
.get(), ds
);
3266 } else if (prefix
== "mon_status") {
3267 get_mon_status(f
.get(), ds
);
3273 } else if (prefix
== "sync force" ||
3274 prefix
== "mon sync force") {
3275 string validate1
, validate2
;
3276 cmd_getval(g_ceph_context
, cmdmap
, "validate1", validate1
);
3277 cmd_getval(g_ceph_context
, cmdmap
, "validate2", validate2
);
3278 if (validate1
!= "--yes-i-really-mean-it" ||
3279 validate2
!= "--i-know-what-i-am-doing") {
3281 rs
= "are you SURE? this will mean the monitor store will be "
3282 "erased. pass '--yes-i-really-mean-it "
3283 "--i-know-what-i-am-doing' if you really do.";
3286 sync_force(f
.get(), ds
);
3289 } else if (prefix
== "heap") {
3290 if (!ceph_using_tcmalloc())
3291 rs
= "tcmalloc not enabled, can't use heap profiler commands\n";
3294 cmd_getval(g_ceph_context
, cmdmap
, "heapcmd", heapcmd
);
3295 // XXX 1-element vector, change at callee or make vector here?
3296 vector
<string
> heapcmd_vec
;
3297 get_str_vec(heapcmd
, heapcmd_vec
);
3298 ceph_heap_profiler_handle_command(heapcmd_vec
, ds
);
3303 } else if (prefix
== "quorum") {
3305 cmd_getval(g_ceph_context
, cmdmap
, "quorumcmd", quorumcmd
);
3306 if (quorumcmd
== "exit") {
3308 elector
.stop_participating();
3309 rs
= "stopped responding to quorum, initiated new election";
3311 } else if (quorumcmd
== "enter") {
3312 elector
.start_participating();
3314 rs
= "started responding to quorum, initiated new election";
3317 rs
= "needs a valid 'quorum' command";
3320 } else if (prefix
== "version") {
3322 f
->open_object_section("version");
3323 f
->dump_string("version", pretty_version_to_str());
3327 ds
<< pretty_version_to_str();
3335 if (!m
->get_source().is_mon()) // don't reply to mon->mon commands
3336 reply_command(op
, r
, rs
, rdata
, 0);
3339 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
, version_t version
)
3342 reply_command(op
, rc
, rs
, rdata
, version
);
3345 void Monitor::reply_command(MonOpRequestRef op
, int rc
, const string
&rs
,
3346 bufferlist
& rdata
, version_t version
)
3348 MMonCommand
*m
= static_cast<MMonCommand
*>(op
->get_req());
3349 assert(m
->get_type() == MSG_MON_COMMAND
);
3350 MMonCommandAck
*reply
= new MMonCommandAck(m
->cmd
, rc
, rs
, version
);
3351 reply
->set_tid(m
->get_tid());
3352 reply
->set_data(rdata
);
3353 send_reply(op
, reply
);
3357 // ------------------------
3358 // request/reply routing
3360 // a client/mds/osd will connect to a random monitor. we need to forward any
3361 // messages requiring state updates to the leader, and then route any replies
3362 // back via the correct monitor and back to them. (the monitor will not
3363 // initiate any connections.)
3365 void Monitor::forward_request_leader(MonOpRequestRef op
)
3367 op
->mark_event(__func__
);
3369 int mon
= get_leader();
3370 MonSession
*session
= op
->get_session();
3371 PaxosServiceMessage
*req
= op
->get_req
<PaxosServiceMessage
>();
3373 if (req
->get_source().is_mon() && req
->get_source_addr() != messenger
->get_myaddr()) {
3374 dout(10) << "forward_request won't forward (non-local) mon request " << *req
<< dendl
;
3375 } else if (session
->proxy_con
) {
3376 dout(10) << "forward_request won't double fwd request " << *req
<< dendl
;
3377 } else if (!session
->closed
) {
3378 RoutedRequest
*rr
= new RoutedRequest
;
3379 rr
->tid
= ++routed_request_tid
;
3380 rr
->client_inst
= req
->get_source_inst();
3381 rr
->con
= req
->get_connection();
3382 rr
->con_features
= rr
->con
->get_features();
3383 encode_message(req
, CEPH_FEATURES_ALL
, rr
->request_bl
); // for my use only; use all features
3384 rr
->session
= static_cast<MonSession
*>(session
->get());
3386 routed_requests
[rr
->tid
] = rr
;
3387 session
->routed_request_tids
.insert(rr
->tid
);
3389 dout(10) << "forward_request " << rr
->tid
<< " request " << *req
3390 << " features " << rr
->con_features
<< dendl
;
3392 MForward
*forward
= new MForward(rr
->tid
,
3396 forward
->set_priority(req
->get_priority());
3397 if (session
->auth_handler
) {
3398 forward
->entity_name
= session
->entity_name
;
3399 } else if (req
->get_source().is_mon()) {
3400 forward
->entity_name
.set_type(CEPH_ENTITY_TYPE_MON
);
3402 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3403 op
->mark_forwarded();
3404 assert(op
->get_req()->get_type() != 0);
3406 dout(10) << "forward_request no session for request " << *req
<< dendl
;
3410 // fake connection attached to forwarded messages
3411 struct AnonConnection
: public Connection
{
3412 explicit AnonConnection(CephContext
*cct
) : Connection(cct
, NULL
) {}
3414 int send_message(Message
*m
) override
{
3415 assert(!"send_message on anonymous connection");
3417 void send_keepalive() override
{
3418 assert(!"send_keepalive on anonymous connection");
3420 void mark_down() override
{
3423 void mark_disposable() override
{
3426 bool is_connected() override
{ return false; }
3429 //extract the original message and put it into the regular dispatch function
3430 void Monitor::handle_forward(MonOpRequestRef op
)
3432 MForward
*m
= static_cast<MForward
*>(op
->get_req());
3433 dout(10) << "received forwarded message from " << m
->client
3434 << " via " << m
->get_source_inst() << dendl
;
3435 MonSession
*session
= op
->get_session();
3438 if (!session
->is_capable("mon", MON_CAP_X
)) {
3439 dout(0) << "forward from entity with insufficient caps! "
3440 << session
->caps
<< dendl
;
3442 // see PaxosService::dispatch(); we rely on this being anon
3443 // (c->msgr == NULL)
3444 PaxosServiceMessage
*req
= m
->claim_message();
3445 assert(req
!= NULL
);
3447 ConnectionRef
c(new AnonConnection(cct
));
3448 MonSession
*s
= new MonSession(req
->get_source_inst(),
3449 static_cast<Connection
*>(c
.get()));
3450 c
->set_priv(s
->get());
3451 c
->set_peer_addr(m
->client
.addr
);
3452 c
->set_peer_type(m
->client
.name
.type());
3453 c
->set_features(m
->con_features
);
3455 s
->caps
= m
->client_caps
;
3456 dout(10) << " caps are " << s
->caps
<< dendl
;
3457 s
->entity_name
= m
->entity_name
;
3458 dout(10) << " entity name '" << s
->entity_name
<< "' type "
3459 << s
->entity_name
.get_type() << dendl
;
3460 s
->proxy_con
= m
->get_connection();
3461 s
->proxy_tid
= m
->tid
;
3463 req
->set_connection(c
);
3465 // not super accurate, but better than nothing.
3466 req
->set_recv_stamp(m
->get_recv_stamp());
3469 * note which election epoch this is; we will drop the message if
3470 * there is a future election since our peers will resend routed
3471 * requests in that case.
3473 req
->rx_election_epoch
= get_epoch();
3475 /* Because this is a special fake connection, we need to break
3476 the ref loop between Connection and MonSession differently
3477 than we normally do. Here, the Message refers to the Connection
3478 which refers to the Session, and nobody else refers to the Connection
3479 or the Session. And due to the special nature of this message,
3480 nobody refers to the Connection via the Session. So, clear out that
3481 half of the ref loop.*/
3484 dout(10) << " mesg " << req
<< " from " << m
->get_source_addr() << dendl
;
3491 void Monitor::try_send_message(Message
*m
, const entity_inst_t
& to
)
3493 dout(10) << "try_send_message " << *m
<< " to " << to
<< dendl
;
3496 encode_message(m
, quorum_con_features
, bl
);
3498 messenger
->send_message(m
, to
);
3500 for (int i
=0; i
<(int)monmap
->size(); i
++) {
3502 messenger
->send_message(new MRoute(bl
, to
), monmap
->get_inst(i
));
3506 void Monitor::send_reply(MonOpRequestRef op
, Message
*reply
)
3508 op
->mark_event(__func__
);
3510 MonSession
*session
= op
->get_session();
3512 Message
*req
= op
->get_req();
3513 ConnectionRef con
= op
->get_connection();
3515 reply
->set_cct(g_ceph_context
);
3516 dout(2) << __func__
<< " " << op
<< " " << reply
<< " " << *reply
<< dendl
;
3519 dout(2) << "send_reply no connection, dropping reply " << *reply
3520 << " to " << req
<< " " << *req
<< dendl
;
3522 op
->mark_event("reply: no connection");
3526 if (!session
->con
&& !session
->proxy_con
) {
3527 dout(2) << "send_reply no connection, dropping reply " << *reply
3528 << " to " << req
<< " " << *req
<< dendl
;
3530 op
->mark_event("reply: no connection");
3534 if (session
->proxy_con
) {
3535 dout(15) << "send_reply routing reply to " << con
->get_peer_addr()
3536 << " via " << session
->proxy_con
->get_peer_addr()
3537 << " for request " << *req
<< dendl
;
3538 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, reply
));
3539 op
->mark_event("reply: send routed request");
3541 session
->con
->send_message(reply
);
3542 op
->mark_event("reply: send");
3546 void Monitor::no_reply(MonOpRequestRef op
)
3548 MonSession
*session
= op
->get_session();
3549 Message
*req
= op
->get_req();
3551 if (session
->proxy_con
) {
3552 dout(10) << "no_reply to " << req
->get_source_inst()
3553 << " via " << session
->proxy_con
->get_peer_addr()
3554 << " for request " << *req
<< dendl
;
3555 session
->proxy_con
->send_message(new MRoute(session
->proxy_tid
, NULL
));
3556 op
->mark_event("no_reply: send routed request");
3558 dout(10) << "no_reply to " << req
->get_source_inst()
3559 << " " << *req
<< dendl
;
3560 op
->mark_event("no_reply");
3564 void Monitor::handle_route(MonOpRequestRef op
)
3566 MRoute
*m
= static_cast<MRoute
*>(op
->get_req());
3567 MonSession
*session
= op
->get_session();
3569 if (!session
->is_capable("mon", MON_CAP_X
)) {
3570 dout(0) << "MRoute received from entity without appropriate perms! "
3575 dout(10) << "handle_route " << *m
->msg
<< " to " << m
->dest
<< dendl
;
3577 dout(10) << "handle_route null to " << m
->dest
<< dendl
;
3580 if (m
->session_mon_tid
) {
3581 if (routed_requests
.count(m
->session_mon_tid
)) {
3582 RoutedRequest
*rr
= routed_requests
[m
->session_mon_tid
];
3584 // reset payload, in case encoding is dependent on target features
3586 m
->msg
->clear_payload();
3587 rr
->con
->send_message(m
->msg
);
3590 if (m
->send_osdmap_first
) {
3591 dout(10) << " sending osdmaps from " << m
->send_osdmap_first
<< dendl
;
3592 osdmon()->send_incremental(m
->send_osdmap_first
, rr
->session
,
3593 true, MonOpRequestRef());
3595 assert(rr
->tid
== m
->session_mon_tid
&& rr
->session
->routed_request_tids
.count(m
->session_mon_tid
));
3596 routed_requests
.erase(m
->session_mon_tid
);
3597 rr
->session
->routed_request_tids
.erase(m
->session_mon_tid
);
3600 dout(10) << " don't have routed request tid " << m
->session_mon_tid
<< dendl
;
3603 dout(10) << " not a routed request, trying to send anyway" << dendl
;
3605 messenger
->send_message(m
->msg
, m
->dest
);
3611 void Monitor::resend_routed_requests()
3613 dout(10) << "resend_routed_requests" << dendl
;
3614 int mon
= get_leader();
3615 list
<Context
*> retry
;
3616 for (map
<uint64_t, RoutedRequest
*>::iterator p
= routed_requests
.begin();
3617 p
!= routed_requests
.end();
3619 RoutedRequest
*rr
= p
->second
;
3622 dout(10) << " requeue for self tid " << rr
->tid
<< dendl
;
3623 rr
->op
->mark_event("retry routed request");
3624 retry
.push_back(new C_RetryMessage(this, rr
->op
));
3626 assert(rr
->session
->routed_request_tids
.count(p
->first
));
3627 rr
->session
->routed_request_tids
.erase(p
->first
);
3631 bufferlist::iterator q
= rr
->request_bl
.begin();
3632 PaxosServiceMessage
*req
= (PaxosServiceMessage
*)decode_message(cct
, 0, q
);
3633 rr
->op
->mark_event("resend forwarded message to leader");
3634 dout(10) << " resend to mon." << mon
<< " tid " << rr
->tid
<< " " << *req
<< dendl
;
3635 MForward
*forward
= new MForward(rr
->tid
, req
, rr
->con_features
,
3637 req
->put(); // forward takes its own ref; drop ours.
3638 forward
->client
= rr
->client_inst
;
3639 forward
->set_priority(req
->get_priority());
3640 messenger
->send_message(forward
, monmap
->get_inst(mon
));
3644 routed_requests
.clear();
3645 finish_contexts(g_ceph_context
, retry
);
3649 void Monitor::remove_session(MonSession
*s
)
3651 dout(10) << "remove_session " << s
<< " " << s
->inst
<< dendl
;
3654 for (set
<uint64_t>::iterator p
= s
->routed_request_tids
.begin();
3655 p
!= s
->routed_request_tids
.end();
3657 assert(routed_requests
.count(*p
));
3658 RoutedRequest
*rr
= routed_requests
[*p
];
3659 dout(10) << " dropping routed request " << rr
->tid
<< dendl
;
3661 routed_requests
.erase(*p
);
3663 s
->routed_request_tids
.clear();
3664 s
->con
->set_priv(NULL
);
3665 session_map
.remove_session(s
);
3666 logger
->set(l_mon_num_sessions
, session_map
.get_size());
3667 logger
->inc(l_mon_session_rm
);
3670 void Monitor::remove_all_sessions()
3672 Mutex::Locker
l(session_map_lock
);
3673 while (!session_map
.sessions
.empty()) {
3674 MonSession
*s
= session_map
.sessions
.front();
3677 logger
->inc(l_mon_session_rm
);
3680 logger
->set(l_mon_num_sessions
, session_map
.get_size());
3683 void Monitor::send_command(const entity_inst_t
& inst
,
3684 const vector
<string
>& com
)
3686 dout(10) << "send_command " << inst
<< "" << com
<< dendl
;
3687 MMonCommand
*c
= new MMonCommand(monmap
->fsid
);
3689 try_send_message(c
, inst
);
3692 void Monitor::waitlist_or_zap_client(MonOpRequestRef op
)
3695 * Wait list the new session until we're in the quorum, assuming it's
3697 * tick() will periodically send them back through so we can send
3698 * the client elsewhere if we don't think we're getting back in.
3700 * But we whitelist a few sorts of messages:
3701 * 1) Monitors can talk to us at any time, of course.
3702 * 2) auth messages. It's unlikely to go through much faster, but
3703 * it's possible we've just lost our quorum status and we want to take...
3704 * 3) command messages. We want to accept these under all possible
3707 Message
*m
= op
->get_req();
3708 MonSession
*s
= op
->get_session();
3709 ConnectionRef con
= op
->get_connection();
3710 utime_t too_old
= ceph_clock_now();
3711 too_old
-= g_ceph_context
->_conf
->mon_lease
;
3712 if (m
->get_recv_stamp() > too_old
&&
3713 con
->is_connected()) {
3714 dout(5) << "waitlisting message " << *m
<< dendl
;
3715 maybe_wait_for_quorum
.push_back(new C_RetryMessage(this, op
));
3716 op
->mark_wait_for_quorum();
3718 dout(5) << "discarding message " << *m
<< " and sending client elsewhere" << dendl
;
3720 // proxied sessions aren't registered and don't have a con; don't remove
3722 if (!s
->proxy_con
) {
3723 Mutex::Locker
l(session_map_lock
);
3730 void Monitor::_ms_dispatch(Message
*m
)
3732 if (is_shutdown()) {
3737 MonOpRequestRef op
= op_tracker
.create_request
<MonOpRequest
>(m
);
3738 bool src_is_mon
= op
->is_src_mon();
3739 op
->mark_event("mon:_ms_dispatch");
3740 MonSession
*s
= op
->get_session();
3741 if (s
&& s
->closed
) {
3745 // if the sender is not a monitor, make sure their first message for a
3746 // session is an MAuth. If it is not, assume it's a stray message,
3747 // and considering that we are creating a new session it is safe to
3748 // assume that the sender hasn't authenticated yet, so we have no way
3749 // of assessing whether we should handle it or not.
3750 if (!src_is_mon
&& (m
->get_type() != CEPH_MSG_AUTH
&&
3751 m
->get_type() != CEPH_MSG_MON_GET_MAP
&&
3752 m
->get_type() != CEPH_MSG_PING
)) {
3753 dout(1) << __func__
<< " dropping stray message " << *m
3754 << " from " << m
->get_source_inst() << dendl
;
3758 ConnectionRef con
= m
->get_connection();
3760 Mutex::Locker
l(session_map_lock
);
3761 s
= session_map
.new_session(m
->get_source_inst(), con
.get());
3764 con
->set_priv(s
->get());
3765 dout(10) << __func__
<< " new session " << s
<< " " << *s
<< dendl
;
3768 logger
->set(l_mon_num_sessions
, session_map
.get_size());
3769 logger
->inc(l_mon_session_add
);
3772 // give it monitor caps; the peer type has been authenticated
3773 dout(5) << __func__
<< " setting monitor caps on this connection" << dendl
;
3774 if (!s
->caps
.is_allow_all()) // but no need to repeatedly copy
3775 s
->caps
= *mon_caps
;
3779 dout(20) << __func__
<< " existing session " << s
<< " for " << s
->inst
3785 s
->session_timeout
= ceph_clock_now();
3786 s
->session_timeout
+= g_conf
->mon_session_timeout
;
3788 if (s
->auth_handler
) {
3789 s
->entity_name
= s
->auth_handler
->get_entity_name();
3791 dout(20) << " caps " << s
->caps
.get_str() << dendl
;
3793 if ((is_synchronizing() ||
3794 (s
->global_id
== 0 && !exited_quorum
.is_zero())) &&
3796 m
->get_type() != CEPH_MSG_PING
) {
3797 waitlist_or_zap_client(op
);
3804 void Monitor::dispatch_op(MonOpRequestRef op
)
3806 op
->mark_event("mon:dispatch_op");
3807 MonSession
*s
= op
->get_session();
3810 dout(10) << " session closed, dropping " << op
->get_req() << dendl
;
3814 /* we will consider the default type as being 'monitor' until proven wrong */
3815 op
->set_type_monitor();
3816 /* deal with all messages that do not necessarily need caps */
3817 bool dealt_with
= true;
3818 switch (op
->get_req()->get_type()) {
3820 case MSG_MON_GLOBAL_ID
:
3822 op
->set_type_service();
3823 /* no need to check caps here */
3824 paxos_service
[PAXOS_AUTH
]->dispatch(op
);
3831 /* MMonGetMap may be used by clients to obtain a monmap *before*
3832 * authenticating with the monitor. We need to handle these without
3833 * checking caps because, even on a cluster without cephx, we only set
3834 * session caps *after* the auth handshake. A good example of this
3835 * is when a client calls MonClient::get_monmap_privately(), which does
3836 * not authenticate when obtaining a monmap.
3838 case CEPH_MSG_MON_GET_MAP
:
3839 handle_mon_get_map(op
);
3842 case CEPH_MSG_MON_METADATA
:
3843 return handle_mon_metadata(op
);
3852 /* well, maybe the op belongs to a service... */
3853 op
->set_type_service();
3854 /* deal with all messages which caps should be checked somewhere else */
3856 switch (op
->get_req()->get_type()) {
3859 case CEPH_MSG_MON_GET_OSDMAP
:
3860 case CEPH_MSG_POOLOP
:
3861 case MSG_OSD_BEACON
:
3862 case MSG_OSD_MARK_ME_DOWN
:
3864 case MSG_OSD_FAILURE
:
3867 case MSG_OSD_PGTEMP
:
3868 case MSG_OSD_PG_CREATED
:
3869 case MSG_REMOVE_SNAPS
:
3870 paxos_service
[PAXOS_OSDMAP
]->dispatch(op
);
3874 case MSG_MDS_BEACON
:
3875 case MSG_MDS_OFFLOAD_TARGETS
:
3876 paxos_service
[PAXOS_MDSMAP
]->dispatch(op
);
3880 case MSG_MGR_BEACON
:
3881 paxos_service
[PAXOS_MGR
]->dispatch(op
);
3885 case MSG_MON_MGR_REPORT
:
3886 case CEPH_MSG_STATFS
:
3887 case MSG_GETPOOLSTATS
:
3888 paxos_service
[PAXOS_MGRSTAT
]->dispatch(op
);
3893 paxos_service
[PAXOS_PGMAP
]->dispatch(op
);
3898 paxos_service
[PAXOS_LOG
]->dispatch(op
);
3901 // handle_command() does its own caps checking
3902 case MSG_MON_COMMAND
:
3903 op
->set_type_command();
3914 /* nop, looks like it's not a service message; revert back to monitor */
3915 op
->set_type_monitor();
3917 /* messages we, the Monitor class, need to deal with
3918 * but may be sent by clients. */
3920 if (!op
->get_session()->is_capable("mon", MON_CAP_R
)) {
3921 dout(5) << __func__
<< " " << op
->get_req()->get_source_inst()
3922 << " not enough caps for " << *(op
->get_req()) << " -- dropping"
3928 switch (op
->get_req()->get_type()) {
3931 case CEPH_MSG_MON_GET_VERSION
:
3932 handle_get_version(op
);
3935 case CEPH_MSG_MON_SUBSCRIBE
:
3936 /* FIXME: check what's being subscribed, filter accordingly */
3937 handle_subscribe(op
);
3947 if (!op
->is_src_mon()) {
3948 dout(1) << __func__
<< " unexpected monitor message from"
3949 << " non-monitor entity " << op
->get_req()->get_source_inst()
3950 << " " << *(op
->get_req()) << " -- dropping" << dendl
;
3954 /* messages that should only be sent by another monitor */
3956 switch (op
->get_req()->get_type()) {
3966 // Sync (i.e., the new slurp, but on steroids)
3974 /* log acks are sent from a monitor we sent the MLog to, and are
3975 never sent by clients to us. */
3977 log_client
.handle_log_ack((MLogAck
*)op
->get_req());
3982 op
->set_type_service();
3983 paxos_service
[PAXOS_MONMAP
]->dispatch(op
);
3989 op
->set_type_paxos();
3990 MMonPaxos
*pm
= static_cast<MMonPaxos
*>(op
->get_req());
3991 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
3996 if (state
== STATE_SYNCHRONIZING
) {
3997 // we are synchronizing. These messages would do us no
3998 // good, thus just drop them and ignore them.
3999 dout(10) << __func__
<< " ignore paxos msg from "
4000 << pm
->get_source_inst() << dendl
;
4005 if (pm
->epoch
> get_epoch()) {
4009 if (pm
->epoch
!= get_epoch()) {
4013 paxos
->dispatch(op
);
4018 case MSG_MON_ELECTION
:
4019 op
->set_type_election();
4020 //check privileges here for simplicity
4021 if (!op
->get_session()->is_capable("mon", MON_CAP_X
)) {
4022 dout(0) << "MMonElection received from entity without enough caps!"
4023 << op
->get_session()->caps
<< dendl
;
4026 if (!is_probing() && !is_synchronizing()) {
4027 elector
.dispatch(op
);
4036 handle_timecheck(op
);
4039 case MSG_MON_HEALTH
:
4040 health_monitor
->dispatch(op
);
4048 dout(1) << "dropping unexpected " << *(op
->get_req()) << dendl
;
4057 void Monitor::handle_ping(MonOpRequestRef op
)
4059 MPing
*m
= static_cast<MPing
*>(op
->get_req());
4060 dout(10) << __func__
<< " " << *m
<< dendl
;
4061 MPing
*reply
= new MPing
;
4062 entity_inst_t inst
= m
->get_source_inst();
4064 boost::scoped_ptr
<Formatter
> f(new JSONFormatter(true));
4065 f
->open_object_section("pong");
4067 list
<string
> health_str
;
4068 get_health(health_str
, NULL
, f
.get());
4071 get_mon_status(f
.get(), ss
);
4077 ::encode(ss
.str(), payload
);
4078 reply
->set_payload(payload
);
4079 dout(10) << __func__
<< " reply payload len " << reply
->get_payload().length() << dendl
;
4080 messenger
->send_message(reply
, inst
);
4083 void Monitor::timecheck_start()
4085 dout(10) << __func__
<< dendl
;
4086 timecheck_cleanup();
4087 timecheck_start_round();
4090 void Monitor::timecheck_finish()
4092 dout(10) << __func__
<< dendl
;
4093 timecheck_cleanup();
4096 void Monitor::timecheck_start_round()
4098 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4099 assert(is_leader());
4101 if (monmap
->size() == 1) {
4102 assert(0 == "We are alone; this shouldn't have been scheduled!");
4106 if (timecheck_round
% 2) {
4107 dout(10) << __func__
<< " there's a timecheck going on" << dendl
;
4108 utime_t curr_time
= ceph_clock_now();
4109 double max
= g_conf
->mon_timecheck_interval
*3;
4110 if (curr_time
- timecheck_round_start
< max
) {
4111 dout(10) << __func__
<< " keep current round going" << dendl
;
4114 dout(10) << __func__
4115 << " finish current timecheck and start new" << dendl
;
4116 timecheck_cancel_round();
4120 assert(timecheck_round
% 2 == 0);
4123 timecheck_round_start
= ceph_clock_now();
4124 dout(10) << __func__
<< " new " << timecheck_round
<< dendl
;
4128 dout(10) << __func__
<< " setting up next event" << dendl
;
4129 timecheck_reset_event();
4132 void Monitor::timecheck_finish_round(bool success
)
4134 dout(10) << __func__
<< " curr " << timecheck_round
<< dendl
;
4135 assert(timecheck_round
% 2);
4137 timecheck_round_start
= utime_t();
4140 assert(timecheck_waiting
.empty());
4141 assert(timecheck_acks
== quorum
.size());
4143 timecheck_check_skews();
4147 dout(10) << __func__
<< " " << timecheck_waiting
.size()
4148 << " peers still waiting:";
4149 for (map
<entity_inst_t
,utime_t
>::iterator p
= timecheck_waiting
.begin();
4150 p
!= timecheck_waiting
.end(); ++p
) {
4151 *_dout
<< " " << p
->first
.name
;
4154 timecheck_waiting
.clear();
4156 dout(10) << __func__
<< " finished to " << timecheck_round
<< dendl
;
4159 void Monitor::timecheck_cancel_round()
4161 timecheck_finish_round(false);
4164 void Monitor::timecheck_cleanup()
4166 timecheck_round
= 0;
4168 timecheck_round_start
= utime_t();
4170 if (timecheck_event
) {
4171 timer
.cancel_event(timecheck_event
);
4172 timecheck_event
= NULL
;
4174 timecheck_waiting
.clear();
4175 timecheck_skews
.clear();
4176 timecheck_latencies
.clear();
4178 timecheck_rounds_since_clean
= 0;
4181 void Monitor::timecheck_reset_event()
4183 if (timecheck_event
) {
4184 timer
.cancel_event(timecheck_event
);
4185 timecheck_event
= NULL
;
4189 cct
->_conf
->mon_timecheck_skew_interval
* timecheck_rounds_since_clean
;
4191 if (delay
<= 0 || delay
> cct
->_conf
->mon_timecheck_interval
) {
4192 delay
= cct
->_conf
->mon_timecheck_interval
;
4195 dout(10) << __func__
<< " delay " << delay
4196 << " rounds_since_clean " << timecheck_rounds_since_clean
4199 timecheck_event
= new C_MonContext(this, [this](int) {
4200 timecheck_start_round();
4202 timer
.add_event_after(delay
, timecheck_event
);
4205 void Monitor::timecheck_check_skews()
4207 dout(10) << __func__
<< dendl
;
4208 assert(is_leader());
4209 assert((timecheck_round
% 2) == 0);
4210 if (monmap
->size() == 1) {
4211 assert(0 == "We are alone; we shouldn't have gotten here!");
4214 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4216 bool found_skew
= false;
4217 for (map
<entity_inst_t
, double>::iterator p
= timecheck_skews
.begin();
4218 p
!= timecheck_skews
.end(); ++p
) {
4221 if (timecheck_has_skew(p
->second
, &abs_skew
)) {
4222 dout(10) << __func__
4223 << " " << p
->first
<< " skew " << abs_skew
<< dendl
;
4229 ++timecheck_rounds_since_clean
;
4230 timecheck_reset_event();
4231 } else if (timecheck_rounds_since_clean
> 0) {
4233 << " no clock skews found after " << timecheck_rounds_since_clean
4234 << " rounds" << dendl
;
4235 // make sure the skews are really gone and not just a transient success
4236 // this will run just once if not in the presence of skews again.
4237 timecheck_rounds_since_clean
= 1;
4238 timecheck_reset_event();
4239 timecheck_rounds_since_clean
= 0;
4244 void Monitor::timecheck_report()
4246 dout(10) << __func__
<< dendl
;
4247 assert(is_leader());
4248 assert((timecheck_round
% 2) == 0);
4249 if (monmap
->size() == 1) {
4250 assert(0 == "We are alone; we shouldn't have gotten here!");
4254 assert(timecheck_latencies
.size() == timecheck_skews
.size());
4255 bool do_output
= true; // only output report once
4256 for (set
<int>::iterator q
= quorum
.begin(); q
!= quorum
.end(); ++q
) {
4257 if (monmap
->get_name(*q
) == name
)
4260 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_REPORT
);
4261 m
->epoch
= get_epoch();
4262 m
->round
= timecheck_round
;
4264 for (map
<entity_inst_t
, double>::iterator it
= timecheck_skews
.begin();
4265 it
!= timecheck_skews
.end(); ++it
) {
4266 double skew
= it
->second
;
4267 double latency
= timecheck_latencies
[it
->first
];
4269 m
->skews
[it
->first
] = skew
;
4270 m
->latencies
[it
->first
] = latency
;
4273 dout(25) << __func__
<< " " << it
->first
4274 << " latency " << latency
4275 << " skew " << skew
<< dendl
;
4279 entity_inst_t inst
= monmap
->get_inst(*q
);
4280 dout(10) << __func__
<< " send report to " << inst
<< dendl
;
4281 messenger
->send_message(m
, inst
);
4285 void Monitor::timecheck()
4287 dout(10) << __func__
<< dendl
;
4288 assert(is_leader());
4289 if (monmap
->size() == 1) {
4290 assert(0 == "We are alone; we shouldn't have gotten here!");
4293 assert(timecheck_round
% 2 != 0);
4295 timecheck_acks
= 1; // we ack ourselves
4297 dout(10) << __func__
<< " start timecheck epoch " << get_epoch()
4298 << " round " << timecheck_round
<< dendl
;
4300 // we are at the eye of the storm; the point of reference
4301 timecheck_skews
[messenger
->get_myinst()] = 0.0;
4302 timecheck_latencies
[messenger
->get_myinst()] = 0.0;
4304 for (set
<int>::iterator it
= quorum
.begin(); it
!= quorum
.end(); ++it
) {
4305 if (monmap
->get_name(*it
) == name
)
4308 entity_inst_t inst
= monmap
->get_inst(*it
);
4309 utime_t curr_time
= ceph_clock_now();
4310 timecheck_waiting
[inst
] = curr_time
;
4311 MTimeCheck
*m
= new MTimeCheck(MTimeCheck::OP_PING
);
4312 m
->epoch
= get_epoch();
4313 m
->round
= timecheck_round
;
4314 dout(10) << __func__
<< " send " << *m
<< " to " << inst
<< dendl
;
4315 messenger
->send_message(m
, inst
);
4319 health_status_t
Monitor::timecheck_status(ostringstream
&ss
,
4320 const double skew_bound
,
4321 const double latency
)
4323 health_status_t status
= HEALTH_OK
;
4324 assert(latency
>= 0);
4327 if (timecheck_has_skew(skew_bound
, &abs_skew
)) {
4328 status
= HEALTH_WARN
;
4329 ss
<< "clock skew " << abs_skew
<< "s"
4330 << " > max " << g_conf
->mon_clock_drift_allowed
<< "s";
4336 void Monitor::handle_timecheck_leader(MonOpRequestRef op
)
4338 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4339 dout(10) << __func__
<< " " << *m
<< dendl
;
4340 /* handles PONG's */
4341 assert(m
->op
== MTimeCheck::OP_PONG
);
4343 entity_inst_t other
= m
->get_source_inst();
4344 if (m
->epoch
< get_epoch()) {
4345 dout(1) << __func__
<< " got old timecheck epoch " << m
->epoch
4346 << " from " << other
4347 << " curr " << get_epoch()
4348 << " -- severely lagged? discard" << dendl
;
4351 assert(m
->epoch
== get_epoch());
4353 if (m
->round
< timecheck_round
) {
4354 dout(1) << __func__
<< " got old round " << m
->round
4355 << " from " << other
4356 << " curr " << timecheck_round
<< " -- discard" << dendl
;
4360 utime_t curr_time
= ceph_clock_now();
4362 assert(timecheck_waiting
.count(other
) > 0);
4363 utime_t timecheck_sent
= timecheck_waiting
[other
];
4364 timecheck_waiting
.erase(other
);
4365 if (curr_time
< timecheck_sent
) {
4366 // our clock was readjusted -- drop everything until it all makes sense.
4367 dout(1) << __func__
<< " our clock was readjusted --"
4368 << " bump round and drop current check"
4370 timecheck_cancel_round();
4374 /* update peer latencies */
4375 double latency
= (double)(curr_time
- timecheck_sent
);
4377 if (timecheck_latencies
.count(other
) == 0)
4378 timecheck_latencies
[other
] = latency
;
4380 double avg_latency
= ((timecheck_latencies
[other
]*0.8)+(latency
*0.2));
4381 timecheck_latencies
[other
] = avg_latency
;
4387 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4388 * and 'a' happens to be lower than 'b'; so we use double instead.
4390 * latency is always expected to be >= 0.
4392 * delta, the difference between theirs timestamp and ours, may either be
4393 * lower or higher than 0; will hardly ever be 0.
4395 * The absolute skew is the absolute delta minus the latency, which is
4396 * taken as a whole instead of an rtt given that there is some queueing
4397 * and dispatch times involved and it's hard to assess how long exactly
4398 * it took for the message to travel to the other side and be handled. So
4399 * we call it a bounded skew, the worst case scenario.
4403 * Given that the latency is always positive, we can establish that the
4404 * bounded skew will be:
4406 * 1. positive if the absolute delta is higher than the latency and
4408 * 2. negative if the absolute delta is higher than the latency and
4409 * delta is negative.
4410 * 3. zero if the absolute delta is lower than the latency.
4412 * On 3. we make a judgement call and treat the skew as non-existent.
4413 * This is because that, if the absolute delta is lower than the
4414 * latency, then the apparently existing skew is nothing more than a
4415 * side-effect of the high latency at work.
4417 * This may not be entirely true though, as a severely skewed clock
4418 * may be masked by an even higher latency, but with high latencies
4419 * we probably have worse issues to deal with than just skewed clocks.
4421 assert(latency
>= 0);
4423 double delta
= ((double) m
->timestamp
) - ((double) curr_time
);
4424 double abs_delta
= (delta
> 0 ? delta
: -delta
);
4425 double skew_bound
= abs_delta
- latency
;
4429 skew_bound
= -skew_bound
;
4432 health_status_t status
= timecheck_status(ss
, skew_bound
, latency
);
4433 if (status
== HEALTH_ERR
)
4434 clog
->error() << other
<< " " << ss
.str();
4435 else if (status
== HEALTH_WARN
)
4436 clog
->warn() << other
<< " " << ss
.str();
4438 dout(10) << __func__
<< " from " << other
<< " ts " << m
->timestamp
4439 << " delta " << delta
<< " skew_bound " << skew_bound
4440 << " latency " << latency
<< dendl
;
4442 timecheck_skews
[other
] = skew_bound
;
4445 if (timecheck_acks
== quorum
.size()) {
4446 dout(10) << __func__
<< " got pongs from everybody ("
4447 << timecheck_acks
<< " total)" << dendl
;
4448 assert(timecheck_skews
.size() == timecheck_acks
);
4449 assert(timecheck_waiting
.empty());
4450 // everyone has acked, so bump the round to finish it.
4451 timecheck_finish_round();
4455 void Monitor::handle_timecheck_peon(MonOpRequestRef op
)
4457 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4458 dout(10) << __func__
<< " " << *m
<< dendl
;
4461 assert(m
->op
== MTimeCheck::OP_PING
|| m
->op
== MTimeCheck::OP_REPORT
);
4463 if (m
->epoch
!= get_epoch()) {
4464 dout(1) << __func__
<< " got wrong epoch "
4465 << "(ours " << get_epoch()
4466 << " theirs: " << m
->epoch
<< ") -- discarding" << dendl
;
4470 if (m
->round
< timecheck_round
) {
4471 dout(1) << __func__
<< " got old round " << m
->round
4472 << " current " << timecheck_round
4473 << " (epoch " << get_epoch() << ") -- discarding" << dendl
;
4477 timecheck_round
= m
->round
;
4479 if (m
->op
== MTimeCheck::OP_REPORT
) {
4480 assert((timecheck_round
% 2) == 0);
4481 timecheck_latencies
.swap(m
->latencies
);
4482 timecheck_skews
.swap(m
->skews
);
4486 assert((timecheck_round
% 2) != 0);
4487 MTimeCheck
*reply
= new MTimeCheck(MTimeCheck::OP_PONG
);
4488 utime_t curr_time
= ceph_clock_now();
4489 reply
->timestamp
= curr_time
;
4490 reply
->epoch
= m
->epoch
;
4491 reply
->round
= m
->round
;
4492 dout(10) << __func__
<< " send " << *m
4493 << " to " << m
->get_source_inst() << dendl
;
4494 m
->get_connection()->send_message(reply
);
4497 void Monitor::handle_timecheck(MonOpRequestRef op
)
4499 MTimeCheck
*m
= static_cast<MTimeCheck
*>(op
->get_req());
4500 dout(10) << __func__
<< " " << *m
<< dendl
;
4503 if (m
->op
!= MTimeCheck::OP_PONG
) {
4504 dout(1) << __func__
<< " drop unexpected msg (not pong)" << dendl
;
4506 handle_timecheck_leader(op
);
4508 } else if (is_peon()) {
4509 if (m
->op
!= MTimeCheck::OP_PING
&& m
->op
!= MTimeCheck::OP_REPORT
) {
4510 dout(1) << __func__
<< " drop unexpected msg (not ping or report)" << dendl
;
4512 handle_timecheck_peon(op
);
4515 dout(1) << __func__
<< " drop unexpected msg" << dendl
;
4519 void Monitor::handle_subscribe(MonOpRequestRef op
)
4521 MMonSubscribe
*m
= static_cast<MMonSubscribe
*>(op
->get_req());
4522 dout(10) << "handle_subscribe " << *m
<< dendl
;
4526 MonSession
*s
= op
->get_session();
4529 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= m
->what
.begin();
4532 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4533 if ((p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
) == 0)
4536 // remove conflicting subscribes
4537 if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4538 for (map
<string
, Subscription
*>::iterator it
= s
->sub_map
.begin();
4539 it
!= s
->sub_map
.end(); ) {
4540 if (it
->first
!= p
->first
&& logmon()->sub_name_to_id(it
->first
) >= 0) {
4541 Mutex::Locker
l(session_map_lock
);
4542 session_map
.remove_sub((it
++)->second
);
4550 Mutex::Locker
l(session_map_lock
);
4551 session_map
.add_update_sub(s
, p
->first
, p
->second
.start
,
4552 p
->second
.flags
& CEPH_SUBSCRIBE_ONETIME
,
4553 m
->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP
));
4556 if (p
->first
.compare(0, 6, "mdsmap") == 0 || p
->first
.compare(0, 5, "fsmap") == 0) {
4557 dout(10) << __func__
<< ": MDS sub '" << p
->first
<< "'" << dendl
;
4558 if ((int)s
->is_capable("mds", MON_CAP_R
)) {
4559 Subscription
*sub
= s
->sub_map
[p
->first
];
4560 assert(sub
!= nullptr);
4561 mdsmon()->check_sub(sub
);
4563 } else if (p
->first
== "osdmap") {
4564 if ((int)s
->is_capable("osd", MON_CAP_R
)) {
4565 if (s
->osd_epoch
> p
->second
.start
) {
4566 // client needs earlier osdmaps on purpose, so reset the sent epoch
4569 osdmon()->check_osdmap_sub(s
->sub_map
["osdmap"]);
4571 } else if (p
->first
== "osd_pg_creates") {
4572 if ((int)s
->is_capable("osd", MON_CAP_W
)) {
4573 if (monmap
->get_required_features().contains_all(
4574 ceph::features::mon::FEATURE_LUMINOUS
)) {
4575 osdmon()->check_pg_creates_sub(s
->sub_map
["osd_pg_creates"]);
4577 pgmon()->check_sub(s
->sub_map
["osd_pg_creates"]);
4580 } else if (p
->first
== "monmap") {
4581 monmon()->check_sub(s
->sub_map
[p
->first
]);
4582 } else if (logmon()->sub_name_to_id(p
->first
) >= 0) {
4583 logmon()->check_sub(s
->sub_map
[p
->first
]);
4584 } else if (p
->first
== "mgrmap" || p
->first
== "mgrdigest") {
4585 mgrmon()->check_sub(s
->sub_map
[p
->first
]);
4590 // we only need to reply if the client is old enough to think it
4591 // has to send renewals.
4592 ConnectionRef con
= m
->get_connection();
4593 if (!con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
))
4594 m
->get_connection()->send_message(new MMonSubscribeAck(
4595 monmap
->get_fsid(), (int)g_conf
->mon_subscribe_interval
));
4600 void Monitor::handle_get_version(MonOpRequestRef op
)
4602 MMonGetVersion
*m
= static_cast<MMonGetVersion
*>(op
->get_req());
4603 dout(10) << "handle_get_version " << *m
<< dendl
;
4604 PaxosService
*svc
= NULL
;
4606 MonSession
*s
= op
->get_session();
4609 if (!is_leader() && !is_peon()) {
4610 dout(10) << " waiting for quorum" << dendl
;
4611 waitfor_quorum
.push_back(new C_RetryMessage(this, op
));
4615 if (m
->what
== "mdsmap") {
4617 } else if (m
->what
== "fsmap") {
4619 } else if (m
->what
== "osdmap") {
4621 } else if (m
->what
== "monmap") {
4624 derr
<< "invalid map type " << m
->what
<< dendl
;
4628 if (!svc
->is_readable()) {
4629 svc
->wait_for_readable(op
, new C_RetryMessage(this, op
));
4633 MMonGetVersionReply
*reply
= new MMonGetVersionReply();
4634 reply
->handle
= m
->handle
;
4635 reply
->version
= svc
->get_last_committed();
4636 reply
->oldest_version
= svc
->get_first_committed();
4637 reply
->set_tid(m
->get_tid());
4639 m
->get_connection()->send_message(reply
);
4645 bool Monitor::ms_handle_reset(Connection
*con
)
4647 dout(10) << "ms_handle_reset " << con
<< " " << con
->get_peer_addr() << dendl
;
4649 // ignore lossless monitor sessions
4650 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
)
4653 MonSession
*s
= static_cast<MonSession
*>(con
->get_priv());
4657 // break any con <-> session ref cycle
4658 s
->con
->set_priv(NULL
);
4663 Mutex::Locker
l(lock
);
4665 dout(10) << "reset/close on session " << s
->inst
<< dendl
;
4667 Mutex::Locker
l(session_map_lock
);
4674 bool Monitor::ms_handle_refused(Connection
*con
)
4676 // just log for now...
4677 dout(10) << "ms_handle_refused " << con
<< " " << con
->get_peer_addr() << dendl
;
4683 void Monitor::send_latest_monmap(Connection
*con
)
4686 monmap
->encode(bl
, con
->get_features());
4687 con
->send_message(new MMonMap(bl
));
4690 void Monitor::handle_mon_get_map(MonOpRequestRef op
)
4692 MMonGetMap
*m
= static_cast<MMonGetMap
*>(op
->get_req());
4693 dout(10) << "handle_mon_get_map" << dendl
;
4694 send_latest_monmap(m
->get_connection().get());
4697 void Monitor::handle_mon_metadata(MonOpRequestRef op
)
4699 MMonMetadata
*m
= static_cast<MMonMetadata
*>(op
->get_req());
4701 dout(10) << __func__
<< dendl
;
4702 update_mon_metadata(m
->get_source().num(), std::move(m
->data
));
4706 void Monitor::update_mon_metadata(int from
, Metadata
&& m
)
4708 pending_metadata
.insert(make_pair(from
, std::move(m
)));
4711 int err
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
4712 map
<int, Metadata
> last_metadata
;
4714 bufferlist::iterator iter
= bl
.begin();
4715 ::decode(last_metadata
, iter
);
4716 pending_metadata
.insert(last_metadata
.begin(), last_metadata
.end());
4719 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
4721 ::encode(pending_metadata
, bl
);
4722 t
->put(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
4723 paxos
->trigger_propose();
4726 int Monitor::load_metadata(map
<int, Metadata
>& metadata
)
4729 int r
= store
->get(MONITOR_STORE_PREFIX
, "last_metadata", bl
);
4732 bufferlist::iterator it
= bl
.begin();
4733 ::decode(metadata
, it
);
4737 int Monitor::get_mon_metadata(int mon
, Formatter
*f
, ostream
& err
)
4740 map
<int, Metadata
> last_metadata
;
4741 if (int r
= load_metadata(last_metadata
)) {
4742 err
<< "Unable to load metadata: " << cpp_strerror(r
);
4745 if (!last_metadata
.count(mon
)) {
4746 err
<< "mon." << mon
<< " not found";
4749 const Metadata
& m
= last_metadata
[mon
];
4750 for (Metadata::const_iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
4751 f
->dump_string(p
->first
.c_str(), p
->second
);
4756 void Monitor::count_metadata(const string
& field
, Formatter
*f
)
4758 map
<int, Metadata
> meta
;
4759 load_metadata(meta
);
4760 map
<string
,int> by_val
;
4761 for (auto& p
: meta
) {
4762 auto q
= p
.second
.find(field
);
4763 if (q
== p
.second
.end()) {
4764 by_val
["unknown"]++;
4766 by_val
[q
->second
]++;
4769 f
->open_object_section(field
.c_str());
4770 for (auto& p
: by_val
) {
4771 f
->dump_int(p
.first
.c_str(), p
.second
);
4776 int Monitor::print_nodes(Formatter
*f
, ostream
& err
)
4778 map
<int, Metadata
> metadata
;
4779 if (int r
= load_metadata(metadata
)) {
4780 err
<< "Unable to load metadata.\n";
4784 map
<string
, list
<int> > mons
; // hostname => mon
4785 for (map
<int, Metadata
>::iterator it
= metadata
.begin();
4786 it
!= metadata
.end(); ++it
) {
4787 const Metadata
& m
= it
->second
;
4788 Metadata::const_iterator hostname
= m
.find("hostname");
4789 if (hostname
== m
.end()) {
4790 // not likely though
4793 mons
[hostname
->second
].push_back(it
->first
);
4796 dump_services(f
, mons
, "mon");
4800 // ----------------------------------------------
4803 int Monitor::scrub_start()
4805 dout(10) << __func__
<< dendl
;
4806 assert(is_leader());
4808 if (!scrub_result
.empty()) {
4809 clog
->info() << "scrub already in progress";
4813 scrub_event_cancel();
4814 scrub_result
.clear();
4815 scrub_state
.reset(new ScrubState
);
4821 int Monitor::scrub()
4823 assert(is_leader());
4824 assert(scrub_state
);
4826 scrub_cancel_timeout();
4827 wait_for_paxos_write();
4828 scrub_version
= paxos
->get_version();
4831 // scrub all keys if we're the only monitor in the quorum
4833 (quorum
.size() == 1 ? -1 : cct
->_conf
->mon_scrub_max_keys
);
4835 for (set
<int>::iterator p
= quorum
.begin();
4840 MMonScrub
*r
= new MMonScrub(MMonScrub::OP_SCRUB
, scrub_version
,
4842 r
->key
= scrub_state
->last_key
;
4843 messenger
->send_message(r
, monmap
->get_inst(*p
));
4847 bool r
= _scrub(&scrub_result
[rank
],
4848 &scrub_state
->last_key
,
4851 scrub_state
->finished
= !r
;
4853 // only after we got our scrub results do we really care whether the
4854 // other monitors are late on their results. Also, this way we avoid
4855 // triggering the timeout if we end up getting stuck in _scrub() for
4856 // longer than the duration of the timeout.
4857 scrub_reset_timeout();
4859 if (quorum
.size() == 1) {
4860 assert(scrub_state
->finished
== true);
4866 void Monitor::handle_scrub(MonOpRequestRef op
)
4868 MMonScrub
*m
= static_cast<MMonScrub
*>(op
->get_req());
4869 dout(10) << __func__
<< " " << *m
<< dendl
;
4871 case MMonScrub::OP_SCRUB
:
4876 wait_for_paxos_write();
4878 if (m
->version
!= paxos
->get_version())
4881 MMonScrub
*reply
= new MMonScrub(MMonScrub::OP_RESULT
,
4885 reply
->key
= m
->key
;
4886 _scrub(&reply
->result
, &reply
->key
, &reply
->num_keys
);
4887 m
->get_connection()->send_message(reply
);
4891 case MMonScrub::OP_RESULT
:
4895 if (m
->version
!= scrub_version
)
4897 // reset the timeout each time we get a result
4898 scrub_reset_timeout();
4900 int from
= m
->get_source().num();
4901 assert(scrub_result
.count(from
) == 0);
4902 scrub_result
[from
] = m
->result
;
4904 if (scrub_result
.size() == quorum
.size()) {
4905 scrub_check_results();
4906 scrub_result
.clear();
4907 if (scrub_state
->finished
)
4917 bool Monitor::_scrub(ScrubResult
*r
,
4918 pair
<string
,string
> *start
,
4922 assert(start
!= NULL
);
4923 assert(num_keys
!= NULL
);
4925 set
<string
> prefixes
= get_sync_targets_names();
4926 prefixes
.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
4928 dout(10) << __func__
<< " start (" << *start
<< ")"
4929 << " num_keys " << *num_keys
<< dendl
;
4931 MonitorDBStore::Synchronizer it
= store
->get_synchronizer(*start
, prefixes
);
4933 int scrubbed_keys
= 0;
4934 pair
<string
,string
> last_key
;
4936 while (it
->has_next_chunk()) {
4938 if (*num_keys
> 0 && scrubbed_keys
== *num_keys
)
4941 pair
<string
,string
> k
= it
->get_next_key();
4942 if (prefixes
.count(k
.first
) == 0)
4945 if (cct
->_conf
->mon_scrub_inject_missing_keys
> 0.0 &&
4946 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_missing_keys
*10000.0)) {
4947 dout(10) << __func__
<< " inject missing key, skipping (" << k
<< ")"
4953 //TODO: what when store->get returns error or empty bl?
4954 store
->get(k
.first
, k
.second
, bl
);
4955 uint32_t key_crc
= bl
.crc32c(0);
4956 dout(30) << __func__
<< " " << k
<< " bl " << bl
.length() << " bytes"
4957 << " crc " << key_crc
<< dendl
;
4958 r
->prefix_keys
[k
.first
]++;
4959 if (r
->prefix_crc
.count(k
.first
) == 0)
4960 r
->prefix_crc
[k
.first
] = 0;
4961 r
->prefix_crc
[k
.first
] = bl
.crc32c(r
->prefix_crc
[k
.first
]);
4963 if (cct
->_conf
->mon_scrub_inject_crc_mismatch
> 0.0 &&
4964 (rand() % 10000 < cct
->_conf
->mon_scrub_inject_crc_mismatch
*10000.0)) {
4965 dout(10) << __func__
<< " inject failure at (" << k
<< ")" << dendl
;
4966 r
->prefix_crc
[k
.first
] += 1;
4973 dout(20) << __func__
<< " last_key (" << last_key
<< ")"
4974 << " scrubbed_keys " << scrubbed_keys
4975 << " has_next " << it
->has_next_chunk() << dendl
;
4978 *num_keys
= scrubbed_keys
;
4980 return it
->has_next_chunk();
4983 void Monitor::scrub_check_results()
4985 dout(10) << __func__
<< dendl
;
4989 ScrubResult
& mine
= scrub_result
[rank
];
4990 for (map
<int,ScrubResult
>::iterator p
= scrub_result
.begin();
4991 p
!= scrub_result
.end();
4993 if (p
->first
== rank
)
4995 if (p
->second
!= mine
) {
4997 clog
->error() << "scrub mismatch";
4998 clog
->error() << " mon." << rank
<< " " << mine
;
4999 clog
->error() << " mon." << p
->first
<< " " << p
->second
;
5003 clog
->info() << "scrub ok on " << quorum
<< ": " << mine
;
5006 inline void Monitor::scrub_timeout()
5008 dout(1) << __func__
<< " restarting scrub" << dendl
;
5013 void Monitor::scrub_finish()
5015 dout(10) << __func__
<< dendl
;
5017 scrub_event_start();
5020 void Monitor::scrub_reset()
5022 dout(10) << __func__
<< dendl
;
5023 scrub_cancel_timeout();
5025 scrub_result
.clear();
5026 scrub_state
.reset();
5029 inline void Monitor::scrub_update_interval(int secs
)
5031 // we don't care about changes if we are not the leader.
5032 // changes will be visible if we become the leader.
5036 dout(1) << __func__
<< " new interval = " << secs
<< dendl
;
5038 // if scrub already in progress, all changes will already be visible during
5039 // the next round. Nothing to do.
5040 if (scrub_state
!= NULL
)
5043 scrub_event_cancel();
5044 scrub_event_start();
5047 void Monitor::scrub_event_start()
5049 dout(10) << __func__
<< dendl
;
5052 scrub_event_cancel();
5054 if (cct
->_conf
->mon_scrub_interval
<= 0) {
5055 dout(1) << __func__
<< " scrub event is disabled"
5056 << " (mon_scrub_interval = " << cct
->_conf
->mon_scrub_interval
5061 scrub_event
= new C_MonContext(this, [this](int) {
5064 timer
.add_event_after(cct
->_conf
->mon_scrub_interval
, scrub_event
);
5067 void Monitor::scrub_event_cancel()
5069 dout(10) << __func__
<< dendl
;
5071 timer
.cancel_event(scrub_event
);
5076 inline void Monitor::scrub_cancel_timeout()
5078 if (scrub_timeout_event
) {
5079 timer
.cancel_event(scrub_timeout_event
);
5080 scrub_timeout_event
= NULL
;
5084 void Monitor::scrub_reset_timeout()
5086 dout(15) << __func__
<< " reset timeout event" << dendl
;
5087 scrub_cancel_timeout();
5089 scrub_timeout_event
= new C_MonContext(this, [this](int) {
5092 timer
.add_event_after(g_conf
->mon_scrub_timeout
, scrub_timeout_event
);
5095 /************ TICK ***************/
5096 void Monitor::new_tick()
5098 timer
.add_event_after(g_conf
->mon_tick_interval
, new C_MonContext(this, [this](int) {
5103 void Monitor::tick()
5106 dout(11) << "tick" << dendl
;
5108 for (vector
<PaxosService
*>::iterator p
= paxos_service
.begin(); p
!= paxos_service
.end(); ++p
) {
5114 utime_t now
= ceph_clock_now();
5116 Mutex::Locker
l(session_map_lock
);
5117 auto p
= session_map
.sessions
.begin();
5119 bool out_for_too_long
= (!exited_quorum
.is_zero() &&
5120 now
> (exited_quorum
+ 2*g_conf
->mon_lease
));
5126 // don't trim monitors
5127 if (s
->inst
.name
.is_mon())
5130 if (s
->session_timeout
< now
&& s
->con
) {
5131 // check keepalive, too
5132 s
->session_timeout
= s
->con
->get_last_keepalive();
5133 s
->session_timeout
+= g_conf
->mon_session_timeout
;
5135 if (s
->session_timeout
< now
) {
5136 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5137 << " (timeout " << s
->session_timeout
5138 << " < now " << now
<< ")" << dendl
;
5139 } else if (out_for_too_long
) {
5140 // boot the client Session because we've taken too long getting back in
5141 dout(10) << " trimming session " << s
->con
<< " " << s
->inst
5142 << " because we've been out of quorum too long" << dendl
;
5147 s
->con
->mark_down();
5149 logger
->inc(l_mon_session_trim
);
5152 sync_trim_providers();
5154 if (!maybe_wait_for_quorum
.empty()) {
5155 finish_contexts(g_ceph_context
, maybe_wait_for_quorum
);
5158 if (is_leader() && paxos
->is_active() && fingerprint
.is_zero()) {
5159 // this is only necessary on upgraded clusters.
5160 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
5161 prepare_new_fingerprint(t
);
5162 paxos
->trigger_propose();
5168 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t
)
5171 nf
.generate_random();
5172 dout(10) << __func__
<< " proposing cluster_fingerprint " << nf
<< dendl
;
5176 t
->put(MONITOR_NAME
, "cluster_fingerprint", bl
);
5179 int Monitor::check_fsid()
5182 int r
= store
->get(MONITOR_NAME
, "cluster_uuid", ebl
);
5187 string
es(ebl
.c_str(), ebl
.length());
5189 // only keep the first line
5190 size_t pos
= es
.find_first_of('\n');
5191 if (pos
!= string::npos
)
5194 dout(10) << "check_fsid cluster_uuid contains '" << es
<< "'" << dendl
;
5196 if (!ondisk
.parse(es
.c_str())) {
5197 derr
<< "error: unable to parse uuid" << dendl
;
5201 if (monmap
->get_fsid() != ondisk
) {
5202 derr
<< "error: cluster_uuid file exists with value " << ondisk
5203 << ", != our uuid " << monmap
->get_fsid() << dendl
;
5210 int Monitor::write_fsid()
5212 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5214 int r
= store
->apply_transaction(t
);
5218 int Monitor::write_fsid(MonitorDBStore::TransactionRef t
)
5221 ss
<< monmap
->get_fsid() << "\n";
5222 string us
= ss
.str();
5227 t
->put(MONITOR_NAME
, "cluster_uuid", b
);
5232 * this is the closest thing to a traditional 'mkfs' for ceph.
5233 * initialize the monitor state machines to their initial values.
5235 int Monitor::mkfs(bufferlist
& osdmapbl
)
5237 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
5239 // verify cluster fsid
5240 int r
= check_fsid();
5241 if (r
< 0 && r
!= -ENOENT
)
5245 magicbl
.append(CEPH_MON_ONDISK_MAGIC
);
5246 magicbl
.append("\n");
5247 t
->put(MONITOR_NAME
, "magic", magicbl
);
5250 features
= get_initial_supported_features();
5253 // save monmap, osdmap, keyring.
5254 bufferlist monmapbl
;
5255 monmap
->encode(monmapbl
, CEPH_FEATURES_ALL
);
5256 monmap
->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5257 t
->put("mkfs", "monmap", monmapbl
);
5259 if (osdmapbl
.length()) {
5260 // make sure it's a valid osdmap
5263 om
.decode(osdmapbl
);
5265 catch (buffer::error
& e
) {
5266 derr
<< "error decoding provided osdmap: " << e
.what() << dendl
;
5269 t
->put("mkfs", "osdmap", osdmapbl
);
5272 if (is_keyring_required()) {
5274 string keyring_filename
;
5276 r
= ceph_resolve_file_search(g_conf
->keyring
, keyring_filename
);
5278 derr
<< "unable to find a keyring file on " << g_conf
->keyring
5279 << ": " << cpp_strerror(r
) << dendl
;
5280 if (g_conf
->key
!= "") {
5281 string keyring_plaintext
= "[mon.]\n\tkey = " + g_conf
->key
+
5282 "\n\tcaps mon = \"allow *\"\n";
5284 bl
.append(keyring_plaintext
);
5286 bufferlist::iterator i
= bl
.begin();
5287 keyring
.decode_plaintext(i
);
5289 catch (const buffer::error
& e
) {
5290 derr
<< "error decoding keyring " << keyring_plaintext
5291 << ": " << e
.what() << dendl
;
5298 r
= keyring
.load(g_ceph_context
, keyring_filename
);
5300 derr
<< "unable to load initial keyring " << g_conf
->keyring
<< dendl
;
5305 // put mon. key in external keyring; seed with everything else.
5306 extract_save_mon_key(keyring
);
5308 bufferlist keyringbl
;
5309 keyring
.encode_plaintext(keyringbl
);
5310 t
->put("mkfs", "keyring", keyringbl
);
5313 store
->apply_transaction(t
);
5318 int Monitor::write_default_keyring(bufferlist
& bl
)
5321 os
<< g_conf
->mon_data
<< "/keyring";
5324 int fd
= ::open(os
.str().c_str(), O_WRONLY
|O_CREAT
, 0600);
5327 dout(0) << __func__
<< " failed to open " << os
.str()
5328 << ": " << cpp_strerror(err
) << dendl
;
5332 err
= bl
.write_fd(fd
);
5335 VOID_TEMP_FAILURE_RETRY(::close(fd
));
5340 void Monitor::extract_save_mon_key(KeyRing
& keyring
)
5342 EntityName mon_name
;
5343 mon_name
.set_type(CEPH_ENTITY_TYPE_MON
);
5345 if (keyring
.get_auth(mon_name
, mon_key
)) {
5346 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl
;
5348 pkey
.add(mon_name
, mon_key
);
5350 pkey
.encode_plaintext(bl
);
5351 write_default_keyring(bl
);
5352 keyring
.remove(mon_name
);
5356 bool Monitor::ms_get_authorizer(int service_id
, AuthAuthorizer
**authorizer
,
5359 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id
)
5365 // we only connect to other monitors and mgr; every else connects to us.
5366 if (service_id
!= CEPH_ENTITY_TYPE_MON
&&
5367 service_id
!= CEPH_ENTITY_TYPE_MGR
)
5370 if (!auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5372 dout(20) << __func__
<< " building auth_none authorizer" << dendl
;
5373 AuthNoneClientHandler
handler(g_ceph_context
, nullptr);
5374 handler
.set_global_id(0);
5375 *authorizer
= handler
.build_authorizer(service_id
);
5379 CephXServiceTicketInfo auth_ticket_info
;
5380 CephXSessionAuthInfo info
;
5384 name
.set_type(CEPH_ENTITY_TYPE_MON
);
5385 auth_ticket_info
.ticket
.name
= name
;
5386 auth_ticket_info
.ticket
.global_id
= 0;
5388 if (service_id
== CEPH_ENTITY_TYPE_MON
) {
5389 // mon to mon authentication uses the private monitor shared key and not the
5392 if (!keyring
.get_secret(name
, secret
) &&
5393 !key_server
.get_secret(name
, secret
)) {
5394 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5396 stringstream ss
, ds
;
5397 int err
= key_server
.list_secrets(ds
);
5399 ss
<< "no installed auth entries!";
5401 ss
<< "installed auth entries:";
5402 dout(0) << ss
.str() << "\n" << ds
.str() << dendl
;
5406 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
,
5407 secret
, (uint64_t)-1);
5409 dout(0) << __func__
<< " failed to build mon session_auth_info "
5410 << cpp_strerror(ret
) << dendl
;
5413 } else if (service_id
== CEPH_ENTITY_TYPE_MGR
) {
5415 ret
= key_server
.build_session_auth_info(service_id
, auth_ticket_info
, info
);
5417 derr
<< __func__
<< " failed to build mgr service session_auth_info "
5418 << cpp_strerror(ret
) << dendl
;
5422 ceph_abort(); // see check at top of fn
5425 CephXTicketBlob blob
;
5426 if (!cephx_build_service_ticket_blob(cct
, info
, blob
)) {
5427 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl
;
5430 bufferlist ticket_data
;
5431 ::encode(blob
, ticket_data
);
5433 bufferlist::iterator iter
= ticket_data
.begin();
5434 CephXTicketHandler
handler(g_ceph_context
, service_id
);
5435 ::decode(handler
.ticket
, iter
);
5437 handler
.session_key
= info
.session_key
;
5439 *authorizer
= handler
.build_authorizer(0);
5444 bool Monitor::ms_verify_authorizer(Connection
*con
, int peer_type
,
5445 int protocol
, bufferlist
& authorizer_data
,
5446 bufferlist
& authorizer_reply
,
5447 bool& isvalid
, CryptoKey
& session_key
)
5449 dout(10) << "ms_verify_authorizer " << con
->get_peer_addr()
5450 << " " << ceph_entity_type_name(peer_type
)
5451 << " protocol " << protocol
<< dendl
;
5456 if (peer_type
== CEPH_ENTITY_TYPE_MON
&&
5457 auth_cluster_required
.is_supported_auth(CEPH_AUTH_CEPHX
)) {
5458 // monitor, and cephx is enabled
5460 if (protocol
== CEPH_AUTH_CEPHX
) {
5461 bufferlist::iterator iter
= authorizer_data
.begin();
5462 CephXServiceTicketInfo auth_ticket_info
;
5464 if (authorizer_data
.length()) {
5465 bool ret
= cephx_verify_authorizer(g_ceph_context
, &keyring
, iter
,
5466 auth_ticket_info
, authorizer_reply
);
5468 session_key
= auth_ticket_info
.session_key
;
5471 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con
->get_peer_addr() << dendl
;
5475 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl
;