]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Monitor.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / mon / Monitor.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
11fdf7f2 16#include <iterator>
7c673cae 17#include <sstream>
11fdf7f2 18#include <tuple>
7c673cae
FG
19#include <stdlib.h>
20#include <signal.h>
21#include <limits.h>
22#include <cstring>
23#include <boost/scope_exit.hpp>
24#include <boost/algorithm/string/predicate.hpp>
25
11fdf7f2
TL
26#include "json_spirit/json_spirit_reader.h"
27#include "json_spirit/json_spirit_writer.h"
28
7c673cae
FG
29#include "Monitor.h"
30#include "common/version.h"
11fdf7f2
TL
31#include "common/blkdev.h"
32#include "common/cmdparse.h"
33#include "common/signal.h"
7c673cae
FG
34
35#include "osd/OSDMap.h"
36
37#include "MonitorDBStore.h"
38
39#include "messages/PaxosServiceMessage.h"
40#include "messages/MMonMap.h"
41#include "messages/MMonGetMap.h"
42#include "messages/MMonGetVersion.h"
43#include "messages/MMonGetVersionReply.h"
44#include "messages/MGenericMessage.h"
45#include "messages/MMonCommand.h"
46#include "messages/MMonCommandAck.h"
47#include "messages/MMonMetadata.h"
48#include "messages/MMonSync.h"
49#include "messages/MMonScrub.h"
50#include "messages/MMonProbe.h"
51#include "messages/MMonJoin.h"
52#include "messages/MMonPaxos.h"
53#include "messages/MRoute.h"
54#include "messages/MForward.h"
55
56#include "messages/MMonSubscribe.h"
57#include "messages/MMonSubscribeAck.h"
58
59#include "messages/MAuthReply.h"
60
11fdf7f2 61#include "messages/MTimeCheck2.h"
7c673cae
FG
62#include "messages/MPing.h"
63
64#include "common/strtol.h"
65#include "common/ceph_argparse.h"
66#include "common/Timer.h"
67#include "common/Clock.h"
68#include "common/errno.h"
69#include "common/perf_counters.h"
70#include "common/admin_socket.h"
71#include "global/signal_handler.h"
72#include "common/Formatter.h"
73#include "include/stringify.h"
74#include "include/color.h"
75#include "include/ceph_fs.h"
76#include "include/str_list.h"
77
78#include "OSDMonitor.h"
79#include "MDSMonitor.h"
80#include "MonmapMonitor.h"
7c673cae
FG
81#include "LogMonitor.h"
82#include "AuthMonitor.h"
83#include "MgrMonitor.h"
31f18b77 84#include "MgrStatMonitor.h"
11fdf7f2 85#include "ConfigMonitor.h"
7c673cae
FG
86#include "mon/QuorumService.h"
87#include "mon/HealthMonitor.h"
88#include "mon/ConfigKeyService.h"
89#include "common/config.h"
90#include "common/cmdparse.h"
11fdf7f2 91#include "include/ceph_assert.h"
7c673cae
FG
92#include "include/compat.h"
93#include "perfglue/heap_profiler.h"
94
95#include "auth/none/AuthNoneClientHandler.h"
96
97#define dout_subsys ceph_subsys_mon
98#undef dout_prefix
99#define dout_prefix _prefix(_dout, this)
100static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
101 return *_dout << "mon." << mon->name << "@" << mon->rank
102 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
103}
104
105const string Monitor::MONITOR_NAME = "monitor";
106const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
107
108
109#undef FLAG
110#undef COMMAND
111#undef COMMAND_WITH_FLAG
7c673cae 112#define FLAG(f) (MonCommand::FLAG_##f)
11fdf7f2
TL
113#define COMMAND(parsesig, helptext, modulename, req_perms) \
114 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
115#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
116 {parsesig, helptext, modulename, req_perms, flags},
d2e6a577 117MonCommand mon_commands[] = {
7c673cae 118#include <mon/MonCommands.h>
d2e6a577 119};
7c673cae
FG
120#undef COMMAND
121#undef COMMAND_WITH_FLAG
7c673cae
FG
122
123
11fdf7f2 124
7c673cae
FG
125void C_MonContext::finish(int r) {
126 if (mon->is_shutdown())
127 return;
128 FunctionContext::finish(r);
129}
130
131Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
132 Messenger *m, Messenger *mgr_m, MonMap *map) :
133 Dispatcher(cct_),
11fdf7f2 134 AuthServer(cct_),
7c673cae
FG
135 name(nm),
136 rank(-1),
137 messenger(m),
138 con_self(m ? m->get_loopback_connection() : NULL),
139 lock("Monitor::lock"),
140 timer(cct_, lock),
141 finisher(cct_, "mon_finisher", "fin"),
11fdf7f2 142 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
7c673cae
FG
143 has_ever_joined(false),
144 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
145 monmap(map),
146 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
147 key_server(cct, &keyring),
148 auth_cluster_required(cct,
149 cct->_conf->auth_supported.empty() ?
150 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
151 auth_service_required(cct,
152 cct->_conf->auth_supported.empty() ?
11fdf7f2 153 cct->_conf->auth_service_required : cct->_conf->auth_supported),
7c673cae
FG
154 mgr_messenger(mgr_m),
155 mgr_client(cct_, mgr_m),
11fdf7f2 156 gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
7c673cae
FG
157 store(s),
158
7c673cae
FG
159 elector(this),
160 required_features(0),
161 leader(0),
162 quorum_con_features(0),
163 // scrub
164 scrub_version(0),
165 scrub_event(NULL),
166 scrub_timeout_event(NULL),
167
168 // sync state
169 sync_provider_count(0),
170 sync_cookie(0),
171 sync_full(false),
172 sync_start_version(0),
173 sync_timeout_event(NULL),
174 sync_last_committed_floor(0),
175
176 timecheck_round(0),
177 timecheck_acks(0),
178 timecheck_rounds_since_clean(0),
179 timecheck_event(NULL),
180
181 paxos_service(PAXOS_NUM),
182 admin_hook(NULL),
183 routed_request_tid(0),
11fdf7f2 184 op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1)
7c673cae
FG
185{
186 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
187 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
188
189 update_log_clients();
190
11fdf7f2
TL
191 if (!gss_ktfile_client.empty()) {
192 // Assert we can export environment variable
193 /*
194 The default client keytab is used, if it is present and readable,
195 to automatically obtain initial credentials for GSSAPI client
196 applications. The principal name of the first entry in the client
197 keytab is used by default when obtaining initial credentials.
198 1. The KRB5_CLIENT_KTNAME environment variable.
199 2. The default_client_keytab_name profile variable in [libdefaults].
200 3. The hardcoded default, DEFCKTNAME.
201 */
202 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
203 gss_ktfile_client.c_str(), 1));
204 ceph_assert(set_result == 0);
205 }
206
207 op_tracker.set_complaint_and_threshold(
208 g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count(),
209 g_conf().get_val<int64_t>("mon_op_log_threshold"));
210 op_tracker.set_history_size_and_duration(
211 g_conf().get_val<uint64_t>("mon_op_history_size"),
212 g_conf().get_val<std::chrono::seconds>("mon_op_history_duration").count());
213 op_tracker.set_history_slow_op_size_and_threshold(
214 g_conf().get_val<uint64_t>("mon_op_history_slow_op_size"),
215 g_conf().get_val<std::chrono::seconds>("mon_op_history_slow_op_threshold").count());
216
7c673cae
FG
217 paxos = new Paxos(this, "paxos");
218
11fdf7f2
TL
219 paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(this, paxos, "mdsmap"));
220 paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(this, paxos, "monmap"));
221 paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, this, paxos, "osdmap"));
222 paxos_service[PAXOS_LOG].reset(new LogMonitor(this, paxos, "logm"));
223 paxos_service[PAXOS_AUTH].reset(new AuthMonitor(this, paxos, "auth"));
224 paxos_service[PAXOS_MGR].reset(new MgrMonitor(this, paxos, "mgr"));
225 paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(this, paxos, "mgrstat"));
226 paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(this, paxos, "health"));
227 paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(this, paxos, "config"));
228
7c673cae
FG
229 config_key_service = new ConfigKeyService(this, paxos);
230
11fdf7f2
TL
231 bool r = mon_caps.parse("allow *", NULL);
232 ceph_assert(r);
7c673cae
FG
233
234 exited_quorum = ceph_clock_now();
235
d2e6a577 236 // prepare local commands
11fdf7f2
TL
237 local_mon_commands.resize(std::size(mon_commands));
238 for (unsigned i = 0; i < std::size(mon_commands); ++i) {
d2e6a577
FG
239 local_mon_commands[i] = mon_commands[i];
240 }
241 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
242
11fdf7f2
TL
243 prenautilus_local_mon_commands = local_mon_commands;
244 for (auto& i : prenautilus_local_mon_commands) {
245 std::string n = cmddesc_get_prenautilus_compat(i.cmdstring);
246 if (n != i.cmdstring) {
247 dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl;
248 i.cmdstring = n;
249 }
d2e6a577 250 }
11fdf7f2 251 MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl);
d2e6a577 252
7c673cae
FG
253 // assume our commands until we have an election. this only means
254 // we won't reply with EINVAL before the election; any command that
255 // actually matters will wait until we have quorum etc and then
256 // retry (and revalidate).
d2e6a577 257 leader_mon_commands = local_mon_commands;
7c673cae
FG
258}
259
7c673cae
FG
260Monitor::~Monitor()
261{
11fdf7f2
TL
262 op_tracker.on_shutdown();
263
264 paxos_service.clear();
7c673cae
FG
265 delete config_key_service;
266 delete paxos;
11fdf7f2 267 ceph_assert(session_map.sessions.empty());
7c673cae
FG
268}
269
270
271class AdminHook : public AdminSocketHook {
272 Monitor *mon;
273public:
274 explicit AdminHook(Monitor *m) : mon(m) {}
11fdf7f2
TL
275 bool call(std::string_view command, const cmdmap_t& cmdmap,
276 std::string_view format, bufferlist& out) override {
7c673cae
FG
277 stringstream ss;
278 mon->do_admin_command(command, cmdmap, format, ss);
279 out.append(ss);
280 return true;
281 }
282};
283
11fdf7f2
TL
284void Monitor::do_admin_command(std::string_view command, const cmdmap_t& cmdmap,
285 std::string_view format, std::ostream& ss)
7c673cae 286{
11fdf7f2 287 std::lock_guard l(lock);
7c673cae
FG
288
289 boost::scoped_ptr<Formatter> f(Formatter::create(format));
290
291 string args;
11fdf7f2 292 for (auto p = cmdmap.begin();
7c673cae
FG
293 p != cmdmap.end(); ++p) {
294 if (p->first == "prefix")
295 continue;
296 if (!args.empty())
297 args += ", ";
298 args += cmd_vartype_stringify(p->second);
299 }
300 args = "[" + args + "]";
301
302 bool read_only = (command == "mon_status" ||
303 command == "mon metadata" ||
304 command == "quorum_status" ||
224ce89b
WB
305 command == "ops" ||
306 command == "sessions");
7c673cae
FG
307
308 (read_only ? audit_clog->debug() : audit_clog->info())
309 << "from='admin socket' entity='admin socket' "
310 << "cmd='" << command << "' args=" << args << ": dispatch";
311
312 if (command == "mon_status") {
313 get_mon_status(f.get(), ss);
314 if (f)
315 f->flush(ss);
316 } else if (command == "quorum_status") {
317 _quorum_status(f.get(), ss);
318 } else if (command == "sync_force") {
319 string validate;
320 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
321 (validate != "--yes-i-really-mean-it")) {
322 ss << "are you SURE? this will mean the monitor store will be erased "
323 "the next time the monitor is restarted. pass "
324 "'--yes-i-really-mean-it' if you really do.";
325 goto abort;
326 }
327 sync_force(f.get(), ss);
11fdf7f2
TL
328 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
329 command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
7c673cae
FG
330 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
331 goto abort;
332 } else if (command == "quorum enter") {
333 elector.start_participating();
334 start_election();
335 ss << "started responding to quorum, initiated new election";
336 } else if (command == "quorum exit") {
337 start_election();
338 elector.stop_participating();
339 ss << "stopped responding to quorum, initiated new election";
340 } else if (command == "ops") {
341 (void)op_tracker.dump_ops_in_flight(f.get());
342 if (f) {
343 f->flush(ss);
344 }
224ce89b
WB
345 } else if (command == "sessions") {
346
347 if (f) {
348 f->open_array_section("sessions");
349 for (auto p : session_map.sessions) {
350 f->dump_stream("session") << *p;
351 }
352 f->close_section();
353 f->flush(ss);
354 }
355
11fdf7f2
TL
356 } else if (command == "dump_historic_ops") {
357 if (op_tracker.dump_historic_ops(f.get())) {
358 f->flush(ss);
359 } else {
360 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
361 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
362 }
363 } else if (command == "dump_historic_ops_by_duration" ) {
364 if (op_tracker.dump_historic_ops(f.get(), true)) {
365 f->flush(ss);
366 } else {
367 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
368 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
369 }
370 } else if (command == "dump_historic_slow_ops") {
371 if (op_tracker.dump_historic_slow_ops(f.get(), {})) {
372 f->flush(ss);
373 } else {
374 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
375 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
376 }
7c673cae 377 } else {
11fdf7f2 378 ceph_abort_msg("bad AdminSocket command binding");
7c673cae
FG
379 }
380 (read_only ? audit_clog->debug() : audit_clog->info())
381 << "from='admin socket' "
382 << "entity='admin socket' "
383 << "cmd=" << command << " "
384 << "args=" << args << ": finished";
385 return;
386
387abort:
388 (read_only ? audit_clog->debug() : audit_clog->info())
389 << "from='admin socket' "
390 << "entity='admin socket' "
391 << "cmd=" << command << " "
392 << "args=" << args << ": aborted";
393}
394
395void Monitor::handle_signal(int signum)
396{
11fdf7f2 397 ceph_assert(signum == SIGINT || signum == SIGTERM);
7c673cae
FG
398 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
399 shutdown();
400}
401
402CompatSet Monitor::get_initial_supported_features()
403{
404 CompatSet::FeatureSet ceph_mon_feature_compat;
405 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
406 CompatSet::FeatureSet ceph_mon_feature_incompat;
407 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
408 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
409 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
410 ceph_mon_feature_incompat);
411}
412
413CompatSet Monitor::get_supported_features()
414{
415 CompatSet compat = get_initial_supported_features();
416 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
417 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
418 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
419 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
420 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
181888fb 421 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
11fdf7f2
TL
422 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
423 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
7c673cae
FG
424 return compat;
425}
426
427CompatSet Monitor::get_legacy_features()
428{
429 CompatSet::FeatureSet ceph_mon_feature_compat;
430 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
431 CompatSet::FeatureSet ceph_mon_feature_incompat;
432 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
433 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
434 ceph_mon_feature_incompat);
435}
436
437int Monitor::check_features(MonitorDBStore *store)
438{
439 CompatSet required = get_supported_features();
440 CompatSet ondisk;
441
442 read_features_off_disk(store, &ondisk);
443
444 if (!required.writeable(ondisk)) {
445 CompatSet diff = required.unsupported(ondisk);
446 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
447 return -EPERM;
448 }
449
450 return 0;
451}
452
453void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
454{
455 bufferlist featuresbl;
456 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
457 if (featuresbl.length() == 0) {
458 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
459 << "Assuming it is old-style and introducing one." << dendl;
460 //we only want the baseline ~v.18 features assumed to be on disk.
461 //If new features are introduced this code needs to disappear or
462 //be made smarter.
463 *features = get_legacy_features();
464
465 features->encode(featuresbl);
466 auto t(std::make_shared<MonitorDBStore::Transaction>());
467 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
468 store->apply_transaction(t);
469 } else {
11fdf7f2 470 auto it = featuresbl.cbegin();
7c673cae
FG
471 features->decode(it);
472 }
473}
474
475void Monitor::read_features()
476{
477 read_features_off_disk(store, &features);
478 dout(10) << "features " << features << dendl;
479
480 calc_quorum_requirements();
481 dout(10) << "required_features " << required_features << dendl;
482}
483
484void Monitor::write_features(MonitorDBStore::TransactionRef t)
485{
486 bufferlist bl;
487 features.encode(bl);
488 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
489}
490
491const char** Monitor::get_tracked_conf_keys() const
492{
493 static const char* KEYS[] = {
494 "crushtool", // helpful for testing
495 "mon_election_timeout",
496 "mon_lease",
497 "mon_lease_renew_interval_factor",
498 "mon_lease_ack_timeout_factor",
499 "mon_accept_timeout_factor",
500 // clog & admin clog
501 "clog_to_monitors",
502 "clog_to_syslog",
503 "clog_to_syslog_facility",
504 "clog_to_syslog_level",
505 "clog_to_graylog",
506 "clog_to_graylog_host",
507 "clog_to_graylog_port",
508 "host",
509 "fsid",
510 // periodic health to clog
511 "mon_health_to_clog",
512 "mon_health_to_clog_interval",
513 "mon_health_to_clog_tick_interval",
514 // scrub interval
515 "mon_scrub_interval",
11fdf7f2
TL
516 "mon_allow_pool_delete",
517 // osdmap pruning - observed, not handled.
518 "mon_osdmap_full_prune_enabled",
519 "mon_osdmap_full_prune_min",
520 "mon_osdmap_full_prune_interval",
521 "mon_osdmap_full_prune_txsize",
522 // debug options - observed, not handled
523 "mon_debug_extra_checks",
524 "mon_debug_block_osdmap_trim",
7c673cae
FG
525 NULL
526 };
527 return KEYS;
528}
529
11fdf7f2 530void Monitor::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
531 const std::set<std::string> &changed)
532{
533 sanitize_options();
534
535 dout(10) << __func__ << " " << changed << dendl;
536
537 if (changed.count("clog_to_monitors") ||
538 changed.count("clog_to_syslog") ||
539 changed.count("clog_to_syslog_level") ||
540 changed.count("clog_to_syslog_facility") ||
541 changed.count("clog_to_graylog") ||
542 changed.count("clog_to_graylog_host") ||
543 changed.count("clog_to_graylog_port") ||
544 changed.count("host") ||
545 changed.count("fsid")) {
546 update_log_clients();
547 }
548
549 if (changed.count("mon_health_to_clog") ||
550 changed.count("mon_health_to_clog_interval") ||
551 changed.count("mon_health_to_clog_tick_interval")) {
552 health_to_clog_update_conf(changed);
553 }
554
555 if (changed.count("mon_scrub_interval")) {
556 scrub_update_interval(conf->mon_scrub_interval);
557 }
558}
559
560void Monitor::update_log_clients()
561{
562 map<string,string> log_to_monitors;
563 map<string,string> log_to_syslog;
564 map<string,string> log_channel;
565 map<string,string> log_prio;
566 map<string,string> log_to_graylog;
567 map<string,string> log_to_graylog_host;
568 map<string,string> log_to_graylog_port;
569 uuid_d fsid;
570 string host;
571
572 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
573 log_channel, log_prio, log_to_graylog,
574 log_to_graylog_host, log_to_graylog_port,
575 fsid, host))
576 return;
577
578 clog->update_config(log_to_monitors, log_to_syslog,
579 log_channel, log_prio, log_to_graylog,
580 log_to_graylog_host, log_to_graylog_port,
581 fsid, host);
582
583 audit_clog->update_config(log_to_monitors, log_to_syslog,
584 log_channel, log_prio, log_to_graylog,
585 log_to_graylog_host, log_to_graylog_port,
586 fsid, host);
587}
588
589int Monitor::sanitize_options()
590{
591 int r = 0;
592
593 // mon_lease must be greater than mon_lease_renewal; otherwise we
594 // may incur in leases expiring before they are renewed.
11fdf7f2 595 if (g_conf()->mon_lease_renew_interval_factor >= 1.0) {
7c673cae 596 clog->error() << "mon_lease_renew_interval_factor ("
11fdf7f2 597 << g_conf()->mon_lease_renew_interval_factor
7c673cae
FG
598 << ") must be less than 1.0";
599 r = -EINVAL;
600 }
601
602 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
603 // got time to renew the lease and get an ack for it. Having both options
604 // with the same value, for a given small vale, could mean timing out if
605 // the monitors happened to be overloaded -- or even under normal load for
606 // a small enough value.
11fdf7f2 607 if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) {
7c673cae 608 clog->error() << "mon_lease_ack_timeout_factor ("
11fdf7f2 609 << g_conf()->mon_lease_ack_timeout_factor
7c673cae
FG
610 << ") must be greater than 1.0";
611 r = -EINVAL;
612 }
613
614 return r;
615}
616
617int Monitor::preinit()
618{
619 lock.Lock();
620
621 dout(1) << "preinit fsid " << monmap->fsid << dendl;
622
623 int r = sanitize_options();
624 if (r < 0) {
625 derr << "option sanitization failed!" << dendl;
626 lock.Unlock();
627 return r;
628 }
629
11fdf7f2 630 ceph_assert(!logger);
7c673cae
FG
631 {
632 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
3efd9988
FG
633 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
634 PerfCountersBuilder::PRIO_USEFUL);
635 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
636 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
637 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
638 "srm", PerfCountersBuilder::PRIO_INTERESTING);
639 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
640 "strm", PerfCountersBuilder::PRIO_USEFUL);
641 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
642 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
643 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
644 "estt", PerfCountersBuilder::PRIO_INTERESTING);
645 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
646 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
647 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
648 "elst", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
649 logger = pcb.create_perf_counters();
650 cct->get_perfcounters_collection()->add(logger);
651 }
652
11fdf7f2 653 ceph_assert(!cluster_logger);
7c673cae
FG
654 {
655 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
656 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
657 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
658 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
659 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
660 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
661 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
11fdf7f2
TL
662 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES));
663 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES));
664 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
665 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
666 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
667 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
668 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
669 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
670 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
671 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
672 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
673 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
11fdf7f2 674 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
675 cluster_logger = pcb.create_perf_counters();
676 }
677
678 paxos->init_logger();
679
680 // verify cluster_uuid
681 {
682 int r = check_fsid();
683 if (r == -ENOENT)
684 r = write_fsid();
685 if (r < 0) {
686 lock.Unlock();
687 return r;
688 }
689 }
690
691 // open compatset
692 read_features();
693
694 // have we ever joined a quorum?
695 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
696 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
697
698 if (!has_ever_joined) {
699 // impose initial quorum restrictions?
700 list<string> initial_members;
11fdf7f2 701 get_str_list(g_conf()->mon_initial_members, initial_members);
7c673cae
FG
702
703 if (!initial_members.empty()) {
704 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
705
11fdf7f2
TL
706 monmap->set_initial_members(
707 g_ceph_context, initial_members, name, messenger->get_myaddrs(),
708 &extra_probe_peers);
7c673cae
FG
709
710 dout(10) << " monmap is " << *monmap << dendl;
711 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
712 }
713 } else if (!monmap->contains(name)) {
714 derr << "not in monmap and have been in a quorum before; "
715 << "must have been removed" << dendl;
11fdf7f2 716 if (g_conf()->mon_force_quorum_join) {
7c673cae
FG
717 dout(0) << "we should have died but "
718 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
719 } else {
720 derr << "commit suicide!" << dendl;
721 lock.Unlock();
722 return -ENOENT;
723 }
724 }
725
726 {
727 // We have a potentially inconsistent store state in hands. Get rid of it
728 // and start fresh.
729 bool clear_store = false;
730 if (store->exists("mon_sync", "in_sync")) {
731 dout(1) << __func__ << " clean up potentially inconsistent store state"
732 << dendl;
733 clear_store = true;
734 }
735
736 if (store->get("mon_sync", "force_sync") > 0) {
737 dout(1) << __func__ << " force sync by clearing store state" << dendl;
738 clear_store = true;
739 }
740
741 if (clear_store) {
742 set<string> sync_prefixes = get_sync_targets_names();
743 store->clear(sync_prefixes);
744 }
745 }
746
747 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
748 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
749
750 init_paxos();
7c673cae
FG
751
752 if (is_keyring_required()) {
753 // we need to bootstrap authentication keys so we can form an
754 // initial quorum.
755 if (authmon()->get_last_committed() == 0) {
756 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
757 bufferlist bl;
758 int err = store->get("mkfs", "keyring", bl);
759 if (err == 0 && bl.length() > 0) {
760 // Attempt to decode and extract keyring only if it is found.
761 KeyRing keyring;
11fdf7f2
TL
762 auto p = bl.cbegin();
763 decode(keyring, p);
7c673cae
FG
764 extract_save_mon_key(keyring);
765 }
766 }
767
11fdf7f2 768 string keyring_loc = g_conf()->mon_data + "/keyring";
7c673cae
FG
769
770 r = keyring.load(cct, keyring_loc);
771 if (r < 0) {
772 EntityName mon_name;
773 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
774 EntityAuth mon_key;
775 if (key_server.get_auth(mon_name, mon_key)) {
776 dout(1) << "copying mon. key from old db to external keyring" << dendl;
777 keyring.add(mon_name, mon_key);
778 bufferlist bl;
779 keyring.encode_plaintext(bl);
780 write_default_keyring(bl);
781 } else {
11fdf7f2 782 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
7c673cae
FG
783 lock.Unlock();
784 return r;
785 }
786 }
787 }
788
789 admin_hook = new AdminHook(this);
790 AdminSocket* admin_socket = cct->get_admin_socket();
791
792 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
793 lock.Unlock();
794 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
795 "show current monitor status");
11fdf7f2 796 ceph_assert(r == 0);
7c673cae
FG
797 r = admin_socket->register_command("quorum_status", "quorum_status",
798 admin_hook, "show current quorum status");
11fdf7f2 799 ceph_assert(r == 0);
7c673cae
FG
800 r = admin_socket->register_command("sync_force",
801 "sync_force name=validate,"
802 "type=CephChoices,"
803 "strings=--yes-i-really-mean-it",
804 admin_hook,
805 "force sync of and clear monitor store");
11fdf7f2 806 ceph_assert(r == 0);
7c673cae
FG
807 r = admin_socket->register_command("add_bootstrap_peer_hint",
808 "add_bootstrap_peer_hint name=addr,"
809 "type=CephIPAddr",
810 admin_hook,
811 "add peer address as potential bootstrap"
812 " peer for cluster bringup");
11fdf7f2
TL
813 ceph_assert(r == 0);
814 r = admin_socket->register_command("add_bootstrap_peer_hintv",
815 "add_bootstrap_peer_hintv name=addrv,"
816 "type=CephString",
817 admin_hook,
818 "add peer address vector as potential bootstrap"
819 " peer for cluster bringup");
820 ceph_assert(r == 0);
7c673cae
FG
821 r = admin_socket->register_command("quorum enter", "quorum enter",
822 admin_hook,
823 "force monitor back into quorum");
11fdf7f2 824 ceph_assert(r == 0);
7c673cae
FG
825 r = admin_socket->register_command("quorum exit", "quorum exit",
826 admin_hook,
827 "force monitor out of the quorum");
11fdf7f2 828 ceph_assert(r == 0);
7c673cae
FG
829 r = admin_socket->register_command("ops",
830 "ops",
831 admin_hook,
832 "show the ops currently in flight");
11fdf7f2 833 ceph_assert(r == 0);
224ce89b
WB
834 r = admin_socket->register_command("sessions",
835 "sessions",
836 admin_hook,
837 "list existing sessions");
11fdf7f2
TL
838 ceph_assert(r == 0);
839 r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops",
840 admin_hook,
841 "show recent ops");
842 ceph_assert(r == 0);
843 r = admin_socket->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration",
844 admin_hook,
845 "show recent ops, sorted by duration");
846 ceph_assert(r == 0);
847 r = admin_socket->register_command("dump_historic_slow_ops", "dump_historic_slow_ops",
848 admin_hook,
849 "show recent slow ops");
850 ceph_assert(r == 0);
7c673cae
FG
851
852 lock.Lock();
853
854 // add ourselves as a conf observer
11fdf7f2
TL
855 g_conf().add_observer(this);
856
857 messenger->set_auth_client(this);
858 messenger->set_auth_server(this);
859 mgr_messenger->set_auth_client(this);
860
861 auth_registry.refresh_config();
7c673cae
FG
862
863 lock.Unlock();
864 return 0;
865}
866
867int Monitor::init()
868{
869 dout(2) << "init" << dendl;
11fdf7f2 870 std::lock_guard l(lock);
7c673cae
FG
871
872 finisher.start();
873
874 // start ticker
875 timer.init();
876 new_tick();
877
878 cpu_tp.start();
879
880 // i'm ready!
881 messenger->add_dispatcher_tail(this);
882
11fdf7f2 883 // kickstart pet mgrclient
7c673cae
FG
884 mgr_client.init();
885 mgr_messenger->add_dispatcher_tail(&mgr_client);
886 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
11fdf7f2 887 mgrmon()->prime_mgr_client();
7c673cae 888
11fdf7f2 889 state = STATE_PROBING;
7c673cae 890 bootstrap();
b5b8bbf5
FG
891 // add features of myself into feature_map
892 session_map.feature_map.add_mon(con_self->get_features());
7c673cae
FG
893 return 0;
894}
895
896void Monitor::init_paxos()
897{
898 dout(10) << __func__ << dendl;
899 paxos->init();
900
901 // init services
11fdf7f2
TL
902 for (auto& svc : paxos_service) {
903 svc->init();
7c673cae
FG
904 }
905
906 refresh_from_paxos(NULL);
907}
908
909void Monitor::refresh_from_paxos(bool *need_bootstrap)
910{
911 dout(10) << __func__ << dendl;
912
913 bufferlist bl;
914 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
915 if (r >= 0) {
916 try {
11fdf7f2
TL
917 auto p = bl.cbegin();
918 decode(fingerprint, p);
7c673cae
FG
919 }
920 catch (buffer::error& e) {
921 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
922 }
923 } else {
924 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
925 }
926
11fdf7f2
TL
927 for (auto& svc : paxos_service) {
928 svc->refresh(need_bootstrap);
7c673cae 929 }
11fdf7f2
TL
930 for (auto& svc : paxos_service) {
931 svc->post_refresh();
7c673cae 932 }
224ce89b 933 load_metadata();
7c673cae
FG
934}
935
936void Monitor::register_cluster_logger()
937{
938 if (!cluster_logger_registered) {
939 dout(10) << "register_cluster_logger" << dendl;
940 cluster_logger_registered = true;
941 cct->get_perfcounters_collection()->add(cluster_logger);
942 } else {
943 dout(10) << "register_cluster_logger - already registered" << dendl;
944 }
945}
946
947void Monitor::unregister_cluster_logger()
948{
949 if (cluster_logger_registered) {
950 dout(10) << "unregister_cluster_logger" << dendl;
951 cluster_logger_registered = false;
952 cct->get_perfcounters_collection()->remove(cluster_logger);
953 } else {
954 dout(10) << "unregister_cluster_logger - not registered" << dendl;
955 }
956}
957
958void Monitor::update_logger()
959{
960 cluster_logger->set(l_cluster_num_mon, monmap->size());
961 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
962}
963
964void Monitor::shutdown()
965{
966 dout(1) << "shutdown" << dendl;
967
968 lock.Lock();
969
970 wait_for_paxos_write();
971
11fdf7f2
TL
972 {
973 std::lock_guard l(auth_lock);
974 authmon()->_set_mon_num_rank(0, 0);
975 }
976
7c673cae
FG
977 state = STATE_SHUTDOWN;
978
f64942e4 979 lock.Unlock();
11fdf7f2 980 g_conf().remove_observer(this);
f64942e4 981 lock.Lock();
7c673cae
FG
982
983 if (admin_hook) {
11fdf7f2 984 cct->get_admin_socket()->unregister_commands(admin_hook);
7c673cae
FG
985 delete admin_hook;
986 admin_hook = NULL;
987 }
988
989 elector.shutdown();
990
991 mgr_client.shutdown();
992
993 lock.Unlock();
994 finisher.wait_for_empty();
995 finisher.stop();
996 lock.Lock();
997
998 // clean up
999 paxos->shutdown();
11fdf7f2
TL
1000 for (auto& svc : paxos_service) {
1001 svc->shutdown();
1002 }
7c673cae
FG
1003
1004 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
1005 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
1006
1007 timer.shutdown();
1008
1009 cpu_tp.stop();
1010
1011 remove_all_sessions();
1012
f64942e4
AA
1013 log_client.shutdown();
1014
1015 // unlock before msgr shutdown...
1016 lock.Unlock();
1017
1018 // shutdown messenger before removing logger from perfcounter collection,
1019 // otherwise _ms_dispatch() will try to update deleted logger
1020 messenger->shutdown();
1021 mgr_messenger->shutdown();
1022
7c673cae
FG
1023 if (logger) {
1024 cct->get_perfcounters_collection()->remove(logger);
1025 delete logger;
1026 logger = NULL;
1027 }
1028 if (cluster_logger) {
1029 if (cluster_logger_registered)
1030 cct->get_perfcounters_collection()->remove(cluster_logger);
1031 delete cluster_logger;
1032 cluster_logger = NULL;
1033 }
7c673cae
FG
1034}
1035
1036void Monitor::wait_for_paxos_write()
1037{
1038 if (paxos->is_writing() || paxos->is_writing_previous()) {
1039 dout(10) << __func__ << " flushing pending write" << dendl;
1040 lock.Unlock();
1041 store->flush();
1042 lock.Lock();
1043 dout(10) << __func__ << " flushed pending write" << dendl;
1044 }
1045}
1046
11fdf7f2
TL
1047void Monitor::respawn()
1048{
1049 // --- WARNING TO FUTURE COPY/PASTERS ---
1050 // You must also add a call like
1051 //
1052 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1053 //
1054 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1055 // instead of "(exe)", so that killall (and log rotation) will work.
1056
1057 dout(0) << __func__ << dendl;
1058
1059 char *new_argv[orig_argc+1];
1060 dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
1061 for (int i=0; i<orig_argc; i++) {
1062 new_argv[i] = (char *)orig_argv[i];
1063 dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
1064 }
1065 new_argv[orig_argc] = NULL;
1066
1067 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1068 * This allows us to exec the same executable even if it has since been
1069 * unlinked.
1070 */
1071 char exe_path[PATH_MAX] = "";
1072#ifdef PROCPREFIX
1073 if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
1074 dout(1) << "respawning with exe " << exe_path << dendl;
1075 strcpy(exe_path, PROCPREFIX "/proc/self/exe");
1076 } else {
1077#else
1078 {
1079#endif
1080 /* Print CWD for the user's interest */
1081 char buf[PATH_MAX];
1082 char *cwd = getcwd(buf, sizeof(buf));
1083 ceph_assert(cwd);
1084 dout(1) << " cwd " << cwd << dendl;
1085
1086 /* Fall back to a best-effort: just running in our CWD */
1087 strncpy(exe_path, orig_argv[0], PATH_MAX-1);
1088 }
1089
1090 dout(1) << " exe_path " << exe_path << dendl;
1091
1092 unblock_all_signals(NULL);
1093 execv(exe_path, new_argv);
1094
1095 dout(0) << "respawn execv " << orig_argv[0]
1096 << " failed with " << cpp_strerror(errno) << dendl;
1097
1098 // We have to assert out here, because suicide() returns, and callers
1099 // to respawn expect it never to return.
1100 ceph_abort();
1101}
1102
7c673cae
FG
1103void Monitor::bootstrap()
1104{
1105 dout(10) << "bootstrap" << dendl;
1106 wait_for_paxos_write();
1107
1108 sync_reset_requester();
1109 unregister_cluster_logger();
1110 cancel_probe_timeout();
1111
11fdf7f2
TL
1112 if (monmap->get_epoch() == 0) {
1113 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl;
1114 monmap->calc_legacy_ranks();
1115 }
1116 dout(10) << "monmap " << *monmap << dendl;
1117
1118 if (monmap->min_mon_release &&
1119 monmap->min_mon_release + 2 < (int)ceph_release()) {
1120 derr << "current monmap has min_mon_release "
1121 << (int)monmap->min_mon_release
1122 << " (" << ceph_release_name(monmap->min_mon_release)
1123 << ") which is >2 releases older than me " << ceph_release()
1124 << " (" << ceph_release_name(ceph_release()) << "), stopping."
1125 << dendl;
1126 exit(0);
1127 }
1128
7c673cae 1129 // note my rank
11fdf7f2 1130 int newrank = monmap->get_rank(messenger->get_myaddrs());
7c673cae
FG
1131 if (newrank < 0 && rank >= 0) {
1132 // was i ever part of the quorum?
1133 if (has_ever_joined) {
1134 dout(0) << " removed from monmap, suicide." << dendl;
1135 exit(0);
1136 }
1137 }
11fdf7f2
TL
1138 if (newrank >= 0 &&
1139 monmap->get_addrs(newrank) != messenger->get_myaddrs()) {
1140 dout(0) << " monmap addrs for rank " << newrank << " changed, i am "
1141 << messenger->get_myaddrs()
1142 << ", monmap is " << monmap->get_addrs(newrank) << ", respawning"
1143 << dendl;
1144 respawn();
1145 }
7c673cae
FG
1146 if (newrank != rank) {
1147 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
1148 messenger->set_myname(entity_name_t::MON(newrank));
1149 rank = newrank;
1150
1151 // reset all connections, or else our peers will think we are someone else.
1152 messenger->mark_down_all();
1153 }
1154
1155 // reset
1156 state = STATE_PROBING;
1157
1158 _reset();
1159
1160 // sync store
11fdf7f2 1161 if (g_conf()->mon_compact_on_bootstrap) {
7c673cae
FG
1162 dout(10) << "bootstrap -- triggering compaction" << dendl;
1163 store->compact();
1164 dout(10) << "bootstrap -- finished compaction" << dendl;
1165 }
1166
1167 // singleton monitor?
1168 if (monmap->size() == 1 && rank == 0) {
1169 win_standalone_election();
1170 return;
1171 }
1172
1173 reset_probe_timeout();
1174
1175 // i'm outside the quorum
1176 if (monmap->contains(name))
1177 outside_quorum.insert(name);
1178
1179 // probe monitors
1180 dout(10) << "probing other monitors" << dendl;
1181 for (unsigned i = 0; i < monmap->size(); i++) {
1182 if ((int)i != rank)
11fdf7f2
TL
1183 send_mon_message(
1184 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1185 ceph_release()),
1186 i);
1187 }
1188 for (auto& av : extra_probe_peers) {
1189 if (av != messenger->get_myaddrs()) {
1190 messenger->send_to_mon(
1191 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1192 ceph_release()),
1193 av);
7c673cae
FG
1194 }
1195 }
1196}
1197
11fdf7f2
TL
1198bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd,
1199 const cmdmap_t& cmdmap,
1200 ostream& ss)
7c673cae 1201{
7c673cae
FG
1202 if (is_leader() || is_peon()) {
1203 ss << "mon already active; ignoring bootstrap hint";
1204 return true;
1205 }
1206
11fdf7f2
TL
1207 entity_addrvec_t addrs;
1208 string addrstr;
1209 if (cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1210 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '"
1211 << addrstr << "'" << dendl;
1212
1213 entity_addr_t addr;
1214 const char *end = 0;
1215 if (!addr.parse(addrstr.c_str(), &end, entity_addr_t::TYPE_ANY)) {
1216 ss << "failed to parse addrs '" << addrstr
1217 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1218 return false;
1219 }
1220
1221 addrs.v.push_back(addr);
1222 if (addr.get_port() == 0) {
1223 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1224 addrs.v[0].set_port(CEPH_MON_PORT_IANA);
1225 addrs.v.push_back(addr);
1226 addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY);
1227 addrs.v[1].set_port(CEPH_MON_PORT_LEGACY);
1228 } else if (addr.get_type() == entity_addr_t::TYPE_ANY) {
1229 if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
1230 addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY);
1231 } else {
1232 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1233 }
1234 }
1235 } else if (cmd_getval(g_ceph_context, cmdmap, "addrv", addrstr)) {
1236 dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '"
1237 << addrstr << "'" << dendl;
1238 const char *end = 0;
1239 if (!addrs.parse(addrstr.c_str(), &end)) {
1240 ss << "failed to parse addrs '" << addrstr
1241 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1242 return false;
1243 }
1244 } else {
1245 ss << "no addr or addrv provided";
1246 return false;
1247 }
7c673cae 1248
11fdf7f2
TL
1249 extra_probe_peers.insert(addrs);
1250 ss << "adding peer " << addrs << " to list: " << extra_probe_peers;
7c673cae
FG
1251 return true;
1252}
1253
1254// called by bootstrap(), or on leader|peon -> electing
1255void Monitor::_reset()
1256{
1257 dout(10) << __func__ << dendl;
1258
11fdf7f2
TL
1259 // disable authentication
1260 {
1261 std::lock_guard l(auth_lock);
1262 authmon()->_set_mon_num_rank(0, 0);
1263 }
1264
7c673cae
FG
1265 cancel_probe_timeout();
1266 timecheck_finish();
1267 health_events_cleanup();
181888fb 1268 health_check_log_times.clear();
7c673cae
FG
1269 scrub_event_cancel();
1270
1271 leader_since = utime_t();
11fdf7f2 1272 quorum_since = {};
7c673cae
FG
1273 if (!quorum.empty()) {
1274 exited_quorum = ceph_clock_now();
1275 }
1276 quorum.clear();
1277 outside_quorum.clear();
31f18b77 1278 quorum_feature_map.clear();
7c673cae
FG
1279
1280 scrub_reset();
1281
1282 paxos->restart();
1283
11fdf7f2
TL
1284 for (auto& svc : paxos_service) {
1285 svc->restart();
1286 }
7c673cae
FG
1287}
1288
1289
1290// -----------------------------------------------------------
1291// sync
1292
1293set<string> Monitor::get_sync_targets_names()
1294{
1295 set<string> targets;
1296 targets.insert(paxos->get_name());
11fdf7f2
TL
1297 for (auto& svc : paxos_service) {
1298 svc->get_store_prefixes(targets);
1299 }
7c673cae 1300 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
11fdf7f2 1301 ceph_assert(config_key_service_ptr);
7c673cae
FG
1302 config_key_service_ptr->get_store_prefixes(targets);
1303 return targets;
1304}
1305
1306
1307void Monitor::sync_timeout()
1308{
1309 dout(10) << __func__ << dendl;
11fdf7f2 1310 ceph_assert(state == STATE_SYNCHRONIZING);
7c673cae
FG
1311 bootstrap();
1312}
1313
1314void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1315{
1316 dout(1) << __func__ << dendl;
1317
1318 MonMap latest_monmap;
1319
1320 // Grab latest monmap from MonmapMonitor
1321 bufferlist monmon_bl;
1322 int err = monmon()->get_monmap(monmon_bl);
1323 if (err < 0) {
1324 if (err != -ENOENT) {
1325 derr << __func__
1326 << " something wrong happened while reading the store: "
1327 << cpp_strerror(err) << dendl;
11fdf7f2 1328 ceph_abort_msg("error reading the store");
7c673cae
FG
1329 }
1330 } else {
1331 latest_monmap.decode(monmon_bl);
1332 }
1333
1334 // Grab last backed up monmap (if any) and compare epochs
1335 if (store->exists("mon_sync", "latest_monmap")) {
1336 bufferlist backup_bl;
1337 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1338 if (err < 0) {
1339 derr << __func__
1340 << " something wrong happened while reading the store: "
1341 << cpp_strerror(err) << dendl;
11fdf7f2 1342 ceph_abort_msg("error reading the store");
7c673cae 1343 }
11fdf7f2 1344 ceph_assert(backup_bl.length() > 0);
7c673cae
FG
1345
1346 MonMap backup_monmap;
1347 backup_monmap.decode(backup_bl);
1348
1349 if (backup_monmap.epoch > latest_monmap.epoch)
1350 latest_monmap = backup_monmap;
1351 }
1352
1353 // Check if our current monmap's epoch is greater than the one we've
1354 // got so far.
1355 if (monmap->epoch > latest_monmap.epoch)
1356 latest_monmap = *monmap;
1357
1358 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1359
1360 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1361}
1362
1363void Monitor::sync_reset_requester()
1364{
1365 dout(10) << __func__ << dendl;
1366
1367 if (sync_timeout_event) {
1368 timer.cancel_event(sync_timeout_event);
1369 sync_timeout_event = NULL;
1370 }
1371
11fdf7f2 1372 sync_provider = entity_addrvec_t();
7c673cae
FG
1373 sync_cookie = 0;
1374 sync_full = false;
1375 sync_start_version = 0;
1376}
1377
1378void Monitor::sync_reset_provider()
1379{
1380 dout(10) << __func__ << dendl;
1381 sync_providers.clear();
1382}
1383
11fdf7f2 1384void Monitor::sync_start(entity_addrvec_t &addrs, bool full)
7c673cae 1385{
11fdf7f2 1386 dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl;
7c673cae 1387
11fdf7f2 1388 ceph_assert(state == STATE_PROBING ||
7c673cae
FG
1389 state == STATE_SYNCHRONIZING);
1390 state = STATE_SYNCHRONIZING;
1391
1392 // make sure are not a provider for anyone!
1393 sync_reset_provider();
1394
1395 sync_full = full;
1396
1397 if (sync_full) {
1398 // stash key state, and mark that we are syncing
1399 auto t(std::make_shared<MonitorDBStore::Transaction>());
1400 sync_stash_critical_state(t);
1401 t->put("mon_sync", "in_sync", 1);
1402
11fdf7f2 1403 sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version());
7c673cae
FG
1404 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1405 << sync_last_committed_floor << dendl;
1406 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1407
1408 store->apply_transaction(t);
1409
11fdf7f2 1410 ceph_assert(g_conf()->mon_sync_requester_kill_at != 1);
7c673cae
FG
1411
1412 // clear the underlying store
1413 set<string> targets = get_sync_targets_names();
1414 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1415 store->clear(targets);
1416
1417 // make sure paxos knows it has been reset. this prevents a
1418 // bootstrap and then different probe reply order from possibly
1419 // deciding a partial or no sync is needed.
1420 paxos->init();
1421
11fdf7f2 1422 ceph_assert(g_conf()->mon_sync_requester_kill_at != 2);
7c673cae
FG
1423 }
1424
1425 // assume 'other' as the leader. We will update the leader once we receive
1426 // a reply to the sync start.
11fdf7f2 1427 sync_provider = addrs;
7c673cae
FG
1428
1429 sync_reset_timeout();
1430
1431 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1432 if (!sync_full)
1433 m->last_committed = paxos->get_version();
11fdf7f2 1434 messenger->send_to_mon(m, sync_provider);
7c673cae
FG
1435}
1436
1437void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1438{
1439 dout(10) << __func__ << dendl;
1440 bufferlist backup_monmap;
1441 sync_obtain_latest_monmap(backup_monmap);
11fdf7f2 1442 ceph_assert(backup_monmap.length() > 0);
7c673cae
FG
1443 t->put("mon_sync", "latest_monmap", backup_monmap);
1444}
1445
1446void Monitor::sync_reset_timeout()
1447{
1448 dout(10) << __func__ << dendl;
1449 if (sync_timeout_event)
1450 timer.cancel_event(sync_timeout_event);
3efd9988 1451 sync_timeout_event = timer.add_event_after(
11fdf7f2 1452 g_conf()->mon_sync_timeout,
3efd9988
FG
1453 new C_MonContext(this, [this](int) {
1454 sync_timeout();
1455 }));
7c673cae
FG
1456}
1457
1458void Monitor::sync_finish(version_t last_committed)
1459{
1460 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1461
11fdf7f2 1462 ceph_assert(g_conf()->mon_sync_requester_kill_at != 7);
7c673cae
FG
1463
1464 if (sync_full) {
1465 // finalize the paxos commits
1466 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1467 paxos->read_and_prepare_transactions(tx, sync_start_version,
1468 last_committed);
1469 tx->put(paxos->get_name(), "last_committed", last_committed);
1470
1471 dout(30) << __func__ << " final tx dump:\n";
1472 JSONFormatter f(true);
1473 tx->dump(&f);
1474 f.flush(*_dout);
1475 *_dout << dendl;
1476
1477 store->apply_transaction(tx);
1478 }
1479
11fdf7f2 1480 ceph_assert(g_conf()->mon_sync_requester_kill_at != 8);
7c673cae
FG
1481
1482 auto t(std::make_shared<MonitorDBStore::Transaction>());
1483 t->erase("mon_sync", "in_sync");
1484 t->erase("mon_sync", "force_sync");
1485 t->erase("mon_sync", "last_committed_floor");
1486 store->apply_transaction(t);
1487
11fdf7f2 1488 ceph_assert(g_conf()->mon_sync_requester_kill_at != 9);
7c673cae
FG
1489
1490 init_paxos();
1491
11fdf7f2 1492 ceph_assert(g_conf()->mon_sync_requester_kill_at != 10);
7c673cae
FG
1493
1494 bootstrap();
1495}
1496
1497void Monitor::handle_sync(MonOpRequestRef op)
1498{
1499 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1500 dout(10) << __func__ << " " << *m << dendl;
1501 switch (m->op) {
1502
1503 // provider ---------
1504
1505 case MMonSync::OP_GET_COOKIE_FULL:
1506 case MMonSync::OP_GET_COOKIE_RECENT:
1507 handle_sync_get_cookie(op);
1508 break;
1509 case MMonSync::OP_GET_CHUNK:
1510 handle_sync_get_chunk(op);
1511 break;
1512
1513 // client -----------
1514
1515 case MMonSync::OP_COOKIE:
1516 handle_sync_cookie(op);
1517 break;
1518
1519 case MMonSync::OP_CHUNK:
1520 case MMonSync::OP_LAST_CHUNK:
1521 handle_sync_chunk(op);
1522 break;
1523 case MMonSync::OP_NO_COOKIE:
1524 handle_sync_no_cookie(op);
1525 break;
1526
1527 default:
1528 dout(0) << __func__ << " unknown op " << m->op << dendl;
11fdf7f2 1529 ceph_abort_msg("unknown op");
7c673cae
FG
1530 }
1531}
1532
1533// leader
1534
1535void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1536{
1537 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1538 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1539 m->get_connection()->send_message(reply);
1540}
1541
1542void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1543{
1544 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1545 if (is_synchronizing()) {
1546 _sync_reply_no_cookie(op);
1547 return;
1548 }
1549
11fdf7f2 1550 ceph_assert(g_conf()->mon_sync_provider_kill_at != 1);
7c673cae
FG
1551
1552 // make sure they can understand us.
1553 if ((required_features ^ m->get_connection()->get_features()) &
1554 required_features) {
1555 dout(5) << " ignoring peer mon." << m->get_source().num()
1556 << " has features " << std::hex
1557 << m->get_connection()->get_features()
1558 << " but we require " << required_features << std::dec << dendl;
1559 return;
1560 }
1561
1562 // make up a unique cookie. include election epoch (which persists
1563 // across restarts for the whole cluster) and a counter for this
1564 // process instance. there is no need to be unique *across*
1565 // monitors, though.
1566 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
11fdf7f2 1567 ceph_assert(sync_providers.count(cookie) == 0);
7c673cae
FG
1568
1569 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1570
1571 SyncProvider& sp = sync_providers[cookie];
1572 sp.cookie = cookie;
11fdf7f2
TL
1573 sp.addrs = m->get_source_addrs();
1574 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
7c673cae
FG
1575
1576 set<string> sync_targets;
1577 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1578 // full scan
1579 sync_targets = get_sync_targets_names();
1580 sp.last_committed = paxos->get_version();
1581 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1582 sp.full = true;
1583 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1584 } else {
1585 // just catch up paxos
1586 sp.last_committed = m->last_committed;
1587 }
1588 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1589
1590 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1591 reply->last_committed = sp.last_committed;
1592 m->get_connection()->send_message(reply);
1593}
1594
1595void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1596{
1597 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1598 dout(10) << __func__ << " " << *m << dendl;
1599
1600 if (sync_providers.count(m->cookie) == 0) {
1601 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1602 _sync_reply_no_cookie(op);
1603 return;
1604 }
1605
11fdf7f2 1606 ceph_assert(g_conf()->mon_sync_provider_kill_at != 2);
7c673cae
FG
1607
1608 SyncProvider& sp = sync_providers[m->cookie];
11fdf7f2 1609 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
7c673cae
FG
1610
1611 if (sp.last_committed < paxos->get_first_committed() &&
1612 paxos->get_first_committed() > 1) {
1613 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1614 << " < our fc " << paxos->get_first_committed() << dendl;
1615 sync_providers.erase(m->cookie);
1616 _sync_reply_no_cookie(op);
1617 return;
1618 }
1619
1620 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1621 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1622
11fdf7f2 1623 int left = g_conf()->mon_sync_max_payload_size;
7c673cae
FG
1624 while (sp.last_committed < paxos->get_version() && left > 0) {
1625 bufferlist bl;
1626 sp.last_committed++;
224ce89b
WB
1627
1628 int err = store->get(paxos->get_name(), sp.last_committed, bl);
11fdf7f2 1629 ceph_assert(err == 0);
224ce89b 1630
7c673cae
FG
1631 tx->put(paxos->get_name(), sp.last_committed, bl);
1632 left -= bl.length();
1633 dout(20) << __func__ << " including paxos state " << sp.last_committed
1634 << dendl;
1635 }
1636 reply->last_committed = sp.last_committed;
1637
1638 if (sp.full && left > 0) {
1639 sp.synchronizer->get_chunk_tx(tx, left);
1640 sp.last_key = sp.synchronizer->get_last_key();
1641 reply->last_key = sp.last_key;
1642 }
1643
1644 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1645 sp.last_committed < paxos->get_version()) {
1646 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1647 << " key " << sp.last_key << dendl;
1648 } else {
1649 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1650 << " key " << sp.last_key << dendl;
1651 reply->op = MMonSync::OP_LAST_CHUNK;
1652
11fdf7f2 1653 ceph_assert(g_conf()->mon_sync_provider_kill_at != 3);
7c673cae
FG
1654
1655 // clean up our local state
1656 sync_providers.erase(sp.cookie);
1657 }
1658
11fdf7f2 1659 encode(*tx, reply->chunk_bl);
7c673cae
FG
1660
1661 m->get_connection()->send_message(reply);
1662}
1663
1664// requester
1665
1666void Monitor::handle_sync_cookie(MonOpRequestRef op)
1667{
1668 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1669 dout(10) << __func__ << " " << *m << dendl;
1670 if (sync_cookie) {
1671 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1672 return;
1673 }
11fdf7f2 1674 if (m->get_source_addrs() != sync_provider) {
7c673cae
FG
1675 dout(10) << __func__ << " source does not match, discarding" << dendl;
1676 return;
1677 }
1678 sync_cookie = m->cookie;
1679 sync_start_version = m->last_committed;
1680
1681 sync_reset_timeout();
1682 sync_get_next_chunk();
1683
11fdf7f2 1684 ceph_assert(g_conf()->mon_sync_requester_kill_at != 3);
7c673cae
FG
1685}
1686
1687void Monitor::sync_get_next_chunk()
1688{
1689 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
11fdf7f2
TL
1690 if (g_conf()->mon_inject_sync_get_chunk_delay > 0) {
1691 dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl;
1692 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0));
7c673cae
FG
1693 }
1694 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
11fdf7f2 1695 messenger->send_to_mon(r, sync_provider);
7c673cae 1696
11fdf7f2 1697 ceph_assert(g_conf()->mon_sync_requester_kill_at != 4);
7c673cae
FG
1698}
1699
1700void Monitor::handle_sync_chunk(MonOpRequestRef op)
1701{
1702 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1703 dout(10) << __func__ << " " << *m << dendl;
1704
1705 if (m->cookie != sync_cookie) {
1706 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1707 return;
1708 }
11fdf7f2 1709 if (m->get_source_addrs() != sync_provider) {
7c673cae
FG
1710 dout(10) << __func__ << " source does not match, discarding" << dendl;
1711 return;
1712 }
1713
11fdf7f2
TL
1714 ceph_assert(state == STATE_SYNCHRONIZING);
1715 ceph_assert(g_conf()->mon_sync_requester_kill_at != 5);
7c673cae
FG
1716
1717 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1718 tx->append_from_encoded(m->chunk_bl);
1719
1720 dout(30) << __func__ << " tx dump:\n";
1721 JSONFormatter f(true);
1722 tx->dump(&f);
1723 f.flush(*_dout);
1724 *_dout << dendl;
1725
1726 store->apply_transaction(tx);
1727
11fdf7f2 1728 ceph_assert(g_conf()->mon_sync_requester_kill_at != 6);
7c673cae
FG
1729
1730 if (!sync_full) {
1731 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1732 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1733 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1734 m->last_committed);
1735 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1736
1737 dout(30) << __func__ << " tx dump:\n";
1738 JSONFormatter f(true);
1739 tx->dump(&f);
1740 f.flush(*_dout);
1741 *_dout << dendl;
1742
1743 store->apply_transaction(tx);
1744 paxos->init(); // to refresh what we just wrote
1745 }
1746
1747 if (m->op == MMonSync::OP_CHUNK) {
1748 sync_reset_timeout();
1749 sync_get_next_chunk();
1750 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1751 sync_finish(m->last_committed);
1752 }
1753}
1754
1755void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1756{
1757 dout(10) << __func__ << dendl;
1758 bootstrap();
1759}
1760
1761void Monitor::sync_trim_providers()
1762{
1763 dout(20) << __func__ << dendl;
1764
1765 utime_t now = ceph_clock_now();
1766 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1767 while (p != sync_providers.end()) {
1768 if (now > p->second.timeout) {
11fdf7f2
TL
1769 dout(10) << __func__ << " expiring cookie " << p->second.cookie
1770 << " for " << p->second.addrs << dendl;
7c673cae
FG
1771 sync_providers.erase(p++);
1772 } else {
1773 ++p;
1774 }
1775 }
1776}
1777
1778// ---------------------------------------------------
1779// probe
1780
1781void Monitor::cancel_probe_timeout()
1782{
1783 if (probe_timeout_event) {
1784 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1785 timer.cancel_event(probe_timeout_event);
1786 probe_timeout_event = NULL;
1787 } else {
1788 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1789 }
1790}
1791
1792void Monitor::reset_probe_timeout()
1793{
1794 cancel_probe_timeout();
1795 probe_timeout_event = new C_MonContext(this, [this](int r) {
1796 probe_timeout(r);
1797 });
11fdf7f2 1798 double t = g_conf()->mon_probe_timeout;
3efd9988
FG
1799 if (timer.add_event_after(t, probe_timeout_event)) {
1800 dout(10) << "reset_probe_timeout " << probe_timeout_event
1801 << " after " << t << " seconds" << dendl;
1802 } else {
1803 probe_timeout_event = nullptr;
1804 }
7c673cae
FG
1805}
1806
1807void Monitor::probe_timeout(int r)
1808{
1809 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
11fdf7f2
TL
1810 ceph_assert(is_probing() || is_synchronizing());
1811 ceph_assert(probe_timeout_event);
7c673cae
FG
1812 probe_timeout_event = NULL;
1813 bootstrap();
1814}
1815
1816void Monitor::handle_probe(MonOpRequestRef op)
1817{
1818 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1819 dout(10) << "handle_probe " << *m << dendl;
1820
1821 if (m->fsid != monmap->fsid) {
1822 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1823 return;
1824 }
1825
1826 switch (m->op) {
1827 case MMonProbe::OP_PROBE:
1828 handle_probe_probe(op);
1829 break;
1830
1831 case MMonProbe::OP_REPLY:
1832 handle_probe_reply(op);
1833 break;
1834
1835 case MMonProbe::OP_MISSING_FEATURES:
81eedcae
TL
1836 derr << __func__ << " require release " << (int)m->mon_release << " > "
1837 << (int)ceph_release()
1838 << ", or missing features (have " << CEPH_FEATURES_ALL
7c673cae 1839 << ", required " << m->required_features
11fdf7f2 1840 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")"
7c673cae
FG
1841 << dendl;
1842 break;
1843 }
1844}
1845
1846void Monitor::handle_probe_probe(MonOpRequestRef op)
1847{
1848 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1849
1850 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1851 << " features " << m->get_connection()->get_features() << dendl;
1852 uint64_t missing = required_features & ~m->get_connection()->get_features();
81eedcae
TL
1853 if ((m->mon_release > 0 && m->mon_release < monmap->min_mon_release) ||
1854 missing) {
1855 dout(1) << " peer " << m->get_source_addr()
1856 << " release " << (int)m->mon_release
1857 << " < min_mon_release " << (int)monmap->min_mon_release
11fdf7f2
TL
1858 << ", or missing features " << missing << dendl;
1859 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1860 name, has_ever_joined, monmap->min_mon_release);
1861 m->required_features = required_features;
1862 m->get_connection()->send_message(r);
7c673cae
FG
1863 goto out;
1864 }
1865
1866 if (!is_probing() && !is_synchronizing()) {
1867 // If the probing mon is way ahead of us, we need to re-bootstrap.
1868 // Normally we capture this case when we initially bootstrap, but
1869 // it is possible we pass those checks (we overlap with
1870 // quorum-to-be) but fail to join a quorum before it moves past
1871 // us. We need to be kicked back to bootstrap so we can
1872 // synchonize, not keep calling elections.
1873 if (paxos->get_version() + 1 < m->paxos_first_version) {
1874 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1875 << "ahead of us, re-bootstrapping" << dendl;
1876 bootstrap();
1877 goto out;
1878
1879 }
1880 }
1881
1882 MMonProbe *r;
11fdf7f2
TL
1883 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined,
1884 ceph_release());
7c673cae
FG
1885 r->name = name;
1886 r->quorum = quorum;
1887 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1888 r->paxos_first_version = paxos->get_first_committed();
1889 r->paxos_last_version = paxos->get_version();
1890 m->get_connection()->send_message(r);
1891
1892 // did we discover a peer here?
1893 if (!monmap->contains(m->get_source_addr())) {
11fdf7f2 1894 dout(1) << " adding peer " << m->get_source_addrs()
7c673cae 1895 << " to list of hints" << dendl;
11fdf7f2 1896 extra_probe_peers.insert(m->get_source_addrs());
7c673cae
FG
1897 }
1898
1899 out:
1900 return;
1901}
1902
1903void Monitor::handle_probe_reply(MonOpRequestRef op)
1904{
1905 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
11fdf7f2
TL
1906 dout(10) << "handle_probe_reply " << m->get_source_inst()
1907 << " " << *m << dendl;
7c673cae
FG
1908 dout(10) << " monmap is " << *monmap << dendl;
1909
1910 // discover name and addrs during probing or electing states.
1911 if (!is_probing() && !is_electing()) {
1912 return;
1913 }
1914
1915 // newer map, or they've joined a quorum and we haven't?
1916 bufferlist mybl;
1917 monmap->encode(mybl, m->get_connection()->get_features());
1918 // make sure it's actually different; the checks below err toward
1919 // taking the other guy's map, which could cause us to loop.
1920 if (!mybl.contents_equal(m->monmap_bl)) {
1921 MonMap *newmap = new MonMap;
1922 newmap->decode(m->monmap_bl);
1923 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1924 !has_ever_joined)) {
1925 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1926 << ", mine was " << monmap->get_epoch() << dendl;
1927 delete newmap;
1928 monmap->decode(m->monmap_bl);
1929
1930 bootstrap();
1931 return;
1932 }
1933 delete newmap;
1934 }
1935
1936 // rename peer?
1937 string peer_name = monmap->get_name(m->get_source_addr());
1938 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1939 dout(10) << " renaming peer " << m->get_source_addr() << " "
1940 << peer_name << " -> " << m->name << " in my monmap"
1941 << dendl;
1942 monmap->rename(peer_name, m->name);
1943
1944 if (is_electing()) {
1945 bootstrap();
1946 return;
1947 }
11fdf7f2 1948 } else if (peer_name.size()) {
7c673cae 1949 dout(10) << " peer name is " << peer_name << dendl;
11fdf7f2
TL
1950 } else {
1951 dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl;
7c673cae
FG
1952 }
1953
1954 // new initial peer?
1955 if (monmap->get_epoch() == 0 &&
1956 monmap->contains(m->name) &&
11fdf7f2
TL
1957 monmap->get_addrs(m->name).front().is_blank_ip()) {
1958 dout(1) << " learned initial mon " << m->name
1959 << " addrs " << m->get_source_addrs() << dendl;
1960 monmap->set_addrvec(m->name, m->get_source_addrs());
7c673cae
FG
1961
1962 bootstrap();
1963 return;
1964 }
1965
1966 // end discover phase
1967 if (!is_probing()) {
1968 return;
1969 }
1970
11fdf7f2 1971 ceph_assert(paxos != NULL);
7c673cae
FG
1972
1973 if (is_synchronizing()) {
1974 dout(10) << " currently syncing" << dendl;
1975 return;
1976 }
1977
11fdf7f2 1978 entity_addrvec_t other = m->get_source_addrs();
7c673cae
FG
1979
1980 if (m->paxos_last_version < sync_last_committed_floor) {
1981 dout(10) << " peer paxos versions [" << m->paxos_first_version
1982 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1983 << sync_last_committed_floor << ", ignoring"
1984 << dendl;
1985 } else {
1986 if (paxos->get_version() < m->paxos_first_version &&
1987 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1988 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1989 << "," << m->paxos_last_version << "]"
1990 << " vs my version " << paxos->get_version()
1991 << " (too far ahead)"
1992 << dendl;
1993 cancel_probe_timeout();
1994 sync_start(other, true);
1995 return;
1996 }
11fdf7f2 1997 if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
7c673cae
FG
1998 dout(10) << " peer paxos last version " << m->paxos_last_version
1999 << " vs my version " << paxos->get_version()
2000 << " (too far ahead)"
2001 << dendl;
2002 cancel_probe_timeout();
2003 sync_start(other, false);
2004 return;
2005 }
2006 }
2007
11fdf7f2
TL
2008 // did the existing cluster complete upgrade to luminous?
2009 if (osdmon()->osdmap.get_epoch()) {
2010 if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
2011 derr << __func__ << " existing cluster has not completed upgrade to"
2012 << " luminous; 'ceph osd require_osd_release luminous' before"
2013 << " upgrading" << dendl;
2014 exit(0);
2015 }
2016 if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) ||
2017 !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
2018 derr << __func__ << " existing cluster has not completed a full luminous"
2019 << " scrub to purge legacy snapdir objects; please scrub before"
2020 << " upgrading beyond luminous." << dendl;
2021 exit(0);
2022 }
2023 }
2024
7c673cae
FG
2025 // is there an existing quorum?
2026 if (m->quorum.size()) {
2027 dout(10) << " existing quorum " << m->quorum << dendl;
2028
2029 dout(10) << " peer paxos version " << m->paxos_last_version
2030 << " vs my version " << paxos->get_version()
2031 << " (ok)"
2032 << dendl;
2033
2034 if (monmap->contains(name) &&
11fdf7f2 2035 !monmap->get_addrs(name).front().is_blank_ip()) {
7c673cae
FG
2036 // i'm part of the cluster; just initiate a new election
2037 start_election();
2038 } else {
2039 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
11fdf7f2
TL
2040 send_mon_message(
2041 new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
2042 *m->quorum.begin());
7c673cae
FG
2043 }
2044 } else {
2045 if (monmap->contains(m->name)) {
2046 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
2047 outside_quorum.insert(m->name);
2048 } else {
2049 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
2050 return;
2051 }
2052
11fdf7f2 2053 unsigned need = monmap->min_quorum_size();
7c673cae
FG
2054 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
2055 if (outside_quorum.size() >= need) {
2056 if (outside_quorum.count(name)) {
2057 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
2058 start_election();
2059 } else {
2060 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
2061 }
2062 } else {
2063 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
2064 }
2065 }
2066}
2067
2068void Monitor::join_election()
2069{
2070 dout(10) << __func__ << dendl;
2071 wait_for_paxos_write();
2072 _reset();
2073 state = STATE_ELECTING;
2074
2075 logger->inc(l_mon_num_elections);
2076}
2077
2078void Monitor::start_election()
2079{
2080 dout(10) << "start_election" << dendl;
2081 wait_for_paxos_write();
2082 _reset();
2083 state = STATE_ELECTING;
2084
2085 logger->inc(l_mon_num_elections);
2086 logger->inc(l_mon_election_call);
2087
b32b8144 2088 clog->info() << "mon." << name << " calling monitor election";
7c673cae
FG
2089 elector.call_election();
2090}
2091
2092void Monitor::win_standalone_election()
2093{
2094 dout(1) << "win_standalone_election" << dendl;
2095
2096 // bump election epoch, in case the previous epoch included other
2097 // monitors; we need to be able to make the distinction.
2098 elector.init();
2099 elector.advance_epoch();
2100
2101 rank = monmap->get_rank(name);
11fdf7f2 2102 ceph_assert(rank == 0);
7c673cae
FG
2103 set<int> q;
2104 q.insert(rank);
2105
224ce89b
WB
2106 map<int,Metadata> metadata;
2107 collect_metadata(&metadata[0]);
2108
7c673cae
FG
2109 win_election(elector.get_epoch(), q,
2110 CEPH_FEATURES_ALL,
2111 ceph::features::mon::get_supported(),
11fdf7f2 2112 ceph_release(),
d2e6a577 2113 metadata);
7c673cae
FG
2114}
2115
2116const utime_t& Monitor::get_leader_since() const
2117{
11fdf7f2 2118 ceph_assert(state == STATE_LEADER);
7c673cae
FG
2119 return leader_since;
2120}
2121
2122epoch_t Monitor::get_epoch()
2123{
2124 return elector.get_epoch();
2125}
2126
2127void Monitor::_finish_svc_election()
2128{
11fdf7f2 2129 ceph_assert(state == STATE_LEADER || state == STATE_PEON);
7c673cae 2130
11fdf7f2 2131 for (auto& svc : paxos_service) {
7c673cae 2132 // we already called election_finished() on monmon(); avoid callig twice
11fdf7f2 2133 if (state == STATE_LEADER && svc.get() == monmon())
7c673cae 2134 continue;
11fdf7f2 2135 svc->election_finished();
7c673cae
FG
2136 }
2137}
2138
2139void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
2140 const mon_feature_t& mon_features,
11fdf7f2 2141 int min_mon_release,
d2e6a577 2142 const map<int,Metadata>& metadata)
7c673cae
FG
2143{
2144 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
2145 << " features " << features
2146 << " mon_features " << mon_features
11fdf7f2 2147 << " min_mon_release " << min_mon_release
7c673cae 2148 << dendl;
11fdf7f2 2149 ceph_assert(is_electing());
7c673cae
FG
2150 state = STATE_LEADER;
2151 leader_since = ceph_clock_now();
11fdf7f2 2152 quorum_since = mono_clock::now();
7c673cae
FG
2153 leader = rank;
2154 quorum = active;
2155 quorum_con_features = features;
2156 quorum_mon_features = mon_features;
11fdf7f2 2157 quorum_min_mon_release = min_mon_release;
224ce89b 2158 pending_metadata = metadata;
7c673cae
FG
2159 outside_quorum.clear();
2160
b32b8144
FG
2161 clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
2162 << " in quorum (ranks " << quorum << ")";
7c673cae 2163
d2e6a577 2164 set_leader_commands(get_local_commands(mon_features));
7c673cae
FG
2165
2166 paxos->leader_init();
2167 // NOTE: tell monmap monitor first. This is important for the
2168 // bootstrap case to ensure that the very first paxos proposal
2169 // codifies the monmap. Otherwise any manner of chaos can ensue
2170 // when monitors are call elections or participating in a paxos
2171 // round without agreeing on who the participants are.
2172 monmon()->election_finished();
2173 _finish_svc_election();
7c673cae
FG
2174
2175 logger->inc(l_mon_election_win);
2176
224ce89b
WB
2177 // inject new metadata in first transaction.
2178 {
2179 // include previous metadata for missing mons (that aren't part of
2180 // the current quorum).
2181 map<int,Metadata> m = metadata;
2182 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
2183 if (m.count(rank) == 0 &&
2184 mon_metadata.count(rank)) {
2185 m[rank] = mon_metadata[rank];
2186 }
2187 }
2188
2189 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2190 // a new transaction immediately after the election finishes. We should
2191 // do that anyway for other reasons, though.
2192 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
2193 bufferlist bl;
11fdf7f2 2194 encode(m, bl);
224ce89b
WB
2195 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
2196 }
2197
7c673cae
FG
2198 finish_election();
2199 if (monmap->size() > 1 &&
2200 monmap->get_epoch() > 0) {
2201 timecheck_start();
2202 health_tick_start();
b32b8144
FG
2203
2204 // Freshen the health status before doing health_to_clog in case
2205 // our just-completed election changed the health
2206 healthmon()->wait_for_active_ctx(new FunctionContext([this](int r){
2207 dout(20) << "healthmon now active" << dendl;
2208 healthmon()->tick();
2209 if (healthmon()->is_proposing()) {
2210 dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
2211 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
2212 [this](int r){
11fdf7f2 2213 ceph_assert(lock.is_locked_by_me());
b32b8144
FG
2214 do_health_to_clog_interval();
2215 }));
2216
2217 } else {
2218 do_health_to_clog_interval();
2219 }
2220 }));
2221
7c673cae
FG
2222 scrub_event_start();
2223 }
7c673cae
FG
2224}
2225
2226void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
2227 uint64_t features,
11fdf7f2
TL
2228 const mon_feature_t& mon_features,
2229 int min_mon_release)
7c673cae
FG
2230{
2231 state = STATE_PEON;
2232 leader_since = utime_t();
11fdf7f2 2233 quorum_since = mono_clock::now();
7c673cae
FG
2234 leader = l;
2235 quorum = q;
2236 outside_quorum.clear();
2237 quorum_con_features = features;
2238 quorum_mon_features = mon_features;
11fdf7f2 2239 quorum_min_mon_release = min_mon_release;
7c673cae
FG
2240 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
2241 << " quorum is " << quorum << " features are " << quorum_con_features
2242 << " mon_features are " << quorum_mon_features
11fdf7f2 2243 << " min_mon_release " << min_mon_release
7c673cae
FG
2244 << dendl;
2245
2246 paxos->peon_init();
2247 _finish_svc_election();
7c673cae
FG
2248
2249 logger->inc(l_mon_election_lose);
2250
2251 finish_election();
11fdf7f2 2252}
7c673cae 2253
11fdf7f2
TL
2254namespace {
2255std::string collect_compression_algorithms()
2256{
2257 ostringstream os;
2258 bool printed = false;
2259 for (auto [name, key] : Compressor::compression_algorithms) {
2260 if (printed) {
2261 os << ", ";
2262 } else {
2263 printed = true;
2264 }
2265 std::ignore = key;
2266 os << name;
7c673cae 2267 }
11fdf7f2
TL
2268 return os.str();
2269}
7c673cae
FG
2270}
2271
224ce89b
WB
2272void Monitor::collect_metadata(Metadata *m)
2273{
2274 collect_sys_info(m, g_ceph_context);
11fdf7f2
TL
2275 (*m)["addrs"] = stringify(messenger->get_myaddrs());
2276 (*m)["compression_algorithms"] = collect_compression_algorithms();
2277
2278 // infer storage device
2279 string devname = store->get_devname();
2280 set<string> devnames;
2281 get_raw_devices(devname, &devnames);
2282 (*m)["devices"] = stringify(devnames);
2283 string devids;
2284 for (auto& devname : devnames) {
2285 string err;
2286 string id = get_device_id(devname, &err);
2287 if (id.size()) {
2288 if (!devids.empty()) {
2289 devids += ",";
2290 }
2291 devids += devname + "=" + id;
2292 } else {
2293 derr << "failed to get devid for " << devname << ": " << err << dendl;
2294 }
2295 }
2296 (*m)["device_ids"] = devids;
224ce89b
WB
2297}
2298
7c673cae
FG
2299void Monitor::finish_election()
2300{
2301 apply_quorum_to_compatset_features();
2302 apply_monmap_to_compatset_features();
2303 timecheck_finish();
2304 exited_quorum = utime_t();
2305 finish_contexts(g_ceph_context, waitfor_quorum);
2306 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2307 resend_routed_requests();
2308 update_logger();
2309 register_cluster_logger();
2310
11fdf7f2
TL
2311 // enable authentication
2312 {
2313 std::lock_guard l(auth_lock);
2314 authmon()->_set_mon_num_rank(monmap->size(), rank);
2315 }
2316
7c673cae 2317 // am i named properly?
11fdf7f2 2318 string cur_name = monmap->get_name(messenger->get_myaddrs());
7c673cae
FG
2319 if (cur_name != name) {
2320 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
11fdf7f2
TL
2321 send_mon_message(
2322 new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
2323 *quorum.begin());
7c673cae
FG
2324 }
2325}
2326
2327void Monitor::_apply_compatset_features(CompatSet &new_features)
2328{
2329 if (new_features.compare(features) != 0) {
2330 CompatSet diff = features.unsupported(new_features);
2331 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2332 features = new_features;
2333
2334 auto t = std::make_shared<MonitorDBStore::Transaction>();
2335 write_features(t);
2336 store->apply_transaction(t);
2337
2338 calc_quorum_requirements();
2339 }
2340}
2341
2342void Monitor::apply_quorum_to_compatset_features()
2343{
2344 CompatSet new_features(features);
11fdf7f2 2345 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
7c673cae
FG
2346 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2347 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2348 }
11fdf7f2
TL
2349 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2350 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
7c673cae
FG
2351 dout(5) << __func__ << dendl;
2352 _apply_compatset_features(new_features);
2353}
2354
2355void Monitor::apply_monmap_to_compatset_features()
2356{
2357 CompatSet new_features(features);
2358 mon_feature_t monmap_features = monmap->get_required_features();
2359
2360 /* persistent monmap features may go into the compatset.
2361 * optional monmap features may not - why?
2362 * because optional monmap features may be set/unset by the admin,
2363 * and possibly by other means that haven't yet been thought out,
2364 * so we can't make the monitor enforce them on start - because they
2365 * may go away.
2366 * this, of course, does not invalidate setting a compatset feature
2367 * for an optional feature - as long as you make sure to clean it up
2368 * once you unset it.
2369 */
2370 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
11fdf7f2 2371 ceph_assert(ceph::features::mon::get_persistent().contains_all(
7c673cae
FG
2372 ceph::features::mon::FEATURE_KRAKEN));
2373 // this feature should only ever be set if the quorum supports it.
11fdf7f2 2374 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
7c673cae
FG
2375 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2376 }
181888fb 2377 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
11fdf7f2 2378 ceph_assert(ceph::features::mon::get_persistent().contains_all(
181888fb
FG
2379 ceph::features::mon::FEATURE_LUMINOUS));
2380 // this feature should only ever be set if the quorum supports it.
11fdf7f2 2381 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
181888fb
FG
2382 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2383 }
11fdf7f2
TL
2384 if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) {
2385 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2386 ceph::features::mon::FEATURE_MIMIC));
2387 // this feature should only ever be set if the quorum supports it.
2388 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC));
2389 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
2390 }
2391 if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) {
2392 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2393 ceph::features::mon::FEATURE_NAUTILUS));
2394 // this feature should only ever be set if the quorum supports it.
2395 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS));
2396 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
2397 }
7c673cae
FG
2398
2399 dout(5) << __func__ << dendl;
2400 _apply_compatset_features(new_features);
2401}
2402
2403void Monitor::calc_quorum_requirements()
2404{
2405 required_features = 0;
2406
2407 // compatset
7c673cae
FG
2408 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2409 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2410 }
7c673cae
FG
2411 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2412 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2413 }
181888fb
FG
2414 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2415 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2416 }
11fdf7f2
TL
2417 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) {
2418 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2419 }
2420 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) {
2421 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2422 CEPH_FEATUREMASK_CEPHX_V2;
2423 }
7c673cae
FG
2424
2425 // monmap
2426 if (monmap->get_required_features().contains_all(
2427 ceph::features::mon::FEATURE_KRAKEN)) {
2428 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2429 }
2430 if (monmap->get_required_features().contains_all(
2431 ceph::features::mon::FEATURE_LUMINOUS)) {
2432 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2433 }
11fdf7f2
TL
2434 if (monmap->get_required_features().contains_all(
2435 ceph::features::mon::FEATURE_MIMIC)) {
2436 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2437 }
2438 if (monmap->get_required_features().contains_all(
2439 ceph::features::mon::FEATURE_NAUTILUS)) {
2440 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2441 CEPH_FEATUREMASK_CEPHX_V2;
2442 }
7c673cae
FG
2443 dout(10) << __func__ << " required_features " << required_features << dendl;
2444}
2445
31f18b77
FG
2446void Monitor::get_combined_feature_map(FeatureMap *fm)
2447{
2448 *fm += session_map.feature_map;
2449 for (auto id : quorum) {
2450 if (id != rank) {
2451 *fm += quorum_feature_map[id];
2452 }
2453 }
2454}
2455
7c673cae
FG
2456void Monitor::sync_force(Formatter *f, ostream& ss)
2457{
2458 bool free_formatter = false;
2459
2460 if (!f) {
2461 // louzy/lazy hack: default to json if no formatter has been defined
2462 f = new JSONFormatter();
2463 free_formatter = true;
2464 }
2465
2466 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2467 sync_stash_critical_state(tx);
2468 tx->put("mon_sync", "force_sync", 1);
2469 store->apply_transaction(tx);
2470
2471 f->open_object_section("sync_force");
2472 f->dump_int("ret", 0);
2473 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2474 f->close_section(); // sync_force
2475 f->flush(ss);
2476 if (free_formatter)
2477 delete f;
2478}
2479
2480void Monitor::_quorum_status(Formatter *f, ostream& ss)
2481{
2482 bool free_formatter = false;
2483
2484 if (!f) {
2485 // louzy/lazy hack: default to json if no formatter has been defined
2486 f = new JSONFormatter();
2487 free_formatter = true;
2488 }
2489 f->open_object_section("quorum_status");
2490 f->dump_int("election_epoch", get_epoch());
2491
2492 f->open_array_section("quorum");
2493 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2494 f->dump_int("mon", *p);
2495 f->close_section(); // quorum
2496
2497 list<string> quorum_names = get_quorum_names();
2498 f->open_array_section("quorum_names");
2499 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2500 f->dump_string("mon", *p);
2501 f->close_section(); // quorum_names
2502
2503 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2504
11fdf7f2
TL
2505 if (!quorum.empty()) {
2506 f->dump_int(
2507 "quorum_age",
2508 std::chrono::duration_cast<std::chrono::seconds>(
2509 mono_clock::now() - quorum_since).count());
2510 }
2511
7c673cae
FG
2512 f->open_object_section("monmap");
2513 monmap->dump(f);
2514 f->close_section(); // monmap
2515
2516 f->close_section(); // quorum_status
2517 f->flush(ss);
2518 if (free_formatter)
2519 delete f;
2520}
2521
2522void Monitor::get_mon_status(Formatter *f, ostream& ss)
2523{
2524 bool free_formatter = false;
2525
2526 if (!f) {
2527 // louzy/lazy hack: default to json if no formatter has been defined
2528 f = new JSONFormatter();
2529 free_formatter = true;
2530 }
2531
2532 f->open_object_section("mon_status");
2533 f->dump_string("name", name);
2534 f->dump_int("rank", rank);
2535 f->dump_string("state", get_state_name());
2536 f->dump_int("election_epoch", get_epoch());
2537
2538 f->open_array_section("quorum");
2539 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2540 f->dump_int("mon", *p);
2541 }
7c673cae
FG
2542 f->close_section(); // quorum
2543
11fdf7f2
TL
2544 if (!quorum.empty()) {
2545 f->dump_int(
2546 "quorum_age",
2547 std::chrono::duration_cast<std::chrono::seconds>(
2548 mono_clock::now() - quorum_since).count());
2549 }
2550
7c673cae
FG
2551 f->open_object_section("features");
2552 f->dump_stream("required_con") << required_features;
2553 mon_feature_t req_mon_features = get_required_mon_features();
2554 req_mon_features.dump(f, "required_mon");
2555 f->dump_stream("quorum_con") << quorum_con_features;
2556 quorum_mon_features.dump(f, "quorum_mon");
2557 f->close_section(); // features
2558
2559 f->open_array_section("outside_quorum");
2560 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2561 f->dump_string("mon", *p);
2562 f->close_section(); // outside_quorum
2563
2564 f->open_array_section("extra_probe_peers");
11fdf7f2 2565 for (set<entity_addrvec_t>::iterator p = extra_probe_peers.begin();
7c673cae 2566 p != extra_probe_peers.end();
11fdf7f2
TL
2567 ++p) {
2568 f->dump_object("peer", *p);
2569 }
7c673cae
FG
2570 f->close_section(); // extra_probe_peers
2571
2572 f->open_array_section("sync_provider");
2573 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2574 p != sync_providers.end();
2575 ++p) {
2576 f->dump_unsigned("cookie", p->second.cookie);
11fdf7f2 2577 f->dump_object("addrs", p->second.addrs);
7c673cae
FG
2578 f->dump_stream("timeout") << p->second.timeout;
2579 f->dump_unsigned("last_committed", p->second.last_committed);
2580 f->dump_stream("last_key") << p->second.last_key;
2581 }
2582 f->close_section();
2583
2584 if (is_synchronizing()) {
2585 f->open_object_section("sync");
2586 f->dump_stream("sync_provider") << sync_provider;
2587 f->dump_unsigned("sync_cookie", sync_cookie);
2588 f->dump_unsigned("sync_start_version", sync_start_version);
2589 f->close_section();
2590 }
2591
11fdf7f2
TL
2592 if (g_conf()->mon_sync_provider_kill_at > 0)
2593 f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at);
2594 if (g_conf()->mon_sync_requester_kill_at > 0)
2595 f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at);
7c673cae
FG
2596
2597 f->open_object_section("monmap");
2598 monmap->dump(f);
2599 f->close_section();
2600
31f18b77 2601 f->dump_object("feature_map", session_map.feature_map);
7c673cae
FG
2602 f->close_section(); // mon_status
2603
2604 if (free_formatter) {
2605 // flush formatter to ss and delete it iff we created the formatter
2606 f->flush(ss);
2607 delete f;
2608 }
2609}
2610
2611
2612// health status to clog
2613
2614void Monitor::health_tick_start()
2615{
2616 if (!cct->_conf->mon_health_to_clog ||
2617 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2618 return;
2619
2620 dout(15) << __func__ << dendl;
2621
2622 health_tick_stop();
3efd9988
FG
2623 health_tick_event = timer.add_event_after(
2624 cct->_conf->mon_health_to_clog_tick_interval,
2625 new C_MonContext(this, [this](int r) {
2626 if (r < 0)
2627 return;
3efd9988
FG
2628 health_tick_start();
2629 }));
7c673cae
FG
2630}
2631
2632void Monitor::health_tick_stop()
2633{
2634 dout(15) << __func__ << dendl;
2635
2636 if (health_tick_event) {
2637 timer.cancel_event(health_tick_event);
2638 health_tick_event = NULL;
2639 }
2640}
2641
2642utime_t Monitor::health_interval_calc_next_update()
2643{
2644 utime_t now = ceph_clock_now();
2645
2646 time_t secs = now.sec();
2647 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2648 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2649 utime_t next = utime_t(secs + adjustment, 0);
2650
2651 dout(20) << __func__
2652 << " now: " << now << ","
2653 << " next: " << next << ","
2654 << " interval: " << cct->_conf->mon_health_to_clog_interval
2655 << dendl;
2656
2657 return next;
2658}
2659
2660void Monitor::health_interval_start()
2661{
2662 dout(15) << __func__ << dendl;
2663
2664 if (!cct->_conf->mon_health_to_clog ||
2665 cct->_conf->mon_health_to_clog_interval <= 0) {
2666 return;
2667 }
2668
2669 health_interval_stop();
2670 utime_t next = health_interval_calc_next_update();
2671 health_interval_event = new C_MonContext(this, [this](int r) {
2672 if (r < 0)
2673 return;
2674 do_health_to_clog_interval();
2675 });
3efd9988
FG
2676 if (!timer.add_event_at(next, health_interval_event)) {
2677 health_interval_event = nullptr;
2678 }
7c673cae
FG
2679}
2680
2681void Monitor::health_interval_stop()
2682{
2683 dout(15) << __func__ << dendl;
2684 if (health_interval_event) {
2685 timer.cancel_event(health_interval_event);
2686 }
2687 health_interval_event = NULL;
2688}
2689
2690void Monitor::health_events_cleanup()
2691{
2692 health_tick_stop();
2693 health_interval_stop();
2694 health_status_cache.reset();
2695}
2696
2697void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2698{
2699 dout(20) << __func__ << dendl;
2700
2701 if (changed.count("mon_health_to_clog")) {
2702 if (!cct->_conf->mon_health_to_clog) {
2703 health_events_cleanup();
11fdf7f2 2704 return;
7c673cae
FG
2705 } else {
2706 if (!health_tick_event) {
2707 health_tick_start();
2708 }
2709 if (!health_interval_event) {
2710 health_interval_start();
2711 }
2712 }
2713 }
2714
2715 if (changed.count("mon_health_to_clog_interval")) {
2716 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2717 health_interval_stop();
2718 } else {
2719 health_interval_start();
2720 }
2721 }
2722
2723 if (changed.count("mon_health_to_clog_tick_interval")) {
2724 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2725 health_tick_stop();
2726 } else {
2727 health_tick_start();
2728 }
2729 }
2730}
2731
2732void Monitor::do_health_to_clog_interval()
2733{
2734 // outputting to clog may have been disabled in the conf
2735 // since we were scheduled.
2736 if (!cct->_conf->mon_health_to_clog ||
2737 cct->_conf->mon_health_to_clog_interval <= 0)
2738 return;
2739
2740 dout(10) << __func__ << dendl;
2741
2742 // do we have a cached value for next_clog_update? if not,
2743 // do we know when the last update was?
2744
2745 do_health_to_clog(true);
2746 health_interval_start();
2747}
2748
2749void Monitor::do_health_to_clog(bool force)
2750{
2751 // outputting to clog may have been disabled in the conf
2752 // since we were scheduled.
2753 if (!cct->_conf->mon_health_to_clog ||
2754 cct->_conf->mon_health_to_clog_interval <= 0)
2755 return;
2756
2757 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2758
11fdf7f2
TL
2759 string summary;
2760 health_status_t level = get_health_status(false, nullptr, &summary);
2761 if (!force &&
2762 summary == health_status_cache.summary &&
2763 level == health_status_cache.overall)
2764 return;
2765 clog->health(level) << "overall " << summary;
2766 health_status_cache.summary = summary;
2767 health_status_cache.overall = level;
224ce89b
WB
2768}
2769
2770health_status_t Monitor::get_health_status(
2771 bool want_detail,
2772 Formatter *f,
2773 std::string *plain,
2774 const char *sep1,
2775 const char *sep2)
2776{
2777 health_status_t r = HEALTH_OK;
224ce89b
WB
2778 if (f) {
2779 f->open_object_section("health");
2780 f->open_object_section("checks");
2781 }
7c673cae 2782
224ce89b
WB
2783 string summary;
2784 string *psummary = f ? nullptr : &summary;
2785 for (auto& svc : paxos_service) {
2786 r = std::min(r, svc->get_health_checks().dump_summary(
2787 f, psummary, sep2, want_detail));
2788 }
7c673cae 2789
224ce89b
WB
2790 if (f) {
2791 f->close_section();
2792 f->dump_stream("status") << r;
11fdf7f2 2793 f->close_section();
224ce89b
WB
2794 } else {
2795 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2796 *plain = stringify(r);
2797 if (summary.size()) {
2798 *plain += sep1;
2799 *plain += summary;
2800 }
2801 *plain += "\n";
2802 }
2803
11fdf7f2 2804 if (want_detail && !f) {
224ce89b 2805 for (auto& svc : paxos_service) {
11fdf7f2 2806 svc->get_health_checks().dump_detail(plain);
224ce89b
WB
2807 }
2808 }
11fdf7f2 2809
224ce89b
WB
2810 return r;
2811}
2812
2813void Monitor::log_health(
2814 const health_check_map_t& updated,
2815 const health_check_map_t& previous,
2816 MonitorDBStore::TransactionRef t)
2817{
11fdf7f2 2818 if (!g_conf()->mon_health_to_clog) {
7c673cae
FG
2819 return;
2820 }
181888fb
FG
2821
2822 const utime_t now = ceph_clock_now();
2823
224ce89b
WB
2824 // FIXME: log atomically as part of @t instead of using clog.
2825 dout(10) << __func__ << " updated " << updated.checks.size()
2826 << " previous " << previous.checks.size()
2827 << dendl;
11fdf7f2 2828 const auto min_log_period = g_conf().get_val<int64_t>(
181888fb 2829 "mon_health_log_update_period");
224ce89b
WB
2830 for (auto& p : updated.checks) {
2831 auto q = previous.checks.find(p.first);
181888fb 2832 bool logged = false;
224ce89b
WB
2833 if (q == previous.checks.end()) {
2834 // new
2835 ostringstream ss;
2836 ss << "Health check failed: " << p.second.summary << " ("
2837 << p.first << ")";
181888fb
FG
2838 clog->health(p.second.severity) << ss.str();
2839
2840 logged = true;
224ce89b
WB
2841 } else {
2842 if (p.second.summary != q->second.summary ||
2843 p.second.severity != q->second.severity) {
181888fb
FG
2844
2845 auto status_iter = health_check_log_times.find(p.first);
2846 if (status_iter != health_check_log_times.end()) {
2847 if (p.second.severity == q->second.severity &&
2848 now - status_iter->second.updated_at < min_log_period) {
2849 // We already logged this recently and the severity is unchanged,
2850 // so skip emitting an update of the summary string.
2851 // We'll get an update out of tick() later if the check
2852 // is still failing.
2853 continue;
2854 }
2855 }
2856
2857 // summary or severity changed (ignore detail changes at this level)
2858 ostringstream ss;
224ce89b 2859 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
181888fb
FG
2860 clog->health(p.second.severity) << ss.str();
2861
2862 logged = true;
2863 }
2864 }
2865 // Record the time at which we last logged, so that we can check this
2866 // when considering whether/when to print update messages.
2867 if (logged) {
2868 auto iter = health_check_log_times.find(p.first);
2869 if (iter == health_check_log_times.end()) {
2870 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2871 p.second.severity, p.second.summary, now));
2872 } else {
2873 iter->second = HealthCheckLogStatus(
2874 p.second.severity, p.second.summary, now);
224ce89b
WB
2875 }
2876 }
2877 }
2878 for (auto& p : previous.checks) {
2879 if (!updated.checks.count(p.first)) {
2880 // cleared
2881 ostringstream ss;
2882 if (p.first == "DEGRADED_OBJECTS") {
2883 clog->info() << "All degraded objects recovered";
2884 } else if (p.first == "OSD_FLAGS") {
2885 clog->info() << "OSD flags cleared";
2886 } else {
2887 clog->info() << "Health check cleared: " << p.first << " (was: "
2888 << p.second.summary << ")";
2889 }
181888fb
FG
2890
2891 if (health_check_log_times.count(p.first)) {
2892 health_check_log_times.erase(p.first);
2893 }
224ce89b
WB
2894 }
2895 }
7c673cae 2896
224ce89b
WB
2897 if (previous.checks.size() && updated.checks.size() == 0) {
2898 // We might be going into a fully healthy state, check
2899 // other subsystems
2900 bool any_checks = false;
2901 for (auto& svc : paxos_service) {
2902 if (&(svc->get_health_checks()) == &(previous)) {
2903 // Ignore the ones we're clearing right now
2904 continue;
2905 }
7c673cae 2906
224ce89b
WB
2907 if (svc->get_health_checks().checks.size() > 0) {
2908 any_checks = true;
2909 break;
2910 }
2911 }
2912 if (!any_checks) {
2913 clog->info() << "Cluster is now healthy";
2914 }
2915 }
7c673cae
FG
2916}
2917
7c673cae
FG
2918void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2919{
2920 if (f)
2921 f->open_object_section("status");
2922
11fdf7f2 2923 mono_clock::time_point now = mono_clock::now();
7c673cae
FG
2924 if (f) {
2925 f->dump_stream("fsid") << monmap->get_fsid();
11fdf7f2 2926 get_health_status(false, f, nullptr);
7c673cae
FG
2927 f->dump_unsigned("election_epoch", get_epoch());
2928 {
2929 f->open_array_section("quorum");
2930 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2931 f->dump_int("rank", *p);
2932 f->close_section();
2933 f->open_array_section("quorum_names");
2934 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2935 f->dump_string("id", monmap->get_name(*p));
2936 f->close_section();
11fdf7f2
TL
2937 f->dump_int(
2938 "quorum_age",
2939 std::chrono::duration_cast<std::chrono::seconds>(
2940 mono_clock::now() - quorum_since).count());
7c673cae
FG
2941 }
2942 f->open_object_section("monmap");
2943 monmap->dump(f);
2944 f->close_section();
2945 f->open_object_section("osdmap");
224ce89b 2946 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
7c673cae
FG
2947 f->close_section();
2948 f->open_object_section("pgmap");
11fdf7f2 2949 mgrstatmon()->print_summary(f, NULL);
7c673cae
FG
2950 f->close_section();
2951 f->open_object_section("fsmap");
2952 mdsmon()->get_fsmap().print_summary(f, NULL);
2953 f->close_section();
7c673cae
FG
2954 f->open_object_section("mgrmap");
2955 mgrmon()->get_map().print_summary(f, nullptr);
2956 f->close_section();
224ce89b
WB
2957
2958 f->dump_object("servicemap", mgrstatmon()->get_service_map());
11fdf7f2
TL
2959
2960 f->open_object_section("progress_events");
2961 for (auto& i : mgrstatmon()->get_progress_events()) {
2962 f->dump_object(i.first.c_str(), i.second);
2963 }
2964 f->close_section();
2965
7c673cae
FG
2966 f->close_section();
2967 } else {
31f18b77
FG
2968 ss << " cluster:\n";
2969 ss << " id: " << monmap->get_fsid() << "\n";
224ce89b
WB
2970
2971 string health;
11fdf7f2
TL
2972 get_health_status(false, nullptr, &health,
2973 "\n ", "\n ");
224ce89b
WB
2974 ss << " health: " << health << "\n";
2975
31f18b77 2976 ss << "\n \n services:\n";
224ce89b
WB
2977 {
2978 size_t maxlen = 3;
2979 auto& service_map = mgrstatmon()->get_service_map();
2980 for (auto& p : service_map.services) {
2981 maxlen = std::max(maxlen, p.first.size());
2982 }
2983 string spacing(maxlen - 3, ' ');
2984 const auto quorum_names = get_quorum_names();
2985 const auto mon_count = monmap->mon_info.size();
2986 ss << " mon: " << spacing << mon_count << " daemons, quorum "
11fdf7f2 2987 << quorum_names << " (age " << timespan_str(now - quorum_since) << ")";
224ce89b
WB
2988 if (quorum_names.size() != mon_count) {
2989 std::list<std::string> out_of_q;
2990 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2991 if (quorum.count(i) == 0) {
2992 out_of_q.push_back(monmap->ranks[i]);
2993 }
2994 }
2995 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2996 out_of_q.end(), std::string(", "));
31f18b77 2997 }
7c673cae 2998 ss << "\n";
224ce89b
WB
2999 if (mgrmon()->in_use()) {
3000 ss << " mgr: " << spacing;
3001 mgrmon()->get_map().print_summary(nullptr, &ss);
3002 ss << "\n";
3003 }
3004 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
3005 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
3006 }
3007 ss << " osd: " << spacing;
3008 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
3009 ss << "\n";
3010 for (auto& p : service_map.services) {
3011 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
3012 << p.second.get_summary() << "\n";
3013 }
7c673cae 3014 }
31f18b77
FG
3015
3016 ss << "\n \n data:\n";
11fdf7f2
TL
3017 mgrstatmon()->print_summary(NULL, &ss);
3018
3019 auto& pem = mgrstatmon()->get_progress_events();
3020 if (!pem.empty()) {
3021 ss << "\n \n progress:\n";
3022 for (auto& i : pem) {
3023 ss << " " << i.second.message << "\n";
3024 ss << " [";
3025 unsigned j;
81eedcae 3026 for (j = 0; j < (unsigned)(i.second.progress * 30.0); ++j) {
11fdf7f2
TL
3027 ss << '=';
3028 }
3029 for (; j < 30; ++j) {
3030 ss << '.';
3031 }
3032 ss << "]\n";
3033 }
3034 }
31f18b77 3035 ss << "\n ";
7c673cae
FG
3036 }
3037}
3038
11fdf7f2 3039void Monitor::_generate_command_map(cmdmap_t& cmdmap,
7c673cae
FG
3040 map<string,string> &param_str_map)
3041{
11fdf7f2 3042 for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) {
7c673cae
FG
3043 if (p->first == "prefix")
3044 continue;
3045 if (p->first == "caps") {
3046 vector<string> cv;
3047 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
3048 cv.size() % 2 == 0) {
3049 for (unsigned i = 0; i < cv.size(); i += 2) {
3050 string k = string("caps_") + cv[i];
3051 param_str_map[k] = cv[i + 1];
3052 }
3053 continue;
3054 }
3055 }
3056 param_str_map[p->first] = cmd_vartype_stringify(p->second);
3057 }
3058}
3059
c07f9fc5
FG
3060const MonCommand *Monitor::_get_moncommand(
3061 const string &cmd_prefix,
d2e6a577
FG
3062 const vector<MonCommand>& cmds)
3063{
3064 for (auto& c : cmds) {
3065 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
3066 return &c;
7c673cae
FG
3067 }
3068 }
d2e6a577 3069 return nullptr;
7c673cae
FG
3070}
3071
11fdf7f2
TL
3072bool Monitor::_allowed_command(MonSession *s, const string &module,
3073 const string &prefix, const cmdmap_t& cmdmap,
7c673cae
FG
3074 const map<string,string>& param_str_map,
3075 const MonCommand *this_cmd) {
3076
3077 bool cmd_r = this_cmd->requires_perm('r');
3078 bool cmd_w = this_cmd->requires_perm('w');
3079 bool cmd_x = this_cmd->requires_perm('x');
3080
3081 bool capable = s->caps.is_capable(
3082 g_ceph_context,
3083 CEPH_ENTITY_TYPE_MON,
3084 s->entity_name,
3085 module, prefix, param_str_map,
11fdf7f2
TL
3086 cmd_r, cmd_w, cmd_x,
3087 s->get_peer_socket_addr());
7c673cae
FG
3088
3089 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
3090 return capable;
3091}
3092
c07f9fc5 3093void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
7c673cae 3094 Formatter *f,
11fdf7f2
TL
3095 uint64_t features,
3096 bufferlist *rdata)
7c673cae
FG
3097{
3098 int cmdnum = 0;
3099 f->open_object_section("command_descriptions");
c07f9fc5
FG
3100 for (const auto &cmd : commands) {
3101 unsigned flags = cmd.flags;
7c673cae
FG
3102 ostringstream secname;
3103 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
11fdf7f2 3104 dump_cmddesc_to_json(f, features, secname.str(),
c07f9fc5 3105 cmd.cmdstring, cmd.helpstring, cmd.module,
11fdf7f2 3106 cmd.req_perms, flags);
7c673cae
FG
3107 cmdnum++;
3108 }
3109 f->close_section(); // command_descriptions
3110
3111 f->flush(*rdata);
3112}
3113
7c673cae
FG
3114bool Monitor::is_keyring_required()
3115{
11fdf7f2
TL
3116 return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3117 auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3118 auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS) ||
3119 auth_service_required.is_supported_auth(CEPH_AUTH_GSS);
7c673cae
FG
3120}
3121
3122struct C_MgrProxyCommand : public Context {
3123 Monitor *mon;
3124 MonOpRequestRef op;
3125 uint64_t size;
3126 bufferlist outbl;
3127 string outs;
3128 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
3129 : mon(mon), op(op), size(s) { }
3130 void finish(int r) {
11fdf7f2 3131 std::lock_guard l(mon->lock);
7c673cae
FG
3132 mon->mgr_proxy_bytes -= size;
3133 mon->reply_command(op, r, outs, outbl, 0);
3134 }
3135};
3136
3137void Monitor::handle_command(MonOpRequestRef op)
3138{
11fdf7f2 3139 ceph_assert(op->is_type_command());
7c673cae
FG
3140 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3141 if (m->fsid != monmap->fsid) {
3142 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
3143 reply_command(op, -EPERM, "wrong fsid", 0);
3144 return;
3145 }
3146
11fdf7f2 3147 MonSession *session = op->get_session();
7c673cae
FG
3148 if (!session) {
3149 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3150 return;
3151 }
7c673cae
FG
3152
3153 if (m->cmd.empty()) {
3154 string rs = "No command supplied";
3155 reply_command(op, -EINVAL, rs, 0);
3156 return;
3157 }
3158
3159 string prefix;
3160 vector<string> fullcmd;
11fdf7f2 3161 cmdmap_t cmdmap;
7c673cae
FG
3162 stringstream ss, ds;
3163 bufferlist rdata;
3164 string rs;
3165 int r = -EINVAL;
3166 rs = "unrecognized command";
3167
3168 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3169 // ss has reason for failure
3170 r = -EINVAL;
3171 rs = ss.str();
3172 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3173 reply_command(op, r, rs, 0);
3174 return;
3175 }
3176
3177 // check return value. If no prefix parameter provided,
3178 // return value will be false, then return error info.
3179 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
3180 reply_command(op, -EINVAL, "command prefix not found", 0);
3181 return;
3182 }
3183
3184 // check prefix is empty
3185 if (prefix.empty()) {
3186 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3187 return;
3188 }
3189
3190 if (prefix == "get_command_descriptions") {
3191 bufferlist rdata;
3192 Formatter *f = Formatter::create("json");
d2e6a577 3193
11fdf7f2
TL
3194 std::vector<MonCommand> commands = static_cast<MgrMonitor*>(
3195 paxos_service[PAXOS_MGR].get())->get_command_descs();
c07f9fc5 3196
d2e6a577
FG
3197 for (auto& c : leader_mon_commands) {
3198 commands.push_back(c);
c07f9fc5
FG
3199 }
3200
11fdf7f2
TL
3201 auto features = m->get_connection()->get_features();
3202 format_command_descriptions(commands, f, features, &rdata);
7c673cae
FG
3203 delete f;
3204 reply_command(op, 0, "", rdata, 0);
3205 return;
3206 }
3207
3208 string module;
3209 string err;
3210
3211 dout(0) << "handle_command " << *m << dendl;
3212
3213 string format;
3214 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
3215 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3216
3217 get_str_vec(prefix, fullcmd);
3218
3219 // make sure fullcmd is not empty.
3220 // invalid prefix will cause empty vector fullcmd.
3221 // such as, prefix=";,,;"
3222 if (fullcmd.empty()) {
3223 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3224 return;
3225 }
3226
3227 module = fullcmd[0];
3228
3229 // validate command is in leader map
3230
3231 const MonCommand *leader_cmd;
c07f9fc5
FG
3232 const auto& mgr_cmds = mgrmon()->get_command_descs();
3233 const MonCommand *mgr_cmd = nullptr;
3234 if (!mgr_cmds.empty()) {
d2e6a577 3235 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
c07f9fc5 3236 }
d2e6a577 3237 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
7c673cae 3238 if (!leader_cmd) {
c07f9fc5
FG
3239 leader_cmd = mgr_cmd;
3240 if (!leader_cmd) {
3241 reply_command(op, -EINVAL, "command not known", 0);
3242 return;
3243 }
7c673cae
FG
3244 }
3245 // validate command is in our map & matches, or forward if it is allowed
d2e6a577
FG
3246 const MonCommand *mon_cmd = _get_moncommand(
3247 prefix,
3248 get_local_commands(quorum_mon_features));
c07f9fc5
FG
3249 if (!mon_cmd) {
3250 mon_cmd = mgr_cmd;
3251 }
7c673cae
FG
3252 if (!is_leader()) {
3253 if (!mon_cmd) {
3254 if (leader_cmd->is_noforward()) {
3255 reply_command(op, -EINVAL,
3256 "command not locally supported and not allowed to forward",
3257 0);
3258 return;
3259 }
3260 dout(10) << "Command not locally supported, forwarding request "
3261 << m << dendl;
3262 forward_request_leader(op);
3263 return;
3264 } else if (!mon_cmd->is_compat(leader_cmd)) {
3265 if (mon_cmd->is_noforward()) {
3266 reply_command(op, -EINVAL,
3267 "command not compatible with leader and not allowed to forward",
3268 0);
3269 return;
3270 }
3271 dout(10) << "Command not compatible with leader, forwarding request "
3272 << m << dendl;
3273 forward_request_leader(op);
3274 return;
3275 }
3276 }
3277
3278 if (mon_cmd->is_obsolete() ||
3279 (cct->_conf->mon_debug_deprecated_as_obsolete
3280 && mon_cmd->is_deprecated())) {
3281 reply_command(op, -ENOTSUP,
3282 "command is obsolete; please check usage and/or man page",
3283 0);
3284 return;
3285 }
3286
3287 if (session->proxy_con && mon_cmd->is_noforward()) {
3288 dout(10) << "Got forward for noforward command " << m << dendl;
3289 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3290 return;
3291 }
3292
3293 /* what we perceive as being the service the command falls under */
3294 string service(mon_cmd->module);
3295
3296 dout(25) << __func__ << " prefix='" << prefix
3297 << "' module='" << module
3298 << "' service='" << service << "'" << dendl;
3299
3300 bool cmd_is_rw =
3301 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3302
3303 // validate user's permissions for requested command
3304 map<string,string> param_str_map;
3305 _generate_command_map(cmdmap, param_str_map);
3306 if (!_allowed_command(session, service, prefix, cmdmap,
3307 param_str_map, mon_cmd)) {
3308 dout(1) << __func__ << " access denied" << dendl;
3309 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
11fdf7f2 3310 << "from='" << session->name << " " << session->addrs << "' "
7c673cae
FG
3311 << "entity='" << session->entity_name << "' "
3312 << "cmd=" << m->cmd << ": access denied";
3313 reply_command(op, -EACCES, "access denied", 0);
3314 return;
3315 }
3316
3317 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
11fdf7f2
TL
3318 << "from='" << session->name << " " << session->addrs << "' "
3319 << "entity='" << session->entity_name << "' "
3320 << "cmd=" << m->cmd << ": dispatch";
7c673cae 3321
11fdf7f2 3322 if (mon_cmd->is_mgr()) {
7c673cae
FG
3323 const auto& hdr = m->get_header();
3324 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
11fdf7f2
TL
3325 uint64_t max = g_conf().get_val<Option::size_t>("mon_client_bytes")
3326 * g_conf().get_val<double>("mon_mgr_proxy_client_bytes_ratio");
7c673cae
FG
3327 if (mgr_proxy_bytes + size > max) {
3328 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3329 << " + " << size << " > max " << max << dendl;
3330 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3331 return;
3332 }
3333 mgr_proxy_bytes += size;
3334 dout(10) << __func__ << " proxying mgr command (+" << size
3335 << " -> " << mgr_proxy_bytes << ")" << dendl;
3336 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3337 mgr_client.start_command(m->cmd,
3338 m->get_data(),
3339 &fin->outbl,
3340 &fin->outs,
3341 new C_OnFinisher(fin, &finisher));
3342 return;
3343 }
3344
d2e6a577
FG
3345 if ((module == "mds" || module == "fs") &&
3346 prefix != "fs authorize") {
7c673cae
FG
3347 mdsmon()->dispatch(op);
3348 return;
3349 }
11fdf7f2
TL
3350 if ((module == "osd" ||
3351 prefix == "pg map" ||
3352 prefix == "pg repeer") &&
31f18b77 3353 prefix != "osd last-stat-seq") {
7c673cae
FG
3354 osdmon()->dispatch(op);
3355 return;
3356 }
11fdf7f2
TL
3357 if (module == "config") {
3358 configmon()->dispatch(op);
7c673cae
FG
3359 return;
3360 }
11fdf7f2 3361
7c673cae
FG
3362 if (module == "mon" &&
3363 /* Let the Monitor class handle the following commands:
3364 * 'mon compact'
3365 * 'mon scrub'
3366 * 'mon sync force'
3367 */
3368 prefix != "mon compact" &&
3369 prefix != "mon scrub" &&
3370 prefix != "mon sync force" &&
31f18b77
FG
3371 prefix != "mon metadata" &&
3372 prefix != "mon versions" &&
11fdf7f2
TL
3373 prefix != "mon count-metadata" &&
3374 prefix != "mon ok-to-stop" &&
3375 prefix != "mon ok-to-add-offline" &&
3376 prefix != "mon ok-to-rm") {
7c673cae
FG
3377 monmon()->dispatch(op);
3378 return;
3379 }
d2e6a577 3380 if (module == "auth" || prefix == "fs authorize") {
7c673cae
FG
3381 authmon()->dispatch(op);
3382 return;
3383 }
3384 if (module == "log") {
3385 logmon()->dispatch(op);
3386 return;
3387 }
3388
3389 if (module == "config-key") {
3390 config_key_service->dispatch(op);
3391 return;
3392 }
3393
3394 if (module == "mgr") {
3395 mgrmon()->dispatch(op);
3396 return;
3397 }
3398
3399 if (prefix == "fsid") {
3400 if (f) {
3401 f->open_object_section("fsid");
3402 f->dump_stream("fsid") << monmap->fsid;
3403 f->close_section();
3404 f->flush(rdata);
3405 } else {
3406 ds << monmap->fsid;
3407 rdata.append(ds);
3408 }
3409 reply_command(op, 0, "", rdata, 0);
3410 return;
3411 }
3412
3413 if (prefix == "scrub" || prefix == "mon scrub") {
3414 wait_for_paxos_write();
3415 if (is_leader()) {
3416 int r = scrub_start();
3417 reply_command(op, r, "", rdata, 0);
3418 } else if (is_peon()) {
3419 forward_request_leader(op);
3420 } else {
3421 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3422 }
3423 return;
3424 }
3425
3426 if (prefix == "compact" || prefix == "mon compact") {
3427 dout(1) << "triggering manual compaction" << dendl;
11fdf7f2
TL
3428 auto start = ceph::coarse_mono_clock::now();
3429 store->compact_async();
3430 auto end = ceph::coarse_mono_clock::now();
3431 double duration = std::chrono::duration<double>(end-start).count();
3432 dout(1) << "finished manual compaction in " << duration << " seconds" << dendl;
7c673cae 3433 ostringstream oss;
11fdf7f2 3434 oss << "compacted " << g_conf().get_val<std::string>("mon_keyvaluedb") << " in " << duration << " seconds";
7c673cae
FG
3435 rs = oss.str();
3436 r = 0;
3437 }
3438 else if (prefix == "injectargs") {
3439 vector<string> injected_args;
3440 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3441 if (!injected_args.empty()) {
3442 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3443 ostringstream oss;
11fdf7f2 3444 r = g_conf().injectargs(str_join(injected_args, " "), &oss);
7c673cae
FG
3445 ss << "injectargs:" << oss.str();
3446 rs = ss.str();
3447 goto out;
3448 } else {
3449 rs = "must supply options to be parsed in a single string";
3450 r = -EINVAL;
3451 }
224ce89b
WB
3452 } else if (prefix == "time-sync-status") {
3453 if (!f)
3454 f.reset(Formatter::create("json-pretty"));
3455 f->open_object_section("time_sync");
3456 if (!timecheck_skews.empty()) {
3457 f->open_object_section("time_skew_status");
3458 for (auto& i : timecheck_skews) {
224ce89b 3459 double skew = i.second;
11fdf7f2
TL
3460 double latency = timecheck_latencies[i.first];
3461 string name = monmap->get_name(i.first);
224ce89b
WB
3462 ostringstream tcss;
3463 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3464 f->open_object_section(name.c_str());
3465 f->dump_float("skew", skew);
3466 f->dump_float("latency", latency);
3467 f->dump_stream("health") << tcstatus;
3468 if (tcstatus != HEALTH_OK) {
3469 f->dump_stream("details") << tcss.str();
3470 }
3471 f->close_section();
3472 }
3473 f->close_section();
3474 }
3475 f->open_object_section("timechecks");
3476 f->dump_unsigned("epoch", get_epoch());
3477 f->dump_int("round", timecheck_round);
3478 f->dump_stream("round_status") << ((timecheck_round%2) ?
3479 "on-going" : "finished");
3480 f->close_section();
3481 f->close_section();
3482 f->flush(rdata);
3483 r = 0;
3484 rs = "";
c07f9fc5
FG
3485 } else if (prefix == "config set") {
3486 std::string key;
3487 cmd_getval(cct, cmdmap, "key", key);
3488 std::string val;
3489 cmd_getval(cct, cmdmap, "value", val);
11fdf7f2 3490 r = g_conf().set_val(key, val, &ss);
d2e6a577 3491 if (r == 0) {
11fdf7f2 3492 g_conf().apply_changes(nullptr);
d2e6a577 3493 }
c07f9fc5
FG
3494 rs = ss.str();
3495 goto out;
7c673cae
FG
3496 } else if (prefix == "status" ||
3497 prefix == "health" ||
3498 prefix == "df") {
3499 string detail;
3500 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3501
3502 if (prefix == "status") {
3503 // get_cluster_status handles f == NULL
3504 get_cluster_status(ds, f.get());
3505
3506 if (f) {
3507 f->flush(ds);
3508 ds << '\n';
3509 }
3510 rdata.append(ds);
3511 } else if (prefix == "health") {
11fdf7f2
TL
3512 string plain;
3513 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3514 if (f) {
3515 f->flush(rdata);
7c673cae 3516 } else {
11fdf7f2 3517 rdata.append(plain);
7c673cae 3518 }
7c673cae
FG
3519 } else if (prefix == "df") {
3520 bool verbose = (detail == "detail");
3521 if (f)
3522 f->open_object_section("stats");
3523
11fdf7f2
TL
3524 mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose);
3525 if (!f) {
3526 ds << "\n \n";
3527 }
3528 mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
7c673cae
FG
3529
3530 if (f) {
3531 f->close_section();
3532 f->flush(ds);
3533 ds << '\n';
3534 }
3535 } else {
11fdf7f2 3536 ceph_abort_msg("We should never get here!");
7c673cae
FG
3537 return;
3538 }
3539 rdata.append(ds);
3540 rs = "";
3541 r = 0;
3542 } else if (prefix == "report") {
3543
3544 // this must be formatted, in its current form
3545 if (!f)
3546 f.reset(Formatter::create("json-pretty"));
3547 f->open_object_section("report");
3548 f->dump_stream("cluster_fingerprint") << fingerprint;
3549 f->dump_string("version", ceph_version_to_str());
3550 f->dump_string("commit", git_version_to_str());
3551 f->dump_stream("timestamp") << ceph_clock_now();
3552
3553 vector<string> tagsvec;
3554 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3555 string tagstr = str_join(tagsvec, " ");
3556 if (!tagstr.empty())
3557 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3558 f->dump_string("tag", tagstr);
3559
11fdf7f2 3560 get_health_status(true, f.get(), nullptr);
7c673cae
FG
3561
3562 monmon()->dump_info(f.get());
3563 osdmon()->dump_info(f.get());
3564 mdsmon()->dump_info(f.get());
7c673cae 3565 authmon()->dump_info(f.get());
11fdf7f2 3566 mgrstatmon()->dump_info(f.get());
7c673cae
FG
3567
3568 paxos->dump_info(f.get());
3569
3570 f->close_section();
3571 f->flush(rdata);
3572
3573 ostringstream ss2;
11fdf7f2 3574 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY);
7c673cae
FG
3575 rs = ss2.str();
3576 r = 0;
31f18b77
FG
3577 } else if (prefix == "osd last-stat-seq") {
3578 int64_t osd;
3579 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3580 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3581 if (f) {
3582 f->dump_unsigned("seq", seq);
3583 f->flush(ds);
3584 } else {
3585 ds << seq;
3586 rdata.append(ds);
3587 }
3588 rs = "";
3589 r = 0;
7c673cae
FG
3590 } else if (prefix == "node ls") {
3591 string node_type("all");
3592 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3593 if (!f)
3594 f.reset(Formatter::create("json-pretty"));
3595 if (node_type == "all") {
3596 f->open_object_section("nodes");
3597 print_nodes(f.get(), ds);
3598 osdmon()->print_nodes(f.get());
3599 mdsmon()->print_nodes(f.get());
11fdf7f2 3600 mgrmon()->print_nodes(f.get());
7c673cae
FG
3601 f->close_section();
3602 } else if (node_type == "mon") {
3603 print_nodes(f.get(), ds);
3604 } else if (node_type == "osd") {
3605 osdmon()->print_nodes(f.get());
3606 } else if (node_type == "mds") {
3607 mdsmon()->print_nodes(f.get());
11fdf7f2
TL
3608 } else if (node_type == "mgr") {
3609 mgrmon()->print_nodes(f.get());
7c673cae
FG
3610 }
3611 f->flush(ds);
3612 rdata.append(ds);
3613 rs = "";
3614 r = 0;
31f18b77
FG
3615 } else if (prefix == "features") {
3616 if (!is_leader() && !is_peon()) {
3617 dout(10) << " waiting for quorum" << dendl;
3618 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3619 return;
3620 }
3621 if (!is_leader()) {
3622 forward_request_leader(op);
3623 return;
3624 }
3625 if (!f)
3626 f.reset(Formatter::create("json-pretty"));
3627 FeatureMap fm;
3628 get_combined_feature_map(&fm);
3629 f->dump_object("features", fm);
3630 f->flush(rdata);
3631 rs = "";
3632 r = 0;
7c673cae
FG
3633 } else if (prefix == "mon metadata") {
3634 if (!f)
3635 f.reset(Formatter::create("json-pretty"));
3636
3637 string name;
3638 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3639 if (!all) {
3640 // Dump a single mon's metadata
3641 int mon = monmap->get_rank(name);
3642 if (mon < 0) {
3643 rs = "requested mon not found";
3644 r = -ENOENT;
3645 goto out;
3646 }
3647 f->open_object_section("mon_metadata");
3648 r = get_mon_metadata(mon, f.get(), ds);
3649 f->close_section();
3650 } else {
3651 // Dump all mons' metadata
3652 r = 0;
3653 f->open_array_section("mon_metadata");
3654 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3655 std::ostringstream get_err;
3656 f->open_object_section("mon");
3657 f->dump_string("name", monmap->get_name(rank));
3658 r = get_mon_metadata(rank, f.get(), get_err);
3659 f->close_section();
3660 if (r == -ENOENT || r == -EINVAL) {
3661 dout(1) << get_err.str() << dendl;
3662 // Drop error, list what metadata we do have
3663 r = 0;
3664 } else if (r != 0) {
3665 derr << "Unexpected error from get_mon_metadata: "
3666 << cpp_strerror(r) << dendl;
3667 ds << get_err.str();
3668 break;
3669 }
3670 }
3671 f->close_section();
3672 }
3673
3674 f->flush(ds);
3675 rdata.append(ds);
3676 rs = "";
31f18b77
FG
3677 } else if (prefix == "mon versions") {
3678 if (!f)
3679 f.reset(Formatter::create("json-pretty"));
3680 count_metadata("ceph_version", f.get());
3681 f->flush(ds);
3682 rdata.append(ds);
3683 rs = "";
3684 r = 0;
3685 } else if (prefix == "mon count-metadata") {
3686 if (!f)
3687 f.reset(Formatter::create("json-pretty"));
3688 string field;
3689 cmd_getval(g_ceph_context, cmdmap, "property", field);
3690 count_metadata(field, f.get());
3691 f->flush(ds);
3692 rdata.append(ds);
3693 rs = "";
3694 r = 0;
7c673cae
FG
3695 } else if (prefix == "quorum_status") {
3696 // make sure our map is readable and up to date
3697 if (!is_leader() && !is_peon()) {
3698 dout(10) << " waiting for quorum" << dendl;
3699 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3700 return;
3701 }
3702 _quorum_status(f.get(), ds);
3703 rdata.append(ds);
3704 rs = "";
3705 r = 0;
11fdf7f2
TL
3706 } else if (prefix == "mon ok-to-stop") {
3707 vector<string> ids;
3708 if (!cmd_getval(g_ceph_context, cmdmap, "ids", ids)) {
3709 r = -EINVAL;
3710 goto out;
3711 }
3712 set<string> wouldbe;
3713 for (auto rank : quorum) {
3714 wouldbe.insert(monmap->get_name(rank));
3715 }
3716 for (auto& n : ids) {
3717 if (monmap->contains(n)) {
3718 wouldbe.erase(n);
3719 }
3720 }
3721 if (wouldbe.size() < monmap->min_quorum_size()) {
3722 r = -EBUSY;
3723 rs = "not enough monitors would be available (" + stringify(wouldbe) +
3724 ") after stopping mons " + stringify(ids);
3725 goto out;
3726 }
3727 r = 0;
3728 rs = "quorum should be preserved (" + stringify(wouldbe) +
3729 ") after stopping " + stringify(ids);
3730 } else if (prefix == "mon ok-to-add-offline") {
3731 if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) {
3732 rs = "adding a monitor may break quorum (until that monitor starts)";
3733 r = -EBUSY;
3734 goto out;
3735 }
3736 rs = "adding another mon that is not yet online will not break quorum";
3737 r = 0;
3738 } else if (prefix == "mon ok-to-rm") {
3739 string id;
3740 if (!cmd_getval(g_ceph_context, cmdmap, "id", id)) {
3741 r = -EINVAL;
3742 rs = "must specify a monitor id";
3743 goto out;
3744 }
3745 if (!monmap->contains(id)) {
3746 r = 0;
3747 rs = "mon." + id + " does not exist";
3748 goto out;
3749 }
3750 int rank = monmap->get_rank(id);
3751 if (quorum.count(rank) &&
3752 quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) {
3753 r = -EBUSY;
3754 rs = "removing mon." + id + " would break quorum";
3755 goto out;
3756 }
3757 r = 0;
3758 rs = "safe to remove mon." + id;
7c673cae
FG
3759 } else if (prefix == "mon_status") {
3760 get_mon_status(f.get(), ds);
3761 if (f)
3762 f->flush(ds);
3763 rdata.append(ds);
3764 rs = "";
3765 r = 0;
3766 } else if (prefix == "sync force" ||
3767 prefix == "mon sync force") {
11fdf7f2
TL
3768 bool validate1 = false;
3769 cmd_getval(g_ceph_context, cmdmap, "yes_i_really_mean_it", validate1);
3770 bool validate2 = false;
3771 cmd_getval(g_ceph_context, cmdmap, "i_know_what_i_am_doing", validate2);
3772
3773 if (!validate1 || !validate2) {
7c673cae
FG
3774 r = -EINVAL;
3775 rs = "are you SURE? this will mean the monitor store will be "
3776 "erased. pass '--yes-i-really-mean-it "
3777 "--i-know-what-i-am-doing' if you really do.";
3778 goto out;
3779 }
3780 sync_force(f.get(), ds);
3781 rs = ds.str();
3782 r = 0;
3783 } else if (prefix == "heap") {
3784 if (!ceph_using_tcmalloc())
3785 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3786 else {
3787 string heapcmd;
3788 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3789 // XXX 1-element vector, change at callee or make vector here?
3790 vector<string> heapcmd_vec;
3791 get_str_vec(heapcmd, heapcmd_vec);
11fdf7f2
TL
3792 string value;
3793 if (cmd_getval(g_ceph_context, cmdmap, "value", value))
3794 heapcmd_vec.push_back(value);
7c673cae
FG
3795 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3796 rdata.append(ds);
3797 rs = "";
3798 r = 0;
3799 }
3800 } else if (prefix == "quorum") {
3801 string quorumcmd;
3802 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3803 if (quorumcmd == "exit") {
3804 start_election();
3805 elector.stop_participating();
3806 rs = "stopped responding to quorum, initiated new election";
3807 r = 0;
3808 } else if (quorumcmd == "enter") {
3809 elector.start_participating();
3810 start_election();
3811 rs = "started responding to quorum, initiated new election";
3812 r = 0;
3813 } else {
3814 rs = "needs a valid 'quorum' command";
3815 r = -EINVAL;
3816 }
3817 } else if (prefix == "version") {
3818 if (f) {
3819 f->open_object_section("version");
3820 f->dump_string("version", pretty_version_to_str());
3821 f->close_section();
3822 f->flush(ds);
3823 } else {
3824 ds << pretty_version_to_str();
3825 }
3826 rdata.append(ds);
3827 rs = "";
3828 r = 0;
c07f9fc5
FG
3829 } else if (prefix == "versions") {
3830 if (!f)
3831 f.reset(Formatter::create("json-pretty"));
3832 map<string,int> overall;
3833 f->open_object_section("version");
3834 map<string,int> mon, mgr, osd, mds;
3835
3836 count_metadata("ceph_version", &mon);
3837 f->open_object_section("mon");
3838 for (auto& p : mon) {
3839 f->dump_int(p.first.c_str(), p.second);
3840 overall[p.first] += p.second;
3841 }
3842 f->close_section();
3843
3844 mgrmon()->count_metadata("ceph_version", &mgr);
3845 f->open_object_section("mgr");
3846 for (auto& p : mgr) {
3847 f->dump_int(p.first.c_str(), p.second);
3848 overall[p.first] += p.second;
3849 }
3850 f->close_section();
3851
3852 osdmon()->count_metadata("ceph_version", &osd);
3853 f->open_object_section("osd");
3854 for (auto& p : osd) {
3855 f->dump_int(p.first.c_str(), p.second);
3856 overall[p.first] += p.second;
3857 }
3858 f->close_section();
3859
3860 mdsmon()->count_metadata("ceph_version", &mds);
3861 f->open_object_section("mds");
35e4c445 3862 for (auto& p : mds) {
c07f9fc5
FG
3863 f->dump_int(p.first.c_str(), p.second);
3864 overall[p.first] += p.second;
3865 }
3866 f->close_section();
3867
3868 for (auto& p : mgrstatmon()->get_service_map().services) {
3869 f->open_object_section(p.first.c_str());
3870 map<string,int> m;
3871 p.second.count_metadata("ceph_version", &m);
3872 for (auto& q : m) {
3873 f->dump_int(q.first.c_str(), q.second);
3874 overall[q.first] += q.second;
3875 }
3876 f->close_section();
3877 }
3878
3879 f->open_object_section("overall");
3880 for (auto& p : overall) {
3881 f->dump_int(p.first.c_str(), p.second);
3882 }
3883 f->close_section();
3884 f->close_section();
3885 f->flush(rdata);
3886 rs = "";
3887 r = 0;
11fdf7f2
TL
3888 } else if (prefix == "smart") {
3889 string want_devid;
3890 cmd_getval(cct, cmdmap, "devid", want_devid);
3891
3892 string devname = store->get_devname();
3893 set<string> devnames;
3894 get_raw_devices(devname, &devnames);
3895 json_spirit::mObject json_map;
3896 uint64_t smart_timeout = cct->_conf.get_val<uint64_t>(
3897 "mon_smart_report_timeout");
3898 for (auto& devname : devnames) {
3899 string err;
3900 string devid = get_device_id(devname, &err);
3901 if (want_devid.size() && want_devid != devid) {
3902 derr << "get_device_id failed on " << devname << ": " << err << dendl;
3903 continue;
3904 }
3905 json_spirit::mValue smart_json;
3906 if (block_device_get_metrics(devname, smart_timeout,
3907 &smart_json)) {
3908 dout(10) << "block_device_get_metrics failed for /dev/" << devname
3909 << dendl;
3910 continue;
3911 }
3912 json_map[devid] = smart_json;
3913 }
3914 ostringstream ss;
3915 json_spirit::write(json_map, ss, json_spirit::pretty_print);
3916 rdata.append(ss.str());
3917 r = 0;
3918 rs = "";
7c673cae
FG
3919 }
3920
3921 out:
3922 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3923 reply_command(op, r, rs, rdata, 0);
3924}
3925
3926void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3927{
3928 bufferlist rdata;
3929 reply_command(op, rc, rs, rdata, version);
3930}
3931
3932void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3933 bufferlist& rdata, version_t version)
3934{
3935 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
11fdf7f2 3936 ceph_assert(m->get_type() == MSG_MON_COMMAND);
7c673cae
FG
3937 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3938 reply->set_tid(m->get_tid());
3939 reply->set_data(rdata);
3940 send_reply(op, reply);
3941}
3942
3943
3944// ------------------------
3945// request/reply routing
3946//
3947// a client/mds/osd will connect to a random monitor. we need to forward any
3948// messages requiring state updates to the leader, and then route any replies
3949// back via the correct monitor and back to them. (the monitor will not
3950// initiate any connections.)
3951
3952void Monitor::forward_request_leader(MonOpRequestRef op)
3953{
3954 op->mark_event(__func__);
3955
3956 int mon = get_leader();
3957 MonSession *session = op->get_session();
3958 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3959
11fdf7f2 3960 if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) {
7c673cae
FG
3961 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3962 } else if (session->proxy_con) {
3963 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3964 } else if (!session->closed) {
3965 RoutedRequest *rr = new RoutedRequest;
3966 rr->tid = ++routed_request_tid;
7c673cae
FG
3967 rr->con = req->get_connection();
3968 rr->con_features = rr->con->get_features();
3969 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3970 rr->session = static_cast<MonSession *>(session->get());
3971 rr->op = op;
3972 routed_requests[rr->tid] = rr;
3973 session->routed_request_tids.insert(rr->tid);
3974
3975 dout(10) << "forward_request " << rr->tid << " request " << *req
3976 << " features " << rr->con_features << dendl;
3977
3978 MForward *forward = new MForward(rr->tid,
3979 req,
3980 rr->con_features,
3981 rr->session->caps);
3982 forward->set_priority(req->get_priority());
3983 if (session->auth_handler) {
3984 forward->entity_name = session->entity_name;
3985 } else if (req->get_source().is_mon()) {
3986 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3987 }
11fdf7f2 3988 send_mon_message(forward, mon);
7c673cae 3989 op->mark_forwarded();
11fdf7f2 3990 ceph_assert(op->get_req()->get_type() != 0);
7c673cae
FG
3991 } else {
3992 dout(10) << "forward_request no session for request " << *req << dendl;
3993 }
3994}
3995
3996// fake connection attached to forwarded messages
3997struct AnonConnection : public Connection {
11fdf7f2
TL
3998 entity_addr_t socket_addr;
3999 explicit AnonConnection(CephContext *cct,
4000 const entity_addr_t& sa)
4001 : Connection(cct, NULL),
4002 socket_addr(sa) {}
7c673cae
FG
4003
4004 int send_message(Message *m) override {
11fdf7f2 4005 ceph_assert(!"send_message on anonymous connection");
7c673cae
FG
4006 }
4007 void send_keepalive() override {
11fdf7f2 4008 ceph_assert(!"send_keepalive on anonymous connection");
7c673cae
FG
4009 }
4010 void mark_down() override {
4011 // silently ignore
4012 }
4013 void mark_disposable() override {
4014 // silengtly ignore
4015 }
4016 bool is_connected() override { return false; }
11fdf7f2
TL
4017 entity_addr_t get_peer_socket_addr() const override {
4018 return socket_addr;
4019 }
7c673cae
FG
4020};
4021
4022//extract the original message and put it into the regular dispatch function
4023void Monitor::handle_forward(MonOpRequestRef op)
4024{
4025 MForward *m = static_cast<MForward*>(op->get_req());
11fdf7f2
TL
4026 dout(10) << "received forwarded message from "
4027 << ceph_entity_type_name(m->client_type)
4028 << " " << m->client_addrs
7c673cae
FG
4029 << " via " << m->get_source_inst() << dendl;
4030 MonSession *session = op->get_session();
11fdf7f2 4031 ceph_assert(session);
7c673cae
FG
4032
4033 if (!session->is_capable("mon", MON_CAP_X)) {
4034 dout(0) << "forward from entity with insufficient caps! "
4035 << session->caps << dendl;
4036 } else {
4037 // see PaxosService::dispatch(); we rely on this being anon
4038 // (c->msgr == NULL)
4039 PaxosServiceMessage *req = m->claim_message();
11fdf7f2
TL
4040 ceph_assert(req != NULL);
4041
4042 ConnectionRef c(new AnonConnection(cct, m->client_socket_addr));
4043 MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
4044 s->_ident(req->get_source(),
4045 req->get_source_addrs());
4046 c->set_priv(RefCountedPtr{s, false});
4047 c->set_peer_addrs(m->client_addrs);
4048 c->set_peer_type(m->client_type);
7c673cae
FG
4049 c->set_features(m->con_features);
4050
11fdf7f2 4051 s->authenticated = true;
7c673cae
FG
4052 s->caps = m->client_caps;
4053 dout(10) << " caps are " << s->caps << dendl;
4054 s->entity_name = m->entity_name;
4055 dout(10) << " entity name '" << s->entity_name << "' type "
4056 << s->entity_name.get_type() << dendl;
4057 s->proxy_con = m->get_connection();
4058 s->proxy_tid = m->tid;
4059
4060 req->set_connection(c);
4061
4062 // not super accurate, but better than nothing.
4063 req->set_recv_stamp(m->get_recv_stamp());
4064
4065 /*
4066 * note which election epoch this is; we will drop the message if
4067 * there is a future election since our peers will resend routed
4068 * requests in that case.
4069 */
4070 req->rx_election_epoch = get_epoch();
4071
7c673cae 4072 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
7c673cae 4073 _ms_dispatch(req);
7c673cae 4074
11fdf7f2
TL
4075 // break the session <-> con ref loop by removing the con->session
4076 // reference, which is no longer needed once the MonOpRequest is
4077 // set up.
4078 c->set_priv(NULL);
7c673cae
FG
4079 }
4080}
4081
4082void Monitor::send_reply(MonOpRequestRef op, Message *reply)
4083{
4084 op->mark_event(__func__);
4085
4086 MonSession *session = op->get_session();
11fdf7f2 4087 ceph_assert(session);
7c673cae
FG
4088 Message *req = op->get_req();
4089 ConnectionRef con = op->get_connection();
4090
4091 reply->set_cct(g_ceph_context);
4092 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
4093
4094 if (!con) {
4095 dout(2) << "send_reply no connection, dropping reply " << *reply
4096 << " to " << req << " " << *req << dendl;
4097 reply->put();
4098 op->mark_event("reply: no connection");
4099 return;
4100 }
4101
4102 if (!session->con && !session->proxy_con) {
4103 dout(2) << "send_reply no connection, dropping reply " << *reply
4104 << " to " << req << " " << *req << dendl;
4105 reply->put();
4106 op->mark_event("reply: no connection");
4107 return;
4108 }
4109
4110 if (session->proxy_con) {
4111 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
4112 << " via " << session->proxy_con->get_peer_addr()
4113 << " for request " << *req << dendl;
4114 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
4115 op->mark_event("reply: send routed request");
4116 } else {
4117 session->con->send_message(reply);
4118 op->mark_event("reply: send");
4119 }
4120}
4121
4122void Monitor::no_reply(MonOpRequestRef op)
4123{
4124 MonSession *session = op->get_session();
4125 Message *req = op->get_req();
4126
4127 if (session->proxy_con) {
4128 dout(10) << "no_reply to " << req->get_source_inst()
4129 << " via " << session->proxy_con->get_peer_addr()
4130 << " for request " << *req << dendl;
4131 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
4132 op->mark_event("no_reply: send routed request");
4133 } else {
4134 dout(10) << "no_reply to " << req->get_source_inst()
4135 << " " << *req << dendl;
4136 op->mark_event("no_reply");
4137 }
4138}
4139
4140void Monitor::handle_route(MonOpRequestRef op)
4141{
4142 MRoute *m = static_cast<MRoute*>(op->get_req());
4143 MonSession *session = op->get_session();
4144 //check privileges
4145 if (!session->is_capable("mon", MON_CAP_X)) {
4146 dout(0) << "MRoute received from entity without appropriate perms! "
4147 << dendl;
4148 return;
4149 }
4150 if (m->msg)
11fdf7f2
TL
4151 dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg
4152 << dendl;
7c673cae 4153 else
11fdf7f2 4154 dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl;
7c673cae
FG
4155
4156 // look it up
4157 if (m->session_mon_tid) {
4158 if (routed_requests.count(m->session_mon_tid)) {
4159 RoutedRequest *rr = routed_requests[m->session_mon_tid];
4160
4161 // reset payload, in case encoding is dependent on target features
4162 if (m->msg) {
4163 m->msg->clear_payload();
4164 rr->con->send_message(m->msg);
4165 m->msg = NULL;
4166 }
4167 if (m->send_osdmap_first) {
4168 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
4169 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
4170 true, MonOpRequestRef());
4171 }
11fdf7f2 4172 ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
7c673cae
FG
4173 routed_requests.erase(m->session_mon_tid);
4174 rr->session->routed_request_tids.erase(m->session_mon_tid);
4175 delete rr;
4176 } else {
4177 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
4178 }
4179 } else {
11fdf7f2 4180 dout(10) << " not a routed request, ignoring" << dendl;
7c673cae
FG
4181 }
4182}
4183
4184void Monitor::resend_routed_requests()
4185{
4186 dout(10) << "resend_routed_requests" << dendl;
4187 int mon = get_leader();
4188 list<Context*> retry;
4189 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
4190 p != routed_requests.end();
4191 ++p) {
4192 RoutedRequest *rr = p->second;
4193
4194 if (mon == rank) {
4195 dout(10) << " requeue for self tid " << rr->tid << dendl;
4196 rr->op->mark_event("retry routed request");
4197 retry.push_back(new C_RetryMessage(this, rr->op));
4198 if (rr->session) {
11fdf7f2 4199 ceph_assert(rr->session->routed_request_tids.count(p->first));
7c673cae
FG
4200 rr->session->routed_request_tids.erase(p->first);
4201 }
4202 delete rr;
4203 } else {
11fdf7f2
TL
4204 auto q = rr->request_bl.cbegin();
4205 PaxosServiceMessage *req =
4206 (PaxosServiceMessage *)decode_message(cct, 0, q);
7c673cae 4207 rr->op->mark_event("resend forwarded message to leader");
11fdf7f2
TL
4208 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
4209 << dendl;
4210 MForward *forward = new MForward(rr->tid,
4211 req,
4212 rr->con_features,
7c673cae
FG
4213 rr->session->caps);
4214 req->put(); // forward takes its own ref; drop ours.
11fdf7f2
TL
4215 forward->client_type = rr->con->get_peer_type();
4216 forward->client_addrs = rr->con->get_peer_addrs();
4217 forward->client_socket_addr = rr->con->get_peer_socket_addr();
7c673cae 4218 forward->set_priority(req->get_priority());
11fdf7f2 4219 send_mon_message(forward, mon);
7c673cae
FG
4220 }
4221 }
4222 if (mon == rank) {
4223 routed_requests.clear();
4224 finish_contexts(g_ceph_context, retry);
4225 }
4226}
4227
4228void Monitor::remove_session(MonSession *s)
4229{
11fdf7f2 4230 dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs
224ce89b 4231 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
11fdf7f2
TL
4232 ceph_assert(s->con);
4233 ceph_assert(!s->closed);
7c673cae
FG
4234 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4235 p != s->routed_request_tids.end();
4236 ++p) {
11fdf7f2 4237 ceph_assert(routed_requests.count(*p));
7c673cae
FG
4238 RoutedRequest *rr = routed_requests[*p];
4239 dout(10) << " dropping routed request " << rr->tid << dendl;
4240 delete rr;
4241 routed_requests.erase(*p);
4242 }
4243 s->routed_request_tids.clear();
11fdf7f2 4244 s->con->set_priv(nullptr);
7c673cae
FG
4245 session_map.remove_session(s);
4246 logger->set(l_mon_num_sessions, session_map.get_size());
4247 logger->inc(l_mon_session_rm);
4248}
4249
4250void Monitor::remove_all_sessions()
4251{
11fdf7f2 4252 std::lock_guard l(session_map_lock);
7c673cae
FG
4253 while (!session_map.sessions.empty()) {
4254 MonSession *s = session_map.sessions.front();
4255 remove_session(s);
11fdf7f2 4256 logger->inc(l_mon_session_rm);
7c673cae
FG
4257 }
4258 if (logger)
4259 logger->set(l_mon_num_sessions, session_map.get_size());
4260}
4261
11fdf7f2 4262void Monitor::send_mon_message(Message *m, int rank)
7c673cae 4263{
11fdf7f2 4264 messenger->send_to_mon(m, monmap->get_addrs(rank));
7c673cae
FG
4265}
4266
4267void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4268{
4269 /**
4270 * Wait list the new session until we're in the quorum, assuming it's
4271 * sufficiently new.
4272 * tick() will periodically send them back through so we can send
4273 * the client elsewhere if we don't think we're getting back in.
4274 *
4275 * But we whitelist a few sorts of messages:
4276 * 1) Monitors can talk to us at any time, of course.
4277 * 2) auth messages. It's unlikely to go through much faster, but
4278 * it's possible we've just lost our quorum status and we want to take...
4279 * 3) command messages. We want to accept these under all possible
4280 * circumstances.
4281 */
4282 Message *m = op->get_req();
4283 MonSession *s = op->get_session();
4284 ConnectionRef con = op->get_connection();
4285 utime_t too_old = ceph_clock_now();
4286 too_old -= g_ceph_context->_conf->mon_lease;
4287 if (m->get_recv_stamp() > too_old &&
4288 con->is_connected()) {
4289 dout(5) << "waitlisting message " << *m << dendl;
4290 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4291 op->mark_wait_for_quorum();
4292 } else {
4293 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4294 con->mark_down();
4295 // proxied sessions aren't registered and don't have a con; don't remove
4296 // those.
4297 if (!s->proxy_con) {
11fdf7f2 4298 std::lock_guard l(session_map_lock);
7c673cae
FG
4299 remove_session(s);
4300 }
4301 op->mark_zap();
4302 }
4303}
4304
4305void Monitor::_ms_dispatch(Message *m)
4306{
4307 if (is_shutdown()) {
4308 m->put();
4309 return;
4310 }
4311
4312 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4313 bool src_is_mon = op->is_src_mon();
4314 op->mark_event("mon:_ms_dispatch");
4315 MonSession *s = op->get_session();
4316 if (s && s->closed) {
4317 return;
4318 }
224ce89b
WB
4319
4320 if (src_is_mon && s) {
4321 ConnectionRef con = m->get_connection();
4322 if (con->get_messenger() && con->get_features() != s->con_features) {
4323 // only update features if this is a non-anonymous connection
4324 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4325 << " (was " << s->con_features
4326 << ", now " << con->get_features() << ")" << dendl;
4327 // connection features changed - recreate session.
4328 if (s->con && s->con != con) {
4329 dout(10) << __func__ << " connection for " << m->get_source_inst()
4330 << " changed from session; mark down and replace" << dendl;
4331 s->con->mark_down();
4332 }
4333 if (s->item.is_on_list()) {
4334 // forwarded messages' sessions are not in the sessions map and
4335 // exist only while the op is being handled.
4336 remove_session(s);
4337 }
4338 s->put();
4339 s = nullptr;
4340 }
4341 }
4342
7c673cae
FG
4343 if (!s) {
4344 // if the sender is not a monitor, make sure their first message for a
4345 // session is an MAuth. If it is not, assume it's a stray message,
4346 // and considering that we are creating a new session it is safe to
4347 // assume that the sender hasn't authenticated yet, so we have no way
4348 // of assessing whether we should handle it or not.
4349 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4350 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4351 m->get_type() != CEPH_MSG_PING)) {
4352 dout(1) << __func__ << " dropping stray message " << *m
4353 << " from " << m->get_source_inst() << dendl;
4354 return;
4355 }
4356
4357 ConnectionRef con = m->get_connection();
4358 {
11fdf7f2
TL
4359 std::lock_guard l(session_map_lock);
4360 s = session_map.new_session(m->get_source(),
4361 m->get_source_addrs(),
4362 con.get());
7c673cae 4363 }
11fdf7f2
TL
4364 ceph_assert(s);
4365 con->set_priv(RefCountedPtr{s, false});
224ce89b
WB
4366 dout(10) << __func__ << " new session " << s << " " << *s
4367 << " features 0x" << std::hex
4368 << s->con_features << std::dec << dendl;
7c673cae
FG
4369 op->set_session(s);
4370
4371 logger->set(l_mon_num_sessions, session_map.get_size());
4372 logger->inc(l_mon_session_add);
4373
4374 if (src_is_mon) {
4375 // give it monitor caps; the peer type has been authenticated
4376 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4377 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
11fdf7f2
TL
4378 s->caps = mon_caps;
4379 s->authenticated = true;
7c673cae 4380 }
7c673cae 4381 } else {
11fdf7f2 4382 dout(20) << __func__ << " existing session " << s << " for " << s->name
7c673cae
FG
4383 << dendl;
4384 }
4385
11fdf7f2 4386 ceph_assert(s);
7c673cae
FG
4387
4388 s->session_timeout = ceph_clock_now();
11fdf7f2 4389 s->session_timeout += g_conf()->mon_session_timeout;
7c673cae
FG
4390
4391 if (s->auth_handler) {
4392 s->entity_name = s->auth_handler->get_entity_name();
4393 }
4394 dout(20) << " caps " << s->caps.get_str() << dendl;
4395
4396 if ((is_synchronizing() ||
11fdf7f2 4397 (!s->authenticated && !exited_quorum.is_zero())) &&
7c673cae
FG
4398 !src_is_mon &&
4399 m->get_type() != CEPH_MSG_PING) {
4400 waitlist_or_zap_client(op);
4401 } else {
4402 dispatch_op(op);
4403 }
4404 return;
4405}
4406
4407void Monitor::dispatch_op(MonOpRequestRef op)
4408{
4409 op->mark_event("mon:dispatch_op");
4410 MonSession *s = op->get_session();
11fdf7f2 4411 ceph_assert(s);
7c673cae
FG
4412 if (s->closed) {
4413 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4414 return;
4415 }
4416
4417 /* we will consider the default type as being 'monitor' until proven wrong */
4418 op->set_type_monitor();
4419 /* deal with all messages that do not necessarily need caps */
7c673cae
FG
4420 switch (op->get_req()->get_type()) {
4421 // auth
4422 case MSG_MON_GLOBAL_ID:
4423 case CEPH_MSG_AUTH:
4424 op->set_type_service();
4425 /* no need to check caps here */
4426 paxos_service[PAXOS_AUTH]->dispatch(op);
11fdf7f2 4427 return;
7c673cae
FG
4428
4429 case CEPH_MSG_PING:
4430 handle_ping(op);
11fdf7f2
TL
4431 return;
4432 }
7c673cae 4433
11fdf7f2
TL
4434 if (!op->get_session()->authenticated) {
4435 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4436 << " is not authenticated, dropping " << *(op->get_req())
4437 << dendl;
4438 return;
4439 }
4440
4441 switch (op->get_req()->get_type()) {
7c673cae
FG
4442 case CEPH_MSG_MON_GET_MAP:
4443 handle_mon_get_map(op);
11fdf7f2
TL
4444 return;
4445
4446 case MSG_GET_CONFIG:
4447 configmon()->handle_get_config(op);
4448 return;
7c673cae
FG
4449
4450 case CEPH_MSG_MON_METADATA:
4451 return handle_mon_metadata(op);
4452
11fdf7f2
TL
4453 case CEPH_MSG_MON_SUBSCRIBE:
4454 /* FIXME: check what's being subscribed, filter accordingly */
4455 handle_subscribe(op);
4456 return;
7c673cae 4457 }
7c673cae
FG
4458
4459 /* well, maybe the op belongs to a service... */
4460 op->set_type_service();
4461 /* deal with all messages which caps should be checked somewhere else */
7c673cae
FG
4462 switch (op->get_req()->get_type()) {
4463
4464 // OSDs
4465 case CEPH_MSG_MON_GET_OSDMAP:
4466 case CEPH_MSG_POOLOP:
4467 case MSG_OSD_BEACON:
4468 case MSG_OSD_MARK_ME_DOWN:
4469 case MSG_OSD_FULL:
4470 case MSG_OSD_FAILURE:
4471 case MSG_OSD_BOOT:
4472 case MSG_OSD_ALIVE:
4473 case MSG_OSD_PGTEMP:
4474 case MSG_OSD_PG_CREATED:
4475 case MSG_REMOVE_SNAPS:
11fdf7f2 4476 case MSG_OSD_PG_READY_TO_MERGE:
7c673cae 4477 paxos_service[PAXOS_OSDMAP]->dispatch(op);
11fdf7f2 4478 return;
7c673cae
FG
4479
4480 // MDSs
4481 case MSG_MDS_BEACON:
4482 case MSG_MDS_OFFLOAD_TARGETS:
4483 paxos_service[PAXOS_MDSMAP]->dispatch(op);
11fdf7f2 4484 return;
7c673cae
FG
4485
4486 // Mgrs
4487 case MSG_MGR_BEACON:
4488 paxos_service[PAXOS_MGR]->dispatch(op);
11fdf7f2 4489 return;
7c673cae 4490
31f18b77 4491 // MgrStat
b32b8144 4492 case MSG_MON_MGR_REPORT:
11fdf7f2 4493 case CEPH_MSG_STATFS:
7c673cae 4494 case MSG_GETPOOLSTATS:
31f18b77 4495 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
11fdf7f2 4496 return;
7c673cae 4497
11fdf7f2 4498 // log
7c673cae
FG
4499 case MSG_LOG:
4500 paxos_service[PAXOS_LOG]->dispatch(op);
11fdf7f2 4501 return;
7c673cae
FG
4502
4503 // handle_command() does its own caps checking
4504 case MSG_MON_COMMAND:
4505 op->set_type_command();
4506 handle_command(op);
11fdf7f2 4507 return;
7c673cae 4508 }
7c673cae
FG
4509
4510 /* nop, looks like it's not a service message; revert back to monitor */
4511 op->set_type_monitor();
4512
4513 /* messages we, the Monitor class, need to deal with
4514 * but may be sent by clients. */
4515
4516 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4517 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4518 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4519 << dendl;
11fdf7f2 4520 return;
7c673cae
FG
4521 }
4522
7c673cae 4523 switch (op->get_req()->get_type()) {
7c673cae
FG
4524 // misc
4525 case CEPH_MSG_MON_GET_VERSION:
4526 handle_get_version(op);
11fdf7f2 4527 return;
7c673cae 4528 }
7c673cae
FG
4529
4530 if (!op->is_src_mon()) {
4531 dout(1) << __func__ << " unexpected monitor message from"
4532 << " non-monitor entity " << op->get_req()->get_source_inst()
4533 << " " << *(op->get_req()) << " -- dropping" << dendl;
11fdf7f2 4534 return;
7c673cae
FG
4535 }
4536
4537 /* messages that should only be sent by another monitor */
7c673cae
FG
4538 switch (op->get_req()->get_type()) {
4539
4540 case MSG_ROUTE:
4541 handle_route(op);
11fdf7f2 4542 return;
7c673cae
FG
4543
4544 case MSG_MON_PROBE:
4545 handle_probe(op);
11fdf7f2 4546 return;
7c673cae
FG
4547
4548 // Sync (i.e., the new slurp, but on steroids)
4549 case MSG_MON_SYNC:
4550 handle_sync(op);
11fdf7f2 4551 return;
7c673cae
FG
4552 case MSG_MON_SCRUB:
4553 handle_scrub(op);
11fdf7f2 4554 return;
7c673cae
FG
4555
4556 /* log acks are sent from a monitor we sent the MLog to, and are
4557 never sent by clients to us. */
4558 case MSG_LOGACK:
4559 log_client.handle_log_ack((MLogAck*)op->get_req());
11fdf7f2 4560 return;
7c673cae
FG
4561
4562 // monmap
4563 case MSG_MON_JOIN:
4564 op->set_type_service();
4565 paxos_service[PAXOS_MONMAP]->dispatch(op);
11fdf7f2 4566 return;
7c673cae
FG
4567
4568 // paxos
4569 case MSG_MON_PAXOS:
4570 {
4571 op->set_type_paxos();
4572 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4573 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4574 //can't send these!
11fdf7f2 4575 return;
7c673cae
FG
4576 }
4577
4578 if (state == STATE_SYNCHRONIZING) {
4579 // we are synchronizing. These messages would do us no
4580 // good, thus just drop them and ignore them.
4581 dout(10) << __func__ << " ignore paxos msg from "
4582 << pm->get_source_inst() << dendl;
11fdf7f2 4583 return;
7c673cae
FG
4584 }
4585
4586 // sanitize
4587 if (pm->epoch > get_epoch()) {
4588 bootstrap();
11fdf7f2 4589 return;
7c673cae
FG
4590 }
4591 if (pm->epoch != get_epoch()) {
11fdf7f2 4592 return;
7c673cae
FG
4593 }
4594
4595 paxos->dispatch(op);
4596 }
11fdf7f2 4597 return;
7c673cae
FG
4598
4599 // elector messages
4600 case MSG_MON_ELECTION:
4601 op->set_type_election();
4602 //check privileges here for simplicity
4603 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4604 dout(0) << "MMonElection received from entity without enough caps!"
4605 << op->get_session()->caps << dendl;
11fdf7f2 4606 return;;
7c673cae
FG
4607 }
4608 if (!is_probing() && !is_synchronizing()) {
4609 elector.dispatch(op);
4610 }
11fdf7f2 4611 return;
7c673cae
FG
4612
4613 case MSG_FORWARD:
4614 handle_forward(op);
11fdf7f2 4615 return;
7c673cae
FG
4616
4617 case MSG_TIMECHECK:
11fdf7f2
TL
4618 dout(5) << __func__ << " ignoring " << op << dendl;
4619 return;
4620 case MSG_TIMECHECK2:
7c673cae 4621 handle_timecheck(op);
11fdf7f2 4622 return;
7c673cae
FG
4623
4624 case MSG_MON_HEALTH:
11fdf7f2
TL
4625 dout(5) << __func__ << " dropping deprecated message: "
4626 << *op->get_req() << dendl;
7c673cae 4627 break;
224ce89b
WB
4628 case MSG_MON_HEALTH_CHECKS:
4629 op->set_type_service();
4630 paxos_service[PAXOS_HEALTH]->dispatch(op);
11fdf7f2 4631 return;
7c673cae 4632 }
11fdf7f2 4633 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
7c673cae
FG
4634 return;
4635}
4636
4637void Monitor::handle_ping(MonOpRequestRef op)
4638{
4639 MPing *m = static_cast<MPing*>(op->get_req());
4640 dout(10) << __func__ << " " << *m << dendl;
4641 MPing *reply = new MPing;
7c673cae
FG
4642 bufferlist payload;
4643 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4644 f->open_object_section("pong");
4645
11fdf7f2 4646 get_health_status(false, f.get(), nullptr);
7c673cae
FG
4647 {
4648 stringstream ss;
4649 get_mon_status(f.get(), ss);
4650 }
4651
4652 f->close_section();
4653 stringstream ss;
4654 f->flush(ss);
11fdf7f2 4655 encode(ss.str(), payload);
7c673cae
FG
4656 reply->set_payload(payload);
4657 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
11fdf7f2 4658 m->get_connection()->send_message(reply);
7c673cae
FG
4659}
4660
4661void Monitor::timecheck_start()
4662{
4663 dout(10) << __func__ << dendl;
4664 timecheck_cleanup();
11fdf7f2
TL
4665 if (get_quorum_mon_features().contains_all(
4666 ceph::features::mon::FEATURE_NAUTILUS)) {
4667 timecheck_start_round();
4668 }
7c673cae
FG
4669}
4670
4671void Monitor::timecheck_finish()
4672{
4673 dout(10) << __func__ << dendl;
4674 timecheck_cleanup();
4675}
4676
4677void Monitor::timecheck_start_round()
4678{
4679 dout(10) << __func__ << " curr " << timecheck_round << dendl;
11fdf7f2 4680 ceph_assert(is_leader());
7c673cae
FG
4681
4682 if (monmap->size() == 1) {
11fdf7f2 4683 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
7c673cae
FG
4684 return;
4685 }
4686
4687 if (timecheck_round % 2) {
4688 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4689 utime_t curr_time = ceph_clock_now();
11fdf7f2 4690 double max = g_conf()->mon_timecheck_interval*3;
7c673cae
FG
4691 if (curr_time - timecheck_round_start < max) {
4692 dout(10) << __func__ << " keep current round going" << dendl;
4693 goto out;
4694 } else {
4695 dout(10) << __func__
4696 << " finish current timecheck and start new" << dendl;
4697 timecheck_cancel_round();
4698 }
4699 }
4700
11fdf7f2 4701 ceph_assert(timecheck_round % 2 == 0);
7c673cae
FG
4702 timecheck_acks = 0;
4703 timecheck_round ++;
4704 timecheck_round_start = ceph_clock_now();
4705 dout(10) << __func__ << " new " << timecheck_round << dendl;
4706
4707 timecheck();
4708out:
4709 dout(10) << __func__ << " setting up next event" << dendl;
4710 timecheck_reset_event();
4711}
4712
4713void Monitor::timecheck_finish_round(bool success)
4714{
4715 dout(10) << __func__ << " curr " << timecheck_round << dendl;
11fdf7f2 4716 ceph_assert(timecheck_round % 2);
7c673cae
FG
4717 timecheck_round ++;
4718 timecheck_round_start = utime_t();
4719
4720 if (success) {
11fdf7f2
TL
4721 ceph_assert(timecheck_waiting.empty());
4722 ceph_assert(timecheck_acks == quorum.size());
7c673cae
FG
4723 timecheck_report();
4724 timecheck_check_skews();
4725 return;
4726 }
4727
4728 dout(10) << __func__ << " " << timecheck_waiting.size()
4729 << " peers still waiting:";
11fdf7f2
TL
4730 for (auto& p : timecheck_waiting) {
4731 *_dout << " mon." << p.first;
7c673cae
FG
4732 }
4733 *_dout << dendl;
4734 timecheck_waiting.clear();
4735
4736 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4737}
4738
4739void Monitor::timecheck_cancel_round()
4740{
4741 timecheck_finish_round(false);
4742}
4743
4744void Monitor::timecheck_cleanup()
4745{
4746 timecheck_round = 0;
4747 timecheck_acks = 0;
4748 timecheck_round_start = utime_t();
4749
4750 if (timecheck_event) {
4751 timer.cancel_event(timecheck_event);
4752 timecheck_event = NULL;
4753 }
4754 timecheck_waiting.clear();
4755 timecheck_skews.clear();
4756 timecheck_latencies.clear();
4757
4758 timecheck_rounds_since_clean = 0;
4759}
4760
4761void Monitor::timecheck_reset_event()
4762{
4763 if (timecheck_event) {
4764 timer.cancel_event(timecheck_event);
4765 timecheck_event = NULL;
4766 }
4767
4768 double delay =
4769 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4770
4771 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4772 delay = cct->_conf->mon_timecheck_interval;
4773 }
4774
4775 dout(10) << __func__ << " delay " << delay
4776 << " rounds_since_clean " << timecheck_rounds_since_clean
4777 << dendl;
4778
3efd9988
FG
4779 timecheck_event = timer.add_event_after(
4780 delay,
4781 new C_MonContext(this, [this](int) {
4782 timecheck_start_round();
4783 }));
7c673cae
FG
4784}
4785
4786void Monitor::timecheck_check_skews()
4787{
4788 dout(10) << __func__ << dendl;
11fdf7f2
TL
4789 ceph_assert(is_leader());
4790 ceph_assert((timecheck_round % 2) == 0);
7c673cae 4791 if (monmap->size() == 1) {
11fdf7f2 4792 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
7c673cae
FG
4793 return;
4794 }
11fdf7f2 4795 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
7c673cae
FG
4796
4797 bool found_skew = false;
11fdf7f2 4798 for (auto& p : timecheck_skews) {
7c673cae 4799 double abs_skew;
11fdf7f2 4800 if (timecheck_has_skew(p.second, &abs_skew)) {
7c673cae 4801 dout(10) << __func__
11fdf7f2 4802 << " " << p.first << " skew " << abs_skew << dendl;
7c673cae
FG
4803 found_skew = true;
4804 }
4805 }
4806
4807 if (found_skew) {
4808 ++timecheck_rounds_since_clean;
4809 timecheck_reset_event();
4810 } else if (timecheck_rounds_since_clean > 0) {
4811 dout(1) << __func__
4812 << " no clock skews found after " << timecheck_rounds_since_clean
4813 << " rounds" << dendl;
4814 // make sure the skews are really gone and not just a transient success
4815 // this will run just once if not in the presence of skews again.
4816 timecheck_rounds_since_clean = 1;
4817 timecheck_reset_event();
4818 timecheck_rounds_since_clean = 0;
4819 }
4820
4821}
4822
4823void Monitor::timecheck_report()
4824{
4825 dout(10) << __func__ << dendl;
11fdf7f2
TL
4826 ceph_assert(is_leader());
4827 ceph_assert((timecheck_round % 2) == 0);
7c673cae 4828 if (monmap->size() == 1) {
11fdf7f2 4829 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
7c673cae
FG
4830 return;
4831 }
4832
11fdf7f2 4833 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
7c673cae
FG
4834 bool do_output = true; // only output report once
4835 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4836 if (monmap->get_name(*q) == name)
4837 continue;
4838
11fdf7f2 4839 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT);
7c673cae
FG
4840 m->epoch = get_epoch();
4841 m->round = timecheck_round;
4842
11fdf7f2
TL
4843 for (auto& it : timecheck_skews) {
4844 double skew = it.second;
4845 double latency = timecheck_latencies[it.first];
7c673cae 4846
11fdf7f2
TL
4847 m->skews[it.first] = skew;
4848 m->latencies[it.first] = latency;
7c673cae
FG
4849
4850 if (do_output) {
11fdf7f2 4851 dout(25) << __func__ << " mon." << it.first
7c673cae
FG
4852 << " latency " << latency
4853 << " skew " << skew << dendl;
4854 }
4855 }
4856 do_output = false;
11fdf7f2
TL
4857 dout(10) << __func__ << " send report to mon." << *q << dendl;
4858 send_mon_message(m, *q);
7c673cae
FG
4859 }
4860}
4861
4862void Monitor::timecheck()
4863{
4864 dout(10) << __func__ << dendl;
11fdf7f2 4865 ceph_assert(is_leader());
7c673cae 4866 if (monmap->size() == 1) {
11fdf7f2 4867 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
7c673cae
FG
4868 return;
4869 }
11fdf7f2 4870 ceph_assert(timecheck_round % 2 != 0);
7c673cae
FG
4871
4872 timecheck_acks = 1; // we ack ourselves
4873
4874 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4875 << " round " << timecheck_round << dendl;
4876
4877 // we are at the eye of the storm; the point of reference
11fdf7f2
TL
4878 timecheck_skews[rank] = 0.0;
4879 timecheck_latencies[rank] = 0.0;
7c673cae
FG
4880
4881 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4882 if (monmap->get_name(*it) == name)
4883 continue;
4884
7c673cae 4885 utime_t curr_time = ceph_clock_now();
11fdf7f2
TL
4886 timecheck_waiting[*it] = curr_time;
4887 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
7c673cae
FG
4888 m->epoch = get_epoch();
4889 m->round = timecheck_round;
11fdf7f2
TL
4890 dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
4891 send_mon_message(m, *it);
7c673cae
FG
4892 }
4893}
4894
4895health_status_t Monitor::timecheck_status(ostringstream &ss,
4896 const double skew_bound,
4897 const double latency)
4898{
4899 health_status_t status = HEALTH_OK;
11fdf7f2 4900 ceph_assert(latency >= 0);
7c673cae
FG
4901
4902 double abs_skew;
4903 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4904 status = HEALTH_WARN;
4905 ss << "clock skew " << abs_skew << "s"
11fdf7f2 4906 << " > max " << g_conf()->mon_clock_drift_allowed << "s";
7c673cae
FG
4907 }
4908
4909 return status;
4910}
4911
4912void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4913{
11fdf7f2 4914 MTimeCheck2 *m = static_cast<MTimeCheck2*>(op->get_req());
7c673cae
FG
4915 dout(10) << __func__ << " " << *m << dendl;
4916 /* handles PONG's */
11fdf7f2 4917 ceph_assert(m->op == MTimeCheck2::OP_PONG);
7c673cae 4918
11fdf7f2 4919 int other = m->get_source().num();
7c673cae
FG
4920 if (m->epoch < get_epoch()) {
4921 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4922 << " from " << other
4923 << " curr " << get_epoch()
4924 << " -- severely lagged? discard" << dendl;
4925 return;
4926 }
11fdf7f2 4927 ceph_assert(m->epoch == get_epoch());
7c673cae
FG
4928
4929 if (m->round < timecheck_round) {
4930 dout(1) << __func__ << " got old round " << m->round
4931 << " from " << other
4932 << " curr " << timecheck_round << " -- discard" << dendl;
4933 return;
4934 }
4935
4936 utime_t curr_time = ceph_clock_now();
4937
11fdf7f2 4938 ceph_assert(timecheck_waiting.count(other) > 0);
7c673cae
FG
4939 utime_t timecheck_sent = timecheck_waiting[other];
4940 timecheck_waiting.erase(other);
4941 if (curr_time < timecheck_sent) {
4942 // our clock was readjusted -- drop everything until it all makes sense.
4943 dout(1) << __func__ << " our clock was readjusted --"
4944 << " bump round and drop current check"
4945 << dendl;
4946 timecheck_cancel_round();
4947 return;
4948 }
4949
4950 /* update peer latencies */
4951 double latency = (double)(curr_time - timecheck_sent);
4952
4953 if (timecheck_latencies.count(other) == 0)
4954 timecheck_latencies[other] = latency;
4955 else {
4956 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4957 timecheck_latencies[other] = avg_latency;
4958 }
4959
4960 /*
4961 * update skews
4962 *
4963 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4964 * and 'a' happens to be lower than 'b'; so we use double instead.
4965 *
4966 * latency is always expected to be >= 0.
4967 *
4968 * delta, the difference between theirs timestamp and ours, may either be
4969 * lower or higher than 0; will hardly ever be 0.
4970 *
4971 * The absolute skew is the absolute delta minus the latency, which is
4972 * taken as a whole instead of an rtt given that there is some queueing
4973 * and dispatch times involved and it's hard to assess how long exactly
4974 * it took for the message to travel to the other side and be handled. So
4975 * we call it a bounded skew, the worst case scenario.
4976 *
4977 * Now, to math!
4978 *
4979 * Given that the latency is always positive, we can establish that the
4980 * bounded skew will be:
4981 *
4982 * 1. positive if the absolute delta is higher than the latency and
4983 * delta is positive
4984 * 2. negative if the absolute delta is higher than the latency and
4985 * delta is negative.
4986 * 3. zero if the absolute delta is lower than the latency.
4987 *
4988 * On 3. we make a judgement call and treat the skew as non-existent.
4989 * This is because that, if the absolute delta is lower than the
4990 * latency, then the apparently existing skew is nothing more than a
4991 * side-effect of the high latency at work.
4992 *
4993 * This may not be entirely true though, as a severely skewed clock
4994 * may be masked by an even higher latency, but with high latencies
4995 * we probably have worse issues to deal with than just skewed clocks.
4996 */
11fdf7f2 4997 ceph_assert(latency >= 0);
7c673cae
FG
4998
4999 double delta = ((double) m->timestamp) - ((double) curr_time);
5000 double abs_delta = (delta > 0 ? delta : -delta);
5001 double skew_bound = abs_delta - latency;
5002 if (skew_bound < 0)
5003 skew_bound = 0;
5004 else if (delta < 0)
5005 skew_bound = -skew_bound;
5006
5007 ostringstream ss;
5008 health_status_t status = timecheck_status(ss, skew_bound, latency);
b32b8144
FG
5009 if (status != HEALTH_OK) {
5010 clog->health(status) << other << " " << ss.str();
5011 }
7c673cae
FG
5012
5013 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
5014 << " delta " << delta << " skew_bound " << skew_bound
5015 << " latency " << latency << dendl;
5016
5017 timecheck_skews[other] = skew_bound;
5018
5019 timecheck_acks++;
5020 if (timecheck_acks == quorum.size()) {
5021 dout(10) << __func__ << " got pongs from everybody ("
5022 << timecheck_acks << " total)" << dendl;
11fdf7f2
TL
5023 ceph_assert(timecheck_skews.size() == timecheck_acks);
5024 ceph_assert(timecheck_waiting.empty());
7c673cae
FG
5025 // everyone has acked, so bump the round to finish it.
5026 timecheck_finish_round();
5027 }
5028}
5029
5030void Monitor::handle_timecheck_peon(MonOpRequestRef op)
5031{
11fdf7f2 5032 MTimeCheck2 *m = static_cast<MTimeCheck2*>(op->get_req());
7c673cae
FG
5033 dout(10) << __func__ << " " << *m << dendl;
5034
11fdf7f2
TL
5035 ceph_assert(is_peon());
5036 ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT);
7c673cae
FG
5037
5038 if (m->epoch != get_epoch()) {
5039 dout(1) << __func__ << " got wrong epoch "
5040 << "(ours " << get_epoch()
5041 << " theirs: " << m->epoch << ") -- discarding" << dendl;
5042 return;
5043 }
5044
5045 if (m->round < timecheck_round) {
5046 dout(1) << __func__ << " got old round " << m->round
5047 << " current " << timecheck_round
5048 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
5049 return;
5050 }
5051
5052 timecheck_round = m->round;
5053
11fdf7f2
TL
5054 if (m->op == MTimeCheck2::OP_REPORT) {
5055 ceph_assert((timecheck_round % 2) == 0);
7c673cae
FG
5056 timecheck_latencies.swap(m->latencies);
5057 timecheck_skews.swap(m->skews);
5058 return;
5059 }
5060
11fdf7f2
TL
5061 ceph_assert((timecheck_round % 2) != 0);
5062 MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG);
7c673cae
FG
5063 utime_t curr_time = ceph_clock_now();
5064 reply->timestamp = curr_time;
5065 reply->epoch = m->epoch;
5066 reply->round = m->round;
5067 dout(10) << __func__ << " send " << *m
5068 << " to " << m->get_source_inst() << dendl;
5069 m->get_connection()->send_message(reply);
5070}
5071
5072void Monitor::handle_timecheck(MonOpRequestRef op)
5073{
11fdf7f2 5074 MTimeCheck2 *m = static_cast<MTimeCheck2*>(op->get_req());
7c673cae
FG
5075 dout(10) << __func__ << " " << *m << dendl;
5076
5077 if (is_leader()) {
11fdf7f2 5078 if (m->op != MTimeCheck2::OP_PONG) {
7c673cae
FG
5079 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
5080 } else {
5081 handle_timecheck_leader(op);
5082 }
5083 } else if (is_peon()) {
11fdf7f2 5084 if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) {
7c673cae
FG
5085 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
5086 } else {
5087 handle_timecheck_peon(op);
5088 }
5089 } else {
5090 dout(1) << __func__ << " drop unexpected msg" << dendl;
5091 }
5092}
5093
5094void Monitor::handle_subscribe(MonOpRequestRef op)
5095{
5096 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
5097 dout(10) << "handle_subscribe " << *m << dendl;
5098
5099 bool reply = false;
5100
5101 MonSession *s = op->get_session();
11fdf7f2
TL
5102 ceph_assert(s);
5103
5104 if (m->hostname.size()) {
5105 s->remote_host = m->hostname;
5106 }
7c673cae
FG
5107
5108 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
5109 p != m->what.end();
5110 ++p) {
11fdf7f2
TL
5111 if (p->first == "monmap" || p->first == "config") {
5112 // these require no caps
5113 } else if (!s->is_capable("mon", MON_CAP_R)) {
5114 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
5115 << " not enough caps for " << *(op->get_req()) << " -- dropping"
5116 << dendl;
5117 continue;
5118 }
5119
7c673cae
FG
5120 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5121 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
5122 reply = true;
5123
5124 // remove conflicting subscribes
5125 if (logmon()->sub_name_to_id(p->first) >= 0) {
5126 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
5127 it != s->sub_map.end(); ) {
5128 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
11fdf7f2 5129 std::lock_guard l(session_map_lock);
7c673cae
FG
5130 session_map.remove_sub((it++)->second);
5131 } else {
5132 ++it;
5133 }
5134 }
5135 }
5136
5137 {
11fdf7f2 5138 std::lock_guard l(session_map_lock);
7c673cae
FG
5139 session_map.add_update_sub(s, p->first, p->second.start,
5140 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
5141 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
5142 }
5143
5144 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
5145 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
5146 if ((int)s->is_capable("mds", MON_CAP_R)) {
5147 Subscription *sub = s->sub_map[p->first];
11fdf7f2 5148 ceph_assert(sub != nullptr);
7c673cae
FG
5149 mdsmon()->check_sub(sub);
5150 }
5151 } else if (p->first == "osdmap") {
5152 if ((int)s->is_capable("osd", MON_CAP_R)) {
5153 if (s->osd_epoch > p->second.start) {
5154 // client needs earlier osdmaps on purpose, so reset the sent epoch
5155 s->osd_epoch = 0;
5156 }
5157 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
5158 }
5159 } else if (p->first == "osd_pg_creates") {
5160 if ((int)s->is_capable("osd", MON_CAP_W)) {
11fdf7f2 5161 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
7c673cae
FG
5162 }
5163 } else if (p->first == "monmap") {
5164 monmon()->check_sub(s->sub_map[p->first]);
5165 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
5166 logmon()->check_sub(s->sub_map[p->first]);
5167 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
5168 mgrmon()->check_sub(s->sub_map[p->first]);
224ce89b
WB
5169 } else if (p->first == "servicemap") {
5170 mgrstatmon()->check_sub(s->sub_map[p->first]);
11fdf7f2
TL
5171 } else if (p->first == "config") {
5172 configmon()->check_sub(s);
7c673cae
FG
5173 }
5174 }
5175
5176 if (reply) {
5177 // we only need to reply if the client is old enough to think it
5178 // has to send renewals.
5179 ConnectionRef con = m->get_connection();
5180 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5181 m->get_connection()->send_message(new MMonSubscribeAck(
11fdf7f2 5182 monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval));
7c673cae
FG
5183 }
5184
5185}
5186
5187void Monitor::handle_get_version(MonOpRequestRef op)
5188{
5189 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
5190 dout(10) << "handle_get_version " << *m << dendl;
5191 PaxosService *svc = NULL;
5192
5193 MonSession *s = op->get_session();
11fdf7f2 5194 ceph_assert(s);
7c673cae
FG
5195
5196 if (!is_leader() && !is_peon()) {
5197 dout(10) << " waiting for quorum" << dendl;
5198 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5199 goto out;
5200 }
5201
5202 if (m->what == "mdsmap") {
5203 svc = mdsmon();
5204 } else if (m->what == "fsmap") {
5205 svc = mdsmon();
5206 } else if (m->what == "osdmap") {
5207 svc = osdmon();
5208 } else if (m->what == "monmap") {
5209 svc = monmon();
5210 } else {
5211 derr << "invalid map type " << m->what << dendl;
5212 }
5213
5214 if (svc) {
5215 if (!svc->is_readable()) {
5216 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5217 goto out;
5218 }
5219
5220 MMonGetVersionReply *reply = new MMonGetVersionReply();
5221 reply->handle = m->handle;
5222 reply->version = svc->get_last_committed();
5223 reply->oldest_version = svc->get_first_committed();
5224 reply->set_tid(m->get_tid());
5225
5226 m->get_connection()->send_message(reply);
5227 }
5228 out:
5229 return;
5230}
5231
5232bool Monitor::ms_handle_reset(Connection *con)
5233{
5234 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5235
5236 // ignore lossless monitor sessions
5237 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5238 return false;
5239
11fdf7f2
TL
5240 auto priv = con->get_priv();
5241 auto s = static_cast<MonSession*>(priv.get());
7c673cae
FG
5242 if (!s)
5243 return false;
5244
5245 // break any con <-> session ref cycle
11fdf7f2 5246 s->con->set_priv(nullptr);
7c673cae
FG
5247
5248 if (is_shutdown())
5249 return false;
5250
11fdf7f2 5251 std::lock_guard l(lock);
7c673cae 5252
11fdf7f2
TL
5253 dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl;
5254 if (!s->closed && s->item.is_on_list()) {
5255 std::lock_guard l(session_map_lock);
7c673cae
FG
5256 remove_session(s);
5257 }
7c673cae
FG
5258 return true;
5259}
5260
5261bool Monitor::ms_handle_refused(Connection *con)
5262{
5263 // just log for now...
5264 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5265 return false;
5266}
5267
5268// -----
5269
5270void Monitor::send_latest_monmap(Connection *con)
5271{
5272 bufferlist bl;
5273 monmap->encode(bl, con->get_features());
5274 con->send_message(new MMonMap(bl));
5275}
5276
5277void Monitor::handle_mon_get_map(MonOpRequestRef op)
5278{
5279 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5280 dout(10) << "handle_mon_get_map" << dendl;
5281 send_latest_monmap(m->get_connection().get());
5282}
5283
5284void Monitor::handle_mon_metadata(MonOpRequestRef op)
5285{
5286 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5287 if (is_leader()) {
5288 dout(10) << __func__ << dendl;
5289 update_mon_metadata(m->get_source().num(), std::move(m->data));
5290 }
5291}
5292
5293void Monitor::update_mon_metadata(int from, Metadata&& m)
5294{
224ce89b
WB
5295 // NOTE: this is now for legacy (kraken or jewel) mons only.
5296 pending_metadata[from] = std::move(m);
7c673cae
FG
5297
5298 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
224ce89b 5299 bufferlist bl;
11fdf7f2 5300 encode(pending_metadata, bl);
7c673cae
FG
5301 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5302 paxos->trigger_propose();
5303}
5304
224ce89b 5305int Monitor::load_metadata()
7c673cae
FG
5306{
5307 bufferlist bl;
5308 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5309 if (r)
5310 return r;
11fdf7f2
TL
5311 auto it = bl.cbegin();
5312 decode(mon_metadata, it);
224ce89b
WB
5313
5314 pending_metadata = mon_metadata;
7c673cae
FG
5315 return 0;
5316}
5317
5318int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5319{
11fdf7f2 5320 ceph_assert(f);
224ce89b 5321 if (!mon_metadata.count(mon)) {
7c673cae
FG
5322 err << "mon." << mon << " not found";
5323 return -EINVAL;
5324 }
224ce89b 5325 const Metadata& m = mon_metadata[mon];
7c673cae
FG
5326 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5327 f->dump_string(p->first.c_str(), p->second);
5328 }
5329 return 0;
5330}
5331
c07f9fc5 5332void Monitor::count_metadata(const string& field, map<string,int> *out)
31f18b77 5333{
224ce89b 5334 for (auto& p : mon_metadata) {
31f18b77
FG
5335 auto q = p.second.find(field);
5336 if (q == p.second.end()) {
c07f9fc5 5337 (*out)["unknown"]++;
31f18b77 5338 } else {
c07f9fc5 5339 (*out)[q->second]++;
31f18b77
FG
5340 }
5341 }
c07f9fc5
FG
5342}
5343
5344void Monitor::count_metadata(const string& field, Formatter *f)
5345{
5346 map<string,int> by_val;
5347 count_metadata(field, &by_val);
31f18b77
FG
5348 f->open_object_section(field.c_str());
5349 for (auto& p : by_val) {
5350 f->dump_int(p.first.c_str(), p.second);
5351 }
5352 f->close_section();
5353}
5354
7c673cae
FG
5355int Monitor::print_nodes(Formatter *f, ostream& err)
5356{
11fdf7f2 5357 map<string, list<string> > mons; // hostname => mon
224ce89b
WB
5358 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5359 it != mon_metadata.end(); ++it) {
7c673cae
FG
5360 const Metadata& m = it->second;
5361 Metadata::const_iterator hostname = m.find("hostname");
5362 if (hostname == m.end()) {
5363 // not likely though
5364 continue;
5365 }
11fdf7f2 5366 mons[hostname->second].push_back(monmap->get_name(it->first));
7c673cae
FG
5367 }
5368
5369 dump_services(f, mons, "mon");
5370 return 0;
5371}
5372
5373// ----------------------------------------------
5374// scrub
5375
5376int Monitor::scrub_start()
5377{
5378 dout(10) << __func__ << dendl;
11fdf7f2 5379 ceph_assert(is_leader());
7c673cae
FG
5380
5381 if (!scrub_result.empty()) {
5382 clog->info() << "scrub already in progress";
5383 return -EBUSY;
5384 }
5385
5386 scrub_event_cancel();
5387 scrub_result.clear();
5388 scrub_state.reset(new ScrubState);
5389
5390 scrub();
5391 return 0;
5392}
5393
5394int Monitor::scrub()
5395{
11fdf7f2
TL
5396 ceph_assert(is_leader());
5397 ceph_assert(scrub_state);
7c673cae
FG
5398
5399 scrub_cancel_timeout();
5400 wait_for_paxos_write();
5401 scrub_version = paxos->get_version();
5402
5403
5404 // scrub all keys if we're the only monitor in the quorum
5405 int32_t num_keys =
5406 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5407
5408 for (set<int>::iterator p = quorum.begin();
5409 p != quorum.end();
5410 ++p) {
5411 if (*p == rank)
5412 continue;
5413 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5414 num_keys);
5415 r->key = scrub_state->last_key;
11fdf7f2 5416 send_mon_message(r, *p);
7c673cae
FG
5417 }
5418
5419 // scrub my keys
5420 bool r = _scrub(&scrub_result[rank],
5421 &scrub_state->last_key,
5422 &num_keys);
5423
5424 scrub_state->finished = !r;
5425
5426 // only after we got our scrub results do we really care whether the
5427 // other monitors are late on their results. Also, this way we avoid
5428 // triggering the timeout if we end up getting stuck in _scrub() for
5429 // longer than the duration of the timeout.
5430 scrub_reset_timeout();
5431
5432 if (quorum.size() == 1) {
11fdf7f2 5433 ceph_assert(scrub_state->finished == true);
7c673cae
FG
5434 scrub_finish();
5435 }
5436 return 0;
5437}
5438
5439void Monitor::handle_scrub(MonOpRequestRef op)
5440{
5441 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5442 dout(10) << __func__ << " " << *m << dendl;
5443 switch (m->op) {
5444 case MMonScrub::OP_SCRUB:
5445 {
5446 if (!is_peon())
5447 break;
5448
5449 wait_for_paxos_write();
5450
5451 if (m->version != paxos->get_version())
5452 break;
5453
5454 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5455 m->version,
5456 m->num_keys);
5457
5458 reply->key = m->key;
5459 _scrub(&reply->result, &reply->key, &reply->num_keys);
5460 m->get_connection()->send_message(reply);
5461 }
5462 break;
5463
5464 case MMonScrub::OP_RESULT:
5465 {
5466 if (!is_leader())
5467 break;
5468 if (m->version != scrub_version)
5469 break;
5470 // reset the timeout each time we get a result
5471 scrub_reset_timeout();
5472
5473 int from = m->get_source().num();
11fdf7f2 5474 ceph_assert(scrub_result.count(from) == 0);
7c673cae
FG
5475 scrub_result[from] = m->result;
5476
5477 if (scrub_result.size() == quorum.size()) {
5478 scrub_check_results();
5479 scrub_result.clear();
5480 if (scrub_state->finished)
5481 scrub_finish();
5482 else
5483 scrub();
5484 }
5485 }
5486 break;
5487 }
5488}
5489
5490bool Monitor::_scrub(ScrubResult *r,
5491 pair<string,string> *start,
5492 int *num_keys)
5493{
11fdf7f2
TL
5494 ceph_assert(r != NULL);
5495 ceph_assert(start != NULL);
5496 ceph_assert(num_keys != NULL);
7c673cae
FG
5497
5498 set<string> prefixes = get_sync_targets_names();
5499 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5500
5501 dout(10) << __func__ << " start (" << *start << ")"
5502 << " num_keys " << *num_keys << dendl;
5503
5504 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5505
5506 int scrubbed_keys = 0;
5507 pair<string,string> last_key;
5508
5509 while (it->has_next_chunk()) {
5510
5511 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5512 break;
5513
5514 pair<string,string> k = it->get_next_key();
5515 if (prefixes.count(k.first) == 0)
5516 continue;
5517
5518 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5519 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5520 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5521 << dendl;
5522 continue;
5523 }
5524
5525 bufferlist bl;
224ce89b 5526 int err = store->get(k.first, k.second, bl);
11fdf7f2 5527 ceph_assert(err == 0);
224ce89b 5528
7c673cae
FG
5529 uint32_t key_crc = bl.crc32c(0);
5530 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5531 << " crc " << key_crc << dendl;
5532 r->prefix_keys[k.first]++;
224ce89b 5533 if (r->prefix_crc.count(k.first) == 0) {
7c673cae 5534 r->prefix_crc[k.first] = 0;
224ce89b 5535 }
7c673cae
FG
5536 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5537
5538 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5539 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5540 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5541 r->prefix_crc[k.first] += 1;
5542 }
5543
5544 ++scrubbed_keys;
5545 last_key = k;
5546 }
5547
5548 dout(20) << __func__ << " last_key (" << last_key << ")"
5549 << " scrubbed_keys " << scrubbed_keys
5550 << " has_next " << it->has_next_chunk() << dendl;
5551
5552 *start = last_key;
5553 *num_keys = scrubbed_keys;
5554
5555 return it->has_next_chunk();
5556}
5557
5558void Monitor::scrub_check_results()
5559{
5560 dout(10) << __func__ << dendl;
5561
5562 // compare
5563 int errors = 0;
5564 ScrubResult& mine = scrub_result[rank];
5565 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5566 p != scrub_result.end();
5567 ++p) {
5568 if (p->first == rank)
5569 continue;
5570 if (p->second != mine) {
5571 ++errors;
5572 clog->error() << "scrub mismatch";
5573 clog->error() << " mon." << rank << " " << mine;
5574 clog->error() << " mon." << p->first << " " << p->second;
5575 }
5576 }
5577 if (!errors)
d2e6a577 5578 clog->debug() << "scrub ok on " << quorum << ": " << mine;
7c673cae
FG
5579}
5580
5581inline void Monitor::scrub_timeout()
5582{
5583 dout(1) << __func__ << " restarting scrub" << dendl;
5584 scrub_reset();
5585 scrub_start();
5586}
5587
5588void Monitor::scrub_finish()
5589{
5590 dout(10) << __func__ << dendl;
5591 scrub_reset();
5592 scrub_event_start();
5593}
5594
5595void Monitor::scrub_reset()
5596{
5597 dout(10) << __func__ << dendl;
5598 scrub_cancel_timeout();
5599 scrub_version = 0;
5600 scrub_result.clear();
5601 scrub_state.reset();
5602}
5603
5604inline void Monitor::scrub_update_interval(int secs)
5605{
5606 // we don't care about changes if we are not the leader.
5607 // changes will be visible if we become the leader.
5608 if (!is_leader())
5609 return;
5610
5611 dout(1) << __func__ << " new interval = " << secs << dendl;
5612
5613 // if scrub already in progress, all changes will already be visible during
5614 // the next round. Nothing to do.
5615 if (scrub_state != NULL)
5616 return;
5617
5618 scrub_event_cancel();
5619 scrub_event_start();
5620}
5621
5622void Monitor::scrub_event_start()
5623{
5624 dout(10) << __func__ << dendl;
5625
5626 if (scrub_event)
5627 scrub_event_cancel();
5628
5629 if (cct->_conf->mon_scrub_interval <= 0) {
5630 dout(1) << __func__ << " scrub event is disabled"
5631 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5632 << ")" << dendl;
5633 return;
5634 }
5635
3efd9988
FG
5636 scrub_event = timer.add_event_after(
5637 cct->_conf->mon_scrub_interval,
5638 new C_MonContext(this, [this](int) {
7c673cae 5639 scrub_start();
3efd9988 5640 }));
7c673cae
FG
5641}
5642
5643void Monitor::scrub_event_cancel()
5644{
5645 dout(10) << __func__ << dendl;
5646 if (scrub_event) {
5647 timer.cancel_event(scrub_event);
5648 scrub_event = NULL;
5649 }
5650}
5651
5652inline void Monitor::scrub_cancel_timeout()
5653{
5654 if (scrub_timeout_event) {
5655 timer.cancel_event(scrub_timeout_event);
5656 scrub_timeout_event = NULL;
5657 }
5658}
5659
5660void Monitor::scrub_reset_timeout()
5661{
5662 dout(15) << __func__ << " reset timeout event" << dendl;
5663 scrub_cancel_timeout();
3efd9988 5664 scrub_timeout_event = timer.add_event_after(
11fdf7f2 5665 g_conf()->mon_scrub_timeout,
3efd9988 5666 new C_MonContext(this, [this](int) {
7c673cae 5667 scrub_timeout();
3efd9988 5668 }));
7c673cae
FG
5669}
5670
5671/************ TICK ***************/
5672void Monitor::new_tick()
5673{
11fdf7f2 5674 timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext(this, [this](int) {
7c673cae
FG
5675 tick();
5676 }));
5677}
5678
5679void Monitor::tick()
5680{
5681 // ok go.
5682 dout(11) << "tick" << dendl;
181888fb 5683 const utime_t now = ceph_clock_now();
7c673cae 5684
181888fb
FG
5685 // Check if we need to emit any delayed health check updated messages
5686 if (is_leader()) {
11fdf7f2 5687 const auto min_period = g_conf().get_val<int64_t>(
181888fb
FG
5688 "mon_health_log_update_period");
5689 for (auto& svc : paxos_service) {
5690 auto health = svc->get_health_checks();
5691
5692 for (const auto &i : health.checks) {
5693 const std::string &code = i.first;
5694 const std::string &summary = i.second.summary;
5695 const health_status_t severity = i.second.severity;
5696
5697 auto status_iter = health_check_log_times.find(code);
5698 if (status_iter == health_check_log_times.end()) {
5699 continue;
5700 }
5701
5702 auto &log_status = status_iter->second;
5703 bool const changed = log_status.last_message != summary
5704 || log_status.severity != severity;
5705
5706 if (changed && now - log_status.updated_at > min_period) {
5707 log_status.last_message = summary;
5708 log_status.updated_at = now;
5709 log_status.severity = severity;
5710
5711 ostringstream ss;
5712 ss << "Health check update: " << summary << " (" << code << ")";
5713 clog->health(severity) << ss.str();
5714 }
5715 }
5716 }
5717 }
5718
5719
11fdf7f2
TL
5720 for (auto& svc : paxos_service) {
5721 svc->tick();
5722 svc->maybe_trim();
7c673cae
FG
5723 }
5724
5725 // trim sessions
7c673cae 5726 {
11fdf7f2 5727 std::lock_guard l(session_map_lock);
7c673cae
FG
5728 auto p = session_map.sessions.begin();
5729
5730 bool out_for_too_long = (!exited_quorum.is_zero() &&
11fdf7f2 5731 now > (exited_quorum + 2*g_conf()->mon_lease));
7c673cae
FG
5732
5733 while (!p.end()) {
5734 MonSession *s = *p;
5735 ++p;
5736
5737 // don't trim monitors
11fdf7f2 5738 if (s->name.is_mon())
7c673cae
FG
5739 continue;
5740
5741 if (s->session_timeout < now && s->con) {
5742 // check keepalive, too
5743 s->session_timeout = s->con->get_last_keepalive();
11fdf7f2 5744 s->session_timeout += g_conf()->mon_session_timeout;
7c673cae
FG
5745 }
5746 if (s->session_timeout < now) {
11fdf7f2
TL
5747 dout(10) << " trimming session " << s->con << " " << s->name
5748 << " " << s->addrs
7c673cae
FG
5749 << " (timeout " << s->session_timeout
5750 << " < now " << now << ")" << dendl;
5751 } else if (out_for_too_long) {
5752 // boot the client Session because we've taken too long getting back in
11fdf7f2 5753 dout(10) << " trimming session " << s->con << " " << s->name
7c673cae
FG
5754 << " because we've been out of quorum too long" << dendl;
5755 } else {
5756 continue;
5757 }
5758
5759 s->con->mark_down();
5760 remove_session(s);
5761 logger->inc(l_mon_session_trim);
5762 }
5763 }
5764 sync_trim_providers();
5765
5766 if (!maybe_wait_for_quorum.empty()) {
5767 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5768 }
5769
5770 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5771 // this is only necessary on upgraded clusters.
5772 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5773 prepare_new_fingerprint(t);
5774 paxos->trigger_propose();
5775 }
5776
11fdf7f2 5777 mgr_client.update_daemon_health(get_health_metrics());
7c673cae
FG
5778 new_tick();
5779}
5780
11fdf7f2
TL
5781vector<DaemonHealthMetric> Monitor::get_health_metrics()
5782{
5783 vector<DaemonHealthMetric> metrics;
5784
5785 utime_t oldest_secs;
5786 const utime_t now = ceph_clock_now();
5787 auto too_old = now;
5788 too_old -= g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count();
5789 int slow = 0;
5790 TrackedOpRef oldest_op;
5791 auto count_slow_ops = [&](TrackedOp& op) {
5792 if (op.get_initiated() < too_old) {
5793 slow++;
5794 if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
5795 oldest_op = &op;
5796 }
5797 return true;
5798 } else {
5799 return false;
5800 }
5801 };
5802 if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) {
5803 if (slow) {
5804 derr << __func__ << " reporting " << slow << " slow ops, oldest is "
5805 << oldest_op->get_desc() << dendl;
5806 }
5807 metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
5808 } else {
5809 metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0);
5810 }
5811 return metrics;
5812}
5813
7c673cae
FG
5814void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5815{
5816 uuid_d nf;
5817 nf.generate_random();
5818 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5819
5820 bufferlist bl;
11fdf7f2 5821 encode(nf, bl);
7c673cae
FG
5822 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5823}
5824
5825int Monitor::check_fsid()
5826{
5827 bufferlist ebl;
5828 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5829 if (r == -ENOENT)
5830 return r;
11fdf7f2 5831 ceph_assert(r == 0);
7c673cae
FG
5832
5833 string es(ebl.c_str(), ebl.length());
5834
5835 // only keep the first line
5836 size_t pos = es.find_first_of('\n');
5837 if (pos != string::npos)
5838 es.resize(pos);
5839
5840 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5841 uuid_d ondisk;
5842 if (!ondisk.parse(es.c_str())) {
5843 derr << "error: unable to parse uuid" << dendl;
5844 return -EINVAL;
5845 }
5846
5847 if (monmap->get_fsid() != ondisk) {
5848 derr << "error: cluster_uuid file exists with value " << ondisk
5849 << ", != our uuid " << monmap->get_fsid() << dendl;
5850 return -EEXIST;
5851 }
5852
5853 return 0;
5854}
5855
5856int Monitor::write_fsid()
5857{
5858 auto t(std::make_shared<MonitorDBStore::Transaction>());
5859 write_fsid(t);
5860 int r = store->apply_transaction(t);
5861 return r;
5862}
5863
5864int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5865{
5866 ostringstream ss;
5867 ss << monmap->get_fsid() << "\n";
5868 string us = ss.str();
5869
5870 bufferlist b;
5871 b.append(us);
5872
5873 t->put(MONITOR_NAME, "cluster_uuid", b);
5874 return 0;
5875}
5876
5877/*
5878 * this is the closest thing to a traditional 'mkfs' for ceph.
5879 * initialize the monitor state machines to their initial values.
5880 */
5881int Monitor::mkfs(bufferlist& osdmapbl)
5882{
5883 auto t(std::make_shared<MonitorDBStore::Transaction>());
5884
5885 // verify cluster fsid
5886 int r = check_fsid();
5887 if (r < 0 && r != -ENOENT)
5888 return r;
5889
5890 bufferlist magicbl;
5891 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5892 magicbl.append("\n");
5893 t->put(MONITOR_NAME, "magic", magicbl);
5894
5895
5896 features = get_initial_supported_features();
5897 write_features(t);
5898
5899 // save monmap, osdmap, keyring.
5900 bufferlist monmapbl;
5901 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5902 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5903 t->put("mkfs", "monmap", monmapbl);
5904
5905 if (osdmapbl.length()) {
5906 // make sure it's a valid osdmap
5907 try {
5908 OSDMap om;
5909 om.decode(osdmapbl);
5910 }
5911 catch (buffer::error& e) {
5912 derr << "error decoding provided osdmap: " << e.what() << dendl;
5913 return -EINVAL;
5914 }
5915 t->put("mkfs", "osdmap", osdmapbl);
5916 }
5917
5918 if (is_keyring_required()) {
5919 KeyRing keyring;
5920 string keyring_filename;
5921
11fdf7f2 5922 r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename);
7c673cae 5923 if (r) {
11fdf7f2 5924 derr << "unable to find a keyring file on " << g_conf()->keyring
7c673cae 5925 << ": " << cpp_strerror(r) << dendl;
11fdf7f2
TL
5926 if (g_conf()->key != "") {
5927 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key +
7c673cae
FG
5928 "\n\tcaps mon = \"allow *\"\n";
5929 bufferlist bl;
5930 bl.append(keyring_plaintext);
5931 try {
11fdf7f2 5932 auto i = bl.cbegin();
7c673cae
FG
5933 keyring.decode_plaintext(i);
5934 }
5935 catch (const buffer::error& e) {
5936 derr << "error decoding keyring " << keyring_plaintext
5937 << ": " << e.what() << dendl;
5938 return -EINVAL;
5939 }
5940 } else {
5941 return -ENOENT;
5942 }
5943 } else {
5944 r = keyring.load(g_ceph_context, keyring_filename);
5945 if (r < 0) {
11fdf7f2 5946 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
7c673cae
FG
5947 return r;
5948 }
5949 }
5950
5951 // put mon. key in external keyring; seed with everything else.
5952 extract_save_mon_key(keyring);
5953
5954 bufferlist keyringbl;
5955 keyring.encode_plaintext(keyringbl);
5956 t->put("mkfs", "keyring", keyringbl);
5957 }
5958 write_fsid(t);
5959 store->apply_transaction(t);
5960
5961 return 0;
5962}
5963
5964int Monitor::write_default_keyring(bufferlist& bl)
5965{
5966 ostringstream os;
11fdf7f2 5967 os << g_conf()->mon_data << "/keyring";
7c673cae
FG
5968
5969 int err = 0;
91327a77 5970 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
7c673cae
FG
5971 if (fd < 0) {
5972 err = -errno;
5973 dout(0) << __func__ << " failed to open " << os.str()
5974 << ": " << cpp_strerror(err) << dendl;
5975 return err;
5976 }
5977
5978 err = bl.write_fd(fd);
5979 if (!err)
5980 ::fsync(fd);
5981 VOID_TEMP_FAILURE_RETRY(::close(fd));
5982
5983 return err;
5984}
5985
5986void Monitor::extract_save_mon_key(KeyRing& keyring)
5987{
5988 EntityName mon_name;
5989 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5990 EntityAuth mon_key;
5991 if (keyring.get_auth(mon_name, mon_key)) {
5992 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5993 KeyRing pkey;
5994 pkey.add(mon_name, mon_key);
5995 bufferlist bl;
5996 pkey.encode_plaintext(bl);
5997 write_default_keyring(bl);
5998 keyring.remove(mon_name);
5999 }
6000}
6001
11fdf7f2
TL
6002// AuthClient methods -- for mon <-> mon communication
6003int Monitor::get_auth_request(
6004 Connection *con,
6005 AuthConnectionMeta *auth_meta,
6006 uint32_t *method,
6007 vector<uint32_t> *preferred_modes,
6008 bufferlist *out)
6009{
6010 std::scoped_lock l(auth_lock);
6011 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
6012 con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
6013 return -EACCES;
6014 }
6015 AuthAuthorizer *auth;
6016 if (!ms_get_authorizer(con->get_peer_type(), &auth)) {
6017 return -EACCES;
6018 }
6019 auth_meta->authorizer.reset(auth);
6020 auth_registry.get_supported_modes(con->get_peer_type(),
6021 auth->protocol,
6022 preferred_modes);
6023 *method = auth->protocol;
6024 *out = auth->bl;
6025 return 0;
6026}
6027
6028int Monitor::handle_auth_reply_more(
6029 Connection *con,
6030 AuthConnectionMeta *auth_meta,
6031 const bufferlist& bl,
6032 bufferlist *reply)
6033{
6034 std::scoped_lock l(auth_lock);
6035 if (!auth_meta->authorizer) {
6036 derr << __func__ << " no authorizer?" << dendl;
6037 return -EACCES;
6038 }
6039 auth_meta->authorizer->add_challenge(cct, bl);
6040 *reply = auth_meta->authorizer->bl;
6041 return 0;
6042}
6043
6044int Monitor::handle_auth_done(
6045 Connection *con,
6046 AuthConnectionMeta *auth_meta,
6047 uint64_t global_id,
6048 uint32_t con_mode,
6049 const bufferlist& bl,
6050 CryptoKey *session_key,
6051 std::string *connection_secret)
6052{
6053 std::scoped_lock l(auth_lock);
6054 // verify authorizer reply
6055 auto p = bl.begin();
6056 if (!auth_meta->authorizer->verify_reply(p, connection_secret)) {
6057 dout(0) << __func__ << " failed verifying authorizer reply" << dendl;
6058 return -EACCES;
6059 }
6060 auth_meta->session_key = auth_meta->authorizer->session_key;
6061 return 0;
6062}
6063
6064int Monitor::handle_auth_bad_method(
6065 Connection *con,
6066 AuthConnectionMeta *auth_meta,
6067 uint32_t old_auth_method,
6068 int result,
6069 const std::vector<uint32_t>& allowed_methods,
6070 const std::vector<uint32_t>& allowed_modes)
6071{
6072 derr << __func__ << " hmm, they didn't like " << old_auth_method
6073 << " result " << cpp_strerror(result) << dendl;
6074 return -EACCES;
6075}
6076
6077bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer)
7c673cae
FG
6078{
6079 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
6080 << dendl;
6081
6082 if (is_shutdown())
6083 return false;
6084
6085 // we only connect to other monitors and mgr; every else connects to us.
6086 if (service_id != CEPH_ENTITY_TYPE_MON &&
6087 service_id != CEPH_ENTITY_TYPE_MGR)
6088 return false;
6089
6090 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
6091 // auth_none
6092 dout(20) << __func__ << " building auth_none authorizer" << dendl;
11fdf7f2 6093 AuthNoneClientHandler handler{g_ceph_context};
7c673cae
FG
6094 handler.set_global_id(0);
6095 *authorizer = handler.build_authorizer(service_id);
6096 return true;
6097 }
6098
6099 CephXServiceTicketInfo auth_ticket_info;
6100 CephXSessionAuthInfo info;
6101 int ret;
6102
6103 EntityName name;
6104 name.set_type(CEPH_ENTITY_TYPE_MON);
6105 auth_ticket_info.ticket.name = name;
6106 auth_ticket_info.ticket.global_id = 0;
6107
6108 if (service_id == CEPH_ENTITY_TYPE_MON) {
6109 // mon to mon authentication uses the private monitor shared key and not the
6110 // rotating key
6111 CryptoKey secret;
6112 if (!keyring.get_secret(name, secret) &&
6113 !key_server.get_secret(name, secret)) {
6114 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6115 << dendl;
6116 stringstream ss, ds;
6117 int err = key_server.list_secrets(ds);
6118 if (err < 0)
6119 ss << "no installed auth entries!";
6120 else
6121 ss << "installed auth entries:";
6122 dout(0) << ss.str() << "\n" << ds.str() << dendl;
6123 return false;
6124 }
6125
11fdf7f2
TL
6126 ret = key_server.build_session_auth_info(
6127 service_id, auth_ticket_info.ticket, info, secret, (uint64_t)-1);
7c673cae
FG
6128 if (ret < 0) {
6129 dout(0) << __func__ << " failed to build mon session_auth_info "
6130 << cpp_strerror(ret) << dendl;
6131 return false;
6132 }
6133 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
6134 // mgr
11fdf7f2
TL
6135 ret = key_server.build_session_auth_info(
6136 service_id, auth_ticket_info.ticket, info);
7c673cae
FG
6137 if (ret < 0) {
6138 derr << __func__ << " failed to build mgr service session_auth_info "
6139 << cpp_strerror(ret) << dendl;
6140 return false;
6141 }
6142 } else {
6143 ceph_abort(); // see check at top of fn
6144 }
6145
6146 CephXTicketBlob blob;
6147 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
6148 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
6149 return false;
6150 }
6151 bufferlist ticket_data;
11fdf7f2 6152 encode(blob, ticket_data);
7c673cae 6153
11fdf7f2 6154 auto iter = ticket_data.cbegin();
7c673cae 6155 CephXTicketHandler handler(g_ceph_context, service_id);
11fdf7f2 6156 decode(handler.ticket, iter);
7c673cae
FG
6157
6158 handler.session_key = info.session_key;
6159
6160 *authorizer = handler.build_authorizer(0);
6161
6162 return true;
6163}
6164
11fdf7f2 6165KeyStore *Monitor::ms_get_auth1_authorizer_keystore()
7c673cae 6166{
11fdf7f2
TL
6167 return &keyring;
6168}
7c673cae 6169
11fdf7f2
TL
6170int Monitor::handle_auth_request(
6171 Connection *con,
6172 AuthConnectionMeta *auth_meta,
6173 bool more,
6174 uint32_t auth_method,
6175 const bufferlist &payload,
6176 bufferlist *reply)
6177{
6178 std::scoped_lock l(auth_lock);
7c673cae 6179
11fdf7f2
TL
6180 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6181 // e.g., peer_features, peer_addrs, and others are still unknown.
6182
6183 dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)")
6184 << " method " << auth_method
6185 << " payload " << payload.length()
6186 << dendl;
6187 if (!more) {
6188 auth_meta->auth_mode = payload[0];
6189 }
6190
6191 if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER &&
6192 auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) {
6193 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
6194 auth_method);
6195 if (!ah) {
6196 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
6197 << auth_method << dendl;
6198 return -EOPNOTSUPP;
6199 }
6200 bool was_challenge = (bool)auth_meta->authorizer_challenge;
6201 bool isvalid = ah->verify_authorizer(
6202 cct,
6203 &keyring,
6204 payload,
6205 auth_meta->get_connection_secret_length(),
6206 reply,
6207 &con->peer_name,
6208 &con->peer_global_id,
6209 &con->peer_caps_info,
6210 &auth_meta->session_key,
6211 &auth_meta->connection_secret,
6212 &auth_meta->authorizer_challenge);
6213 if (isvalid) {
6214 ms_handle_authentication(con);
6215 return 1;
6216 }
6217 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
6218 return 0;
6219 }
6220 dout(10) << __func__ << " bad authorizer on " << con << dendl;
6221 return -EACCES;
6222 } else if (auth_meta->auth_mode < AUTH_MODE_MON &&
6223 auth_meta->auth_mode > AUTH_MODE_MON_MAX) {
6224 derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode
6225 << dendl;
6226 return -EACCES;
6227 }
6228
6229 // wait until we've formed an initial quorum on mkfs so that we have
6230 // the initial keys (e.g., client.admin).
6231 if (authmon()->get_last_committed() == 0) {
6232 dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl;
6233 return -EBUSY;
6234 }
6235
6236 RefCountedPtr priv;
6237 MonSession *s;
6238 int32_t r = 0;
6239 auto p = payload.begin();
6240 if (!more) {
6241 if (con->get_priv()) {
6242 return -EACCES; // wtf
6243 }
6244
6245 // handler?
6246 unique_ptr<AuthServiceHandler> auth_handler{get_auth_service_handler(
6247 auth_method, g_ceph_context, &key_server)};
6248 if (!auth_handler) {
6249 dout(1) << __func__ << " auth_method " << auth_method << " not supported"
6250 << dendl;
6251 return -EOPNOTSUPP;
6252 }
6253
6254 uint8_t mode;
6255 EntityName entity_name;
6256
6257 try {
6258 decode(mode, p);
6259 if (mode < AUTH_MODE_MON ||
6260 mode > AUTH_MODE_MON_MAX) {
6261 dout(1) << __func__ << " invalid mode " << (int)mode << dendl;
6262 return -EACCES;
6263 }
6264 assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX);
6265 decode(entity_name, p);
6266 decode(con->peer_global_id, p);
6267 } catch (buffer::error& e) {
6268 dout(1) << __func__ << " failed to decode, " << e.what() << dendl;
6269 return -EACCES;
6270 }
6271
6272 // supported method?
6273 if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
6274 entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
6275 entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
6276 entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) {
6277 if (!auth_cluster_required.is_supported_auth(auth_method)) {
6278 dout(10) << __func__ << " entity " << entity_name << " method "
6279 << auth_method << " not among supported "
6280 << auth_cluster_required.get_supported_set() << dendl;
6281 return -EOPNOTSUPP;
7c673cae
FG
6282 }
6283 } else {
11fdf7f2
TL
6284 if (!auth_service_required.is_supported_auth(auth_method)) {
6285 dout(10) << __func__ << " entity " << entity_name << " method "
6286 << auth_method << " not among supported "
6287 << auth_cluster_required.get_supported_set() << dendl;
6288 return -EOPNOTSUPP;
6289 }
6290 }
6291
6292 // for msgr1 we would do some weirdness here to ensure signatures
6293 // are supported by the client if we require it. for msgr2 that
6294 // is not necessary.
6295
6296 if (!con->peer_global_id) {
6297 con->peer_global_id = authmon()->_assign_global_id();
6298 if (!con->peer_global_id) {
6299 dout(1) << __func__ << " failed to assign global_id" << dendl;
6300 return -EBUSY;
6301 }
6302 dout(10) << __func__ << " assigned global_id " << con->peer_global_id
6303 << dendl;
6304 }
6305
6306 // set up partial session
6307 s = new MonSession(con);
6308 s->auth_handler = auth_handler.release();
6309 con->set_priv(RefCountedPtr{s, false});
6310
6311 r = s->auth_handler->start_session(
6312 entity_name,
6313 auth_meta->get_connection_secret_length(),
6314 reply,
6315 &con->peer_caps_info,
6316 &auth_meta->session_key,
6317 &auth_meta->connection_secret);
6318 } else {
6319 priv = con->get_priv();
6320 if (!priv) {
6321 // this can happen if the async ms_handle_reset event races with
6322 // the unlocked call into handle_auth_request
6323 return -EACCES;
7c673cae 6324 }
11fdf7f2
TL
6325 s = static_cast<MonSession*>(priv.get());
6326 r = s->auth_handler->handle_request(
6327 p,
6328 auth_meta->get_connection_secret_length(),
6329 reply,
6330 &con->peer_global_id,
6331 &con->peer_caps_info,
6332 &auth_meta->session_key,
6333 &auth_meta->connection_secret);
6334 }
6335 if (r > 0 &&
6336 !s->authenticated) {
6337 ms_handle_authentication(con);
6338 }
6339
6340 dout(30) << " r " << r << " reply:\n";
6341 reply->hexdump(*_dout);
6342 *_dout << dendl;
6343 return r;
6344}
6345
6346void Monitor::ms_handle_accept(Connection *con)
6347{
6348 auto priv = con->get_priv();
6349 MonSession *s = static_cast<MonSession*>(priv.get());
6350 if (!s) {
6351 // legacy protocol v1?
6352 dout(10) << __func__ << " con " << con << " no session" << dendl;
6353 return;
6354 }
6355
6356 if (s->item.is_on_list()) {
6357 dout(10) << __func__ << " con " << con << " session " << s
6358 << " already on list" << dendl;
7c673cae 6359 } else {
11fdf7f2
TL
6360 dout(10) << __func__ << " con " << con << " session " << s
6361 << " registering session for "
6362 << con->get_peer_addrs() << dendl;
6363 s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()),
6364 con->get_peer_addrs());
6365 std::lock_guard l(session_map_lock);
6366 session_map.add_session(s);
7c673cae 6367 }
11fdf7f2
TL
6368}
6369
6370int Monitor::ms_handle_authentication(Connection *con)
6371{
6372 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
6373 // mon <-> mon connections need no Session, and setting one up
6374 // creates an awkward ref cycle between Session and Connection.
6375 return 1;
6376 }
6377
6378 auto priv = con->get_priv();
6379 MonSession *s = static_cast<MonSession*>(priv.get());
6380 if (!s) {
6381 // must be msgr2, otherwise dispatch would have set up the session.
6382 s = session_map.new_session(
6383 entity_name_t(con->get_peer_type(), -1), // we don't know yet
6384 con->get_peer_addrs(),
6385 con);
6386 assert(s);
6387 dout(10) << __func__ << " adding session " << s << " to con " << con
6388 << dendl;
6389 con->set_priv(s);
6390 logger->set(l_mon_num_sessions, session_map.get_size());
6391 logger->inc(l_mon_session_add);
6392 }
6393 dout(10) << __func__ << " session " << s << " con " << con
6394 << " addr " << s->con->get_peer_addr()
6395 << " " << *s << dendl;
6396
6397 AuthCapsInfo &caps_info = con->get_peer_caps_info();
6398 int ret = 0;
6399 if (caps_info.allow_all) {
6400 s->caps.set_allow_all();
6401 s->authenticated = true;
6402 ret = 1;
6403 }
6404 if (caps_info.caps.length()) {
6405 bufferlist::const_iterator p = caps_info.caps.cbegin();
6406 string str;
6407 try {
6408 decode(str, p);
6409 } catch (const buffer::error &err) {
6410 derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name()
6411 << " in auth db" << dendl;
6412 str.clear();
6413 ret = -EPERM;
6414 }
6415 if (ret >= 0) {
6416 if (s->caps.parse(str, NULL)) {
6417 s->authenticated = true;
6418 ret = 1;
6419 } else {
6420 derr << __func__ << " unparseable caps '" << str << "' for "
6421 << con->get_peer_entity_name() << dendl;
6422 ret = -EPERM;
6423 }
6424 }
6425 }
6426
6427 return ret;
7c673cae 6428}