]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Monitor.cc
update source to 12.2.11
[ceph.git] / ceph / src / mon / Monitor.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include <sstream>
17#include <stdlib.h>
18#include <signal.h>
19#include <limits.h>
20#include <cstring>
21#include <boost/scope_exit.hpp>
22#include <boost/algorithm/string/predicate.hpp>
23
24#include "Monitor.h"
25#include "common/version.h"
26
27#include "osd/OSDMap.h"
28
29#include "MonitorDBStore.h"
30
31#include "messages/PaxosServiceMessage.h"
32#include "messages/MMonMap.h"
33#include "messages/MMonGetMap.h"
34#include "messages/MMonGetVersion.h"
35#include "messages/MMonGetVersionReply.h"
36#include "messages/MGenericMessage.h"
37#include "messages/MMonCommand.h"
38#include "messages/MMonCommandAck.h"
31f18b77 39#include "messages/MMonHealth.h"
7c673cae
FG
40#include "messages/MMonMetadata.h"
41#include "messages/MMonSync.h"
42#include "messages/MMonScrub.h"
43#include "messages/MMonProbe.h"
44#include "messages/MMonJoin.h"
45#include "messages/MMonPaxos.h"
46#include "messages/MRoute.h"
47#include "messages/MForward.h"
b32b8144 48#include "messages/MStatfs.h"
7c673cae
FG
49
50#include "messages/MMonSubscribe.h"
51#include "messages/MMonSubscribeAck.h"
52
53#include "messages/MAuthReply.h"
54
55#include "messages/MTimeCheck.h"
7c673cae
FG
56#include "messages/MPing.h"
57
58#include "common/strtol.h"
59#include "common/ceph_argparse.h"
60#include "common/Timer.h"
61#include "common/Clock.h"
62#include "common/errno.h"
63#include "common/perf_counters.h"
64#include "common/admin_socket.h"
65#include "global/signal_handler.h"
66#include "common/Formatter.h"
67#include "include/stringify.h"
68#include "include/color.h"
69#include "include/ceph_fs.h"
70#include "include/str_list.h"
71
72#include "OSDMonitor.h"
73#include "MDSMonitor.h"
74#include "MonmapMonitor.h"
75#include "PGMonitor.h"
76#include "LogMonitor.h"
77#include "AuthMonitor.h"
78#include "MgrMonitor.h"
31f18b77 79#include "MgrStatMonitor.h"
7c673cae 80#include "mon/QuorumService.h"
224ce89b 81#include "mon/OldHealthMonitor.h"
7c673cae
FG
82#include "mon/HealthMonitor.h"
83#include "mon/ConfigKeyService.h"
84#include "common/config.h"
85#include "common/cmdparse.h"
86#include "include/assert.h"
87#include "include/compat.h"
88#include "perfglue/heap_profiler.h"
89
90#include "auth/none/AuthNoneClientHandler.h"
91
92#define dout_subsys ceph_subsys_mon
93#undef dout_prefix
94#define dout_prefix _prefix(_dout, this)
95static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
96 return *_dout << "mon." << mon->name << "@" << mon->rank
97 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
98}
99
100const string Monitor::MONITOR_NAME = "monitor";
101const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
102
103
104#undef FLAG
105#undef COMMAND
106#undef COMMAND_WITH_FLAG
7c673cae
FG
107#define FLAG(f) (MonCommand::FLAG_##f)
108#define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
109 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
110#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
111 {parsesig, helptext, modulename, req_perms, avail, flags},
d2e6a577 112MonCommand mon_commands[] = {
7c673cae 113#include <mon/MonCommands.h>
d2e6a577
FG
114};
115MonCommand pgmonitor_commands[] = {
116#include <mon/PGMonitorCommands.h>
117};
7c673cae
FG
118#undef COMMAND
119#undef COMMAND_WITH_FLAG
7c673cae
FG
120
121
7c673cae
FG
122void C_MonContext::finish(int r) {
123 if (mon->is_shutdown())
124 return;
125 FunctionContext::finish(r);
126}
127
128Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
129 Messenger *m, Messenger *mgr_m, MonMap *map) :
130 Dispatcher(cct_),
131 name(nm),
132 rank(-1),
133 messenger(m),
134 con_self(m ? m->get_loopback_connection() : NULL),
135 lock("Monitor::lock"),
136 timer(cct_, lock),
137 finisher(cct_, "mon_finisher", "fin"),
138 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
139 has_ever_joined(false),
140 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
141 monmap(map),
142 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
143 key_server(cct, &keyring),
144 auth_cluster_required(cct,
145 cct->_conf->auth_supported.empty() ?
146 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
147 auth_service_required(cct,
148 cct->_conf->auth_supported.empty() ?
149 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
7c673cae
FG
150 mgr_messenger(mgr_m),
151 mgr_client(cct_, mgr_m),
31f18b77 152 pgservice(nullptr),
7c673cae
FG
153 store(s),
154
155 state(STATE_PROBING),
156
157 elector(this),
158 required_features(0),
159 leader(0),
160 quorum_con_features(0),
161 // scrub
162 scrub_version(0),
163 scrub_event(NULL),
164 scrub_timeout_event(NULL),
165
166 // sync state
167 sync_provider_count(0),
168 sync_cookie(0),
169 sync_full(false),
170 sync_start_version(0),
171 sync_timeout_event(NULL),
172 sync_last_committed_floor(0),
173
174 timecheck_round(0),
175 timecheck_acks(0),
176 timecheck_rounds_since_clean(0),
177 timecheck_event(NULL),
178
179 paxos_service(PAXOS_NUM),
180 admin_hook(NULL),
181 routed_request_tid(0),
182 op_tracker(cct, true, 1)
183{
184 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
185 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
186
187 update_log_clients();
188
189 paxos = new Paxos(this, "paxos");
190
191 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
192 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
193 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
194 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
195 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
196 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
197 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
31f18b77 198 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
224ce89b 199 paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
7c673cae 200
224ce89b 201 health_monitor = new OldHealthMonitor(this);
7c673cae
FG
202 config_key_service = new ConfigKeyService(this, paxos);
203
204 mon_caps = new MonCap();
205 bool r = mon_caps->parse("allow *", NULL);
206 assert(r);
207
208 exited_quorum = ceph_clock_now();
209
d2e6a577
FG
210 // prepare local commands
211 local_mon_commands.resize(ARRAY_SIZE(mon_commands));
212 for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
213 local_mon_commands[i] = mon_commands[i];
214 }
215 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
216
217 local_upgrading_mon_commands = local_mon_commands;
218 for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
219 local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
220 }
221 MonCommand::encode_vector(local_upgrading_mon_commands,
222 local_upgrading_mon_commands_bl);
223
7c673cae
FG
224 // assume our commands until we have an election. this only means
225 // we won't reply with EINVAL before the election; any command that
226 // actually matters will wait until we have quorum etc and then
227 // retry (and revalidate).
d2e6a577 228 leader_mon_commands = local_mon_commands;
31f18b77
FG
229
230 // note: OSDMonitor may update this based on the luminous flag.
231 pgservice = mgrstatmon()->get_pg_stat_service();
7c673cae
FG
232}
233
7c673cae
FG
234Monitor::~Monitor()
235{
236 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
237 delete *p;
238 delete health_monitor;
239 delete config_key_service;
240 delete paxos;
241 assert(session_map.sessions.empty());
242 delete mon_caps;
7c673cae
FG
243}
244
245
246class AdminHook : public AdminSocketHook {
247 Monitor *mon;
248public:
249 explicit AdminHook(Monitor *m) : mon(m) {}
250 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
251 bufferlist& out) override {
252 stringstream ss;
253 mon->do_admin_command(command, cmdmap, format, ss);
254 out.append(ss);
255 return true;
256 }
257};
258
259void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
260 ostream& ss)
261{
262 Mutex::Locker l(lock);
263
264 boost::scoped_ptr<Formatter> f(Formatter::create(format));
265
266 string args;
267 for (cmdmap_t::iterator p = cmdmap.begin();
268 p != cmdmap.end(); ++p) {
269 if (p->first == "prefix")
270 continue;
271 if (!args.empty())
272 args += ", ";
273 args += cmd_vartype_stringify(p->second);
274 }
275 args = "[" + args + "]";
276
277 bool read_only = (command == "mon_status" ||
278 command == "mon metadata" ||
279 command == "quorum_status" ||
224ce89b
WB
280 command == "ops" ||
281 command == "sessions");
7c673cae
FG
282
283 (read_only ? audit_clog->debug() : audit_clog->info())
284 << "from='admin socket' entity='admin socket' "
285 << "cmd='" << command << "' args=" << args << ": dispatch";
286
287 if (command == "mon_status") {
288 get_mon_status(f.get(), ss);
289 if (f)
290 f->flush(ss);
291 } else if (command == "quorum_status") {
292 _quorum_status(f.get(), ss);
293 } else if (command == "sync_force") {
294 string validate;
295 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
296 (validate != "--yes-i-really-mean-it")) {
297 ss << "are you SURE? this will mean the monitor store will be erased "
298 "the next time the monitor is restarted. pass "
299 "'--yes-i-really-mean-it' if you really do.";
300 goto abort;
301 }
302 sync_force(f.get(), ss);
303 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
304 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
305 goto abort;
306 } else if (command == "quorum enter") {
307 elector.start_participating();
308 start_election();
309 ss << "started responding to quorum, initiated new election";
310 } else if (command == "quorum exit") {
311 start_election();
312 elector.stop_participating();
313 ss << "stopped responding to quorum, initiated new election";
314 } else if (command == "ops") {
315 (void)op_tracker.dump_ops_in_flight(f.get());
316 if (f) {
317 f->flush(ss);
318 }
224ce89b
WB
319 } else if (command == "sessions") {
320
321 if (f) {
322 f->open_array_section("sessions");
323 for (auto p : session_map.sessions) {
324 f->dump_stream("session") << *p;
325 }
326 f->close_section();
327 f->flush(ss);
328 }
329
7c673cae
FG
330 } else {
331 assert(0 == "bad AdminSocket command binding");
332 }
333 (read_only ? audit_clog->debug() : audit_clog->info())
334 << "from='admin socket' "
335 << "entity='admin socket' "
336 << "cmd=" << command << " "
337 << "args=" << args << ": finished";
338 return;
339
340abort:
341 (read_only ? audit_clog->debug() : audit_clog->info())
342 << "from='admin socket' "
343 << "entity='admin socket' "
344 << "cmd=" << command << " "
345 << "args=" << args << ": aborted";
346}
347
348void Monitor::handle_signal(int signum)
349{
350 assert(signum == SIGINT || signum == SIGTERM);
351 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
352 shutdown();
353}
354
355CompatSet Monitor::get_initial_supported_features()
356{
357 CompatSet::FeatureSet ceph_mon_feature_compat;
358 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
359 CompatSet::FeatureSet ceph_mon_feature_incompat;
360 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
361 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
362 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
363 ceph_mon_feature_incompat);
364}
365
366CompatSet Monitor::get_supported_features()
367{
368 CompatSet compat = get_initial_supported_features();
369 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
370 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
371 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
372 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
373 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
181888fb 374 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
7c673cae
FG
375 return compat;
376}
377
378CompatSet Monitor::get_legacy_features()
379{
380 CompatSet::FeatureSet ceph_mon_feature_compat;
381 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
382 CompatSet::FeatureSet ceph_mon_feature_incompat;
383 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
384 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
385 ceph_mon_feature_incompat);
386}
387
388int Monitor::check_features(MonitorDBStore *store)
389{
390 CompatSet required = get_supported_features();
391 CompatSet ondisk;
392
393 read_features_off_disk(store, &ondisk);
394
395 if (!required.writeable(ondisk)) {
396 CompatSet diff = required.unsupported(ondisk);
397 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
398 return -EPERM;
399 }
400
401 return 0;
402}
403
404void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
405{
406 bufferlist featuresbl;
407 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
408 if (featuresbl.length() == 0) {
409 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
410 << "Assuming it is old-style and introducing one." << dendl;
411 //we only want the baseline ~v.18 features assumed to be on disk.
412 //If new features are introduced this code needs to disappear or
413 //be made smarter.
414 *features = get_legacy_features();
415
416 features->encode(featuresbl);
417 auto t(std::make_shared<MonitorDBStore::Transaction>());
418 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
419 store->apply_transaction(t);
420 } else {
421 bufferlist::iterator it = featuresbl.begin();
422 features->decode(it);
423 }
424}
425
426void Monitor::read_features()
427{
428 read_features_off_disk(store, &features);
429 dout(10) << "features " << features << dendl;
430
431 calc_quorum_requirements();
432 dout(10) << "required_features " << required_features << dendl;
433}
434
435void Monitor::write_features(MonitorDBStore::TransactionRef t)
436{
437 bufferlist bl;
438 features.encode(bl);
439 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
440}
441
442const char** Monitor::get_tracked_conf_keys() const
443{
444 static const char* KEYS[] = {
445 "crushtool", // helpful for testing
446 "mon_election_timeout",
447 "mon_lease",
448 "mon_lease_renew_interval_factor",
449 "mon_lease_ack_timeout_factor",
450 "mon_accept_timeout_factor",
451 // clog & admin clog
452 "clog_to_monitors",
453 "clog_to_syslog",
454 "clog_to_syslog_facility",
455 "clog_to_syslog_level",
456 "clog_to_graylog",
457 "clog_to_graylog_host",
458 "clog_to_graylog_port",
459 "host",
460 "fsid",
461 // periodic health to clog
462 "mon_health_to_clog",
463 "mon_health_to_clog_interval",
464 "mon_health_to_clog_tick_interval",
465 // scrub interval
466 "mon_scrub_interval",
467 NULL
468 };
469 return KEYS;
470}
471
472void Monitor::handle_conf_change(const struct md_config_t *conf,
473 const std::set<std::string> &changed)
474{
475 sanitize_options();
476
477 dout(10) << __func__ << " " << changed << dendl;
478
479 if (changed.count("clog_to_monitors") ||
480 changed.count("clog_to_syslog") ||
481 changed.count("clog_to_syslog_level") ||
482 changed.count("clog_to_syslog_facility") ||
483 changed.count("clog_to_graylog") ||
484 changed.count("clog_to_graylog_host") ||
485 changed.count("clog_to_graylog_port") ||
486 changed.count("host") ||
487 changed.count("fsid")) {
488 update_log_clients();
489 }
490
491 if (changed.count("mon_health_to_clog") ||
492 changed.count("mon_health_to_clog_interval") ||
493 changed.count("mon_health_to_clog_tick_interval")) {
494 health_to_clog_update_conf(changed);
495 }
496
497 if (changed.count("mon_scrub_interval")) {
498 scrub_update_interval(conf->mon_scrub_interval);
499 }
500}
501
502void Monitor::update_log_clients()
503{
504 map<string,string> log_to_monitors;
505 map<string,string> log_to_syslog;
506 map<string,string> log_channel;
507 map<string,string> log_prio;
508 map<string,string> log_to_graylog;
509 map<string,string> log_to_graylog_host;
510 map<string,string> log_to_graylog_port;
511 uuid_d fsid;
512 string host;
513
514 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
515 log_channel, log_prio, log_to_graylog,
516 log_to_graylog_host, log_to_graylog_port,
517 fsid, host))
518 return;
519
520 clog->update_config(log_to_monitors, log_to_syslog,
521 log_channel, log_prio, log_to_graylog,
522 log_to_graylog_host, log_to_graylog_port,
523 fsid, host);
524
525 audit_clog->update_config(log_to_monitors, log_to_syslog,
526 log_channel, log_prio, log_to_graylog,
527 log_to_graylog_host, log_to_graylog_port,
528 fsid, host);
529}
530
531int Monitor::sanitize_options()
532{
533 int r = 0;
534
535 // mon_lease must be greater than mon_lease_renewal; otherwise we
536 // may incur in leases expiring before they are renewed.
537 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
538 clog->error() << "mon_lease_renew_interval_factor ("
539 << g_conf->mon_lease_renew_interval_factor
540 << ") must be less than 1.0";
541 r = -EINVAL;
542 }
543
544 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
545 // got time to renew the lease and get an ack for it. Having both options
546 // with the same value, for a given small vale, could mean timing out if
547 // the monitors happened to be overloaded -- or even under normal load for
548 // a small enough value.
549 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
550 clog->error() << "mon_lease_ack_timeout_factor ("
551 << g_conf->mon_lease_ack_timeout_factor
552 << ") must be greater than 1.0";
553 r = -EINVAL;
554 }
555
556 return r;
557}
558
559int Monitor::preinit()
560{
561 lock.Lock();
562
563 dout(1) << "preinit fsid " << monmap->fsid << dendl;
564
565 int r = sanitize_options();
566 if (r < 0) {
567 derr << "option sanitization failed!" << dendl;
568 lock.Unlock();
569 return r;
570 }
571
572 assert(!logger);
573 {
574 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
3efd9988
FG
575 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
576 PerfCountersBuilder::PRIO_USEFUL);
577 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
578 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
579 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
580 "srm", PerfCountersBuilder::PRIO_INTERESTING);
581 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
582 "strm", PerfCountersBuilder::PRIO_USEFUL);
583 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
584 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
585 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
586 "estt", PerfCountersBuilder::PRIO_INTERESTING);
587 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
588 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
589 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
590 "elst", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
591 logger = pcb.create_perf_counters();
592 cct->get_perfcounters_collection()->add(logger);
593 }
594
595 assert(!cluster_logger);
596 {
597 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
598 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
599 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
600 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
601 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
602 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
603 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
1adf2230
AA
604 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(BYTES));
605 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(BYTES));
606 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(BYTES));
7c673cae
FG
607 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
608 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
609 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
610 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
611 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
612 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
613 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
614 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
615 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
616 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
617 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
618 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
619 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
620 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
621 cluster_logger = pcb.create_perf_counters();
622 }
623
624 paxos->init_logger();
625
626 // verify cluster_uuid
627 {
628 int r = check_fsid();
629 if (r == -ENOENT)
630 r = write_fsid();
631 if (r < 0) {
632 lock.Unlock();
633 return r;
634 }
635 }
636
637 // open compatset
638 read_features();
639
640 // have we ever joined a quorum?
641 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
642 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
643
644 if (!has_ever_joined) {
645 // impose initial quorum restrictions?
646 list<string> initial_members;
647 get_str_list(g_conf->mon_initial_members, initial_members);
648
649 if (!initial_members.empty()) {
650 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
651
652 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
653 &extra_probe_peers);
654
655 dout(10) << " monmap is " << *monmap << dendl;
656 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
657 }
658 } else if (!monmap->contains(name)) {
659 derr << "not in monmap and have been in a quorum before; "
660 << "must have been removed" << dendl;
661 if (g_conf->mon_force_quorum_join) {
662 dout(0) << "we should have died but "
663 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
664 } else {
665 derr << "commit suicide!" << dendl;
666 lock.Unlock();
667 return -ENOENT;
668 }
669 }
670
671 {
672 // We have a potentially inconsistent store state in hands. Get rid of it
673 // and start fresh.
674 bool clear_store = false;
675 if (store->exists("mon_sync", "in_sync")) {
676 dout(1) << __func__ << " clean up potentially inconsistent store state"
677 << dendl;
678 clear_store = true;
679 }
680
681 if (store->get("mon_sync", "force_sync") > 0) {
682 dout(1) << __func__ << " force sync by clearing store state" << dendl;
683 clear_store = true;
684 }
685
686 if (clear_store) {
687 set<string> sync_prefixes = get_sync_targets_names();
688 store->clear(sync_prefixes);
689 }
690 }
691
692 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
693 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
694
695 init_paxos();
696 health_monitor->init();
697
698 if (is_keyring_required()) {
699 // we need to bootstrap authentication keys so we can form an
700 // initial quorum.
701 if (authmon()->get_last_committed() == 0) {
702 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
703 bufferlist bl;
704 int err = store->get("mkfs", "keyring", bl);
705 if (err == 0 && bl.length() > 0) {
706 // Attempt to decode and extract keyring only if it is found.
707 KeyRing keyring;
708 bufferlist::iterator p = bl.begin();
709 ::decode(keyring, p);
710 extract_save_mon_key(keyring);
711 }
712 }
713
714 string keyring_loc = g_conf->mon_data + "/keyring";
715
716 r = keyring.load(cct, keyring_loc);
717 if (r < 0) {
718 EntityName mon_name;
719 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
720 EntityAuth mon_key;
721 if (key_server.get_auth(mon_name, mon_key)) {
722 dout(1) << "copying mon. key from old db to external keyring" << dendl;
723 keyring.add(mon_name, mon_key);
724 bufferlist bl;
725 keyring.encode_plaintext(bl);
726 write_default_keyring(bl);
727 } else {
728 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
729 lock.Unlock();
730 return r;
731 }
732 }
733 }
734
735 admin_hook = new AdminHook(this);
736 AdminSocket* admin_socket = cct->get_admin_socket();
737
738 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
739 lock.Unlock();
740 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
741 "show current monitor status");
742 assert(r == 0);
743 r = admin_socket->register_command("quorum_status", "quorum_status",
744 admin_hook, "show current quorum status");
745 assert(r == 0);
746 r = admin_socket->register_command("sync_force",
747 "sync_force name=validate,"
748 "type=CephChoices,"
749 "strings=--yes-i-really-mean-it",
750 admin_hook,
751 "force sync of and clear monitor store");
752 assert(r == 0);
753 r = admin_socket->register_command("add_bootstrap_peer_hint",
754 "add_bootstrap_peer_hint name=addr,"
755 "type=CephIPAddr",
756 admin_hook,
757 "add peer address as potential bootstrap"
758 " peer for cluster bringup");
759 assert(r == 0);
760 r = admin_socket->register_command("quorum enter", "quorum enter",
761 admin_hook,
762 "force monitor back into quorum");
763 assert(r == 0);
764 r = admin_socket->register_command("quorum exit", "quorum exit",
765 admin_hook,
766 "force monitor out of the quorum");
767 assert(r == 0);
768 r = admin_socket->register_command("ops",
769 "ops",
770 admin_hook,
771 "show the ops currently in flight");
772 assert(r == 0);
224ce89b
WB
773 r = admin_socket->register_command("sessions",
774 "sessions",
775 admin_hook,
776 "list existing sessions");
777 assert(r == 0);
7c673cae
FG
778
779 lock.Lock();
780
781 // add ourselves as a conf observer
782 g_conf->add_observer(this);
783
784 lock.Unlock();
785 return 0;
786}
787
788int Monitor::init()
789{
790 dout(2) << "init" << dendl;
791 Mutex::Locker l(lock);
792
793 finisher.start();
794
795 // start ticker
796 timer.init();
797 new_tick();
798
799 cpu_tp.start();
800
801 // i'm ready!
802 messenger->add_dispatcher_tail(this);
803
804 mgr_client.init();
805 mgr_messenger->add_dispatcher_tail(&mgr_client);
806 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
807
808 bootstrap();
b5b8bbf5
FG
809 // add features of myself into feature_map
810 session_map.feature_map.add_mon(con_self->get_features());
7c673cae
FG
811 return 0;
812}
813
814void Monitor::init_paxos()
815{
816 dout(10) << __func__ << dendl;
817 paxos->init();
818
819 // init services
820 for (int i = 0; i < PAXOS_NUM; ++i) {
821 paxos_service[i]->init();
822 }
823
824 refresh_from_paxos(NULL);
825}
826
827void Monitor::refresh_from_paxos(bool *need_bootstrap)
828{
829 dout(10) << __func__ << dendl;
830
831 bufferlist bl;
832 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
833 if (r >= 0) {
834 try {
835 bufferlist::iterator p = bl.begin();
836 ::decode(fingerprint, p);
837 }
838 catch (buffer::error& e) {
839 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
840 }
841 } else {
842 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
843 }
844
845 for (int i = 0; i < PAXOS_NUM; ++i) {
846 paxos_service[i]->refresh(need_bootstrap);
847 }
848 for (int i = 0; i < PAXOS_NUM; ++i) {
849 paxos_service[i]->post_refresh();
850 }
224ce89b 851 load_metadata();
7c673cae
FG
852}
853
854void Monitor::register_cluster_logger()
855{
856 if (!cluster_logger_registered) {
857 dout(10) << "register_cluster_logger" << dendl;
858 cluster_logger_registered = true;
859 cct->get_perfcounters_collection()->add(cluster_logger);
860 } else {
861 dout(10) << "register_cluster_logger - already registered" << dendl;
862 }
863}
864
865void Monitor::unregister_cluster_logger()
866{
867 if (cluster_logger_registered) {
868 dout(10) << "unregister_cluster_logger" << dendl;
869 cluster_logger_registered = false;
870 cct->get_perfcounters_collection()->remove(cluster_logger);
871 } else {
872 dout(10) << "unregister_cluster_logger - not registered" << dendl;
873 }
874}
875
876void Monitor::update_logger()
877{
878 cluster_logger->set(l_cluster_num_mon, monmap->size());
879 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
880}
881
882void Monitor::shutdown()
883{
884 dout(1) << "shutdown" << dendl;
885
886 lock.Lock();
887
888 wait_for_paxos_write();
889
890 state = STATE_SHUTDOWN;
891
f64942e4 892 lock.Unlock();
7c673cae 893 g_conf->remove_observer(this);
f64942e4 894 lock.Lock();
7c673cae
FG
895
896 if (admin_hook) {
897 AdminSocket* admin_socket = cct->get_admin_socket();
898 admin_socket->unregister_command("mon_status");
899 admin_socket->unregister_command("quorum_status");
900 admin_socket->unregister_command("sync_force");
901 admin_socket->unregister_command("add_bootstrap_peer_hint");
902 admin_socket->unregister_command("quorum enter");
903 admin_socket->unregister_command("quorum exit");
904 admin_socket->unregister_command("ops");
224ce89b 905 admin_socket->unregister_command("sessions");
7c673cae
FG
906 delete admin_hook;
907 admin_hook = NULL;
908 }
909
910 elector.shutdown();
911
912 mgr_client.shutdown();
913
914 lock.Unlock();
915 finisher.wait_for_empty();
916 finisher.stop();
917 lock.Lock();
918
919 // clean up
920 paxos->shutdown();
921 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
922 (*p)->shutdown();
923 health_monitor->shutdown();
924
925 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
926 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
927
928 timer.shutdown();
929
930 cpu_tp.stop();
931
932 remove_all_sessions();
933
f64942e4
AA
934 log_client.shutdown();
935
936 // unlock before msgr shutdown...
937 lock.Unlock();
938
939 // shutdown messenger before removing logger from perfcounter collection,
940 // otherwise _ms_dispatch() will try to update deleted logger
941 messenger->shutdown();
942 mgr_messenger->shutdown();
943
7c673cae
FG
944 if (logger) {
945 cct->get_perfcounters_collection()->remove(logger);
946 delete logger;
947 logger = NULL;
948 }
949 if (cluster_logger) {
950 if (cluster_logger_registered)
951 cct->get_perfcounters_collection()->remove(cluster_logger);
952 delete cluster_logger;
953 cluster_logger = NULL;
954 }
7c673cae
FG
955}
956
957void Monitor::wait_for_paxos_write()
958{
959 if (paxos->is_writing() || paxos->is_writing_previous()) {
960 dout(10) << __func__ << " flushing pending write" << dendl;
961 lock.Unlock();
962 store->flush();
963 lock.Lock();
964 dout(10) << __func__ << " flushed pending write" << dendl;
965 }
966}
967
968void Monitor::bootstrap()
969{
970 dout(10) << "bootstrap" << dendl;
971 wait_for_paxos_write();
972
973 sync_reset_requester();
974 unregister_cluster_logger();
975 cancel_probe_timeout();
976
977 // note my rank
978 int newrank = monmap->get_rank(messenger->get_myaddr());
979 if (newrank < 0 && rank >= 0) {
980 // was i ever part of the quorum?
981 if (has_ever_joined) {
982 dout(0) << " removed from monmap, suicide." << dendl;
983 exit(0);
984 }
985 }
986 if (newrank != rank) {
987 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
988 messenger->set_myname(entity_name_t::MON(newrank));
989 rank = newrank;
990
991 // reset all connections, or else our peers will think we are someone else.
992 messenger->mark_down_all();
993 }
994
995 // reset
996 state = STATE_PROBING;
997
998 _reset();
999
1000 // sync store
1001 if (g_conf->mon_compact_on_bootstrap) {
1002 dout(10) << "bootstrap -- triggering compaction" << dendl;
1003 store->compact();
1004 dout(10) << "bootstrap -- finished compaction" << dendl;
1005 }
1006
1007 // singleton monitor?
1008 if (monmap->size() == 1 && rank == 0) {
1009 win_standalone_election();
1010 return;
1011 }
1012
1013 reset_probe_timeout();
1014
1015 // i'm outside the quorum
1016 if (monmap->contains(name))
1017 outside_quorum.insert(name);
1018
1019 // probe monitors
1020 dout(10) << "probing other monitors" << dendl;
1021 for (unsigned i = 0; i < monmap->size(); i++) {
1022 if ((int)i != rank)
1023 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1024 monmap->get_inst(i));
1025 }
1026 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1027 p != extra_probe_peers.end();
1028 ++p) {
1029 if (*p != messenger->get_myaddr()) {
1030 entity_inst_t i;
1031 i.name = entity_name_t::MON(-1);
1032 i.addr = *p;
1033 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1034 }
1035 }
1036}
1037
1038bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1039{
1040 string addrstr;
1041 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1042 ss << "unable to parse address string value '"
1043 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1044 return false;
1045 }
1046 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1047 << addrstr << "'" << dendl;
1048
1049 entity_addr_t addr;
1050 const char *end = 0;
1051 if (!addr.parse(addrstr.c_str(), &end)) {
1052 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1053 return false;
1054 }
1055
1056 if (is_leader() || is_peon()) {
1057 ss << "mon already active; ignoring bootstrap hint";
1058 return true;
1059 }
1060
1061 if (addr.get_port() == 0)
1062 addr.set_port(CEPH_MON_PORT);
1063
1064 extra_probe_peers.insert(addr);
1065 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1066 return true;
1067}
1068
1069// called by bootstrap(), or on leader|peon -> electing
1070void Monitor::_reset()
1071{
1072 dout(10) << __func__ << dendl;
1073
1074 cancel_probe_timeout();
1075 timecheck_finish();
1076 health_events_cleanup();
181888fb 1077 health_check_log_times.clear();
7c673cae
FG
1078 scrub_event_cancel();
1079
1080 leader_since = utime_t();
1081 if (!quorum.empty()) {
1082 exited_quorum = ceph_clock_now();
1083 }
1084 quorum.clear();
1085 outside_quorum.clear();
31f18b77 1086 quorum_feature_map.clear();
7c673cae
FG
1087
1088 scrub_reset();
1089
1090 paxos->restart();
1091
1092 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1093 (*p)->restart();
1094 health_monitor->finish();
1095}
1096
1097
1098// -----------------------------------------------------------
1099// sync
1100
1101set<string> Monitor::get_sync_targets_names()
1102{
1103 set<string> targets;
1104 targets.insert(paxos->get_name());
1105 for (int i = 0; i < PAXOS_NUM; ++i)
1106 paxos_service[i]->get_store_prefixes(targets);
1107 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1108 assert(config_key_service_ptr);
1109 config_key_service_ptr->get_store_prefixes(targets);
1110 return targets;
1111}
1112
1113
1114void Monitor::sync_timeout()
1115{
1116 dout(10) << __func__ << dendl;
1117 assert(state == STATE_SYNCHRONIZING);
1118 bootstrap();
1119}
1120
1121void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1122{
1123 dout(1) << __func__ << dendl;
1124
1125 MonMap latest_monmap;
1126
1127 // Grab latest monmap from MonmapMonitor
1128 bufferlist monmon_bl;
1129 int err = monmon()->get_monmap(monmon_bl);
1130 if (err < 0) {
1131 if (err != -ENOENT) {
1132 derr << __func__
1133 << " something wrong happened while reading the store: "
1134 << cpp_strerror(err) << dendl;
1135 assert(0 == "error reading the store");
1136 }
1137 } else {
1138 latest_monmap.decode(monmon_bl);
1139 }
1140
1141 // Grab last backed up monmap (if any) and compare epochs
1142 if (store->exists("mon_sync", "latest_monmap")) {
1143 bufferlist backup_bl;
1144 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1145 if (err < 0) {
1146 derr << __func__
1147 << " something wrong happened while reading the store: "
1148 << cpp_strerror(err) << dendl;
1149 assert(0 == "error reading the store");
1150 }
1151 assert(backup_bl.length() > 0);
1152
1153 MonMap backup_monmap;
1154 backup_monmap.decode(backup_bl);
1155
1156 if (backup_monmap.epoch > latest_monmap.epoch)
1157 latest_monmap = backup_monmap;
1158 }
1159
1160 // Check if our current monmap's epoch is greater than the one we've
1161 // got so far.
1162 if (monmap->epoch > latest_monmap.epoch)
1163 latest_monmap = *monmap;
1164
1165 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1166
1167 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1168}
1169
1170void Monitor::sync_reset_requester()
1171{
1172 dout(10) << __func__ << dendl;
1173
1174 if (sync_timeout_event) {
1175 timer.cancel_event(sync_timeout_event);
1176 sync_timeout_event = NULL;
1177 }
1178
1179 sync_provider = entity_inst_t();
1180 sync_cookie = 0;
1181 sync_full = false;
1182 sync_start_version = 0;
1183}
1184
1185void Monitor::sync_reset_provider()
1186{
1187 dout(10) << __func__ << dendl;
1188 sync_providers.clear();
1189}
1190
1191void Monitor::sync_start(entity_inst_t &other, bool full)
1192{
1193 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1194
1195 assert(state == STATE_PROBING ||
1196 state == STATE_SYNCHRONIZING);
1197 state = STATE_SYNCHRONIZING;
1198
1199 // make sure are not a provider for anyone!
1200 sync_reset_provider();
1201
1202 sync_full = full;
1203
1204 if (sync_full) {
1205 // stash key state, and mark that we are syncing
1206 auto t(std::make_shared<MonitorDBStore::Transaction>());
1207 sync_stash_critical_state(t);
1208 t->put("mon_sync", "in_sync", 1);
1209
1210 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1211 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1212 << sync_last_committed_floor << dendl;
1213 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1214
1215 store->apply_transaction(t);
1216
1217 assert(g_conf->mon_sync_requester_kill_at != 1);
1218
1219 // clear the underlying store
1220 set<string> targets = get_sync_targets_names();
1221 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1222 store->clear(targets);
1223
1224 // make sure paxos knows it has been reset. this prevents a
1225 // bootstrap and then different probe reply order from possibly
1226 // deciding a partial or no sync is needed.
1227 paxos->init();
1228
1229 assert(g_conf->mon_sync_requester_kill_at != 2);
1230 }
1231
1232 // assume 'other' as the leader. We will update the leader once we receive
1233 // a reply to the sync start.
1234 sync_provider = other;
1235
1236 sync_reset_timeout();
1237
1238 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1239 if (!sync_full)
1240 m->last_committed = paxos->get_version();
1241 messenger->send_message(m, sync_provider);
1242}
1243
1244void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1245{
1246 dout(10) << __func__ << dendl;
1247 bufferlist backup_monmap;
1248 sync_obtain_latest_monmap(backup_monmap);
1249 assert(backup_monmap.length() > 0);
1250 t->put("mon_sync", "latest_monmap", backup_monmap);
1251}
1252
1253void Monitor::sync_reset_timeout()
1254{
1255 dout(10) << __func__ << dendl;
1256 if (sync_timeout_event)
1257 timer.cancel_event(sync_timeout_event);
3efd9988
FG
1258 sync_timeout_event = timer.add_event_after(
1259 g_conf->mon_sync_timeout,
1260 new C_MonContext(this, [this](int) {
1261 sync_timeout();
1262 }));
7c673cae
FG
1263}
1264
1265void Monitor::sync_finish(version_t last_committed)
1266{
1267 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1268
1269 assert(g_conf->mon_sync_requester_kill_at != 7);
1270
1271 if (sync_full) {
1272 // finalize the paxos commits
1273 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1274 paxos->read_and_prepare_transactions(tx, sync_start_version,
1275 last_committed);
1276 tx->put(paxos->get_name(), "last_committed", last_committed);
1277
1278 dout(30) << __func__ << " final tx dump:\n";
1279 JSONFormatter f(true);
1280 tx->dump(&f);
1281 f.flush(*_dout);
1282 *_dout << dendl;
1283
1284 store->apply_transaction(tx);
1285 }
1286
1287 assert(g_conf->mon_sync_requester_kill_at != 8);
1288
1289 auto t(std::make_shared<MonitorDBStore::Transaction>());
1290 t->erase("mon_sync", "in_sync");
1291 t->erase("mon_sync", "force_sync");
1292 t->erase("mon_sync", "last_committed_floor");
1293 store->apply_transaction(t);
1294
1295 assert(g_conf->mon_sync_requester_kill_at != 9);
1296
1297 init_paxos();
1298
1299 assert(g_conf->mon_sync_requester_kill_at != 10);
1300
1301 bootstrap();
1302}
1303
1304void Monitor::handle_sync(MonOpRequestRef op)
1305{
1306 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1307 dout(10) << __func__ << " " << *m << dendl;
1308 switch (m->op) {
1309
1310 // provider ---------
1311
1312 case MMonSync::OP_GET_COOKIE_FULL:
1313 case MMonSync::OP_GET_COOKIE_RECENT:
1314 handle_sync_get_cookie(op);
1315 break;
1316 case MMonSync::OP_GET_CHUNK:
1317 handle_sync_get_chunk(op);
1318 break;
1319
1320 // client -----------
1321
1322 case MMonSync::OP_COOKIE:
1323 handle_sync_cookie(op);
1324 break;
1325
1326 case MMonSync::OP_CHUNK:
1327 case MMonSync::OP_LAST_CHUNK:
1328 handle_sync_chunk(op);
1329 break;
1330 case MMonSync::OP_NO_COOKIE:
1331 handle_sync_no_cookie(op);
1332 break;
1333
1334 default:
1335 dout(0) << __func__ << " unknown op " << m->op << dendl;
1336 assert(0 == "unknown op");
1337 }
1338}
1339
1340// leader
1341
1342void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1343{
1344 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1345 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1346 m->get_connection()->send_message(reply);
1347}
1348
1349void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1350{
1351 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1352 if (is_synchronizing()) {
1353 _sync_reply_no_cookie(op);
1354 return;
1355 }
1356
1357 assert(g_conf->mon_sync_provider_kill_at != 1);
1358
1359 // make sure they can understand us.
1360 if ((required_features ^ m->get_connection()->get_features()) &
1361 required_features) {
1362 dout(5) << " ignoring peer mon." << m->get_source().num()
1363 << " has features " << std::hex
1364 << m->get_connection()->get_features()
1365 << " but we require " << required_features << std::dec << dendl;
1366 return;
1367 }
1368
1369 // make up a unique cookie. include election epoch (which persists
1370 // across restarts for the whole cluster) and a counter for this
1371 // process instance. there is no need to be unique *across*
1372 // monitors, though.
1373 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1374 assert(sync_providers.count(cookie) == 0);
1375
1376 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1377
1378 SyncProvider& sp = sync_providers[cookie];
1379 sp.cookie = cookie;
1380 sp.entity = m->get_source_inst();
1381 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1382
1383 set<string> sync_targets;
1384 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1385 // full scan
1386 sync_targets = get_sync_targets_names();
1387 sp.last_committed = paxos->get_version();
1388 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1389 sp.full = true;
1390 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1391 } else {
1392 // just catch up paxos
1393 sp.last_committed = m->last_committed;
1394 }
1395 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1396
1397 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1398 reply->last_committed = sp.last_committed;
1399 m->get_connection()->send_message(reply);
1400}
1401
1402void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1403{
1404 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1405 dout(10) << __func__ << " " << *m << dendl;
1406
1407 if (sync_providers.count(m->cookie) == 0) {
1408 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1409 _sync_reply_no_cookie(op);
1410 return;
1411 }
1412
1413 assert(g_conf->mon_sync_provider_kill_at != 2);
1414
1415 SyncProvider& sp = sync_providers[m->cookie];
1416 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1417
1418 if (sp.last_committed < paxos->get_first_committed() &&
1419 paxos->get_first_committed() > 1) {
1420 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1421 << " < our fc " << paxos->get_first_committed() << dendl;
1422 sync_providers.erase(m->cookie);
1423 _sync_reply_no_cookie(op);
1424 return;
1425 }
1426
1427 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1428 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1429
1430 int left = g_conf->mon_sync_max_payload_size;
1431 while (sp.last_committed < paxos->get_version() && left > 0) {
1432 bufferlist bl;
1433 sp.last_committed++;
224ce89b
WB
1434
1435 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1436 assert(err == 0);
1437
7c673cae
FG
1438 tx->put(paxos->get_name(), sp.last_committed, bl);
1439 left -= bl.length();
1440 dout(20) << __func__ << " including paxos state " << sp.last_committed
1441 << dendl;
1442 }
1443 reply->last_committed = sp.last_committed;
1444
1445 if (sp.full && left > 0) {
1446 sp.synchronizer->get_chunk_tx(tx, left);
1447 sp.last_key = sp.synchronizer->get_last_key();
1448 reply->last_key = sp.last_key;
1449 }
1450
1451 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1452 sp.last_committed < paxos->get_version()) {
1453 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1454 << " key " << sp.last_key << dendl;
1455 } else {
1456 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1457 << " key " << sp.last_key << dendl;
1458 reply->op = MMonSync::OP_LAST_CHUNK;
1459
1460 assert(g_conf->mon_sync_provider_kill_at != 3);
1461
1462 // clean up our local state
1463 sync_providers.erase(sp.cookie);
1464 }
1465
1466 ::encode(*tx, reply->chunk_bl);
1467
1468 m->get_connection()->send_message(reply);
1469}
1470
1471// requester
1472
1473void Monitor::handle_sync_cookie(MonOpRequestRef op)
1474{
1475 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1476 dout(10) << __func__ << " " << *m << dendl;
1477 if (sync_cookie) {
1478 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1479 return;
1480 }
1481 if (m->get_source_inst() != sync_provider) {
1482 dout(10) << __func__ << " source does not match, discarding" << dendl;
1483 return;
1484 }
1485 sync_cookie = m->cookie;
1486 sync_start_version = m->last_committed;
1487
1488 sync_reset_timeout();
1489 sync_get_next_chunk();
1490
1491 assert(g_conf->mon_sync_requester_kill_at != 3);
1492}
1493
1494void Monitor::sync_get_next_chunk()
1495{
1496 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1497 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1498 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1499 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1500 }
1501 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1502 messenger->send_message(r, sync_provider);
1503
1504 assert(g_conf->mon_sync_requester_kill_at != 4);
1505}
1506
1507void Monitor::handle_sync_chunk(MonOpRequestRef op)
1508{
1509 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1510 dout(10) << __func__ << " " << *m << dendl;
1511
1512 if (m->cookie != sync_cookie) {
1513 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1514 return;
1515 }
1516 if (m->get_source_inst() != sync_provider) {
1517 dout(10) << __func__ << " source does not match, discarding" << dendl;
1518 return;
1519 }
1520
1521 assert(state == STATE_SYNCHRONIZING);
1522 assert(g_conf->mon_sync_requester_kill_at != 5);
1523
1524 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1525 tx->append_from_encoded(m->chunk_bl);
1526
1527 dout(30) << __func__ << " tx dump:\n";
1528 JSONFormatter f(true);
1529 tx->dump(&f);
1530 f.flush(*_dout);
1531 *_dout << dendl;
1532
1533 store->apply_transaction(tx);
1534
1535 assert(g_conf->mon_sync_requester_kill_at != 6);
1536
1537 if (!sync_full) {
1538 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1539 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1540 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1541 m->last_committed);
1542 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1543
1544 dout(30) << __func__ << " tx dump:\n";
1545 JSONFormatter f(true);
1546 tx->dump(&f);
1547 f.flush(*_dout);
1548 *_dout << dendl;
1549
1550 store->apply_transaction(tx);
1551 paxos->init(); // to refresh what we just wrote
1552 }
1553
1554 if (m->op == MMonSync::OP_CHUNK) {
1555 sync_reset_timeout();
1556 sync_get_next_chunk();
1557 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1558 sync_finish(m->last_committed);
1559 }
1560}
1561
1562void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1563{
1564 dout(10) << __func__ << dendl;
1565 bootstrap();
1566}
1567
1568void Monitor::sync_trim_providers()
1569{
1570 dout(20) << __func__ << dendl;
1571
1572 utime_t now = ceph_clock_now();
1573 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1574 while (p != sync_providers.end()) {
1575 if (now > p->second.timeout) {
1576 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1577 sync_providers.erase(p++);
1578 } else {
1579 ++p;
1580 }
1581 }
1582}
1583
1584// ---------------------------------------------------
1585// probe
1586
1587void Monitor::cancel_probe_timeout()
1588{
1589 if (probe_timeout_event) {
1590 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1591 timer.cancel_event(probe_timeout_event);
1592 probe_timeout_event = NULL;
1593 } else {
1594 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1595 }
1596}
1597
1598void Monitor::reset_probe_timeout()
1599{
1600 cancel_probe_timeout();
1601 probe_timeout_event = new C_MonContext(this, [this](int r) {
1602 probe_timeout(r);
1603 });
1604 double t = g_conf->mon_probe_timeout;
3efd9988
FG
1605 if (timer.add_event_after(t, probe_timeout_event)) {
1606 dout(10) << "reset_probe_timeout " << probe_timeout_event
1607 << " after " << t << " seconds" << dendl;
1608 } else {
1609 probe_timeout_event = nullptr;
1610 }
7c673cae
FG
1611}
1612
1613void Monitor::probe_timeout(int r)
1614{
1615 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1616 assert(is_probing() || is_synchronizing());
1617 assert(probe_timeout_event);
1618 probe_timeout_event = NULL;
1619 bootstrap();
1620}
1621
1622void Monitor::handle_probe(MonOpRequestRef op)
1623{
1624 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1625 dout(10) << "handle_probe " << *m << dendl;
1626
1627 if (m->fsid != monmap->fsid) {
1628 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1629 return;
1630 }
1631
1632 switch (m->op) {
1633 case MMonProbe::OP_PROBE:
1634 handle_probe_probe(op);
1635 break;
1636
1637 case MMonProbe::OP_REPLY:
1638 handle_probe_reply(op);
1639 break;
1640
1641 case MMonProbe::OP_MISSING_FEATURES:
1642 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1643 << ", required " << m->required_features
1644 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1645 << dendl;
1646 break;
1647 }
1648}
1649
1650void Monitor::handle_probe_probe(MonOpRequestRef op)
1651{
1652 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1653
1654 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1655 << " features " << m->get_connection()->get_features() << dendl;
1656 uint64_t missing = required_features & ~m->get_connection()->get_features();
1657 if (missing) {
1658 dout(1) << " peer " << m->get_source_addr() << " missing features "
1659 << missing << dendl;
1660 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1661 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1662 name, has_ever_joined);
1663 m->required_features = required_features;
1664 m->get_connection()->send_message(r);
1665 }
1666 goto out;
1667 }
1668
1669 if (!is_probing() && !is_synchronizing()) {
1670 // If the probing mon is way ahead of us, we need to re-bootstrap.
1671 // Normally we capture this case when we initially bootstrap, but
1672 // it is possible we pass those checks (we overlap with
1673 // quorum-to-be) but fail to join a quorum before it moves past
1674 // us. We need to be kicked back to bootstrap so we can
1675 // synchonize, not keep calling elections.
1676 if (paxos->get_version() + 1 < m->paxos_first_version) {
1677 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1678 << "ahead of us, re-bootstrapping" << dendl;
1679 bootstrap();
1680 goto out;
1681
1682 }
1683 }
1684
1685 MMonProbe *r;
1686 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1687 r->name = name;
1688 r->quorum = quorum;
1689 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1690 r->paxos_first_version = paxos->get_first_committed();
1691 r->paxos_last_version = paxos->get_version();
1692 m->get_connection()->send_message(r);
1693
1694 // did we discover a peer here?
1695 if (!monmap->contains(m->get_source_addr())) {
1696 dout(1) << " adding peer " << m->get_source_addr()
1697 << " to list of hints" << dendl;
1698 extra_probe_peers.insert(m->get_source_addr());
1699 }
1700
1701 out:
1702 return;
1703}
1704
1705void Monitor::handle_probe_reply(MonOpRequestRef op)
1706{
1707 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1708 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1709 dout(10) << " monmap is " << *monmap << dendl;
1710
1711 // discover name and addrs during probing or electing states.
1712 if (!is_probing() && !is_electing()) {
1713 return;
1714 }
1715
1716 // newer map, or they've joined a quorum and we haven't?
1717 bufferlist mybl;
1718 monmap->encode(mybl, m->get_connection()->get_features());
1719 // make sure it's actually different; the checks below err toward
1720 // taking the other guy's map, which could cause us to loop.
1721 if (!mybl.contents_equal(m->monmap_bl)) {
1722 MonMap *newmap = new MonMap;
1723 newmap->decode(m->monmap_bl);
1724 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1725 !has_ever_joined)) {
1726 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1727 << ", mine was " << monmap->get_epoch() << dendl;
1728 delete newmap;
1729 monmap->decode(m->monmap_bl);
1730
1731 bootstrap();
1732 return;
1733 }
1734 delete newmap;
1735 }
1736
1737 // rename peer?
1738 string peer_name = monmap->get_name(m->get_source_addr());
1739 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1740 dout(10) << " renaming peer " << m->get_source_addr() << " "
1741 << peer_name << " -> " << m->name << " in my monmap"
1742 << dendl;
1743 monmap->rename(peer_name, m->name);
1744
1745 if (is_electing()) {
1746 bootstrap();
1747 return;
1748 }
1749 } else {
1750 dout(10) << " peer name is " << peer_name << dendl;
1751 }
1752
1753 // new initial peer?
1754 if (monmap->get_epoch() == 0 &&
1755 monmap->contains(m->name) &&
1756 monmap->get_addr(m->name).is_blank_ip()) {
1757 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1758 monmap->set_addr(m->name, m->get_source_addr());
1759
1760 bootstrap();
1761 return;
1762 }
1763
1764 // end discover phase
1765 if (!is_probing()) {
1766 return;
1767 }
1768
1769 assert(paxos != NULL);
1770
1771 if (is_synchronizing()) {
1772 dout(10) << " currently syncing" << dendl;
1773 return;
1774 }
1775
1776 entity_inst_t other = m->get_source_inst();
1777
1778 if (m->paxos_last_version < sync_last_committed_floor) {
1779 dout(10) << " peer paxos versions [" << m->paxos_first_version
1780 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1781 << sync_last_committed_floor << ", ignoring"
1782 << dendl;
1783 } else {
1784 if (paxos->get_version() < m->paxos_first_version &&
1785 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1786 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1787 << "," << m->paxos_last_version << "]"
1788 << " vs my version " << paxos->get_version()
1789 << " (too far ahead)"
1790 << dendl;
1791 cancel_probe_timeout();
1792 sync_start(other, true);
1793 return;
1794 }
1795 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1796 dout(10) << " peer paxos last version " << m->paxos_last_version
1797 << " vs my version " << paxos->get_version()
1798 << " (too far ahead)"
1799 << dendl;
1800 cancel_probe_timeout();
1801 sync_start(other, false);
1802 return;
1803 }
1804 }
1805
1806 // is there an existing quorum?
1807 if (m->quorum.size()) {
1808 dout(10) << " existing quorum " << m->quorum << dendl;
1809
1810 dout(10) << " peer paxos version " << m->paxos_last_version
1811 << " vs my version " << paxos->get_version()
1812 << " (ok)"
1813 << dendl;
1814
1815 if (monmap->contains(name) &&
1816 !monmap->get_addr(name).is_blank_ip()) {
1817 // i'm part of the cluster; just initiate a new election
1818 start_election();
1819 } else {
1820 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1821 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1822 monmap->get_inst(*m->quorum.begin()));
1823 }
1824 } else {
1825 if (monmap->contains(m->name)) {
1826 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1827 outside_quorum.insert(m->name);
1828 } else {
1829 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1830 return;
1831 }
1832
1833 unsigned need = monmap->size() / 2 + 1;
1834 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1835 if (outside_quorum.size() >= need) {
1836 if (outside_quorum.count(name)) {
1837 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1838 start_election();
1839 } else {
1840 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1841 }
1842 } else {
1843 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1844 }
1845 }
1846}
1847
1848void Monitor::join_election()
1849{
1850 dout(10) << __func__ << dendl;
1851 wait_for_paxos_write();
1852 _reset();
1853 state = STATE_ELECTING;
1854
1855 logger->inc(l_mon_num_elections);
1856}
1857
1858void Monitor::start_election()
1859{
1860 dout(10) << "start_election" << dendl;
1861 wait_for_paxos_write();
1862 _reset();
1863 state = STATE_ELECTING;
1864
1865 logger->inc(l_mon_num_elections);
1866 logger->inc(l_mon_election_call);
1867
b32b8144 1868 clog->info() << "mon." << name << " calling monitor election";
7c673cae
FG
1869 elector.call_election();
1870}
1871
1872void Monitor::win_standalone_election()
1873{
1874 dout(1) << "win_standalone_election" << dendl;
1875
1876 // bump election epoch, in case the previous epoch included other
1877 // monitors; we need to be able to make the distinction.
1878 elector.init();
1879 elector.advance_epoch();
1880
1881 rank = monmap->get_rank(name);
1882 assert(rank == 0);
1883 set<int> q;
1884 q.insert(rank);
1885
224ce89b
WB
1886 map<int,Metadata> metadata;
1887 collect_metadata(&metadata[0]);
1888
7c673cae
FG
1889 win_election(elector.get_epoch(), q,
1890 CEPH_FEATURES_ALL,
1891 ceph::features::mon::get_supported(),
d2e6a577 1892 metadata);
7c673cae
FG
1893}
1894
1895const utime_t& Monitor::get_leader_since() const
1896{
1897 assert(state == STATE_LEADER);
1898 return leader_since;
1899}
1900
1901epoch_t Monitor::get_epoch()
1902{
1903 return elector.get_epoch();
1904}
1905
1906void Monitor::_finish_svc_election()
1907{
1908 assert(state == STATE_LEADER || state == STATE_PEON);
1909
1910 for (auto p : paxos_service) {
1911 // we already called election_finished() on monmon(); avoid callig twice
1912 if (state == STATE_LEADER && p == monmon())
1913 continue;
1914 p->election_finished();
1915 }
1916}
1917
1918void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1919 const mon_feature_t& mon_features,
d2e6a577 1920 const map<int,Metadata>& metadata)
7c673cae
FG
1921{
1922 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1923 << " features " << features
1924 << " mon_features " << mon_features
1925 << dendl;
1926 assert(is_electing());
1927 state = STATE_LEADER;
1928 leader_since = ceph_clock_now();
1929 leader = rank;
1930 quorum = active;
1931 quorum_con_features = features;
1932 quorum_mon_features = mon_features;
224ce89b 1933 pending_metadata = metadata;
7c673cae
FG
1934 outside_quorum.clear();
1935
b32b8144
FG
1936 clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
1937 << " in quorum (ranks " << quorum << ")";
7c673cae 1938
d2e6a577 1939 set_leader_commands(get_local_commands(mon_features));
7c673cae
FG
1940
1941 paxos->leader_init();
1942 // NOTE: tell monmap monitor first. This is important for the
1943 // bootstrap case to ensure that the very first paxos proposal
1944 // codifies the monmap. Otherwise any manner of chaos can ensue
1945 // when monitors are call elections or participating in a paxos
1946 // round without agreeing on who the participants are.
1947 monmon()->election_finished();
1948 _finish_svc_election();
1949 health_monitor->start(epoch);
1950
1951 logger->inc(l_mon_election_win);
1952
224ce89b
WB
1953 // inject new metadata in first transaction.
1954 {
1955 // include previous metadata for missing mons (that aren't part of
1956 // the current quorum).
1957 map<int,Metadata> m = metadata;
1958 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1959 if (m.count(rank) == 0 &&
1960 mon_metadata.count(rank)) {
1961 m[rank] = mon_metadata[rank];
1962 }
1963 }
1964
1965 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1966 // a new transaction immediately after the election finishes. We should
1967 // do that anyway for other reasons, though.
1968 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1969 bufferlist bl;
1970 ::encode(m, bl);
1971 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1972 }
1973
7c673cae
FG
1974 finish_election();
1975 if (monmap->size() > 1 &&
1976 monmap->get_epoch() > 0) {
1977 timecheck_start();
1978 health_tick_start();
b32b8144
FG
1979
1980 // Freshen the health status before doing health_to_clog in case
1981 // our just-completed election changed the health
1982 healthmon()->wait_for_active_ctx(new FunctionContext([this](int r){
1983 dout(20) << "healthmon now active" << dendl;
1984 healthmon()->tick();
1985 if (healthmon()->is_proposing()) {
1986 dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
1987 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
1988 [this](int r){
1989 assert(lock.is_locked_by_me());
1990 do_health_to_clog_interval();
1991 }));
1992
1993 } else {
1994 do_health_to_clog_interval();
1995 }
1996 }));
1997
7c673cae
FG
1998 scrub_event_start();
1999 }
7c673cae
FG
2000}
2001
2002void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
2003 uint64_t features,
2004 const mon_feature_t& mon_features)
2005{
2006 state = STATE_PEON;
2007 leader_since = utime_t();
2008 leader = l;
2009 quorum = q;
2010 outside_quorum.clear();
2011 quorum_con_features = features;
2012 quorum_mon_features = mon_features;
2013 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
2014 << " quorum is " << quorum << " features are " << quorum_con_features
2015 << " mon_features are " << quorum_mon_features
2016 << dendl;
2017
2018 paxos->peon_init();
2019 _finish_svc_election();
2020 health_monitor->start(epoch);
2021
2022 logger->inc(l_mon_election_lose);
2023
2024 finish_election();
2025
224ce89b
WB
2026 if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
2027 !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
2028 // for pre-luminous mons only
7c673cae 2029 Metadata sys_info;
224ce89b 2030 collect_metadata(&sys_info);
7c673cae
FG
2031 messenger->send_message(new MMonMetadata(sys_info),
2032 monmap->get_inst(get_leader()));
2033 }
2034}
2035
224ce89b
WB
2036void Monitor::collect_metadata(Metadata *m)
2037{
2038 collect_sys_info(m, g_ceph_context);
2039 (*m)["addr"] = stringify(messenger->get_myaddr());
2040}
2041
7c673cae
FG
2042void Monitor::finish_election()
2043{
2044 apply_quorum_to_compatset_features();
2045 apply_monmap_to_compatset_features();
2046 timecheck_finish();
2047 exited_quorum = utime_t();
2048 finish_contexts(g_ceph_context, waitfor_quorum);
2049 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2050 resend_routed_requests();
2051 update_logger();
2052 register_cluster_logger();
2053
2054 // am i named properly?
2055 string cur_name = monmap->get_name(messenger->get_myaddr());
2056 if (cur_name != name) {
2057 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2058 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2059 monmap->get_inst(*quorum.begin()));
2060 }
2061}
2062
2063void Monitor::_apply_compatset_features(CompatSet &new_features)
2064{
2065 if (new_features.compare(features) != 0) {
2066 CompatSet diff = features.unsupported(new_features);
2067 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2068 features = new_features;
2069
2070 auto t = std::make_shared<MonitorDBStore::Transaction>();
2071 write_features(t);
2072 store->apply_transaction(t);
2073
2074 calc_quorum_requirements();
2075 }
2076}
2077
2078void Monitor::apply_quorum_to_compatset_features()
2079{
2080 CompatSet new_features(features);
2081 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2082 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2083 }
2084 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2085 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2086 }
2087 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2088 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2089 }
2090 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2091 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2092 }
2093 dout(5) << __func__ << dendl;
2094 _apply_compatset_features(new_features);
2095}
2096
2097void Monitor::apply_monmap_to_compatset_features()
2098{
2099 CompatSet new_features(features);
2100 mon_feature_t monmap_features = monmap->get_required_features();
2101
2102 /* persistent monmap features may go into the compatset.
2103 * optional monmap features may not - why?
2104 * because optional monmap features may be set/unset by the admin,
2105 * and possibly by other means that haven't yet been thought out,
2106 * so we can't make the monitor enforce them on start - because they
2107 * may go away.
2108 * this, of course, does not invalidate setting a compatset feature
2109 * for an optional feature - as long as you make sure to clean it up
2110 * once you unset it.
2111 */
2112 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2113 assert(ceph::features::mon::get_persistent().contains_all(
2114 ceph::features::mon::FEATURE_KRAKEN));
2115 // this feature should only ever be set if the quorum supports it.
2116 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2117 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2118 }
181888fb
FG
2119 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2120 assert(ceph::features::mon::get_persistent().contains_all(
2121 ceph::features::mon::FEATURE_LUMINOUS));
2122 // this feature should only ever be set if the quorum supports it.
2123 assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2124 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2125 }
7c673cae
FG
2126
2127 dout(5) << __func__ << dendl;
2128 _apply_compatset_features(new_features);
2129}
2130
2131void Monitor::calc_quorum_requirements()
2132{
2133 required_features = 0;
2134
2135 // compatset
2136 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2137 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2138 }
2139 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2140 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2141 }
2142 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2143 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2144 }
2145 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2146 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2147 }
2148 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2149 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2150 }
181888fb
FG
2151 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2152 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2153 }
7c673cae
FG
2154
2155 // monmap
2156 if (monmap->get_required_features().contains_all(
2157 ceph::features::mon::FEATURE_KRAKEN)) {
2158 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2159 }
2160 if (monmap->get_required_features().contains_all(
2161 ceph::features::mon::FEATURE_LUMINOUS)) {
2162 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2163 }
2164 dout(10) << __func__ << " required_features " << required_features << dendl;
2165}
2166
31f18b77
FG
2167void Monitor::get_combined_feature_map(FeatureMap *fm)
2168{
2169 *fm += session_map.feature_map;
2170 for (auto id : quorum) {
2171 if (id != rank) {
2172 *fm += quorum_feature_map[id];
2173 }
2174 }
2175}
2176
7c673cae
FG
2177void Monitor::sync_force(Formatter *f, ostream& ss)
2178{
2179 bool free_formatter = false;
2180
2181 if (!f) {
2182 // louzy/lazy hack: default to json if no formatter has been defined
2183 f = new JSONFormatter();
2184 free_formatter = true;
2185 }
2186
2187 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2188 sync_stash_critical_state(tx);
2189 tx->put("mon_sync", "force_sync", 1);
2190 store->apply_transaction(tx);
2191
2192 f->open_object_section("sync_force");
2193 f->dump_int("ret", 0);
2194 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2195 f->close_section(); // sync_force
2196 f->flush(ss);
2197 if (free_formatter)
2198 delete f;
2199}
2200
2201void Monitor::_quorum_status(Formatter *f, ostream& ss)
2202{
2203 bool free_formatter = false;
2204
2205 if (!f) {
2206 // louzy/lazy hack: default to json if no formatter has been defined
2207 f = new JSONFormatter();
2208 free_formatter = true;
2209 }
2210 f->open_object_section("quorum_status");
2211 f->dump_int("election_epoch", get_epoch());
2212
2213 f->open_array_section("quorum");
2214 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2215 f->dump_int("mon", *p);
2216 f->close_section(); // quorum
2217
2218 list<string> quorum_names = get_quorum_names();
2219 f->open_array_section("quorum_names");
2220 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2221 f->dump_string("mon", *p);
2222 f->close_section(); // quorum_names
2223
2224 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2225
2226 f->open_object_section("monmap");
2227 monmap->dump(f);
2228 f->close_section(); // monmap
2229
2230 f->close_section(); // quorum_status
2231 f->flush(ss);
2232 if (free_formatter)
2233 delete f;
2234}
2235
2236void Monitor::get_mon_status(Formatter *f, ostream& ss)
2237{
2238 bool free_formatter = false;
2239
2240 if (!f) {
2241 // louzy/lazy hack: default to json if no formatter has been defined
2242 f = new JSONFormatter();
2243 free_formatter = true;
2244 }
2245
2246 f->open_object_section("mon_status");
2247 f->dump_string("name", name);
2248 f->dump_int("rank", rank);
2249 f->dump_string("state", get_state_name());
2250 f->dump_int("election_epoch", get_epoch());
2251
2252 f->open_array_section("quorum");
2253 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2254 f->dump_int("mon", *p);
2255 }
2256
2257 f->close_section(); // quorum
2258
2259 f->open_object_section("features");
2260 f->dump_stream("required_con") << required_features;
2261 mon_feature_t req_mon_features = get_required_mon_features();
2262 req_mon_features.dump(f, "required_mon");
2263 f->dump_stream("quorum_con") << quorum_con_features;
2264 quorum_mon_features.dump(f, "quorum_mon");
2265 f->close_section(); // features
2266
2267 f->open_array_section("outside_quorum");
2268 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2269 f->dump_string("mon", *p);
2270 f->close_section(); // outside_quorum
2271
2272 f->open_array_section("extra_probe_peers");
2273 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2274 p != extra_probe_peers.end();
2275 ++p)
2276 f->dump_stream("peer") << *p;
2277 f->close_section(); // extra_probe_peers
2278
2279 f->open_array_section("sync_provider");
2280 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2281 p != sync_providers.end();
2282 ++p) {
2283 f->dump_unsigned("cookie", p->second.cookie);
2284 f->dump_stream("entity") << p->second.entity;
2285 f->dump_stream("timeout") << p->second.timeout;
2286 f->dump_unsigned("last_committed", p->second.last_committed);
2287 f->dump_stream("last_key") << p->second.last_key;
2288 }
2289 f->close_section();
2290
2291 if (is_synchronizing()) {
2292 f->open_object_section("sync");
2293 f->dump_stream("sync_provider") << sync_provider;
2294 f->dump_unsigned("sync_cookie", sync_cookie);
2295 f->dump_unsigned("sync_start_version", sync_start_version);
2296 f->close_section();
2297 }
2298
2299 if (g_conf->mon_sync_provider_kill_at > 0)
2300 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2301 if (g_conf->mon_sync_requester_kill_at > 0)
2302 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2303
2304 f->open_object_section("monmap");
2305 monmap->dump(f);
2306 f->close_section();
2307
31f18b77 2308 f->dump_object("feature_map", session_map.feature_map);
7c673cae
FG
2309 f->close_section(); // mon_status
2310
2311 if (free_formatter) {
2312 // flush formatter to ss and delete it iff we created the formatter
2313 f->flush(ss);
2314 delete f;
2315 }
2316}
2317
2318
2319// health status to clog
2320
2321void Monitor::health_tick_start()
2322{
2323 if (!cct->_conf->mon_health_to_clog ||
2324 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2325 return;
2326
2327 dout(15) << __func__ << dendl;
2328
2329 health_tick_stop();
3efd9988
FG
2330 health_tick_event = timer.add_event_after(
2331 cct->_conf->mon_health_to_clog_tick_interval,
2332 new C_MonContext(this, [this](int r) {
2333 if (r < 0)
2334 return;
3efd9988
FG
2335 health_tick_start();
2336 }));
7c673cae
FG
2337}
2338
2339void Monitor::health_tick_stop()
2340{
2341 dout(15) << __func__ << dendl;
2342
2343 if (health_tick_event) {
2344 timer.cancel_event(health_tick_event);
2345 health_tick_event = NULL;
2346 }
2347}
2348
2349utime_t Monitor::health_interval_calc_next_update()
2350{
2351 utime_t now = ceph_clock_now();
2352
2353 time_t secs = now.sec();
2354 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2355 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2356 utime_t next = utime_t(secs + adjustment, 0);
2357
2358 dout(20) << __func__
2359 << " now: " << now << ","
2360 << " next: " << next << ","
2361 << " interval: " << cct->_conf->mon_health_to_clog_interval
2362 << dendl;
2363
2364 return next;
2365}
2366
2367void Monitor::health_interval_start()
2368{
2369 dout(15) << __func__ << dendl;
2370
2371 if (!cct->_conf->mon_health_to_clog ||
2372 cct->_conf->mon_health_to_clog_interval <= 0) {
2373 return;
2374 }
2375
2376 health_interval_stop();
2377 utime_t next = health_interval_calc_next_update();
2378 health_interval_event = new C_MonContext(this, [this](int r) {
2379 if (r < 0)
2380 return;
2381 do_health_to_clog_interval();
2382 });
3efd9988
FG
2383 if (!timer.add_event_at(next, health_interval_event)) {
2384 health_interval_event = nullptr;
2385 }
7c673cae
FG
2386}
2387
2388void Monitor::health_interval_stop()
2389{
2390 dout(15) << __func__ << dendl;
2391 if (health_interval_event) {
2392 timer.cancel_event(health_interval_event);
2393 }
2394 health_interval_event = NULL;
2395}
2396
2397void Monitor::health_events_cleanup()
2398{
2399 health_tick_stop();
2400 health_interval_stop();
2401 health_status_cache.reset();
2402}
2403
2404void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2405{
2406 dout(20) << __func__ << dendl;
2407
2408 if (changed.count("mon_health_to_clog")) {
2409 if (!cct->_conf->mon_health_to_clog) {
2410 health_events_cleanup();
2411 } else {
2412 if (!health_tick_event) {
2413 health_tick_start();
2414 }
2415 if (!health_interval_event) {
2416 health_interval_start();
2417 }
2418 }
2419 }
2420
2421 if (changed.count("mon_health_to_clog_interval")) {
2422 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2423 health_interval_stop();
2424 } else {
2425 health_interval_start();
2426 }
2427 }
2428
2429 if (changed.count("mon_health_to_clog_tick_interval")) {
2430 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2431 health_tick_stop();
2432 } else {
2433 health_tick_start();
2434 }
2435 }
2436}
2437
2438void Monitor::do_health_to_clog_interval()
2439{
2440 // outputting to clog may have been disabled in the conf
2441 // since we were scheduled.
2442 if (!cct->_conf->mon_health_to_clog ||
2443 cct->_conf->mon_health_to_clog_interval <= 0)
2444 return;
2445
2446 dout(10) << __func__ << dendl;
2447
2448 // do we have a cached value for next_clog_update? if not,
2449 // do we know when the last update was?
2450
2451 do_health_to_clog(true);
2452 health_interval_start();
2453}
2454
2455void Monitor::do_health_to_clog(bool force)
2456{
2457 // outputting to clog may have been disabled in the conf
2458 // since we were scheduled.
2459 if (!cct->_conf->mon_health_to_clog ||
2460 cct->_conf->mon_health_to_clog_interval <= 0)
2461 return;
2462
2463 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2464
224ce89b
WB
2465 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2466 string summary;
2467 health_status_t level = get_health_status(false, nullptr, &summary);
2468 if (!force &&
2469 summary == health_status_cache.summary &&
2470 level == health_status_cache.overall)
2471 return;
181888fb 2472 clog->health(level) << "overall " << summary;
224ce89b
WB
2473 health_status_cache.summary = summary;
2474 health_status_cache.overall = level;
2475 } else {
2476 // for jewel only
2477 list<string> status;
2478 health_status_t overall = get_health(status, NULL, NULL);
2479 dout(25) << __func__
2480 << (force ? " (force)" : "")
2481 << dendl;
2482
2483 string summary = joinify(status.begin(), status.end(), string("; "));
7c673cae 2484
224ce89b
WB
2485 if (!force &&
2486 overall == health_status_cache.overall &&
2487 !health_status_cache.summary.empty() &&
2488 health_status_cache.summary == summary) {
2489 // we got a dup!
2490 return;
2491 }
2492
2493 clog->info() << summary;
2494
2495 health_status_cache.overall = overall;
2496 health_status_cache.summary = summary;
2497 }
2498}
2499
2500health_status_t Monitor::get_health_status(
2501 bool want_detail,
2502 Formatter *f,
2503 std::string *plain,
2504 const char *sep1,
2505 const char *sep2)
2506{
2507 health_status_t r = HEALTH_OK;
2508 bool compat = g_conf->mon_health_preluminous_compat;
d2e6a577 2509 bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
224ce89b
WB
2510 if (f) {
2511 f->open_object_section("health");
2512 f->open_object_section("checks");
2513 }
7c673cae 2514
224ce89b
WB
2515 string summary;
2516 string *psummary = f ? nullptr : &summary;
2517 for (auto& svc : paxos_service) {
2518 r = std::min(r, svc->get_health_checks().dump_summary(
2519 f, psummary, sep2, want_detail));
2520 }
7c673cae 2521
224ce89b
WB
2522 if (f) {
2523 f->close_section();
2524 f->dump_stream("status") << r;
2525 } else {
2526 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2527 *plain = stringify(r);
2528 if (summary.size()) {
2529 *plain += sep1;
2530 *plain += summary;
2531 }
2532 *plain += "\n";
2533 }
2534
3efd9988
FG
2535 const std::string old_fields_message = "'ceph health' JSON format has "
2536 "changed in luminous. If you see this your monitoring system is "
2537 "scraping the wrong fields. Disable this with 'mon health preluminous "
2538 "compat warning = false'";
2539
d2e6a577
FG
2540 if (f && (compat || compat_warn)) {
2541 health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
3efd9988
FG
2542 f->open_array_section("summary");
2543 if (compat_warn) {
2544 f->open_object_section("item");
2545 f->dump_stream("severity") << HEALTH_WARN;
2546 f->dump_string("summary", old_fields_message);
2547 f->close_section();
2548 }
d2e6a577 2549 if (compat) {
d2e6a577 2550 for (auto& svc : paxos_service) {
3efd9988 2551 svc->get_health_checks().dump_summary_compat(f);
d2e6a577 2552 }
224ce89b 2553 }
3efd9988 2554 f->close_section();
d2e6a577 2555 f->dump_stream("overall_status") << cr;
224ce89b
WB
2556 }
2557
2558 if (want_detail) {
d2e6a577 2559 if (f && (compat || compat_warn)) {
224ce89b 2560 f->open_array_section("detail");
d2e6a577 2561 if (compat_warn) {
3efd9988 2562 f->dump_string("item", old_fields_message);
d2e6a577 2563 }
224ce89b
WB
2564 }
2565
2566 for (auto& svc : paxos_service) {
2567 svc->get_health_checks().dump_detail(f, plain, compat);
2568 }
2569
d2e6a577 2570 if (f && (compat || compat_warn)) {
224ce89b
WB
2571 f->close_section();
2572 }
2573 }
2574 if (f) {
2575 f->close_section();
2576 }
2577 return r;
2578}
2579
2580void Monitor::log_health(
2581 const health_check_map_t& updated,
2582 const health_check_map_t& previous,
2583 MonitorDBStore::TransactionRef t)
2584{
2585 if (!g_conf->mon_health_to_clog) {
7c673cae
FG
2586 return;
2587 }
181888fb
FG
2588
2589 const utime_t now = ceph_clock_now();
2590
224ce89b
WB
2591 // FIXME: log atomically as part of @t instead of using clog.
2592 dout(10) << __func__ << " updated " << updated.checks.size()
2593 << " previous " << previous.checks.size()
2594 << dendl;
181888fb
FG
2595 const auto min_log_period = g_conf->get_val<int64_t>(
2596 "mon_health_log_update_period");
224ce89b
WB
2597 for (auto& p : updated.checks) {
2598 auto q = previous.checks.find(p.first);
181888fb 2599 bool logged = false;
224ce89b
WB
2600 if (q == previous.checks.end()) {
2601 // new
2602 ostringstream ss;
2603 ss << "Health check failed: " << p.second.summary << " ("
2604 << p.first << ")";
181888fb
FG
2605 clog->health(p.second.severity) << ss.str();
2606
2607 logged = true;
224ce89b
WB
2608 } else {
2609 if (p.second.summary != q->second.summary ||
2610 p.second.severity != q->second.severity) {
181888fb
FG
2611
2612 auto status_iter = health_check_log_times.find(p.first);
2613 if (status_iter != health_check_log_times.end()) {
2614 if (p.second.severity == q->second.severity &&
2615 now - status_iter->second.updated_at < min_log_period) {
2616 // We already logged this recently and the severity is unchanged,
2617 // so skip emitting an update of the summary string.
2618 // We'll get an update out of tick() later if the check
2619 // is still failing.
2620 continue;
2621 }
2622 }
2623
2624 // summary or severity changed (ignore detail changes at this level)
2625 ostringstream ss;
224ce89b 2626 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
181888fb
FG
2627 clog->health(p.second.severity) << ss.str();
2628
2629 logged = true;
2630 }
2631 }
2632 // Record the time at which we last logged, so that we can check this
2633 // when considering whether/when to print update messages.
2634 if (logged) {
2635 auto iter = health_check_log_times.find(p.first);
2636 if (iter == health_check_log_times.end()) {
2637 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2638 p.second.severity, p.second.summary, now));
2639 } else {
2640 iter->second = HealthCheckLogStatus(
2641 p.second.severity, p.second.summary, now);
224ce89b
WB
2642 }
2643 }
2644 }
2645 for (auto& p : previous.checks) {
2646 if (!updated.checks.count(p.first)) {
2647 // cleared
2648 ostringstream ss;
2649 if (p.first == "DEGRADED_OBJECTS") {
2650 clog->info() << "All degraded objects recovered";
2651 } else if (p.first == "OSD_FLAGS") {
2652 clog->info() << "OSD flags cleared";
2653 } else {
2654 clog->info() << "Health check cleared: " << p.first << " (was: "
2655 << p.second.summary << ")";
2656 }
181888fb
FG
2657
2658 if (health_check_log_times.count(p.first)) {
2659 health_check_log_times.erase(p.first);
2660 }
224ce89b
WB
2661 }
2662 }
7c673cae 2663
224ce89b
WB
2664 if (previous.checks.size() && updated.checks.size() == 0) {
2665 // We might be going into a fully healthy state, check
2666 // other subsystems
2667 bool any_checks = false;
2668 for (auto& svc : paxos_service) {
2669 if (&(svc->get_health_checks()) == &(previous)) {
2670 // Ignore the ones we're clearing right now
2671 continue;
2672 }
7c673cae 2673
224ce89b
WB
2674 if (svc->get_health_checks().checks.size() > 0) {
2675 any_checks = true;
2676 break;
2677 }
2678 }
2679 if (!any_checks) {
2680 clog->info() << "Cluster is now healthy";
2681 }
2682 }
7c673cae
FG
2683}
2684
2685health_status_t Monitor::get_health(list<string>& status,
2686 bufferlist *detailbl,
2687 Formatter *f)
2688{
2689 list<pair<health_status_t,string> > summary;
2690 list<pair<health_status_t,string> > detail;
2691
2692 if (f)
2693 f->open_object_section("health");
2694
2695 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2696 p != paxos_service.end();
2697 ++p) {
2698 PaxosService *s = *p;
2699 s->get_health(summary, detailbl ? &detail : NULL, cct);
2700 }
2701
224ce89b 2702 health_monitor->get_health(summary, (detailbl ? &detail : NULL));
7c673cae
FG
2703
2704 health_status_t overall = HEALTH_OK;
2705 if (!timecheck_skews.empty()) {
2706 list<string> warns;
7c673cae
FG
2707 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2708 i != timecheck_skews.end(); ++i) {
2709 entity_inst_t inst = i->first;
2710 double skew = i->second;
2711 double latency = timecheck_latencies[inst];
2712 string name = monmap->get_name(inst.addr);
7c673cae
FG
2713 ostringstream tcss;
2714 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2715 if (tcstatus != HEALTH_OK) {
2716 if (overall > tcstatus)
2717 overall = tcstatus;
2718 warns.push_back(name);
7c673cae
FG
2719 ostringstream tmp_ss;
2720 tmp_ss << "mon." << name
2721 << " addr " << inst.addr << " " << tcss.str()
2722 << " (latency " << latency << "s)";
2723 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2724 }
7c673cae
FG
2725 }
2726 if (!warns.empty()) {
2727 ostringstream ss;
2728 ss << "clock skew detected on";
2729 while (!warns.empty()) {
2730 ss << " mon." << warns.front();
2731 warns.pop_front();
2732 if (!warns.empty())
2733 ss << ",";
2734 }
2735 status.push_back(ss.str());
2736 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2737 }
7c673cae 2738 }
7c673cae
FG
2739
2740 if (f)
2741 f->open_array_section("summary");
2742 if (!summary.empty()) {
2743 while (!summary.empty()) {
2744 if (overall > summary.front().first)
2745 overall = summary.front().first;
2746 status.push_back(summary.front().second);
2747 if (f) {
2748 f->open_object_section("item");
2749 f->dump_stream("severity") << summary.front().first;
2750 f->dump_string("summary", summary.front().second);
2751 f->close_section();
2752 }
2753 summary.pop_front();
2754 }
2755 }
2756 if (f)
2757 f->close_section();
2758
2759 stringstream fss;
2760 fss << overall;
2761 status.push_front(fss.str());
2762 if (f)
2763 f->dump_stream("overall_status") << overall;
2764
2765 if (f)
2766 f->open_array_section("detail");
2767 while (!detail.empty()) {
2768 if (f)
2769 f->dump_string("item", detail.front().second);
2770 else if (detailbl != NULL) {
2771 detailbl->append(detail.front().second);
2772 detailbl->append('\n');
2773 }
2774 detail.pop_front();
2775 }
2776 if (f)
2777 f->close_section();
2778
2779 if (f)
2780 f->close_section();
2781
2782 return overall;
2783}
2784
2785void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2786{
2787 if (f)
2788 f->open_object_section("status");
2789
7c673cae
FG
2790 if (f) {
2791 f->dump_stream("fsid") << monmap->get_fsid();
b5b8bbf5
FG
2792 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2793 get_health_status(false, f, nullptr);
2794 } else {
2795 list<string> health_str;
2796 get_health(health_str, nullptr, f);
2797 }
7c673cae
FG
2798 f->dump_unsigned("election_epoch", get_epoch());
2799 {
2800 f->open_array_section("quorum");
2801 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2802 f->dump_int("rank", *p);
2803 f->close_section();
2804 f->open_array_section("quorum_names");
2805 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2806 f->dump_string("id", monmap->get_name(*p));
2807 f->close_section();
2808 }
2809 f->open_object_section("monmap");
2810 monmap->dump(f);
2811 f->close_section();
2812 f->open_object_section("osdmap");
224ce89b 2813 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
7c673cae
FG
2814 f->close_section();
2815 f->open_object_section("pgmap");
31f18b77 2816 pgservice->print_summary(f, NULL);
7c673cae
FG
2817 f->close_section();
2818 f->open_object_section("fsmap");
2819 mdsmon()->get_fsmap().print_summary(f, NULL);
2820 f->close_section();
7c673cae
FG
2821 f->open_object_section("mgrmap");
2822 mgrmon()->get_map().print_summary(f, nullptr);
2823 f->close_section();
224ce89b
WB
2824
2825 f->dump_object("servicemap", mgrstatmon()->get_service_map());
7c673cae
FG
2826 f->close_section();
2827 } else {
31f18b77
FG
2828 ss << " cluster:\n";
2829 ss << " id: " << monmap->get_fsid() << "\n";
224ce89b
WB
2830
2831 string health;
2832 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2833 get_health_status(false, nullptr, &health,
2834 "\n ", "\n ");
2835 } else {
2836 list<string> ls;
2837 get_health(ls, NULL, f);
2838 health = joinify(ls.begin(), ls.end(),
2839 string("\n "));
2840 }
2841 ss << " health: " << health << "\n";
2842
31f18b77 2843 ss << "\n \n services:\n";
224ce89b
WB
2844 {
2845 size_t maxlen = 3;
2846 auto& service_map = mgrstatmon()->get_service_map();
2847 for (auto& p : service_map.services) {
2848 maxlen = std::max(maxlen, p.first.size());
2849 }
2850 string spacing(maxlen - 3, ' ');
2851 const auto quorum_names = get_quorum_names();
2852 const auto mon_count = monmap->mon_info.size();
2853 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2854 << quorum_names;
2855 if (quorum_names.size() != mon_count) {
2856 std::list<std::string> out_of_q;
2857 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2858 if (quorum.count(i) == 0) {
2859 out_of_q.push_back(monmap->ranks[i]);
2860 }
2861 }
2862 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2863 out_of_q.end(), std::string(", "));
31f18b77 2864 }
7c673cae 2865 ss << "\n";
224ce89b
WB
2866 if (mgrmon()->in_use()) {
2867 ss << " mgr: " << spacing;
2868 mgrmon()->get_map().print_summary(nullptr, &ss);
2869 ss << "\n";
2870 }
2871 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2872 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2873 }
2874 ss << " osd: " << spacing;
2875 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2876 ss << "\n";
2877 for (auto& p : service_map.services) {
2878 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2879 << p.second.get_summary() << "\n";
2880 }
7c673cae 2881 }
31f18b77
FG
2882
2883 ss << "\n \n data:\n";
2884 pgservice->print_summary(NULL, &ss);
2885 ss << "\n ";
7c673cae
FG
2886 }
2887}
2888
2889void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2890 map<string,string> &param_str_map)
2891{
2892 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2893 p != cmdmap.end(); ++p) {
2894 if (p->first == "prefix")
2895 continue;
2896 if (p->first == "caps") {
2897 vector<string> cv;
2898 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2899 cv.size() % 2 == 0) {
2900 for (unsigned i = 0; i < cv.size(); i += 2) {
2901 string k = string("caps_") + cv[i];
2902 param_str_map[k] = cv[i + 1];
2903 }
2904 continue;
2905 }
2906 }
2907 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2908 }
2909}
2910
c07f9fc5
FG
2911const MonCommand *Monitor::_get_moncommand(
2912 const string &cmd_prefix,
d2e6a577
FG
2913 const vector<MonCommand>& cmds)
2914{
2915 for (auto& c : cmds) {
2916 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2917 return &c;
7c673cae
FG
2918 }
2919 }
d2e6a577 2920 return nullptr;
7c673cae
FG
2921}
2922
2923bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2924 const map<string,cmd_vartype>& cmdmap,
2925 const map<string,string>& param_str_map,
2926 const MonCommand *this_cmd) {
2927
2928 bool cmd_r = this_cmd->requires_perm('r');
2929 bool cmd_w = this_cmd->requires_perm('w');
2930 bool cmd_x = this_cmd->requires_perm('x');
2931
2932 bool capable = s->caps.is_capable(
2933 g_ceph_context,
2934 CEPH_ENTITY_TYPE_MON,
2935 s->entity_name,
2936 module, prefix, param_str_map,
2937 cmd_r, cmd_w, cmd_x);
2938
2939 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2940 return capable;
2941}
2942
c07f9fc5 2943void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
7c673cae
FG
2944 Formatter *f,
2945 bufferlist *rdata,
2946 bool hide_mgr_flag)
2947{
2948 int cmdnum = 0;
2949 f->open_object_section("command_descriptions");
c07f9fc5
FG
2950 for (const auto &cmd : commands) {
2951 unsigned flags = cmd.flags;
7c673cae
FG
2952 if (hide_mgr_flag) {
2953 flags &= ~MonCommand::FLAG_MGR;
2954 }
2955 ostringstream secname;
2956 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2957 dump_cmddesc_to_json(f, secname.str(),
c07f9fc5
FG
2958 cmd.cmdstring, cmd.helpstring, cmd.module,
2959 cmd.req_perms, cmd.availability, flags);
7c673cae
FG
2960 cmdnum++;
2961 }
2962 f->close_section(); // command_descriptions
2963
2964 f->flush(*rdata);
2965}
2966
7c673cae
FG
2967bool Monitor::is_keyring_required()
2968{
2969 string auth_cluster_required = g_conf->auth_supported.empty() ?
2970 g_conf->auth_cluster_required : g_conf->auth_supported;
2971 string auth_service_required = g_conf->auth_supported.empty() ?
2972 g_conf->auth_service_required : g_conf->auth_supported;
2973
2974 return auth_service_required == "cephx" ||
2975 auth_cluster_required == "cephx";
2976}
2977
2978struct C_MgrProxyCommand : public Context {
2979 Monitor *mon;
2980 MonOpRequestRef op;
2981 uint64_t size;
2982 bufferlist outbl;
2983 string outs;
2984 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2985 : mon(mon), op(op), size(s) { }
2986 void finish(int r) {
2987 Mutex::Locker l(mon->lock);
2988 mon->mgr_proxy_bytes -= size;
2989 mon->reply_command(op, r, outs, outbl, 0);
2990 }
2991};
2992
2993void Monitor::handle_command(MonOpRequestRef op)
2994{
2995 assert(op->is_type_command());
2996 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2997 if (m->fsid != monmap->fsid) {
2998 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2999 reply_command(op, -EPERM, "wrong fsid", 0);
3000 return;
3001 }
3002
3003 MonSession *session = static_cast<MonSession *>(
3004 m->get_connection()->get_priv());
3005 if (!session) {
3006 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3007 return;
3008 }
3009 BOOST_SCOPE_EXIT_ALL(=) {
3010 session->put();
3011 };
3012
3013 if (m->cmd.empty()) {
3014 string rs = "No command supplied";
3015 reply_command(op, -EINVAL, rs, 0);
3016 return;
3017 }
3018
3019 string prefix;
3020 vector<string> fullcmd;
3021 map<string, cmd_vartype> cmdmap;
3022 stringstream ss, ds;
3023 bufferlist rdata;
3024 string rs;
3025 int r = -EINVAL;
3026 rs = "unrecognized command";
3027
3028 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3029 // ss has reason for failure
3030 r = -EINVAL;
3031 rs = ss.str();
3032 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3033 reply_command(op, r, rs, 0);
3034 return;
3035 }
3036
3037 // check return value. If no prefix parameter provided,
3038 // return value will be false, then return error info.
3039 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
3040 reply_command(op, -EINVAL, "command prefix not found", 0);
3041 return;
3042 }
3043
3044 // check prefix is empty
3045 if (prefix.empty()) {
3046 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3047 return;
3048 }
3049
3050 if (prefix == "get_command_descriptions") {
3051 bufferlist rdata;
3052 Formatter *f = Formatter::create("json");
3053 // hide mgr commands until luminous upgrade is complete
3054 bool hide_mgr_flag =
31f18b77 3055 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
c07f9fc5
FG
3056
3057 std::vector<MonCommand> commands;
d2e6a577
FG
3058
3059 // only include mgr commands once all mons are upgrade (and we've dropped
3060 // the hard-coded PGMonitor commands)
3061 if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
3062 commands = static_cast<MgrMonitor*>(
c07f9fc5 3063 paxos_service[PAXOS_MGR])->get_command_descs();
d2e6a577 3064 }
c07f9fc5 3065
d2e6a577
FG
3066 for (auto& c : leader_mon_commands) {
3067 commands.push_back(c);
c07f9fc5
FG
3068 }
3069
3070 format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
7c673cae
FG
3071 delete f;
3072 reply_command(op, 0, "", rdata, 0);
3073 return;
3074 }
3075
3076 string module;
3077 string err;
3078
3079 dout(0) << "handle_command " << *m << dendl;
3080
3081 string format;
3082 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
3083 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3084
3085 get_str_vec(prefix, fullcmd);
3086
3087 // make sure fullcmd is not empty.
3088 // invalid prefix will cause empty vector fullcmd.
3089 // such as, prefix=";,,;"
3090 if (fullcmd.empty()) {
3091 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3092 return;
3093 }
3094
3095 module = fullcmd[0];
3096
3097 // validate command is in leader map
3098
3099 const MonCommand *leader_cmd;
c07f9fc5
FG
3100 const auto& mgr_cmds = mgrmon()->get_command_descs();
3101 const MonCommand *mgr_cmd = nullptr;
3102 if (!mgr_cmds.empty()) {
d2e6a577 3103 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
c07f9fc5 3104 }
d2e6a577 3105 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
7c673cae 3106 if (!leader_cmd) {
c07f9fc5
FG
3107 leader_cmd = mgr_cmd;
3108 if (!leader_cmd) {
3109 reply_command(op, -EINVAL, "command not known", 0);
3110 return;
3111 }
7c673cae
FG
3112 }
3113 // validate command is in our map & matches, or forward if it is allowed
d2e6a577
FG
3114 const MonCommand *mon_cmd = _get_moncommand(
3115 prefix,
3116 get_local_commands(quorum_mon_features));
c07f9fc5
FG
3117 if (!mon_cmd) {
3118 mon_cmd = mgr_cmd;
3119 }
7c673cae
FG
3120 if (!is_leader()) {
3121 if (!mon_cmd) {
3122 if (leader_cmd->is_noforward()) {
3123 reply_command(op, -EINVAL,
3124 "command not locally supported and not allowed to forward",
3125 0);
3126 return;
3127 }
3128 dout(10) << "Command not locally supported, forwarding request "
3129 << m << dendl;
3130 forward_request_leader(op);
3131 return;
3132 } else if (!mon_cmd->is_compat(leader_cmd)) {
3133 if (mon_cmd->is_noforward()) {
3134 reply_command(op, -EINVAL,
3135 "command not compatible with leader and not allowed to forward",
3136 0);
3137 return;
3138 }
3139 dout(10) << "Command not compatible with leader, forwarding request "
3140 << m << dendl;
3141 forward_request_leader(op);
3142 return;
3143 }
3144 }
3145
3146 if (mon_cmd->is_obsolete() ||
3147 (cct->_conf->mon_debug_deprecated_as_obsolete
3148 && mon_cmd->is_deprecated())) {
3149 reply_command(op, -ENOTSUP,
3150 "command is obsolete; please check usage and/or man page",
3151 0);
3152 return;
3153 }
3154
3155 if (session->proxy_con && mon_cmd->is_noforward()) {
3156 dout(10) << "Got forward for noforward command " << m << dendl;
3157 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3158 return;
3159 }
3160
3161 /* what we perceive as being the service the command falls under */
3162 string service(mon_cmd->module);
3163
3164 dout(25) << __func__ << " prefix='" << prefix
3165 << "' module='" << module
3166 << "' service='" << service << "'" << dendl;
3167
3168 bool cmd_is_rw =
3169 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3170
3171 // validate user's permissions for requested command
3172 map<string,string> param_str_map;
3173 _generate_command_map(cmdmap, param_str_map);
3174 if (!_allowed_command(session, service, prefix, cmdmap,
3175 param_str_map, mon_cmd)) {
3176 dout(1) << __func__ << " access denied" << dendl;
3177 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3178 << "from='" << session->inst << "' "
3179 << "entity='" << session->entity_name << "' "
3180 << "cmd=" << m->cmd << ": access denied";
3181 reply_command(op, -EACCES, "access denied", 0);
3182 return;
3183 }
3184
3185 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3186 << "from='" << session->inst << "' "
3187 << "entity='" << session->entity_name << "' "
3188 << "cmd=" << m->cmd << ": dispatch";
3189
3190 if (mon_cmd->is_mgr() &&
31f18b77 3191 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
7c673cae
FG
3192 const auto& hdr = m->get_header();
3193 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3efd9988
FG
3194 uint64_t max = g_conf->get_val<uint64_t>("mon_client_bytes")
3195 * g_conf->get_val<double>("mon_mgr_proxy_client_bytes_ratio");
7c673cae
FG
3196 if (mgr_proxy_bytes + size > max) {
3197 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3198 << " + " << size << " > max " << max << dendl;
3199 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3200 return;
3201 }
3202 mgr_proxy_bytes += size;
3203 dout(10) << __func__ << " proxying mgr command (+" << size
3204 << " -> " << mgr_proxy_bytes << ")" << dendl;
3205 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3206 mgr_client.start_command(m->cmd,
3207 m->get_data(),
3208 &fin->outbl,
3209 &fin->outs,
3210 new C_OnFinisher(fin, &finisher));
3211 return;
3212 }
3213
d2e6a577
FG
3214 if ((module == "mds" || module == "fs") &&
3215 prefix != "fs authorize") {
7c673cae
FG
3216 mdsmon()->dispatch(op);
3217 return;
3218 }
31f18b77
FG
3219 if ((module == "osd" || prefix == "pg map") &&
3220 prefix != "osd last-stat-seq") {
7c673cae
FG
3221 osdmon()->dispatch(op);
3222 return;
3223 }
3224
3225 if (module == "pg") {
3226 pgmon()->dispatch(op);
3227 return;
3228 }
3229 if (module == "mon" &&
3230 /* Let the Monitor class handle the following commands:
3231 * 'mon compact'
3232 * 'mon scrub'
3233 * 'mon sync force'
3234 */
3235 prefix != "mon compact" &&
3236 prefix != "mon scrub" &&
3237 prefix != "mon sync force" &&
31f18b77
FG
3238 prefix != "mon metadata" &&
3239 prefix != "mon versions" &&
3240 prefix != "mon count-metadata") {
7c673cae
FG
3241 monmon()->dispatch(op);
3242 return;
3243 }
d2e6a577 3244 if (module == "auth" || prefix == "fs authorize") {
7c673cae
FG
3245 authmon()->dispatch(op);
3246 return;
3247 }
3248 if (module == "log") {
3249 logmon()->dispatch(op);
3250 return;
3251 }
3252
3253 if (module == "config-key") {
3254 config_key_service->dispatch(op);
3255 return;
3256 }
3257
3258 if (module == "mgr") {
3259 mgrmon()->dispatch(op);
3260 return;
3261 }
3262
3263 if (prefix == "fsid") {
3264 if (f) {
3265 f->open_object_section("fsid");
3266 f->dump_stream("fsid") << monmap->fsid;
3267 f->close_section();
3268 f->flush(rdata);
3269 } else {
3270 ds << monmap->fsid;
3271 rdata.append(ds);
3272 }
3273 reply_command(op, 0, "", rdata, 0);
3274 return;
3275 }
3276
3277 if (prefix == "scrub" || prefix == "mon scrub") {
3278 wait_for_paxos_write();
3279 if (is_leader()) {
3280 int r = scrub_start();
3281 reply_command(op, r, "", rdata, 0);
3282 } else if (is_peon()) {
3283 forward_request_leader(op);
3284 } else {
3285 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3286 }
3287 return;
3288 }
3289
3290 if (prefix == "compact" || prefix == "mon compact") {
3291 dout(1) << "triggering manual compaction" << dendl;
3292 utime_t start = ceph_clock_now();
3293 store->compact();
3294 utime_t end = ceph_clock_now();
3295 end -= start;
3296 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3297 ostringstream oss;
224ce89b 3298 oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
7c673cae
FG
3299 rs = oss.str();
3300 r = 0;
3301 }
3302 else if (prefix == "injectargs") {
3303 vector<string> injected_args;
3304 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3305 if (!injected_args.empty()) {
3306 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3307 ostringstream oss;
3308 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3309 ss << "injectargs:" << oss.str();
3310 rs = ss.str();
3311 goto out;
3312 } else {
3313 rs = "must supply options to be parsed in a single string";
3314 r = -EINVAL;
3315 }
224ce89b
WB
3316 } else if (prefix == "time-sync-status") {
3317 if (!f)
3318 f.reset(Formatter::create("json-pretty"));
3319 f->open_object_section("time_sync");
3320 if (!timecheck_skews.empty()) {
3321 f->open_object_section("time_skew_status");
3322 for (auto& i : timecheck_skews) {
3323 entity_inst_t inst = i.first;
3324 double skew = i.second;
3325 double latency = timecheck_latencies[inst];
3326 string name = monmap->get_name(inst.addr);
3327 ostringstream tcss;
3328 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3329 f->open_object_section(name.c_str());
3330 f->dump_float("skew", skew);
3331 f->dump_float("latency", latency);
3332 f->dump_stream("health") << tcstatus;
3333 if (tcstatus != HEALTH_OK) {
3334 f->dump_stream("details") << tcss.str();
3335 }
3336 f->close_section();
3337 }
3338 f->close_section();
3339 }
3340 f->open_object_section("timechecks");
3341 f->dump_unsigned("epoch", get_epoch());
3342 f->dump_int("round", timecheck_round);
3343 f->dump_stream("round_status") << ((timecheck_round%2) ?
3344 "on-going" : "finished");
3345 f->close_section();
3346 f->close_section();
3347 f->flush(rdata);
3348 r = 0;
3349 rs = "";
c07f9fc5
FG
3350 } else if (prefix == "config set") {
3351 std::string key;
3352 cmd_getval(cct, cmdmap, "key", key);
3353 std::string val;
3354 cmd_getval(cct, cmdmap, "value", val);
3355 r = g_conf->set_val(key, val, true, &ss);
d2e6a577
FG
3356 if (r == 0) {
3357 g_conf->apply_changes(nullptr);
3358 }
c07f9fc5
FG
3359 rs = ss.str();
3360 goto out;
7c673cae
FG
3361 } else if (prefix == "status" ||
3362 prefix == "health" ||
3363 prefix == "df") {
3364 string detail;
3365 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3366
3367 if (prefix == "status") {
3368 // get_cluster_status handles f == NULL
3369 get_cluster_status(ds, f.get());
3370
3371 if (f) {
3372 f->flush(ds);
3373 ds << '\n';
3374 }
3375 rdata.append(ds);
3376 } else if (prefix == "health") {
224ce89b
WB
3377 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3378 string plain;
3379 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3380 if (f) {
3381 f->flush(rdata);
3382 } else {
3383 rdata.append(plain);
3384 }
7c673cae 3385 } else {
224ce89b
WB
3386 list<string> health_str;
3387 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3388 if (f) {
3389 f->flush(ds);
3390 ds << '\n';
3391 } else {
3392 assert(!health_str.empty());
3393 ds << health_str.front();
3394 health_str.pop_front();
3395 if (!health_str.empty()) {
3396 ds << ' ';
3397 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3398 }
7c673cae 3399 }
224ce89b
WB
3400 bufferlist comb;
3401 comb.append(ds);
3402 if (detail == "detail")
3403 comb.append(rdata);
3404 rdata = comb;
7c673cae 3405 }
7c673cae
FG
3406 } else if (prefix == "df") {
3407 bool verbose = (detail == "detail");
3408 if (f)
3409 f->open_object_section("stats");
3410
31f18b77 3411 pgservice->dump_fs_stats(&ds, f.get(), verbose);
7c673cae
FG
3412 if (!f)
3413 ds << '\n';
31f18b77 3414 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
7c673cae
FG
3415
3416 if (f) {
3417 f->close_section();
3418 f->flush(ds);
3419 ds << '\n';
3420 }
3421 } else {
3422 assert(0 == "We should never get here!");
3423 return;
3424 }
3425 rdata.append(ds);
3426 rs = "";
3427 r = 0;
3428 } else if (prefix == "report") {
3429
3430 // this must be formatted, in its current form
3431 if (!f)
3432 f.reset(Formatter::create("json-pretty"));
3433 f->open_object_section("report");
3434 f->dump_stream("cluster_fingerprint") << fingerprint;
3435 f->dump_string("version", ceph_version_to_str());
3436 f->dump_string("commit", git_version_to_str());
3437 f->dump_stream("timestamp") << ceph_clock_now();
3438
3439 vector<string> tagsvec;
3440 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3441 string tagstr = str_join(tagsvec, " ");
3442 if (!tagstr.empty())
3443 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3444 f->dump_string("tag", tagstr);
3445
181888fb
FG
3446 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3447 get_health_status(true, f.get(), nullptr);
3448 } else {
3449 list<string> health_str;
3450 get_health(health_str, nullptr, f.get());
3451 }
7c673cae
FG
3452
3453 monmon()->dump_info(f.get());
3454 osdmon()->dump_info(f.get());
3455 mdsmon()->dump_info(f.get());
7c673cae 3456 authmon()->dump_info(f.get());
31f18b77 3457 pgservice->dump_info(f.get());
7c673cae
FG
3458
3459 paxos->dump_info(f.get());
3460
3461 f->close_section();
3462 f->flush(rdata);
3463
3464 ostringstream ss2;
3465 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3466 rs = ss2.str();
3467 r = 0;
31f18b77
FG
3468 } else if (prefix == "osd last-stat-seq") {
3469 int64_t osd;
3470 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3471 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3472 if (f) {
3473 f->dump_unsigned("seq", seq);
3474 f->flush(ds);
3475 } else {
3476 ds << seq;
3477 rdata.append(ds);
3478 }
3479 rs = "";
3480 r = 0;
7c673cae
FG
3481 } else if (prefix == "node ls") {
3482 string node_type("all");
3483 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3484 if (!f)
3485 f.reset(Formatter::create("json-pretty"));
3486 if (node_type == "all") {
3487 f->open_object_section("nodes");
3488 print_nodes(f.get(), ds);
3489 osdmon()->print_nodes(f.get());
3490 mdsmon()->print_nodes(f.get());
3491 f->close_section();
3492 } else if (node_type == "mon") {
3493 print_nodes(f.get(), ds);
3494 } else if (node_type == "osd") {
3495 osdmon()->print_nodes(f.get());
3496 } else if (node_type == "mds") {
3497 mdsmon()->print_nodes(f.get());
3498 }
3499 f->flush(ds);
3500 rdata.append(ds);
3501 rs = "";
3502 r = 0;
31f18b77
FG
3503 } else if (prefix == "features") {
3504 if (!is_leader() && !is_peon()) {
3505 dout(10) << " waiting for quorum" << dendl;
3506 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3507 return;
3508 }
3509 if (!is_leader()) {
3510 forward_request_leader(op);
3511 return;
3512 }
3513 if (!f)
3514 f.reset(Formatter::create("json-pretty"));
3515 FeatureMap fm;
3516 get_combined_feature_map(&fm);
3517 f->dump_object("features", fm);
3518 f->flush(rdata);
3519 rs = "";
3520 r = 0;
7c673cae
FG
3521 } else if (prefix == "mon metadata") {
3522 if (!f)
3523 f.reset(Formatter::create("json-pretty"));
3524
3525 string name;
3526 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3527 if (!all) {
3528 // Dump a single mon's metadata
3529 int mon = monmap->get_rank(name);
3530 if (mon < 0) {
3531 rs = "requested mon not found";
3532 r = -ENOENT;
3533 goto out;
3534 }
3535 f->open_object_section("mon_metadata");
3536 r = get_mon_metadata(mon, f.get(), ds);
3537 f->close_section();
3538 } else {
3539 // Dump all mons' metadata
3540 r = 0;
3541 f->open_array_section("mon_metadata");
3542 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3543 std::ostringstream get_err;
3544 f->open_object_section("mon");
3545 f->dump_string("name", monmap->get_name(rank));
3546 r = get_mon_metadata(rank, f.get(), get_err);
3547 f->close_section();
3548 if (r == -ENOENT || r == -EINVAL) {
3549 dout(1) << get_err.str() << dendl;
3550 // Drop error, list what metadata we do have
3551 r = 0;
3552 } else if (r != 0) {
3553 derr << "Unexpected error from get_mon_metadata: "
3554 << cpp_strerror(r) << dendl;
3555 ds << get_err.str();
3556 break;
3557 }
3558 }
3559 f->close_section();
3560 }
3561
3562 f->flush(ds);
3563 rdata.append(ds);
3564 rs = "";
31f18b77
FG
3565 } else if (prefix == "mon versions") {
3566 if (!f)
3567 f.reset(Formatter::create("json-pretty"));
3568 count_metadata("ceph_version", f.get());
3569 f->flush(ds);
3570 rdata.append(ds);
3571 rs = "";
3572 r = 0;
3573 } else if (prefix == "mon count-metadata") {
3574 if (!f)
3575 f.reset(Formatter::create("json-pretty"));
3576 string field;
3577 cmd_getval(g_ceph_context, cmdmap, "property", field);
3578 count_metadata(field, f.get());
3579 f->flush(ds);
3580 rdata.append(ds);
3581 rs = "";
3582 r = 0;
7c673cae
FG
3583 } else if (prefix == "quorum_status") {
3584 // make sure our map is readable and up to date
3585 if (!is_leader() && !is_peon()) {
3586 dout(10) << " waiting for quorum" << dendl;
3587 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3588 return;
3589 }
3590 _quorum_status(f.get(), ds);
3591 rdata.append(ds);
3592 rs = "";
3593 r = 0;
3594 } else if (prefix == "mon_status") {
3595 get_mon_status(f.get(), ds);
3596 if (f)
3597 f->flush(ds);
3598 rdata.append(ds);
3599 rs = "";
3600 r = 0;
3601 } else if (prefix == "sync force" ||
3602 prefix == "mon sync force") {
3603 string validate1, validate2;
3604 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3605 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3606 if (validate1 != "--yes-i-really-mean-it" ||
3607 validate2 != "--i-know-what-i-am-doing") {
3608 r = -EINVAL;
3609 rs = "are you SURE? this will mean the monitor store will be "
3610 "erased. pass '--yes-i-really-mean-it "
3611 "--i-know-what-i-am-doing' if you really do.";
3612 goto out;
3613 }
3614 sync_force(f.get(), ds);
3615 rs = ds.str();
3616 r = 0;
3617 } else if (prefix == "heap") {
3618 if (!ceph_using_tcmalloc())
3619 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3620 else {
3621 string heapcmd;
3622 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3623 // XXX 1-element vector, change at callee or make vector here?
3624 vector<string> heapcmd_vec;
3625 get_str_vec(heapcmd, heapcmd_vec);
3626 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3627 rdata.append(ds);
3628 rs = "";
3629 r = 0;
3630 }
3631 } else if (prefix == "quorum") {
3632 string quorumcmd;
3633 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3634 if (quorumcmd == "exit") {
3635 start_election();
3636 elector.stop_participating();
3637 rs = "stopped responding to quorum, initiated new election";
3638 r = 0;
3639 } else if (quorumcmd == "enter") {
3640 elector.start_participating();
3641 start_election();
3642 rs = "started responding to quorum, initiated new election";
3643 r = 0;
3644 } else {
3645 rs = "needs a valid 'quorum' command";
3646 r = -EINVAL;
3647 }
3648 } else if (prefix == "version") {
3649 if (f) {
3650 f->open_object_section("version");
3651 f->dump_string("version", pretty_version_to_str());
3652 f->close_section();
3653 f->flush(ds);
3654 } else {
3655 ds << pretty_version_to_str();
3656 }
3657 rdata.append(ds);
3658 rs = "";
3659 r = 0;
c07f9fc5
FG
3660 } else if (prefix == "versions") {
3661 if (!f)
3662 f.reset(Formatter::create("json-pretty"));
3663 map<string,int> overall;
3664 f->open_object_section("version");
3665 map<string,int> mon, mgr, osd, mds;
3666
3667 count_metadata("ceph_version", &mon);
3668 f->open_object_section("mon");
3669 for (auto& p : mon) {
3670 f->dump_int(p.first.c_str(), p.second);
3671 overall[p.first] += p.second;
3672 }
3673 f->close_section();
3674
3675 mgrmon()->count_metadata("ceph_version", &mgr);
3676 f->open_object_section("mgr");
3677 for (auto& p : mgr) {
3678 f->dump_int(p.first.c_str(), p.second);
3679 overall[p.first] += p.second;
3680 }
3681 f->close_section();
3682
3683 osdmon()->count_metadata("ceph_version", &osd);
3684 f->open_object_section("osd");
3685 for (auto& p : osd) {
3686 f->dump_int(p.first.c_str(), p.second);
3687 overall[p.first] += p.second;
3688 }
3689 f->close_section();
3690
3691 mdsmon()->count_metadata("ceph_version", &mds);
3692 f->open_object_section("mds");
35e4c445 3693 for (auto& p : mds) {
c07f9fc5
FG
3694 f->dump_int(p.first.c_str(), p.second);
3695 overall[p.first] += p.second;
3696 }
3697 f->close_section();
3698
3699 for (auto& p : mgrstatmon()->get_service_map().services) {
3700 f->open_object_section(p.first.c_str());
3701 map<string,int> m;
3702 p.second.count_metadata("ceph_version", &m);
3703 for (auto& q : m) {
3704 f->dump_int(q.first.c_str(), q.second);
3705 overall[q.first] += q.second;
3706 }
3707 f->close_section();
3708 }
3709
3710 f->open_object_section("overall");
3711 for (auto& p : overall) {
3712 f->dump_int(p.first.c_str(), p.second);
3713 }
3714 f->close_section();
3715 f->close_section();
3716 f->flush(rdata);
3717 rs = "";
3718 r = 0;
7c673cae
FG
3719 }
3720
3721 out:
3722 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3723 reply_command(op, r, rs, rdata, 0);
3724}
3725
3726void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3727{
3728 bufferlist rdata;
3729 reply_command(op, rc, rs, rdata, version);
3730}
3731
3732void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3733 bufferlist& rdata, version_t version)
3734{
3735 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3736 assert(m->get_type() == MSG_MON_COMMAND);
3737 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3738 reply->set_tid(m->get_tid());
3739 reply->set_data(rdata);
3740 send_reply(op, reply);
3741}
3742
3743
3744// ------------------------
3745// request/reply routing
3746//
3747// a client/mds/osd will connect to a random monitor. we need to forward any
3748// messages requiring state updates to the leader, and then route any replies
3749// back via the correct monitor and back to them. (the monitor will not
3750// initiate any connections.)
3751
3752void Monitor::forward_request_leader(MonOpRequestRef op)
3753{
3754 op->mark_event(__func__);
3755
3756 int mon = get_leader();
3757 MonSession *session = op->get_session();
3758 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3759
3760 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3761 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3762 } else if (session->proxy_con) {
3763 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3764 } else if (!session->closed) {
3765 RoutedRequest *rr = new RoutedRequest;
3766 rr->tid = ++routed_request_tid;
3767 rr->client_inst = req->get_source_inst();
3768 rr->con = req->get_connection();
3769 rr->con_features = rr->con->get_features();
3770 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3771 rr->session = static_cast<MonSession *>(session->get());
3772 rr->op = op;
3773 routed_requests[rr->tid] = rr;
3774 session->routed_request_tids.insert(rr->tid);
3775
3776 dout(10) << "forward_request " << rr->tid << " request " << *req
3777 << " features " << rr->con_features << dendl;
3778
3779 MForward *forward = new MForward(rr->tid,
3780 req,
3781 rr->con_features,
3782 rr->session->caps);
3783 forward->set_priority(req->get_priority());
3784 if (session->auth_handler) {
3785 forward->entity_name = session->entity_name;
3786 } else if (req->get_source().is_mon()) {
3787 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3788 }
3789 messenger->send_message(forward, monmap->get_inst(mon));
3790 op->mark_forwarded();
3791 assert(op->get_req()->get_type() != 0);
3792 } else {
3793 dout(10) << "forward_request no session for request " << *req << dendl;
3794 }
3795}
3796
3797// fake connection attached to forwarded messages
3798struct AnonConnection : public Connection {
3799 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3800
3801 int send_message(Message *m) override {
3802 assert(!"send_message on anonymous connection");
3803 }
3804 void send_keepalive() override {
3805 assert(!"send_keepalive on anonymous connection");
3806 }
3807 void mark_down() override {
3808 // silently ignore
3809 }
3810 void mark_disposable() override {
3811 // silengtly ignore
3812 }
3813 bool is_connected() override { return false; }
3814};
3815
3816//extract the original message and put it into the regular dispatch function
3817void Monitor::handle_forward(MonOpRequestRef op)
3818{
3819 MForward *m = static_cast<MForward*>(op->get_req());
3820 dout(10) << "received forwarded message from " << m->client
3821 << " via " << m->get_source_inst() << dendl;
3822 MonSession *session = op->get_session();
3823 assert(session);
3824
3825 if (!session->is_capable("mon", MON_CAP_X)) {
3826 dout(0) << "forward from entity with insufficient caps! "
3827 << session->caps << dendl;
3828 } else {
3829 // see PaxosService::dispatch(); we rely on this being anon
3830 // (c->msgr == NULL)
3831 PaxosServiceMessage *req = m->claim_message();
3832 assert(req != NULL);
3833
3834 ConnectionRef c(new AnonConnection(cct));
3835 MonSession *s = new MonSession(req->get_source_inst(),
3836 static_cast<Connection*>(c.get()));
3837 c->set_priv(s->get());
3838 c->set_peer_addr(m->client.addr);
3839 c->set_peer_type(m->client.name.type());
3840 c->set_features(m->con_features);
3841
3842 s->caps = m->client_caps;
3843 dout(10) << " caps are " << s->caps << dendl;
3844 s->entity_name = m->entity_name;
3845 dout(10) << " entity name '" << s->entity_name << "' type "
3846 << s->entity_name.get_type() << dendl;
3847 s->proxy_con = m->get_connection();
3848 s->proxy_tid = m->tid;
3849
3850 req->set_connection(c);
3851
3852 // not super accurate, but better than nothing.
3853 req->set_recv_stamp(m->get_recv_stamp());
3854
3855 /*
3856 * note which election epoch this is; we will drop the message if
3857 * there is a future election since our peers will resend routed
3858 * requests in that case.
3859 */
3860 req->rx_election_epoch = get_epoch();
3861
3862 /* Because this is a special fake connection, we need to break
3863 the ref loop between Connection and MonSession differently
3864 than we normally do. Here, the Message refers to the Connection
3865 which refers to the Session, and nobody else refers to the Connection
3866 or the Session. And due to the special nature of this message,
3867 nobody refers to the Connection via the Session. So, clear out that
3868 half of the ref loop.*/
3869 s->con.reset(NULL);
3870
3871 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3872
3873 _ms_dispatch(req);
3874 s->put();
3875 }
3876}
3877
3878void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3879{
3880 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3881
3882 bufferlist bl;
3883 encode_message(m, quorum_con_features, bl);
3884
3885 messenger->send_message(m, to);
3886
3887 for (int i=0; i<(int)monmap->size(); i++) {
3888 if (i != rank)
3889 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3890 }
3891}
3892
3893void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3894{
3895 op->mark_event(__func__);
3896
3897 MonSession *session = op->get_session();
3898 assert(session);
3899 Message *req = op->get_req();
3900 ConnectionRef con = op->get_connection();
3901
3902 reply->set_cct(g_ceph_context);
3903 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3904
3905 if (!con) {
3906 dout(2) << "send_reply no connection, dropping reply " << *reply
3907 << " to " << req << " " << *req << dendl;
3908 reply->put();
3909 op->mark_event("reply: no connection");
3910 return;
3911 }
3912
3913 if (!session->con && !session->proxy_con) {
3914 dout(2) << "send_reply no connection, dropping reply " << *reply
3915 << " to " << req << " " << *req << dendl;
3916 reply->put();
3917 op->mark_event("reply: no connection");
3918 return;
3919 }
3920
3921 if (session->proxy_con) {
3922 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3923 << " via " << session->proxy_con->get_peer_addr()
3924 << " for request " << *req << dendl;
3925 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3926 op->mark_event("reply: send routed request");
3927 } else {
3928 session->con->send_message(reply);
3929 op->mark_event("reply: send");
3930 }
3931}
3932
3933void Monitor::no_reply(MonOpRequestRef op)
3934{
3935 MonSession *session = op->get_session();
3936 Message *req = op->get_req();
3937
3938 if (session->proxy_con) {
3939 dout(10) << "no_reply to " << req->get_source_inst()
3940 << " via " << session->proxy_con->get_peer_addr()
3941 << " for request " << *req << dendl;
3942 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3943 op->mark_event("no_reply: send routed request");
3944 } else {
3945 dout(10) << "no_reply to " << req->get_source_inst()
3946 << " " << *req << dendl;
3947 op->mark_event("no_reply");
3948 }
3949}
3950
3951void Monitor::handle_route(MonOpRequestRef op)
3952{
3953 MRoute *m = static_cast<MRoute*>(op->get_req());
3954 MonSession *session = op->get_session();
3955 //check privileges
3956 if (!session->is_capable("mon", MON_CAP_X)) {
3957 dout(0) << "MRoute received from entity without appropriate perms! "
3958 << dendl;
3959 return;
3960 }
3961 if (m->msg)
3962 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3963 else
3964 dout(10) << "handle_route null to " << m->dest << dendl;
3965
3966 // look it up
3967 if (m->session_mon_tid) {
3968 if (routed_requests.count(m->session_mon_tid)) {
3969 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3970
3971 // reset payload, in case encoding is dependent on target features
3972 if (m->msg) {
3973 m->msg->clear_payload();
3974 rr->con->send_message(m->msg);
3975 m->msg = NULL;
3976 }
3977 if (m->send_osdmap_first) {
3978 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3979 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3980 true, MonOpRequestRef());
3981 }
3982 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3983 routed_requests.erase(m->session_mon_tid);
3984 rr->session->routed_request_tids.erase(m->session_mon_tid);
3985 delete rr;
3986 } else {
3987 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3988 }
3989 } else {
3990 dout(10) << " not a routed request, trying to send anyway" << dendl;
3991 if (m->msg) {
3992 messenger->send_message(m->msg, m->dest);
3993 m->msg = NULL;
3994 }
3995 }
3996}
3997
3998void Monitor::resend_routed_requests()
3999{
4000 dout(10) << "resend_routed_requests" << dendl;
4001 int mon = get_leader();
4002 list<Context*> retry;
4003 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
4004 p != routed_requests.end();
4005 ++p) {
4006 RoutedRequest *rr = p->second;
4007
4008 if (mon == rank) {
4009 dout(10) << " requeue for self tid " << rr->tid << dendl;
4010 rr->op->mark_event("retry routed request");
4011 retry.push_back(new C_RetryMessage(this, rr->op));
4012 if (rr->session) {
4013 assert(rr->session->routed_request_tids.count(p->first));
4014 rr->session->routed_request_tids.erase(p->first);
4015 }
4016 delete rr;
4017 } else {
4018 bufferlist::iterator q = rr->request_bl.begin();
4019 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
4020 rr->op->mark_event("resend forwarded message to leader");
4021 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
4022 MForward *forward = new MForward(rr->tid, req, rr->con_features,
4023 rr->session->caps);
4024 req->put(); // forward takes its own ref; drop ours.
4025 forward->client = rr->client_inst;
4026 forward->set_priority(req->get_priority());
4027 messenger->send_message(forward, monmap->get_inst(mon));
4028 }
4029 }
4030 if (mon == rank) {
4031 routed_requests.clear();
4032 finish_contexts(g_ceph_context, retry);
4033 }
4034}
4035
4036void Monitor::remove_session(MonSession *s)
4037{
224ce89b
WB
4038 dout(10) << "remove_session " << s << " " << s->inst
4039 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
7c673cae
FG
4040 assert(s->con);
4041 assert(!s->closed);
4042 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4043 p != s->routed_request_tids.end();
4044 ++p) {
4045 assert(routed_requests.count(*p));
4046 RoutedRequest *rr = routed_requests[*p];
4047 dout(10) << " dropping routed request " << rr->tid << dendl;
4048 delete rr;
4049 routed_requests.erase(*p);
4050 }
4051 s->routed_request_tids.clear();
4052 s->con->set_priv(NULL);
4053 session_map.remove_session(s);
4054 logger->set(l_mon_num_sessions, session_map.get_size());
4055 logger->inc(l_mon_session_rm);
4056}
4057
4058void Monitor::remove_all_sessions()
4059{
4060 Mutex::Locker l(session_map_lock);
4061 while (!session_map.sessions.empty()) {
4062 MonSession *s = session_map.sessions.front();
4063 remove_session(s);
4064 if (logger)
4065 logger->inc(l_mon_session_rm);
4066 }
4067 if (logger)
4068 logger->set(l_mon_num_sessions, session_map.get_size());
4069}
4070
4071void Monitor::send_command(const entity_inst_t& inst,
4072 const vector<string>& com)
4073{
4074 dout(10) << "send_command " << inst << "" << com << dendl;
4075 MMonCommand *c = new MMonCommand(monmap->fsid);
4076 c->cmd = com;
4077 try_send_message(c, inst);
4078}
4079
4080void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4081{
4082 /**
4083 * Wait list the new session until we're in the quorum, assuming it's
4084 * sufficiently new.
4085 * tick() will periodically send them back through so we can send
4086 * the client elsewhere if we don't think we're getting back in.
4087 *
4088 * But we whitelist a few sorts of messages:
4089 * 1) Monitors can talk to us at any time, of course.
4090 * 2) auth messages. It's unlikely to go through much faster, but
4091 * it's possible we've just lost our quorum status and we want to take...
4092 * 3) command messages. We want to accept these under all possible
4093 * circumstances.
4094 */
4095 Message *m = op->get_req();
4096 MonSession *s = op->get_session();
4097 ConnectionRef con = op->get_connection();
4098 utime_t too_old = ceph_clock_now();
4099 too_old -= g_ceph_context->_conf->mon_lease;
4100 if (m->get_recv_stamp() > too_old &&
4101 con->is_connected()) {
4102 dout(5) << "waitlisting message " << *m << dendl;
4103 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4104 op->mark_wait_for_quorum();
4105 } else {
4106 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4107 con->mark_down();
4108 // proxied sessions aren't registered and don't have a con; don't remove
4109 // those.
4110 if (!s->proxy_con) {
4111 Mutex::Locker l(session_map_lock);
4112 remove_session(s);
4113 }
4114 op->mark_zap();
4115 }
4116}
4117
4118void Monitor::_ms_dispatch(Message *m)
4119{
4120 if (is_shutdown()) {
4121 m->put();
4122 return;
4123 }
4124
4125 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4126 bool src_is_mon = op->is_src_mon();
4127 op->mark_event("mon:_ms_dispatch");
4128 MonSession *s = op->get_session();
4129 if (s && s->closed) {
4130 return;
4131 }
224ce89b
WB
4132
4133 if (src_is_mon && s) {
4134 ConnectionRef con = m->get_connection();
4135 if (con->get_messenger() && con->get_features() != s->con_features) {
4136 // only update features if this is a non-anonymous connection
4137 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4138 << " (was " << s->con_features
4139 << ", now " << con->get_features() << ")" << dendl;
4140 // connection features changed - recreate session.
4141 if (s->con && s->con != con) {
4142 dout(10) << __func__ << " connection for " << m->get_source_inst()
4143 << " changed from session; mark down and replace" << dendl;
4144 s->con->mark_down();
4145 }
4146 if (s->item.is_on_list()) {
4147 // forwarded messages' sessions are not in the sessions map and
4148 // exist only while the op is being handled.
4149 remove_session(s);
4150 }
4151 s->put();
4152 s = nullptr;
4153 }
4154 }
4155
7c673cae
FG
4156 if (!s) {
4157 // if the sender is not a monitor, make sure their first message for a
4158 // session is an MAuth. If it is not, assume it's a stray message,
4159 // and considering that we are creating a new session it is safe to
4160 // assume that the sender hasn't authenticated yet, so we have no way
4161 // of assessing whether we should handle it or not.
4162 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4163 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4164 m->get_type() != CEPH_MSG_PING)) {
4165 dout(1) << __func__ << " dropping stray message " << *m
4166 << " from " << m->get_source_inst() << dendl;
4167 return;
4168 }
4169
4170 ConnectionRef con = m->get_connection();
4171 {
4172 Mutex::Locker l(session_map_lock);
4173 s = session_map.new_session(m->get_source_inst(), con.get());
4174 }
4175 assert(s);
4176 con->set_priv(s->get());
224ce89b
WB
4177 dout(10) << __func__ << " new session " << s << " " << *s
4178 << " features 0x" << std::hex
4179 << s->con_features << std::dec << dendl;
7c673cae
FG
4180 op->set_session(s);
4181
4182 logger->set(l_mon_num_sessions, session_map.get_size());
4183 logger->inc(l_mon_session_add);
4184
4185 if (src_is_mon) {
4186 // give it monitor caps; the peer type has been authenticated
4187 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4188 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4189 s->caps = *mon_caps;
4190 }
4191 s->put();
4192 } else {
4193 dout(20) << __func__ << " existing session " << s << " for " << s->inst
4194 << dendl;
4195 }
4196
4197 assert(s);
4198
4199 s->session_timeout = ceph_clock_now();
4200 s->session_timeout += g_conf->mon_session_timeout;
4201
4202 if (s->auth_handler) {
4203 s->entity_name = s->auth_handler->get_entity_name();
4204 }
4205 dout(20) << " caps " << s->caps.get_str() << dendl;
4206
4207 if ((is_synchronizing() ||
4208 (s->global_id == 0 && !exited_quorum.is_zero())) &&
4209 !src_is_mon &&
4210 m->get_type() != CEPH_MSG_PING) {
4211 waitlist_or_zap_client(op);
4212 } else {
4213 dispatch_op(op);
4214 }
4215 return;
4216}
4217
4218void Monitor::dispatch_op(MonOpRequestRef op)
4219{
4220 op->mark_event("mon:dispatch_op");
4221 MonSession *s = op->get_session();
4222 assert(s);
4223 if (s->closed) {
4224 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4225 return;
4226 }
4227
4228 /* we will consider the default type as being 'monitor' until proven wrong */
4229 op->set_type_monitor();
4230 /* deal with all messages that do not necessarily need caps */
4231 bool dealt_with = true;
4232 switch (op->get_req()->get_type()) {
4233 // auth
4234 case MSG_MON_GLOBAL_ID:
4235 case CEPH_MSG_AUTH:
4236 op->set_type_service();
4237 /* no need to check caps here */
4238 paxos_service[PAXOS_AUTH]->dispatch(op);
4239 break;
4240
4241 case CEPH_MSG_PING:
4242 handle_ping(op);
4243 break;
4244
4245 /* MMonGetMap may be used by clients to obtain a monmap *before*
4246 * authenticating with the monitor. We need to handle these without
4247 * checking caps because, even on a cluster without cephx, we only set
4248 * session caps *after* the auth handshake. A good example of this
4249 * is when a client calls MonClient::get_monmap_privately(), which does
4250 * not authenticate when obtaining a monmap.
4251 */
4252 case CEPH_MSG_MON_GET_MAP:
4253 handle_mon_get_map(op);
4254 break;
4255
4256 case CEPH_MSG_MON_METADATA:
4257 return handle_mon_metadata(op);
4258
4259 default:
4260 dealt_with = false;
4261 break;
4262 }
4263 if (dealt_with)
4264 return;
4265
4266 /* well, maybe the op belongs to a service... */
4267 op->set_type_service();
4268 /* deal with all messages which caps should be checked somewhere else */
4269 dealt_with = true;
4270 switch (op->get_req()->get_type()) {
4271
4272 // OSDs
4273 case CEPH_MSG_MON_GET_OSDMAP:
4274 case CEPH_MSG_POOLOP:
4275 case MSG_OSD_BEACON:
4276 case MSG_OSD_MARK_ME_DOWN:
4277 case MSG_OSD_FULL:
4278 case MSG_OSD_FAILURE:
4279 case MSG_OSD_BOOT:
4280 case MSG_OSD_ALIVE:
4281 case MSG_OSD_PGTEMP:
4282 case MSG_OSD_PG_CREATED:
4283 case MSG_REMOVE_SNAPS:
4284 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4285 break;
4286
4287 // MDSs
4288 case MSG_MDS_BEACON:
4289 case MSG_MDS_OFFLOAD_TARGETS:
4290 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4291 break;
4292
4293 // Mgrs
4294 case MSG_MGR_BEACON:
4295 paxos_service[PAXOS_MGR]->dispatch(op);
4296 break;
4297
31f18b77 4298 // MgrStat
7c673cae 4299 case CEPH_MSG_STATFS:
b32b8144
FG
4300 // this is an ugly hack, sorry! force the version to 1 so that we do
4301 // not run afoul of the is_readable() paxos check. the client is going
4302 // by the pgmonitor version and the MgrStatMonitor version will lag behind
4303 // that until we complete the upgrade. The paxos ordering crap really
4304 // doesn't matter for statfs results, so just kludge around it here.
4305 if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
4306 ((MStatfs*)op->get_req())->version = 1;
4307 }
4308 case MSG_MON_MGR_REPORT:
7c673cae 4309 case MSG_GETPOOLSTATS:
31f18b77
FG
4310 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4311 break;
4312
4313 // pg
4314 case MSG_PGSTATS:
7c673cae
FG
4315 paxos_service[PAXOS_PGMAP]->dispatch(op);
4316 break;
4317
4318 // log
4319 case MSG_LOG:
4320 paxos_service[PAXOS_LOG]->dispatch(op);
4321 break;
4322
4323 // handle_command() does its own caps checking
4324 case MSG_MON_COMMAND:
4325 op->set_type_command();
4326 handle_command(op);
4327 break;
4328
4329 default:
4330 dealt_with = false;
4331 break;
4332 }
4333 if (dealt_with)
4334 return;
4335
4336 /* nop, looks like it's not a service message; revert back to monitor */
4337 op->set_type_monitor();
4338
4339 /* messages we, the Monitor class, need to deal with
4340 * but may be sent by clients. */
4341
4342 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4343 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4344 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4345 << dendl;
4346 goto drop;
4347 }
4348
4349 dealt_with = true;
4350 switch (op->get_req()->get_type()) {
4351
4352 // misc
4353 case CEPH_MSG_MON_GET_VERSION:
4354 handle_get_version(op);
4355 break;
4356
4357 case CEPH_MSG_MON_SUBSCRIBE:
4358 /* FIXME: check what's being subscribed, filter accordingly */
4359 handle_subscribe(op);
4360 break;
4361
4362 default:
4363 dealt_with = false;
4364 break;
4365 }
4366 if (dealt_with)
4367 return;
4368
4369 if (!op->is_src_mon()) {
4370 dout(1) << __func__ << " unexpected monitor message from"
4371 << " non-monitor entity " << op->get_req()->get_source_inst()
4372 << " " << *(op->get_req()) << " -- dropping" << dendl;
4373 goto drop;
4374 }
4375
4376 /* messages that should only be sent by another monitor */
4377 dealt_with = true;
4378 switch (op->get_req()->get_type()) {
4379
4380 case MSG_ROUTE:
4381 handle_route(op);
4382 break;
4383
4384 case MSG_MON_PROBE:
4385 handle_probe(op);
4386 break;
4387
4388 // Sync (i.e., the new slurp, but on steroids)
4389 case MSG_MON_SYNC:
4390 handle_sync(op);
4391 break;
4392 case MSG_MON_SCRUB:
4393 handle_scrub(op);
4394 break;
4395
4396 /* log acks are sent from a monitor we sent the MLog to, and are
4397 never sent by clients to us. */
4398 case MSG_LOGACK:
4399 log_client.handle_log_ack((MLogAck*)op->get_req());
4400 break;
4401
4402 // monmap
4403 case MSG_MON_JOIN:
4404 op->set_type_service();
4405 paxos_service[PAXOS_MONMAP]->dispatch(op);
4406 break;
4407
4408 // paxos
4409 case MSG_MON_PAXOS:
4410 {
4411 op->set_type_paxos();
4412 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4413 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4414 //can't send these!
4415 break;
4416 }
4417
4418 if (state == STATE_SYNCHRONIZING) {
4419 // we are synchronizing. These messages would do us no
4420 // good, thus just drop them and ignore them.
4421 dout(10) << __func__ << " ignore paxos msg from "
4422 << pm->get_source_inst() << dendl;
4423 break;
4424 }
4425
4426 // sanitize
4427 if (pm->epoch > get_epoch()) {
4428 bootstrap();
4429 break;
4430 }
4431 if (pm->epoch != get_epoch()) {
4432 break;
4433 }
4434
4435 paxos->dispatch(op);
4436 }
4437 break;
4438
4439 // elector messages
4440 case MSG_MON_ELECTION:
4441 op->set_type_election();
4442 //check privileges here for simplicity
4443 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4444 dout(0) << "MMonElection received from entity without enough caps!"
4445 << op->get_session()->caps << dendl;
4446 break;
4447 }
4448 if (!is_probing() && !is_synchronizing()) {
4449 elector.dispatch(op);
4450 }
4451 break;
4452
4453 case MSG_FORWARD:
4454 handle_forward(op);
4455 break;
4456
4457 case MSG_TIMECHECK:
4458 handle_timecheck(op);
4459 break;
4460
4461 case MSG_MON_HEALTH:
4462 health_monitor->dispatch(op);
4463 break;
4464
224ce89b
WB
4465 case MSG_MON_HEALTH_CHECKS:
4466 op->set_type_service();
4467 paxos_service[PAXOS_HEALTH]->dispatch(op);
4468 break;
4469
7c673cae
FG
4470 default:
4471 dealt_with = false;
4472 break;
4473 }
4474 if (!dealt_with) {
4475 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4476 goto drop;
4477 }
4478 return;
4479
4480drop:
4481 return;
4482}
4483
4484void Monitor::handle_ping(MonOpRequestRef op)
4485{
4486 MPing *m = static_cast<MPing*>(op->get_req());
4487 dout(10) << __func__ << " " << *m << dendl;
4488 MPing *reply = new MPing;
4489 entity_inst_t inst = m->get_source_inst();
4490 bufferlist payload;
4491 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4492 f->open_object_section("pong");
4493
181888fb
FG
4494 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
4495 get_health_status(false, f.get(), nullptr);
4496 } else {
4497 list<string> health_str;
4498 get_health(health_str, nullptr, f.get());
4499 }
4500
7c673cae
FG
4501 {
4502 stringstream ss;
4503 get_mon_status(f.get(), ss);
4504 }
4505
4506 f->close_section();
4507 stringstream ss;
4508 f->flush(ss);
4509 ::encode(ss.str(), payload);
4510 reply->set_payload(payload);
4511 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4512 messenger->send_message(reply, inst);
4513}
4514
4515void Monitor::timecheck_start()
4516{
4517 dout(10) << __func__ << dendl;
4518 timecheck_cleanup();
4519 timecheck_start_round();
4520}
4521
4522void Monitor::timecheck_finish()
4523{
4524 dout(10) << __func__ << dendl;
4525 timecheck_cleanup();
4526}
4527
4528void Monitor::timecheck_start_round()
4529{
4530 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4531 assert(is_leader());
4532
4533 if (monmap->size() == 1) {
4534 assert(0 == "We are alone; this shouldn't have been scheduled!");
4535 return;
4536 }
4537
4538 if (timecheck_round % 2) {
4539 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4540 utime_t curr_time = ceph_clock_now();
4541 double max = g_conf->mon_timecheck_interval*3;
4542 if (curr_time - timecheck_round_start < max) {
4543 dout(10) << __func__ << " keep current round going" << dendl;
4544 goto out;
4545 } else {
4546 dout(10) << __func__
4547 << " finish current timecheck and start new" << dendl;
4548 timecheck_cancel_round();
4549 }
4550 }
4551
4552 assert(timecheck_round % 2 == 0);
4553 timecheck_acks = 0;
4554 timecheck_round ++;
4555 timecheck_round_start = ceph_clock_now();
4556 dout(10) << __func__ << " new " << timecheck_round << dendl;
4557
4558 timecheck();
4559out:
4560 dout(10) << __func__ << " setting up next event" << dendl;
4561 timecheck_reset_event();
4562}
4563
4564void Monitor::timecheck_finish_round(bool success)
4565{
4566 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4567 assert(timecheck_round % 2);
4568 timecheck_round ++;
4569 timecheck_round_start = utime_t();
4570
4571 if (success) {
4572 assert(timecheck_waiting.empty());
4573 assert(timecheck_acks == quorum.size());
4574 timecheck_report();
4575 timecheck_check_skews();
4576 return;
4577 }
4578
4579 dout(10) << __func__ << " " << timecheck_waiting.size()
4580 << " peers still waiting:";
4581 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4582 p != timecheck_waiting.end(); ++p) {
4583 *_dout << " " << p->first.name;
4584 }
4585 *_dout << dendl;
4586 timecheck_waiting.clear();
4587
4588 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4589}
4590
4591void Monitor::timecheck_cancel_round()
4592{
4593 timecheck_finish_round(false);
4594}
4595
4596void Monitor::timecheck_cleanup()
4597{
4598 timecheck_round = 0;
4599 timecheck_acks = 0;
4600 timecheck_round_start = utime_t();
4601
4602 if (timecheck_event) {
4603 timer.cancel_event(timecheck_event);
4604 timecheck_event = NULL;
4605 }
4606 timecheck_waiting.clear();
4607 timecheck_skews.clear();
4608 timecheck_latencies.clear();
4609
4610 timecheck_rounds_since_clean = 0;
4611}
4612
4613void Monitor::timecheck_reset_event()
4614{
4615 if (timecheck_event) {
4616 timer.cancel_event(timecheck_event);
4617 timecheck_event = NULL;
4618 }
4619
4620 double delay =
4621 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4622
4623 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4624 delay = cct->_conf->mon_timecheck_interval;
4625 }
4626
4627 dout(10) << __func__ << " delay " << delay
4628 << " rounds_since_clean " << timecheck_rounds_since_clean
4629 << dendl;
4630
3efd9988
FG
4631 timecheck_event = timer.add_event_after(
4632 delay,
4633 new C_MonContext(this, [this](int) {
4634 timecheck_start_round();
4635 }));
7c673cae
FG
4636}
4637
4638void Monitor::timecheck_check_skews()
4639{
4640 dout(10) << __func__ << dendl;
4641 assert(is_leader());
4642 assert((timecheck_round % 2) == 0);
4643 if (monmap->size() == 1) {
4644 assert(0 == "We are alone; we shouldn't have gotten here!");
4645 return;
4646 }
4647 assert(timecheck_latencies.size() == timecheck_skews.size());
4648
4649 bool found_skew = false;
4650 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4651 p != timecheck_skews.end(); ++p) {
4652
4653 double abs_skew;
4654 if (timecheck_has_skew(p->second, &abs_skew)) {
4655 dout(10) << __func__
4656 << " " << p->first << " skew " << abs_skew << dendl;
4657 found_skew = true;
4658 }
4659 }
4660
4661 if (found_skew) {
4662 ++timecheck_rounds_since_clean;
4663 timecheck_reset_event();
4664 } else if (timecheck_rounds_since_clean > 0) {
4665 dout(1) << __func__
4666 << " no clock skews found after " << timecheck_rounds_since_clean
4667 << " rounds" << dendl;
4668 // make sure the skews are really gone and not just a transient success
4669 // this will run just once if not in the presence of skews again.
4670 timecheck_rounds_since_clean = 1;
4671 timecheck_reset_event();
4672 timecheck_rounds_since_clean = 0;
4673 }
4674
4675}
4676
4677void Monitor::timecheck_report()
4678{
4679 dout(10) << __func__ << dendl;
4680 assert(is_leader());
4681 assert((timecheck_round % 2) == 0);
4682 if (monmap->size() == 1) {
4683 assert(0 == "We are alone; we shouldn't have gotten here!");
4684 return;
4685 }
4686
4687 assert(timecheck_latencies.size() == timecheck_skews.size());
4688 bool do_output = true; // only output report once
4689 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4690 if (monmap->get_name(*q) == name)
4691 continue;
4692
4693 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4694 m->epoch = get_epoch();
4695 m->round = timecheck_round;
4696
4697 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4698 it != timecheck_skews.end(); ++it) {
4699 double skew = it->second;
4700 double latency = timecheck_latencies[it->first];
4701
4702 m->skews[it->first] = skew;
4703 m->latencies[it->first] = latency;
4704
4705 if (do_output) {
4706 dout(25) << __func__ << " " << it->first
4707 << " latency " << latency
4708 << " skew " << skew << dendl;
4709 }
4710 }
4711 do_output = false;
4712 entity_inst_t inst = monmap->get_inst(*q);
4713 dout(10) << __func__ << " send report to " << inst << dendl;
4714 messenger->send_message(m, inst);
4715 }
4716}
4717
4718void Monitor::timecheck()
4719{
4720 dout(10) << __func__ << dendl;
4721 assert(is_leader());
4722 if (monmap->size() == 1) {
4723 assert(0 == "We are alone; we shouldn't have gotten here!");
4724 return;
4725 }
4726 assert(timecheck_round % 2 != 0);
4727
4728 timecheck_acks = 1; // we ack ourselves
4729
4730 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4731 << " round " << timecheck_round << dendl;
4732
4733 // we are at the eye of the storm; the point of reference
4734 timecheck_skews[messenger->get_myinst()] = 0.0;
4735 timecheck_latencies[messenger->get_myinst()] = 0.0;
4736
4737 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4738 if (monmap->get_name(*it) == name)
4739 continue;
4740
4741 entity_inst_t inst = monmap->get_inst(*it);
4742 utime_t curr_time = ceph_clock_now();
4743 timecheck_waiting[inst] = curr_time;
4744 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4745 m->epoch = get_epoch();
4746 m->round = timecheck_round;
4747 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4748 messenger->send_message(m, inst);
4749 }
4750}
4751
4752health_status_t Monitor::timecheck_status(ostringstream &ss,
4753 const double skew_bound,
4754 const double latency)
4755{
4756 health_status_t status = HEALTH_OK;
4757 assert(latency >= 0);
4758
4759 double abs_skew;
4760 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4761 status = HEALTH_WARN;
4762 ss << "clock skew " << abs_skew << "s"
4763 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4764 }
4765
4766 return status;
4767}
4768
4769void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4770{
4771 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4772 dout(10) << __func__ << " " << *m << dendl;
4773 /* handles PONG's */
4774 assert(m->op == MTimeCheck::OP_PONG);
4775
4776 entity_inst_t other = m->get_source_inst();
4777 if (m->epoch < get_epoch()) {
4778 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4779 << " from " << other
4780 << " curr " << get_epoch()
4781 << " -- severely lagged? discard" << dendl;
4782 return;
4783 }
4784 assert(m->epoch == get_epoch());
4785
4786 if (m->round < timecheck_round) {
4787 dout(1) << __func__ << " got old round " << m->round
4788 << " from " << other
4789 << " curr " << timecheck_round << " -- discard" << dendl;
4790 return;
4791 }
4792
4793 utime_t curr_time = ceph_clock_now();
4794
4795 assert(timecheck_waiting.count(other) > 0);
4796 utime_t timecheck_sent = timecheck_waiting[other];
4797 timecheck_waiting.erase(other);
4798 if (curr_time < timecheck_sent) {
4799 // our clock was readjusted -- drop everything until it all makes sense.
4800 dout(1) << __func__ << " our clock was readjusted --"
4801 << " bump round and drop current check"
4802 << dendl;
4803 timecheck_cancel_round();
4804 return;
4805 }
4806
4807 /* update peer latencies */
4808 double latency = (double)(curr_time - timecheck_sent);
4809
4810 if (timecheck_latencies.count(other) == 0)
4811 timecheck_latencies[other] = latency;
4812 else {
4813 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4814 timecheck_latencies[other] = avg_latency;
4815 }
4816
4817 /*
4818 * update skews
4819 *
4820 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4821 * and 'a' happens to be lower than 'b'; so we use double instead.
4822 *
4823 * latency is always expected to be >= 0.
4824 *
4825 * delta, the difference between theirs timestamp and ours, may either be
4826 * lower or higher than 0; will hardly ever be 0.
4827 *
4828 * The absolute skew is the absolute delta minus the latency, which is
4829 * taken as a whole instead of an rtt given that there is some queueing
4830 * and dispatch times involved and it's hard to assess how long exactly
4831 * it took for the message to travel to the other side and be handled. So
4832 * we call it a bounded skew, the worst case scenario.
4833 *
4834 * Now, to math!
4835 *
4836 * Given that the latency is always positive, we can establish that the
4837 * bounded skew will be:
4838 *
4839 * 1. positive if the absolute delta is higher than the latency and
4840 * delta is positive
4841 * 2. negative if the absolute delta is higher than the latency and
4842 * delta is negative.
4843 * 3. zero if the absolute delta is lower than the latency.
4844 *
4845 * On 3. we make a judgement call and treat the skew as non-existent.
4846 * This is because that, if the absolute delta is lower than the
4847 * latency, then the apparently existing skew is nothing more than a
4848 * side-effect of the high latency at work.
4849 *
4850 * This may not be entirely true though, as a severely skewed clock
4851 * may be masked by an even higher latency, but with high latencies
4852 * we probably have worse issues to deal with than just skewed clocks.
4853 */
4854 assert(latency >= 0);
4855
4856 double delta = ((double) m->timestamp) - ((double) curr_time);
4857 double abs_delta = (delta > 0 ? delta : -delta);
4858 double skew_bound = abs_delta - latency;
4859 if (skew_bound < 0)
4860 skew_bound = 0;
4861 else if (delta < 0)
4862 skew_bound = -skew_bound;
4863
4864 ostringstream ss;
4865 health_status_t status = timecheck_status(ss, skew_bound, latency);
b32b8144
FG
4866 if (status != HEALTH_OK) {
4867 clog->health(status) << other << " " << ss.str();
4868 }
7c673cae
FG
4869
4870 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4871 << " delta " << delta << " skew_bound " << skew_bound
4872 << " latency " << latency << dendl;
4873
4874 timecheck_skews[other] = skew_bound;
4875
4876 timecheck_acks++;
4877 if (timecheck_acks == quorum.size()) {
4878 dout(10) << __func__ << " got pongs from everybody ("
4879 << timecheck_acks << " total)" << dendl;
4880 assert(timecheck_skews.size() == timecheck_acks);
4881 assert(timecheck_waiting.empty());
4882 // everyone has acked, so bump the round to finish it.
4883 timecheck_finish_round();
4884 }
4885}
4886
4887void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4888{
4889 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4890 dout(10) << __func__ << " " << *m << dendl;
4891
4892 assert(is_peon());
4893 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4894
4895 if (m->epoch != get_epoch()) {
4896 dout(1) << __func__ << " got wrong epoch "
4897 << "(ours " << get_epoch()
4898 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4899 return;
4900 }
4901
4902 if (m->round < timecheck_round) {
4903 dout(1) << __func__ << " got old round " << m->round
4904 << " current " << timecheck_round
4905 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4906 return;
4907 }
4908
4909 timecheck_round = m->round;
4910
4911 if (m->op == MTimeCheck::OP_REPORT) {
4912 assert((timecheck_round % 2) == 0);
4913 timecheck_latencies.swap(m->latencies);
4914 timecheck_skews.swap(m->skews);
4915 return;
4916 }
4917
4918 assert((timecheck_round % 2) != 0);
4919 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4920 utime_t curr_time = ceph_clock_now();
4921 reply->timestamp = curr_time;
4922 reply->epoch = m->epoch;
4923 reply->round = m->round;
4924 dout(10) << __func__ << " send " << *m
4925 << " to " << m->get_source_inst() << dendl;
4926 m->get_connection()->send_message(reply);
4927}
4928
4929void Monitor::handle_timecheck(MonOpRequestRef op)
4930{
4931 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4932 dout(10) << __func__ << " " << *m << dendl;
4933
4934 if (is_leader()) {
4935 if (m->op != MTimeCheck::OP_PONG) {
4936 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4937 } else {
4938 handle_timecheck_leader(op);
4939 }
4940 } else if (is_peon()) {
4941 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4942 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4943 } else {
4944 handle_timecheck_peon(op);
4945 }
4946 } else {
4947 dout(1) << __func__ << " drop unexpected msg" << dendl;
4948 }
4949}
4950
4951void Monitor::handle_subscribe(MonOpRequestRef op)
4952{
4953 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4954 dout(10) << "handle_subscribe " << *m << dendl;
4955
4956 bool reply = false;
4957
4958 MonSession *s = op->get_session();
4959 assert(s);
4960
4961 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4962 p != m->what.end();
4963 ++p) {
4964 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4965 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4966 reply = true;
4967
4968 // remove conflicting subscribes
4969 if (logmon()->sub_name_to_id(p->first) >= 0) {
4970 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4971 it != s->sub_map.end(); ) {
4972 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4973 Mutex::Locker l(session_map_lock);
4974 session_map.remove_sub((it++)->second);
4975 } else {
4976 ++it;
4977 }
4978 }
4979 }
4980
4981 {
4982 Mutex::Locker l(session_map_lock);
4983 session_map.add_update_sub(s, p->first, p->second.start,
4984 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4985 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4986 }
4987
4988 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4989 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4990 if ((int)s->is_capable("mds", MON_CAP_R)) {
4991 Subscription *sub = s->sub_map[p->first];
4992 assert(sub != nullptr);
4993 mdsmon()->check_sub(sub);
4994 }
4995 } else if (p->first == "osdmap") {
4996 if ((int)s->is_capable("osd", MON_CAP_R)) {
4997 if (s->osd_epoch > p->second.start) {
4998 // client needs earlier osdmaps on purpose, so reset the sent epoch
4999 s->osd_epoch = 0;
5000 }
5001 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
5002 }
5003 } else if (p->first == "osd_pg_creates") {
5004 if ((int)s->is_capable("osd", MON_CAP_W)) {
5005 if (monmap->get_required_features().contains_all(
5006 ceph::features::mon::FEATURE_LUMINOUS)) {
5007 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
5008 } else {
5009 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
5010 }
5011 }
5012 } else if (p->first == "monmap") {
5013 monmon()->check_sub(s->sub_map[p->first]);
5014 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
5015 logmon()->check_sub(s->sub_map[p->first]);
5016 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
5017 mgrmon()->check_sub(s->sub_map[p->first]);
224ce89b
WB
5018 } else if (p->first == "servicemap") {
5019 mgrstatmon()->check_sub(s->sub_map[p->first]);
7c673cae
FG
5020 }
5021 }
5022
5023 if (reply) {
5024 // we only need to reply if the client is old enough to think it
5025 // has to send renewals.
5026 ConnectionRef con = m->get_connection();
5027 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5028 m->get_connection()->send_message(new MMonSubscribeAck(
5029 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
5030 }
5031
5032}
5033
5034void Monitor::handle_get_version(MonOpRequestRef op)
5035{
5036 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
5037 dout(10) << "handle_get_version " << *m << dendl;
5038 PaxosService *svc = NULL;
5039
5040 MonSession *s = op->get_session();
5041 assert(s);
5042
5043 if (!is_leader() && !is_peon()) {
5044 dout(10) << " waiting for quorum" << dendl;
5045 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5046 goto out;
5047 }
5048
5049 if (m->what == "mdsmap") {
5050 svc = mdsmon();
5051 } else if (m->what == "fsmap") {
5052 svc = mdsmon();
5053 } else if (m->what == "osdmap") {
5054 svc = osdmon();
5055 } else if (m->what == "monmap") {
5056 svc = monmon();
5057 } else {
5058 derr << "invalid map type " << m->what << dendl;
5059 }
5060
5061 if (svc) {
5062 if (!svc->is_readable()) {
5063 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5064 goto out;
5065 }
5066
5067 MMonGetVersionReply *reply = new MMonGetVersionReply();
5068 reply->handle = m->handle;
5069 reply->version = svc->get_last_committed();
5070 reply->oldest_version = svc->get_first_committed();
5071 reply->set_tid(m->get_tid());
5072
5073 m->get_connection()->send_message(reply);
5074 }
5075 out:
5076 return;
5077}
5078
5079bool Monitor::ms_handle_reset(Connection *con)
5080{
5081 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5082
5083 // ignore lossless monitor sessions
5084 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5085 return false;
5086
5087 MonSession *s = static_cast<MonSession *>(con->get_priv());
5088 if (!s)
5089 return false;
5090
5091 // break any con <-> session ref cycle
5092 s->con->set_priv(NULL);
5093
5094 if (is_shutdown())
5095 return false;
5096
5097 Mutex::Locker l(lock);
5098
5099 dout(10) << "reset/close on session " << s->inst << dendl;
5100 if (!s->closed) {
5101 Mutex::Locker l(session_map_lock);
5102 remove_session(s);
5103 }
5104 s->put();
5105 return true;
5106}
5107
5108bool Monitor::ms_handle_refused(Connection *con)
5109{
5110 // just log for now...
5111 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5112 return false;
5113}
5114
5115// -----
5116
5117void Monitor::send_latest_monmap(Connection *con)
5118{
5119 bufferlist bl;
5120 monmap->encode(bl, con->get_features());
5121 con->send_message(new MMonMap(bl));
5122}
5123
5124void Monitor::handle_mon_get_map(MonOpRequestRef op)
5125{
5126 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5127 dout(10) << "handle_mon_get_map" << dendl;
5128 send_latest_monmap(m->get_connection().get());
5129}
5130
5131void Monitor::handle_mon_metadata(MonOpRequestRef op)
5132{
5133 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5134 if (is_leader()) {
5135 dout(10) << __func__ << dendl;
5136 update_mon_metadata(m->get_source().num(), std::move(m->data));
5137 }
5138}
5139
5140void Monitor::update_mon_metadata(int from, Metadata&& m)
5141{
224ce89b
WB
5142 // NOTE: this is now for legacy (kraken or jewel) mons only.
5143 pending_metadata[from] = std::move(m);
7c673cae
FG
5144
5145 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
224ce89b 5146 bufferlist bl;
7c673cae
FG
5147 ::encode(pending_metadata, bl);
5148 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5149 paxos->trigger_propose();
5150}
5151
224ce89b 5152int Monitor::load_metadata()
7c673cae
FG
5153{
5154 bufferlist bl;
5155 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5156 if (r)
5157 return r;
5158 bufferlist::iterator it = bl.begin();
224ce89b
WB
5159 ::decode(mon_metadata, it);
5160
5161 pending_metadata = mon_metadata;
7c673cae
FG
5162 return 0;
5163}
5164
5165int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5166{
5167 assert(f);
224ce89b 5168 if (!mon_metadata.count(mon)) {
7c673cae
FG
5169 err << "mon." << mon << " not found";
5170 return -EINVAL;
5171 }
224ce89b 5172 const Metadata& m = mon_metadata[mon];
7c673cae
FG
5173 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5174 f->dump_string(p->first.c_str(), p->second);
5175 }
5176 return 0;
5177}
5178
c07f9fc5 5179void Monitor::count_metadata(const string& field, map<string,int> *out)
31f18b77 5180{
224ce89b 5181 for (auto& p : mon_metadata) {
31f18b77
FG
5182 auto q = p.second.find(field);
5183 if (q == p.second.end()) {
c07f9fc5 5184 (*out)["unknown"]++;
31f18b77 5185 } else {
c07f9fc5 5186 (*out)[q->second]++;
31f18b77
FG
5187 }
5188 }
c07f9fc5
FG
5189}
5190
5191void Monitor::count_metadata(const string& field, Formatter *f)
5192{
5193 map<string,int> by_val;
5194 count_metadata(field, &by_val);
31f18b77
FG
5195 f->open_object_section(field.c_str());
5196 for (auto& p : by_val) {
5197 f->dump_int(p.first.c_str(), p.second);
5198 }
5199 f->close_section();
5200}
5201
7c673cae
FG
5202int Monitor::print_nodes(Formatter *f, ostream& err)
5203{
7c673cae 5204 map<string, list<int> > mons; // hostname => mon
224ce89b
WB
5205 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5206 it != mon_metadata.end(); ++it) {
7c673cae
FG
5207 const Metadata& m = it->second;
5208 Metadata::const_iterator hostname = m.find("hostname");
5209 if (hostname == m.end()) {
5210 // not likely though
5211 continue;
5212 }
5213 mons[hostname->second].push_back(it->first);
5214 }
5215
5216 dump_services(f, mons, "mon");
5217 return 0;
5218}
5219
5220// ----------------------------------------------
5221// scrub
5222
5223int Monitor::scrub_start()
5224{
5225 dout(10) << __func__ << dendl;
5226 assert(is_leader());
5227
5228 if (!scrub_result.empty()) {
5229 clog->info() << "scrub already in progress";
5230 return -EBUSY;
5231 }
5232
5233 scrub_event_cancel();
5234 scrub_result.clear();
5235 scrub_state.reset(new ScrubState);
5236
5237 scrub();
5238 return 0;
5239}
5240
5241int Monitor::scrub()
5242{
5243 assert(is_leader());
5244 assert(scrub_state);
5245
5246 scrub_cancel_timeout();
5247 wait_for_paxos_write();
5248 scrub_version = paxos->get_version();
5249
5250
5251 // scrub all keys if we're the only monitor in the quorum
5252 int32_t num_keys =
5253 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5254
5255 for (set<int>::iterator p = quorum.begin();
5256 p != quorum.end();
5257 ++p) {
5258 if (*p == rank)
5259 continue;
5260 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5261 num_keys);
5262 r->key = scrub_state->last_key;
5263 messenger->send_message(r, monmap->get_inst(*p));
5264 }
5265
5266 // scrub my keys
5267 bool r = _scrub(&scrub_result[rank],
5268 &scrub_state->last_key,
5269 &num_keys);
5270
5271 scrub_state->finished = !r;
5272
5273 // only after we got our scrub results do we really care whether the
5274 // other monitors are late on their results. Also, this way we avoid
5275 // triggering the timeout if we end up getting stuck in _scrub() for
5276 // longer than the duration of the timeout.
5277 scrub_reset_timeout();
5278
5279 if (quorum.size() == 1) {
5280 assert(scrub_state->finished == true);
5281 scrub_finish();
5282 }
5283 return 0;
5284}
5285
5286void Monitor::handle_scrub(MonOpRequestRef op)
5287{
5288 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5289 dout(10) << __func__ << " " << *m << dendl;
5290 switch (m->op) {
5291 case MMonScrub::OP_SCRUB:
5292 {
5293 if (!is_peon())
5294 break;
5295
5296 wait_for_paxos_write();
5297
5298 if (m->version != paxos->get_version())
5299 break;
5300
5301 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5302 m->version,
5303 m->num_keys);
5304
5305 reply->key = m->key;
5306 _scrub(&reply->result, &reply->key, &reply->num_keys);
5307 m->get_connection()->send_message(reply);
5308 }
5309 break;
5310
5311 case MMonScrub::OP_RESULT:
5312 {
5313 if (!is_leader())
5314 break;
5315 if (m->version != scrub_version)
5316 break;
5317 // reset the timeout each time we get a result
5318 scrub_reset_timeout();
5319
5320 int from = m->get_source().num();
5321 assert(scrub_result.count(from) == 0);
5322 scrub_result[from] = m->result;
5323
5324 if (scrub_result.size() == quorum.size()) {
5325 scrub_check_results();
5326 scrub_result.clear();
5327 if (scrub_state->finished)
5328 scrub_finish();
5329 else
5330 scrub();
5331 }
5332 }
5333 break;
5334 }
5335}
5336
5337bool Monitor::_scrub(ScrubResult *r,
5338 pair<string,string> *start,
5339 int *num_keys)
5340{
5341 assert(r != NULL);
5342 assert(start != NULL);
5343 assert(num_keys != NULL);
5344
5345 set<string> prefixes = get_sync_targets_names();
5346 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5347
5348 dout(10) << __func__ << " start (" << *start << ")"
5349 << " num_keys " << *num_keys << dendl;
5350
5351 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5352
5353 int scrubbed_keys = 0;
5354 pair<string,string> last_key;
5355
5356 while (it->has_next_chunk()) {
5357
5358 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5359 break;
5360
5361 pair<string,string> k = it->get_next_key();
5362 if (prefixes.count(k.first) == 0)
5363 continue;
5364
5365 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5366 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5367 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5368 << dendl;
5369 continue;
5370 }
5371
5372 bufferlist bl;
224ce89b
WB
5373 int err = store->get(k.first, k.second, bl);
5374 assert(err == 0);
5375
7c673cae
FG
5376 uint32_t key_crc = bl.crc32c(0);
5377 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5378 << " crc " << key_crc << dendl;
5379 r->prefix_keys[k.first]++;
224ce89b 5380 if (r->prefix_crc.count(k.first) == 0) {
7c673cae 5381 r->prefix_crc[k.first] = 0;
224ce89b 5382 }
7c673cae
FG
5383 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5384
5385 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5386 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5387 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5388 r->prefix_crc[k.first] += 1;
5389 }
5390
5391 ++scrubbed_keys;
5392 last_key = k;
5393 }
5394
5395 dout(20) << __func__ << " last_key (" << last_key << ")"
5396 << " scrubbed_keys " << scrubbed_keys
5397 << " has_next " << it->has_next_chunk() << dendl;
5398
5399 *start = last_key;
5400 *num_keys = scrubbed_keys;
5401
5402 return it->has_next_chunk();
5403}
5404
5405void Monitor::scrub_check_results()
5406{
5407 dout(10) << __func__ << dendl;
5408
5409 // compare
5410 int errors = 0;
5411 ScrubResult& mine = scrub_result[rank];
5412 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5413 p != scrub_result.end();
5414 ++p) {
5415 if (p->first == rank)
5416 continue;
5417 if (p->second != mine) {
5418 ++errors;
5419 clog->error() << "scrub mismatch";
5420 clog->error() << " mon." << rank << " " << mine;
5421 clog->error() << " mon." << p->first << " " << p->second;
5422 }
5423 }
5424 if (!errors)
d2e6a577 5425 clog->debug() << "scrub ok on " << quorum << ": " << mine;
7c673cae
FG
5426}
5427
5428inline void Monitor::scrub_timeout()
5429{
5430 dout(1) << __func__ << " restarting scrub" << dendl;
5431 scrub_reset();
5432 scrub_start();
5433}
5434
5435void Monitor::scrub_finish()
5436{
5437 dout(10) << __func__ << dendl;
5438 scrub_reset();
5439 scrub_event_start();
5440}
5441
5442void Monitor::scrub_reset()
5443{
5444 dout(10) << __func__ << dendl;
5445 scrub_cancel_timeout();
5446 scrub_version = 0;
5447 scrub_result.clear();
5448 scrub_state.reset();
5449}
5450
5451inline void Monitor::scrub_update_interval(int secs)
5452{
5453 // we don't care about changes if we are not the leader.
5454 // changes will be visible if we become the leader.
5455 if (!is_leader())
5456 return;
5457
5458 dout(1) << __func__ << " new interval = " << secs << dendl;
5459
5460 // if scrub already in progress, all changes will already be visible during
5461 // the next round. Nothing to do.
5462 if (scrub_state != NULL)
5463 return;
5464
5465 scrub_event_cancel();
5466 scrub_event_start();
5467}
5468
5469void Monitor::scrub_event_start()
5470{
5471 dout(10) << __func__ << dendl;
5472
5473 if (scrub_event)
5474 scrub_event_cancel();
5475
5476 if (cct->_conf->mon_scrub_interval <= 0) {
5477 dout(1) << __func__ << " scrub event is disabled"
5478 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5479 << ")" << dendl;
5480 return;
5481 }
5482
3efd9988
FG
5483 scrub_event = timer.add_event_after(
5484 cct->_conf->mon_scrub_interval,
5485 new C_MonContext(this, [this](int) {
7c673cae 5486 scrub_start();
3efd9988 5487 }));
7c673cae
FG
5488}
5489
5490void Monitor::scrub_event_cancel()
5491{
5492 dout(10) << __func__ << dendl;
5493 if (scrub_event) {
5494 timer.cancel_event(scrub_event);
5495 scrub_event = NULL;
5496 }
5497}
5498
5499inline void Monitor::scrub_cancel_timeout()
5500{
5501 if (scrub_timeout_event) {
5502 timer.cancel_event(scrub_timeout_event);
5503 scrub_timeout_event = NULL;
5504 }
5505}
5506
5507void Monitor::scrub_reset_timeout()
5508{
5509 dout(15) << __func__ << " reset timeout event" << dendl;
5510 scrub_cancel_timeout();
3efd9988
FG
5511 scrub_timeout_event = timer.add_event_after(
5512 g_conf->mon_scrub_timeout,
5513 new C_MonContext(this, [this](int) {
7c673cae 5514 scrub_timeout();
3efd9988 5515 }));
7c673cae
FG
5516}
5517
5518/************ TICK ***************/
5519void Monitor::new_tick()
5520{
5521 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5522 tick();
5523 }));
5524}
5525
5526void Monitor::tick()
5527{
5528 // ok go.
5529 dout(11) << "tick" << dendl;
181888fb 5530 const utime_t now = ceph_clock_now();
7c673cae 5531
181888fb
FG
5532 // Check if we need to emit any delayed health check updated messages
5533 if (is_leader()) {
5534 const auto min_period = g_conf->get_val<int64_t>(
5535 "mon_health_log_update_period");
5536 for (auto& svc : paxos_service) {
5537 auto health = svc->get_health_checks();
5538
5539 for (const auto &i : health.checks) {
5540 const std::string &code = i.first;
5541 const std::string &summary = i.second.summary;
5542 const health_status_t severity = i.second.severity;
5543
5544 auto status_iter = health_check_log_times.find(code);
5545 if (status_iter == health_check_log_times.end()) {
5546 continue;
5547 }
5548
5549 auto &log_status = status_iter->second;
5550 bool const changed = log_status.last_message != summary
5551 || log_status.severity != severity;
5552
5553 if (changed && now - log_status.updated_at > min_period) {
5554 log_status.last_message = summary;
5555 log_status.updated_at = now;
5556 log_status.severity = severity;
5557
5558 ostringstream ss;
5559 ss << "Health check update: " << summary << " (" << code << ")";
5560 clog->health(severity) << ss.str();
5561 }
5562 }
5563 }
5564 }
5565
5566
7c673cae
FG
5567 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5568 (*p)->tick();
5569 (*p)->maybe_trim();
5570 }
5571
5572 // trim sessions
7c673cae
FG
5573 {
5574 Mutex::Locker l(session_map_lock);
5575 auto p = session_map.sessions.begin();
5576
5577 bool out_for_too_long = (!exited_quorum.is_zero() &&
5578 now > (exited_quorum + 2*g_conf->mon_lease));
5579
5580 while (!p.end()) {
5581 MonSession *s = *p;
5582 ++p;
5583
5584 // don't trim monitors
5585 if (s->inst.name.is_mon())
5586 continue;
5587
5588 if (s->session_timeout < now && s->con) {
5589 // check keepalive, too
5590 s->session_timeout = s->con->get_last_keepalive();
5591 s->session_timeout += g_conf->mon_session_timeout;
5592 }
5593 if (s->session_timeout < now) {
5594 dout(10) << " trimming session " << s->con << " " << s->inst
5595 << " (timeout " << s->session_timeout
5596 << " < now " << now << ")" << dendl;
5597 } else if (out_for_too_long) {
5598 // boot the client Session because we've taken too long getting back in
5599 dout(10) << " trimming session " << s->con << " " << s->inst
5600 << " because we've been out of quorum too long" << dendl;
5601 } else {
5602 continue;
5603 }
5604
5605 s->con->mark_down();
5606 remove_session(s);
5607 logger->inc(l_mon_session_trim);
5608 }
5609 }
5610 sync_trim_providers();
5611
5612 if (!maybe_wait_for_quorum.empty()) {
5613 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5614 }
5615
5616 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5617 // this is only necessary on upgraded clusters.
5618 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5619 prepare_new_fingerprint(t);
5620 paxos->trigger_propose();
5621 }
5622
5623 new_tick();
5624}
5625
5626void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5627{
5628 uuid_d nf;
5629 nf.generate_random();
5630 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5631
5632 bufferlist bl;
5633 ::encode(nf, bl);
5634 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5635}
5636
5637int Monitor::check_fsid()
5638{
5639 bufferlist ebl;
5640 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5641 if (r == -ENOENT)
5642 return r;
5643 assert(r == 0);
5644
5645 string es(ebl.c_str(), ebl.length());
5646
5647 // only keep the first line
5648 size_t pos = es.find_first_of('\n');
5649 if (pos != string::npos)
5650 es.resize(pos);
5651
5652 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5653 uuid_d ondisk;
5654 if (!ondisk.parse(es.c_str())) {
5655 derr << "error: unable to parse uuid" << dendl;
5656 return -EINVAL;
5657 }
5658
5659 if (monmap->get_fsid() != ondisk) {
5660 derr << "error: cluster_uuid file exists with value " << ondisk
5661 << ", != our uuid " << monmap->get_fsid() << dendl;
5662 return -EEXIST;
5663 }
5664
5665 return 0;
5666}
5667
5668int Monitor::write_fsid()
5669{
5670 auto t(std::make_shared<MonitorDBStore::Transaction>());
5671 write_fsid(t);
5672 int r = store->apply_transaction(t);
5673 return r;
5674}
5675
5676int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5677{
5678 ostringstream ss;
5679 ss << monmap->get_fsid() << "\n";
5680 string us = ss.str();
5681
5682 bufferlist b;
5683 b.append(us);
5684
5685 t->put(MONITOR_NAME, "cluster_uuid", b);
5686 return 0;
5687}
5688
5689/*
5690 * this is the closest thing to a traditional 'mkfs' for ceph.
5691 * initialize the monitor state machines to their initial values.
5692 */
5693int Monitor::mkfs(bufferlist& osdmapbl)
5694{
5695 auto t(std::make_shared<MonitorDBStore::Transaction>());
5696
5697 // verify cluster fsid
5698 int r = check_fsid();
5699 if (r < 0 && r != -ENOENT)
5700 return r;
5701
5702 bufferlist magicbl;
5703 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5704 magicbl.append("\n");
5705 t->put(MONITOR_NAME, "magic", magicbl);
5706
5707
5708 features = get_initial_supported_features();
5709 write_features(t);
5710
5711 // save monmap, osdmap, keyring.
5712 bufferlist monmapbl;
5713 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5714 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5715 t->put("mkfs", "monmap", monmapbl);
5716
5717 if (osdmapbl.length()) {
5718 // make sure it's a valid osdmap
5719 try {
5720 OSDMap om;
5721 om.decode(osdmapbl);
5722 }
5723 catch (buffer::error& e) {
5724 derr << "error decoding provided osdmap: " << e.what() << dendl;
5725 return -EINVAL;
5726 }
5727 t->put("mkfs", "osdmap", osdmapbl);
5728 }
5729
5730 if (is_keyring_required()) {
5731 KeyRing keyring;
5732 string keyring_filename;
5733
5734 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5735 if (r) {
5736 derr << "unable to find a keyring file on " << g_conf->keyring
5737 << ": " << cpp_strerror(r) << dendl;
5738 if (g_conf->key != "") {
5739 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5740 "\n\tcaps mon = \"allow *\"\n";
5741 bufferlist bl;
5742 bl.append(keyring_plaintext);
5743 try {
5744 bufferlist::iterator i = bl.begin();
5745 keyring.decode_plaintext(i);
5746 }
5747 catch (const buffer::error& e) {
5748 derr << "error decoding keyring " << keyring_plaintext
5749 << ": " << e.what() << dendl;
5750 return -EINVAL;
5751 }
5752 } else {
5753 return -ENOENT;
5754 }
5755 } else {
5756 r = keyring.load(g_ceph_context, keyring_filename);
5757 if (r < 0) {
5758 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5759 return r;
5760 }
5761 }
5762
5763 // put mon. key in external keyring; seed with everything else.
5764 extract_save_mon_key(keyring);
5765
5766 bufferlist keyringbl;
5767 keyring.encode_plaintext(keyringbl);
5768 t->put("mkfs", "keyring", keyringbl);
5769 }
5770 write_fsid(t);
5771 store->apply_transaction(t);
5772
5773 return 0;
5774}
5775
5776int Monitor::write_default_keyring(bufferlist& bl)
5777{
5778 ostringstream os;
5779 os << g_conf->mon_data << "/keyring";
5780
5781 int err = 0;
91327a77 5782 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
7c673cae
FG
5783 if (fd < 0) {
5784 err = -errno;
5785 dout(0) << __func__ << " failed to open " << os.str()
5786 << ": " << cpp_strerror(err) << dendl;
5787 return err;
5788 }
5789
5790 err = bl.write_fd(fd);
5791 if (!err)
5792 ::fsync(fd);
5793 VOID_TEMP_FAILURE_RETRY(::close(fd));
5794
5795 return err;
5796}
5797
5798void Monitor::extract_save_mon_key(KeyRing& keyring)
5799{
5800 EntityName mon_name;
5801 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5802 EntityAuth mon_key;
5803 if (keyring.get_auth(mon_name, mon_key)) {
5804 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5805 KeyRing pkey;
5806 pkey.add(mon_name, mon_key);
5807 bufferlist bl;
5808 pkey.encode_plaintext(bl);
5809 write_default_keyring(bl);
5810 keyring.remove(mon_name);
5811 }
5812}
5813
5814bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5815 bool force_new)
5816{
5817 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5818 << dendl;
5819
5820 if (is_shutdown())
5821 return false;
5822
5823 // we only connect to other monitors and mgr; every else connects to us.
5824 if (service_id != CEPH_ENTITY_TYPE_MON &&
5825 service_id != CEPH_ENTITY_TYPE_MGR)
5826 return false;
5827
5828 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5829 // auth_none
5830 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5831 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5832 handler.set_global_id(0);
5833 *authorizer = handler.build_authorizer(service_id);
5834 return true;
5835 }
5836
5837 CephXServiceTicketInfo auth_ticket_info;
5838 CephXSessionAuthInfo info;
5839 int ret;
5840
5841 EntityName name;
5842 name.set_type(CEPH_ENTITY_TYPE_MON);
5843 auth_ticket_info.ticket.name = name;
5844 auth_ticket_info.ticket.global_id = 0;
5845
5846 if (service_id == CEPH_ENTITY_TYPE_MON) {
5847 // mon to mon authentication uses the private monitor shared key and not the
5848 // rotating key
5849 CryptoKey secret;
5850 if (!keyring.get_secret(name, secret) &&
5851 !key_server.get_secret(name, secret)) {
5852 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5853 << dendl;
5854 stringstream ss, ds;
5855 int err = key_server.list_secrets(ds);
5856 if (err < 0)
5857 ss << "no installed auth entries!";
5858 else
5859 ss << "installed auth entries:";
5860 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5861 return false;
5862 }
5863
5864 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5865 secret, (uint64_t)-1);
5866 if (ret < 0) {
5867 dout(0) << __func__ << " failed to build mon session_auth_info "
5868 << cpp_strerror(ret) << dendl;
5869 return false;
5870 }
5871 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5872 // mgr
5873 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5874 if (ret < 0) {
5875 derr << __func__ << " failed to build mgr service session_auth_info "
5876 << cpp_strerror(ret) << dendl;
5877 return false;
5878 }
5879 } else {
5880 ceph_abort(); // see check at top of fn
5881 }
5882
5883 CephXTicketBlob blob;
5884 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5885 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5886 return false;
5887 }
5888 bufferlist ticket_data;
5889 ::encode(blob, ticket_data);
5890
5891 bufferlist::iterator iter = ticket_data.begin();
5892 CephXTicketHandler handler(g_ceph_context, service_id);
5893 ::decode(handler.ticket, iter);
5894
5895 handler.session_key = info.session_key;
5896
5897 *authorizer = handler.build_authorizer(0);
5898
5899 return true;
5900}
5901
5902bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5903 int protocol, bufferlist& authorizer_data,
5904 bufferlist& authorizer_reply,
28e407b8
AA
5905 bool& isvalid, CryptoKey& session_key,
5906 std::unique_ptr<AuthAuthorizerChallenge> *challenge)
7c673cae
FG
5907{
5908 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5909 << " " << ceph_entity_type_name(peer_type)
5910 << " protocol " << protocol << dendl;
5911
5912 if (is_shutdown())
5913 return false;
5914
5915 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5916 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5917 // monitor, and cephx is enabled
5918 isvalid = false;
5919 if (protocol == CEPH_AUTH_CEPHX) {
5920 bufferlist::iterator iter = authorizer_data.begin();
5921 CephXServiceTicketInfo auth_ticket_info;
5922
5923 if (authorizer_data.length()) {
5924 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
28e407b8 5925 auth_ticket_info, challenge, authorizer_reply);
7c673cae
FG
5926 if (ret) {
5927 session_key = auth_ticket_info.session_key;
5928 isvalid = true;
5929 } else {
5930 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5931 }
5932 }
5933 } else {
5934 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5935 }
5936 } else {
5937 // who cares.
5938 isvalid = true;
5939 }
5940 return true;
5941}