]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Monitor.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <sstream>
17 #include <stdlib.h>
18 #include <signal.h>
19 #include <limits.h>
20 #include <cstring>
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
23
24 #include "Monitor.h"
25 #include "common/version.h"
26
27 #include "osd/OSDMap.h"
28
29 #include "MonitorDBStore.h"
30
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
48 #include "messages/MStatfs.h"
49
50 #include "messages/MMonSubscribe.h"
51 #include "messages/MMonSubscribeAck.h"
52
53 #include "messages/MAuthReply.h"
54
55 #include "messages/MTimeCheck.h"
56 #include "messages/MPing.h"
57
58 #include "common/strtol.h"
59 #include "common/ceph_argparse.h"
60 #include "common/Timer.h"
61 #include "common/Clock.h"
62 #include "common/errno.h"
63 #include "common/perf_counters.h"
64 #include "common/admin_socket.h"
65 #include "global/signal_handler.h"
66 #include "common/Formatter.h"
67 #include "include/stringify.h"
68 #include "include/color.h"
69 #include "include/ceph_fs.h"
70 #include "include/str_list.h"
71
72 #include "OSDMonitor.h"
73 #include "MDSMonitor.h"
74 #include "MonmapMonitor.h"
75 #include "PGMonitor.h"
76 #include "LogMonitor.h"
77 #include "AuthMonitor.h"
78 #include "MgrMonitor.h"
79 #include "MgrStatMonitor.h"
80 #include "mon/QuorumService.h"
81 #include "mon/OldHealthMonitor.h"
82 #include "mon/HealthMonitor.h"
83 #include "mon/ConfigKeyService.h"
84 #include "common/config.h"
85 #include "common/cmdparse.h"
86 #include "include/assert.h"
87 #include "include/compat.h"
88 #include "perfglue/heap_profiler.h"
89
90 #include "auth/none/AuthNoneClientHandler.h"
91
92 #define dout_subsys ceph_subsys_mon
93 #undef dout_prefix
94 #define dout_prefix _prefix(_dout, this)
95 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
96 return *_dout << "mon." << mon->name << "@" << mon->rank
97 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
98 }
99
100 const string Monitor::MONITOR_NAME = "monitor";
101 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
102
103
104 #undef FLAG
105 #undef COMMAND
106 #undef COMMAND_WITH_FLAG
107 #define FLAG(f) (MonCommand::FLAG_##f)
108 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
109 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
110 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
111 {parsesig, helptext, modulename, req_perms, avail, flags},
112 MonCommand mon_commands[] = {
113 #include <mon/MonCommands.h>
114 };
115 MonCommand pgmonitor_commands[] = {
116 #include <mon/PGMonitorCommands.h>
117 };
118 #undef COMMAND
119 #undef COMMAND_WITH_FLAG
120
121
122 void C_MonContext::finish(int r) {
123 if (mon->is_shutdown())
124 return;
125 FunctionContext::finish(r);
126 }
127
128 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
129 Messenger *m, Messenger *mgr_m, MonMap *map) :
130 Dispatcher(cct_),
131 name(nm),
132 rank(-1),
133 messenger(m),
134 con_self(m ? m->get_loopback_connection() : NULL),
135 lock("Monitor::lock"),
136 timer(cct_, lock),
137 finisher(cct_, "mon_finisher", "fin"),
138 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
139 has_ever_joined(false),
140 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
141 monmap(map),
142 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
143 key_server(cct, &keyring),
144 auth_cluster_required(cct,
145 cct->_conf->auth_supported.empty() ?
146 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
147 auth_service_required(cct,
148 cct->_conf->auth_supported.empty() ?
149 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
150 mgr_messenger(mgr_m),
151 mgr_client(cct_, mgr_m),
152 pgservice(nullptr),
153 store(s),
154
155 state(STATE_PROBING),
156
157 elector(this),
158 required_features(0),
159 leader(0),
160 quorum_con_features(0),
161 // scrub
162 scrub_version(0),
163 scrub_event(NULL),
164 scrub_timeout_event(NULL),
165
166 // sync state
167 sync_provider_count(0),
168 sync_cookie(0),
169 sync_full(false),
170 sync_start_version(0),
171 sync_timeout_event(NULL),
172 sync_last_committed_floor(0),
173
174 timecheck_round(0),
175 timecheck_acks(0),
176 timecheck_rounds_since_clean(0),
177 timecheck_event(NULL),
178
179 paxos_service(PAXOS_NUM),
180 admin_hook(NULL),
181 routed_request_tid(0),
182 op_tracker(cct, true, 1)
183 {
184 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
185 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
186
187 update_log_clients();
188
189 paxos = new Paxos(this, "paxos");
190
191 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
192 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
193 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
194 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
195 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
196 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
197 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
198 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
199 paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
200
201 health_monitor = new OldHealthMonitor(this);
202 config_key_service = new ConfigKeyService(this, paxos);
203
204 mon_caps = new MonCap();
205 bool r = mon_caps->parse("allow *", NULL);
206 assert(r);
207
208 exited_quorum = ceph_clock_now();
209
210 // prepare local commands
211 local_mon_commands.resize(ARRAY_SIZE(mon_commands));
212 for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
213 local_mon_commands[i] = mon_commands[i];
214 }
215 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
216
217 local_upgrading_mon_commands = local_mon_commands;
218 for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
219 local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
220 }
221 MonCommand::encode_vector(local_upgrading_mon_commands,
222 local_upgrading_mon_commands_bl);
223
224 // assume our commands until we have an election. this only means
225 // we won't reply with EINVAL before the election; any command that
226 // actually matters will wait until we have quorum etc and then
227 // retry (and revalidate).
228 leader_mon_commands = local_mon_commands;
229
230 // note: OSDMonitor may update this based on the luminous flag.
231 pgservice = mgrstatmon()->get_pg_stat_service();
232 }
233
234 Monitor::~Monitor()
235 {
236 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
237 delete *p;
238 delete health_monitor;
239 delete config_key_service;
240 delete paxos;
241 assert(session_map.sessions.empty());
242 delete mon_caps;
243 }
244
245
246 class AdminHook : public AdminSocketHook {
247 Monitor *mon;
248 public:
249 explicit AdminHook(Monitor *m) : mon(m) {}
250 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
251 bufferlist& out) override {
252 stringstream ss;
253 mon->do_admin_command(command, cmdmap, format, ss);
254 out.append(ss);
255 return true;
256 }
257 };
258
259 void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
260 ostream& ss)
261 {
262 Mutex::Locker l(lock);
263
264 boost::scoped_ptr<Formatter> f(Formatter::create(format));
265
266 string args;
267 for (cmdmap_t::iterator p = cmdmap.begin();
268 p != cmdmap.end(); ++p) {
269 if (p->first == "prefix")
270 continue;
271 if (!args.empty())
272 args += ", ";
273 args += cmd_vartype_stringify(p->second);
274 }
275 args = "[" + args + "]";
276
277 bool read_only = (command == "mon_status" ||
278 command == "mon metadata" ||
279 command == "quorum_status" ||
280 command == "ops" ||
281 command == "sessions");
282
283 (read_only ? audit_clog->debug() : audit_clog->info())
284 << "from='admin socket' entity='admin socket' "
285 << "cmd='" << command << "' args=" << args << ": dispatch";
286
287 if (command == "mon_status") {
288 get_mon_status(f.get(), ss);
289 if (f)
290 f->flush(ss);
291 } else if (command == "quorum_status") {
292 _quorum_status(f.get(), ss);
293 } else if (command == "sync_force") {
294 string validate;
295 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
296 (validate != "--yes-i-really-mean-it")) {
297 ss << "are you SURE? this will mean the monitor store will be erased "
298 "the next time the monitor is restarted. pass "
299 "'--yes-i-really-mean-it' if you really do.";
300 goto abort;
301 }
302 sync_force(f.get(), ss);
303 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
304 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
305 goto abort;
306 } else if (command == "quorum enter") {
307 elector.start_participating();
308 start_election();
309 ss << "started responding to quorum, initiated new election";
310 } else if (command == "quorum exit") {
311 start_election();
312 elector.stop_participating();
313 ss << "stopped responding to quorum, initiated new election";
314 } else if (command == "ops") {
315 (void)op_tracker.dump_ops_in_flight(f.get());
316 if (f) {
317 f->flush(ss);
318 }
319 } else if (command == "sessions") {
320
321 if (f) {
322 f->open_array_section("sessions");
323 for (auto p : session_map.sessions) {
324 f->dump_stream("session") << *p;
325 }
326 f->close_section();
327 f->flush(ss);
328 }
329
330 } else {
331 assert(0 == "bad AdminSocket command binding");
332 }
333 (read_only ? audit_clog->debug() : audit_clog->info())
334 << "from='admin socket' "
335 << "entity='admin socket' "
336 << "cmd=" << command << " "
337 << "args=" << args << ": finished";
338 return;
339
340 abort:
341 (read_only ? audit_clog->debug() : audit_clog->info())
342 << "from='admin socket' "
343 << "entity='admin socket' "
344 << "cmd=" << command << " "
345 << "args=" << args << ": aborted";
346 }
347
348 void Monitor::handle_signal(int signum)
349 {
350 assert(signum == SIGINT || signum == SIGTERM);
351 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
352 shutdown();
353 }
354
355 CompatSet Monitor::get_initial_supported_features()
356 {
357 CompatSet::FeatureSet ceph_mon_feature_compat;
358 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
359 CompatSet::FeatureSet ceph_mon_feature_incompat;
360 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
361 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
362 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
363 ceph_mon_feature_incompat);
364 }
365
366 CompatSet Monitor::get_supported_features()
367 {
368 CompatSet compat = get_initial_supported_features();
369 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
370 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
371 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
372 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
373 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
374 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
375 return compat;
376 }
377
378 CompatSet Monitor::get_legacy_features()
379 {
380 CompatSet::FeatureSet ceph_mon_feature_compat;
381 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
382 CompatSet::FeatureSet ceph_mon_feature_incompat;
383 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
384 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
385 ceph_mon_feature_incompat);
386 }
387
388 int Monitor::check_features(MonitorDBStore *store)
389 {
390 CompatSet required = get_supported_features();
391 CompatSet ondisk;
392
393 read_features_off_disk(store, &ondisk);
394
395 if (!required.writeable(ondisk)) {
396 CompatSet diff = required.unsupported(ondisk);
397 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
398 return -EPERM;
399 }
400
401 return 0;
402 }
403
404 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
405 {
406 bufferlist featuresbl;
407 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
408 if (featuresbl.length() == 0) {
409 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
410 << "Assuming it is old-style and introducing one." << dendl;
411 //we only want the baseline ~v.18 features assumed to be on disk.
412 //If new features are introduced this code needs to disappear or
413 //be made smarter.
414 *features = get_legacy_features();
415
416 features->encode(featuresbl);
417 auto t(std::make_shared<MonitorDBStore::Transaction>());
418 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
419 store->apply_transaction(t);
420 } else {
421 bufferlist::iterator it = featuresbl.begin();
422 features->decode(it);
423 }
424 }
425
426 void Monitor::read_features()
427 {
428 read_features_off_disk(store, &features);
429 dout(10) << "features " << features << dendl;
430
431 calc_quorum_requirements();
432 dout(10) << "required_features " << required_features << dendl;
433 }
434
435 void Monitor::write_features(MonitorDBStore::TransactionRef t)
436 {
437 bufferlist bl;
438 features.encode(bl);
439 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
440 }
441
442 const char** Monitor::get_tracked_conf_keys() const
443 {
444 static const char* KEYS[] = {
445 "crushtool", // helpful for testing
446 "mon_election_timeout",
447 "mon_lease",
448 "mon_lease_renew_interval_factor",
449 "mon_lease_ack_timeout_factor",
450 "mon_accept_timeout_factor",
451 // clog & admin clog
452 "clog_to_monitors",
453 "clog_to_syslog",
454 "clog_to_syslog_facility",
455 "clog_to_syslog_level",
456 "clog_to_graylog",
457 "clog_to_graylog_host",
458 "clog_to_graylog_port",
459 "host",
460 "fsid",
461 // periodic health to clog
462 "mon_health_to_clog",
463 "mon_health_to_clog_interval",
464 "mon_health_to_clog_tick_interval",
465 // scrub interval
466 "mon_scrub_interval",
467 NULL
468 };
469 return KEYS;
470 }
471
472 void Monitor::handle_conf_change(const struct md_config_t *conf,
473 const std::set<std::string> &changed)
474 {
475 sanitize_options();
476
477 dout(10) << __func__ << " " << changed << dendl;
478
479 if (changed.count("clog_to_monitors") ||
480 changed.count("clog_to_syslog") ||
481 changed.count("clog_to_syslog_level") ||
482 changed.count("clog_to_syslog_facility") ||
483 changed.count("clog_to_graylog") ||
484 changed.count("clog_to_graylog_host") ||
485 changed.count("clog_to_graylog_port") ||
486 changed.count("host") ||
487 changed.count("fsid")) {
488 update_log_clients();
489 }
490
491 if (changed.count("mon_health_to_clog") ||
492 changed.count("mon_health_to_clog_interval") ||
493 changed.count("mon_health_to_clog_tick_interval")) {
494 health_to_clog_update_conf(changed);
495 }
496
497 if (changed.count("mon_scrub_interval")) {
498 scrub_update_interval(conf->mon_scrub_interval);
499 }
500 }
501
502 void Monitor::update_log_clients()
503 {
504 map<string,string> log_to_monitors;
505 map<string,string> log_to_syslog;
506 map<string,string> log_channel;
507 map<string,string> log_prio;
508 map<string,string> log_to_graylog;
509 map<string,string> log_to_graylog_host;
510 map<string,string> log_to_graylog_port;
511 uuid_d fsid;
512 string host;
513
514 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
515 log_channel, log_prio, log_to_graylog,
516 log_to_graylog_host, log_to_graylog_port,
517 fsid, host))
518 return;
519
520 clog->update_config(log_to_monitors, log_to_syslog,
521 log_channel, log_prio, log_to_graylog,
522 log_to_graylog_host, log_to_graylog_port,
523 fsid, host);
524
525 audit_clog->update_config(log_to_monitors, log_to_syslog,
526 log_channel, log_prio, log_to_graylog,
527 log_to_graylog_host, log_to_graylog_port,
528 fsid, host);
529 }
530
531 int Monitor::sanitize_options()
532 {
533 int r = 0;
534
535 // mon_lease must be greater than mon_lease_renewal; otherwise we
536 // may incur in leases expiring before they are renewed.
537 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
538 clog->error() << "mon_lease_renew_interval_factor ("
539 << g_conf->mon_lease_renew_interval_factor
540 << ") must be less than 1.0";
541 r = -EINVAL;
542 }
543
544 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
545 // got time to renew the lease and get an ack for it. Having both options
546 // with the same value, for a given small vale, could mean timing out if
547 // the monitors happened to be overloaded -- or even under normal load for
548 // a small enough value.
549 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
550 clog->error() << "mon_lease_ack_timeout_factor ("
551 << g_conf->mon_lease_ack_timeout_factor
552 << ") must be greater than 1.0";
553 r = -EINVAL;
554 }
555
556 return r;
557 }
558
559 int Monitor::preinit()
560 {
561 lock.Lock();
562
563 dout(1) << "preinit fsid " << monmap->fsid << dendl;
564
565 int r = sanitize_options();
566 if (r < 0) {
567 derr << "option sanitization failed!" << dendl;
568 lock.Unlock();
569 return r;
570 }
571
572 assert(!logger);
573 {
574 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
575 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
576 PerfCountersBuilder::PRIO_USEFUL);
577 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
578 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
579 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
580 "srm", PerfCountersBuilder::PRIO_INTERESTING);
581 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
582 "strm", PerfCountersBuilder::PRIO_USEFUL);
583 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
584 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
585 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
586 "estt", PerfCountersBuilder::PRIO_INTERESTING);
587 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
588 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
589 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
590 "elst", PerfCountersBuilder::PRIO_INTERESTING);
591 logger = pcb.create_perf_counters();
592 cct->get_perfcounters_collection()->add(logger);
593 }
594
595 assert(!cluster_logger);
596 {
597 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
598 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
599 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
600 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
601 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
602 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
603 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
604 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
605 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
606 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
607 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
608 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
609 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
610 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
611 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
612 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
613 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
614 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
615 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
616 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
617 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
618 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
619 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
620 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
621 cluster_logger = pcb.create_perf_counters();
622 }
623
624 paxos->init_logger();
625
626 // verify cluster_uuid
627 {
628 int r = check_fsid();
629 if (r == -ENOENT)
630 r = write_fsid();
631 if (r < 0) {
632 lock.Unlock();
633 return r;
634 }
635 }
636
637 // open compatset
638 read_features();
639
640 // have we ever joined a quorum?
641 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
642 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
643
644 if (!has_ever_joined) {
645 // impose initial quorum restrictions?
646 list<string> initial_members;
647 get_str_list(g_conf->mon_initial_members, initial_members);
648
649 if (!initial_members.empty()) {
650 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
651
652 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
653 &extra_probe_peers);
654
655 dout(10) << " monmap is " << *monmap << dendl;
656 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
657 }
658 } else if (!monmap->contains(name)) {
659 derr << "not in monmap and have been in a quorum before; "
660 << "must have been removed" << dendl;
661 if (g_conf->mon_force_quorum_join) {
662 dout(0) << "we should have died but "
663 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
664 } else {
665 derr << "commit suicide!" << dendl;
666 lock.Unlock();
667 return -ENOENT;
668 }
669 }
670
671 {
672 // We have a potentially inconsistent store state in hands. Get rid of it
673 // and start fresh.
674 bool clear_store = false;
675 if (store->exists("mon_sync", "in_sync")) {
676 dout(1) << __func__ << " clean up potentially inconsistent store state"
677 << dendl;
678 clear_store = true;
679 }
680
681 if (store->get("mon_sync", "force_sync") > 0) {
682 dout(1) << __func__ << " force sync by clearing store state" << dendl;
683 clear_store = true;
684 }
685
686 if (clear_store) {
687 set<string> sync_prefixes = get_sync_targets_names();
688 store->clear(sync_prefixes);
689 }
690 }
691
692 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
693 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
694
695 init_paxos();
696 health_monitor->init();
697
698 if (is_keyring_required()) {
699 // we need to bootstrap authentication keys so we can form an
700 // initial quorum.
701 if (authmon()->get_last_committed() == 0) {
702 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
703 bufferlist bl;
704 int err = store->get("mkfs", "keyring", bl);
705 if (err == 0 && bl.length() > 0) {
706 // Attempt to decode and extract keyring only if it is found.
707 KeyRing keyring;
708 bufferlist::iterator p = bl.begin();
709 ::decode(keyring, p);
710 extract_save_mon_key(keyring);
711 }
712 }
713
714 string keyring_loc = g_conf->mon_data + "/keyring";
715
716 r = keyring.load(cct, keyring_loc);
717 if (r < 0) {
718 EntityName mon_name;
719 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
720 EntityAuth mon_key;
721 if (key_server.get_auth(mon_name, mon_key)) {
722 dout(1) << "copying mon. key from old db to external keyring" << dendl;
723 keyring.add(mon_name, mon_key);
724 bufferlist bl;
725 keyring.encode_plaintext(bl);
726 write_default_keyring(bl);
727 } else {
728 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
729 lock.Unlock();
730 return r;
731 }
732 }
733 }
734
735 admin_hook = new AdminHook(this);
736 AdminSocket* admin_socket = cct->get_admin_socket();
737
738 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
739 lock.Unlock();
740 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
741 "show current monitor status");
742 assert(r == 0);
743 r = admin_socket->register_command("quorum_status", "quorum_status",
744 admin_hook, "show current quorum status");
745 assert(r == 0);
746 r = admin_socket->register_command("sync_force",
747 "sync_force name=validate,"
748 "type=CephChoices,"
749 "strings=--yes-i-really-mean-it",
750 admin_hook,
751 "force sync of and clear monitor store");
752 assert(r == 0);
753 r = admin_socket->register_command("add_bootstrap_peer_hint",
754 "add_bootstrap_peer_hint name=addr,"
755 "type=CephIPAddr",
756 admin_hook,
757 "add peer address as potential bootstrap"
758 " peer for cluster bringup");
759 assert(r == 0);
760 r = admin_socket->register_command("quorum enter", "quorum enter",
761 admin_hook,
762 "force monitor back into quorum");
763 assert(r == 0);
764 r = admin_socket->register_command("quorum exit", "quorum exit",
765 admin_hook,
766 "force monitor out of the quorum");
767 assert(r == 0);
768 r = admin_socket->register_command("ops",
769 "ops",
770 admin_hook,
771 "show the ops currently in flight");
772 assert(r == 0);
773 r = admin_socket->register_command("sessions",
774 "sessions",
775 admin_hook,
776 "list existing sessions");
777 assert(r == 0);
778
779 lock.Lock();
780
781 // add ourselves as a conf observer
782 g_conf->add_observer(this);
783
784 lock.Unlock();
785 return 0;
786 }
787
788 int Monitor::init()
789 {
790 dout(2) << "init" << dendl;
791 Mutex::Locker l(lock);
792
793 finisher.start();
794
795 // start ticker
796 timer.init();
797 new_tick();
798
799 cpu_tp.start();
800
801 // i'm ready!
802 messenger->add_dispatcher_tail(this);
803
804 mgr_client.init();
805 mgr_messenger->add_dispatcher_tail(&mgr_client);
806 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
807
808 bootstrap();
809 // add features of myself into feature_map
810 session_map.feature_map.add_mon(con_self->get_features());
811 return 0;
812 }
813
814 void Monitor::init_paxos()
815 {
816 dout(10) << __func__ << dendl;
817 paxos->init();
818
819 // init services
820 for (int i = 0; i < PAXOS_NUM; ++i) {
821 paxos_service[i]->init();
822 }
823
824 refresh_from_paxos(NULL);
825 }
826
827 void Monitor::refresh_from_paxos(bool *need_bootstrap)
828 {
829 dout(10) << __func__ << dendl;
830
831 bufferlist bl;
832 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
833 if (r >= 0) {
834 try {
835 bufferlist::iterator p = bl.begin();
836 ::decode(fingerprint, p);
837 }
838 catch (buffer::error& e) {
839 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
840 }
841 } else {
842 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
843 }
844
845 for (int i = 0; i < PAXOS_NUM; ++i) {
846 paxos_service[i]->refresh(need_bootstrap);
847 }
848 for (int i = 0; i < PAXOS_NUM; ++i) {
849 paxos_service[i]->post_refresh();
850 }
851 load_metadata();
852 }
853
854 void Monitor::register_cluster_logger()
855 {
856 if (!cluster_logger_registered) {
857 dout(10) << "register_cluster_logger" << dendl;
858 cluster_logger_registered = true;
859 cct->get_perfcounters_collection()->add(cluster_logger);
860 } else {
861 dout(10) << "register_cluster_logger - already registered" << dendl;
862 }
863 }
864
865 void Monitor::unregister_cluster_logger()
866 {
867 if (cluster_logger_registered) {
868 dout(10) << "unregister_cluster_logger" << dendl;
869 cluster_logger_registered = false;
870 cct->get_perfcounters_collection()->remove(cluster_logger);
871 } else {
872 dout(10) << "unregister_cluster_logger - not registered" << dendl;
873 }
874 }
875
876 void Monitor::update_logger()
877 {
878 cluster_logger->set(l_cluster_num_mon, monmap->size());
879 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
880 }
881
882 void Monitor::shutdown()
883 {
884 dout(1) << "shutdown" << dendl;
885
886 lock.Lock();
887
888 wait_for_paxos_write();
889
890 state = STATE_SHUTDOWN;
891
892 g_conf->remove_observer(this);
893
894 if (admin_hook) {
895 AdminSocket* admin_socket = cct->get_admin_socket();
896 admin_socket->unregister_command("mon_status");
897 admin_socket->unregister_command("quorum_status");
898 admin_socket->unregister_command("sync_force");
899 admin_socket->unregister_command("add_bootstrap_peer_hint");
900 admin_socket->unregister_command("quorum enter");
901 admin_socket->unregister_command("quorum exit");
902 admin_socket->unregister_command("ops");
903 admin_socket->unregister_command("sessions");
904 delete admin_hook;
905 admin_hook = NULL;
906 }
907
908 elector.shutdown();
909
910 mgr_client.shutdown();
911
912 lock.Unlock();
913 finisher.wait_for_empty();
914 finisher.stop();
915 lock.Lock();
916
917 // clean up
918 paxos->shutdown();
919 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
920 (*p)->shutdown();
921 health_monitor->shutdown();
922
923 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
924 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
925
926 timer.shutdown();
927
928 cpu_tp.stop();
929
930 remove_all_sessions();
931
932 if (logger) {
933 cct->get_perfcounters_collection()->remove(logger);
934 delete logger;
935 logger = NULL;
936 }
937 if (cluster_logger) {
938 if (cluster_logger_registered)
939 cct->get_perfcounters_collection()->remove(cluster_logger);
940 delete cluster_logger;
941 cluster_logger = NULL;
942 }
943
944 log_client.shutdown();
945
946 // unlock before msgr shutdown...
947 lock.Unlock();
948
949 messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
950 mgr_messenger->shutdown();
951 }
952
953 void Monitor::wait_for_paxos_write()
954 {
955 if (paxos->is_writing() || paxos->is_writing_previous()) {
956 dout(10) << __func__ << " flushing pending write" << dendl;
957 lock.Unlock();
958 store->flush();
959 lock.Lock();
960 dout(10) << __func__ << " flushed pending write" << dendl;
961 }
962 }
963
964 void Monitor::bootstrap()
965 {
966 dout(10) << "bootstrap" << dendl;
967 wait_for_paxos_write();
968
969 sync_reset_requester();
970 unregister_cluster_logger();
971 cancel_probe_timeout();
972
973 // note my rank
974 int newrank = monmap->get_rank(messenger->get_myaddr());
975 if (newrank < 0 && rank >= 0) {
976 // was i ever part of the quorum?
977 if (has_ever_joined) {
978 dout(0) << " removed from monmap, suicide." << dendl;
979 exit(0);
980 }
981 }
982 if (newrank != rank) {
983 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
984 messenger->set_myname(entity_name_t::MON(newrank));
985 rank = newrank;
986
987 // reset all connections, or else our peers will think we are someone else.
988 messenger->mark_down_all();
989 }
990
991 // reset
992 state = STATE_PROBING;
993
994 _reset();
995
996 // sync store
997 if (g_conf->mon_compact_on_bootstrap) {
998 dout(10) << "bootstrap -- triggering compaction" << dendl;
999 store->compact();
1000 dout(10) << "bootstrap -- finished compaction" << dendl;
1001 }
1002
1003 // singleton monitor?
1004 if (monmap->size() == 1 && rank == 0) {
1005 win_standalone_election();
1006 return;
1007 }
1008
1009 reset_probe_timeout();
1010
1011 // i'm outside the quorum
1012 if (monmap->contains(name))
1013 outside_quorum.insert(name);
1014
1015 // probe monitors
1016 dout(10) << "probing other monitors" << dendl;
1017 for (unsigned i = 0; i < monmap->size(); i++) {
1018 if ((int)i != rank)
1019 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1020 monmap->get_inst(i));
1021 }
1022 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1023 p != extra_probe_peers.end();
1024 ++p) {
1025 if (*p != messenger->get_myaddr()) {
1026 entity_inst_t i;
1027 i.name = entity_name_t::MON(-1);
1028 i.addr = *p;
1029 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1030 }
1031 }
1032 }
1033
1034 bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1035 {
1036 string addrstr;
1037 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1038 ss << "unable to parse address string value '"
1039 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1040 return false;
1041 }
1042 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1043 << addrstr << "'" << dendl;
1044
1045 entity_addr_t addr;
1046 const char *end = 0;
1047 if (!addr.parse(addrstr.c_str(), &end)) {
1048 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1049 return false;
1050 }
1051
1052 if (is_leader() || is_peon()) {
1053 ss << "mon already active; ignoring bootstrap hint";
1054 return true;
1055 }
1056
1057 if (addr.get_port() == 0)
1058 addr.set_port(CEPH_MON_PORT);
1059
1060 extra_probe_peers.insert(addr);
1061 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1062 return true;
1063 }
1064
1065 // called by bootstrap(), or on leader|peon -> electing
1066 void Monitor::_reset()
1067 {
1068 dout(10) << __func__ << dendl;
1069
1070 cancel_probe_timeout();
1071 timecheck_finish();
1072 health_events_cleanup();
1073 health_check_log_times.clear();
1074 scrub_event_cancel();
1075
1076 leader_since = utime_t();
1077 if (!quorum.empty()) {
1078 exited_quorum = ceph_clock_now();
1079 }
1080 quorum.clear();
1081 outside_quorum.clear();
1082 quorum_feature_map.clear();
1083
1084 scrub_reset();
1085
1086 paxos->restart();
1087
1088 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1089 (*p)->restart();
1090 health_monitor->finish();
1091 }
1092
1093
1094 // -----------------------------------------------------------
1095 // sync
1096
1097 set<string> Monitor::get_sync_targets_names()
1098 {
1099 set<string> targets;
1100 targets.insert(paxos->get_name());
1101 for (int i = 0; i < PAXOS_NUM; ++i)
1102 paxos_service[i]->get_store_prefixes(targets);
1103 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1104 assert(config_key_service_ptr);
1105 config_key_service_ptr->get_store_prefixes(targets);
1106 return targets;
1107 }
1108
1109
1110 void Monitor::sync_timeout()
1111 {
1112 dout(10) << __func__ << dendl;
1113 assert(state == STATE_SYNCHRONIZING);
1114 bootstrap();
1115 }
1116
1117 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1118 {
1119 dout(1) << __func__ << dendl;
1120
1121 MonMap latest_monmap;
1122
1123 // Grab latest monmap from MonmapMonitor
1124 bufferlist monmon_bl;
1125 int err = monmon()->get_monmap(monmon_bl);
1126 if (err < 0) {
1127 if (err != -ENOENT) {
1128 derr << __func__
1129 << " something wrong happened while reading the store: "
1130 << cpp_strerror(err) << dendl;
1131 assert(0 == "error reading the store");
1132 }
1133 } else {
1134 latest_monmap.decode(monmon_bl);
1135 }
1136
1137 // Grab last backed up monmap (if any) and compare epochs
1138 if (store->exists("mon_sync", "latest_monmap")) {
1139 bufferlist backup_bl;
1140 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1141 if (err < 0) {
1142 derr << __func__
1143 << " something wrong happened while reading the store: "
1144 << cpp_strerror(err) << dendl;
1145 assert(0 == "error reading the store");
1146 }
1147 assert(backup_bl.length() > 0);
1148
1149 MonMap backup_monmap;
1150 backup_monmap.decode(backup_bl);
1151
1152 if (backup_monmap.epoch > latest_monmap.epoch)
1153 latest_monmap = backup_monmap;
1154 }
1155
1156 // Check if our current monmap's epoch is greater than the one we've
1157 // got so far.
1158 if (monmap->epoch > latest_monmap.epoch)
1159 latest_monmap = *monmap;
1160
1161 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1162
1163 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1164 }
1165
1166 void Monitor::sync_reset_requester()
1167 {
1168 dout(10) << __func__ << dendl;
1169
1170 if (sync_timeout_event) {
1171 timer.cancel_event(sync_timeout_event);
1172 sync_timeout_event = NULL;
1173 }
1174
1175 sync_provider = entity_inst_t();
1176 sync_cookie = 0;
1177 sync_full = false;
1178 sync_start_version = 0;
1179 }
1180
1181 void Monitor::sync_reset_provider()
1182 {
1183 dout(10) << __func__ << dendl;
1184 sync_providers.clear();
1185 }
1186
1187 void Monitor::sync_start(entity_inst_t &other, bool full)
1188 {
1189 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1190
1191 assert(state == STATE_PROBING ||
1192 state == STATE_SYNCHRONIZING);
1193 state = STATE_SYNCHRONIZING;
1194
1195 // make sure are not a provider for anyone!
1196 sync_reset_provider();
1197
1198 sync_full = full;
1199
1200 if (sync_full) {
1201 // stash key state, and mark that we are syncing
1202 auto t(std::make_shared<MonitorDBStore::Transaction>());
1203 sync_stash_critical_state(t);
1204 t->put("mon_sync", "in_sync", 1);
1205
1206 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1207 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1208 << sync_last_committed_floor << dendl;
1209 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1210
1211 store->apply_transaction(t);
1212
1213 assert(g_conf->mon_sync_requester_kill_at != 1);
1214
1215 // clear the underlying store
1216 set<string> targets = get_sync_targets_names();
1217 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1218 store->clear(targets);
1219
1220 // make sure paxos knows it has been reset. this prevents a
1221 // bootstrap and then different probe reply order from possibly
1222 // deciding a partial or no sync is needed.
1223 paxos->init();
1224
1225 assert(g_conf->mon_sync_requester_kill_at != 2);
1226 }
1227
1228 // assume 'other' as the leader. We will update the leader once we receive
1229 // a reply to the sync start.
1230 sync_provider = other;
1231
1232 sync_reset_timeout();
1233
1234 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1235 if (!sync_full)
1236 m->last_committed = paxos->get_version();
1237 messenger->send_message(m, sync_provider);
1238 }
1239
1240 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1241 {
1242 dout(10) << __func__ << dendl;
1243 bufferlist backup_monmap;
1244 sync_obtain_latest_monmap(backup_monmap);
1245 assert(backup_monmap.length() > 0);
1246 t->put("mon_sync", "latest_monmap", backup_monmap);
1247 }
1248
1249 void Monitor::sync_reset_timeout()
1250 {
1251 dout(10) << __func__ << dendl;
1252 if (sync_timeout_event)
1253 timer.cancel_event(sync_timeout_event);
1254 sync_timeout_event = timer.add_event_after(
1255 g_conf->mon_sync_timeout,
1256 new C_MonContext(this, [this](int) {
1257 sync_timeout();
1258 }));
1259 }
1260
1261 void Monitor::sync_finish(version_t last_committed)
1262 {
1263 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1264
1265 assert(g_conf->mon_sync_requester_kill_at != 7);
1266
1267 if (sync_full) {
1268 // finalize the paxos commits
1269 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1270 paxos->read_and_prepare_transactions(tx, sync_start_version,
1271 last_committed);
1272 tx->put(paxos->get_name(), "last_committed", last_committed);
1273
1274 dout(30) << __func__ << " final tx dump:\n";
1275 JSONFormatter f(true);
1276 tx->dump(&f);
1277 f.flush(*_dout);
1278 *_dout << dendl;
1279
1280 store->apply_transaction(tx);
1281 }
1282
1283 assert(g_conf->mon_sync_requester_kill_at != 8);
1284
1285 auto t(std::make_shared<MonitorDBStore::Transaction>());
1286 t->erase("mon_sync", "in_sync");
1287 t->erase("mon_sync", "force_sync");
1288 t->erase("mon_sync", "last_committed_floor");
1289 store->apply_transaction(t);
1290
1291 assert(g_conf->mon_sync_requester_kill_at != 9);
1292
1293 init_paxos();
1294
1295 assert(g_conf->mon_sync_requester_kill_at != 10);
1296
1297 bootstrap();
1298 }
1299
1300 void Monitor::handle_sync(MonOpRequestRef op)
1301 {
1302 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1303 dout(10) << __func__ << " " << *m << dendl;
1304 switch (m->op) {
1305
1306 // provider ---------
1307
1308 case MMonSync::OP_GET_COOKIE_FULL:
1309 case MMonSync::OP_GET_COOKIE_RECENT:
1310 handle_sync_get_cookie(op);
1311 break;
1312 case MMonSync::OP_GET_CHUNK:
1313 handle_sync_get_chunk(op);
1314 break;
1315
1316 // client -----------
1317
1318 case MMonSync::OP_COOKIE:
1319 handle_sync_cookie(op);
1320 break;
1321
1322 case MMonSync::OP_CHUNK:
1323 case MMonSync::OP_LAST_CHUNK:
1324 handle_sync_chunk(op);
1325 break;
1326 case MMonSync::OP_NO_COOKIE:
1327 handle_sync_no_cookie(op);
1328 break;
1329
1330 default:
1331 dout(0) << __func__ << " unknown op " << m->op << dendl;
1332 assert(0 == "unknown op");
1333 }
1334 }
1335
1336 // leader
1337
1338 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1339 {
1340 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1341 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1342 m->get_connection()->send_message(reply);
1343 }
1344
1345 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1346 {
1347 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1348 if (is_synchronizing()) {
1349 _sync_reply_no_cookie(op);
1350 return;
1351 }
1352
1353 assert(g_conf->mon_sync_provider_kill_at != 1);
1354
1355 // make sure they can understand us.
1356 if ((required_features ^ m->get_connection()->get_features()) &
1357 required_features) {
1358 dout(5) << " ignoring peer mon." << m->get_source().num()
1359 << " has features " << std::hex
1360 << m->get_connection()->get_features()
1361 << " but we require " << required_features << std::dec << dendl;
1362 return;
1363 }
1364
1365 // make up a unique cookie. include election epoch (which persists
1366 // across restarts for the whole cluster) and a counter for this
1367 // process instance. there is no need to be unique *across*
1368 // monitors, though.
1369 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1370 assert(sync_providers.count(cookie) == 0);
1371
1372 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1373
1374 SyncProvider& sp = sync_providers[cookie];
1375 sp.cookie = cookie;
1376 sp.entity = m->get_source_inst();
1377 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1378
1379 set<string> sync_targets;
1380 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1381 // full scan
1382 sync_targets = get_sync_targets_names();
1383 sp.last_committed = paxos->get_version();
1384 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1385 sp.full = true;
1386 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1387 } else {
1388 // just catch up paxos
1389 sp.last_committed = m->last_committed;
1390 }
1391 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1392
1393 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1394 reply->last_committed = sp.last_committed;
1395 m->get_connection()->send_message(reply);
1396 }
1397
1398 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1399 {
1400 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1401 dout(10) << __func__ << " " << *m << dendl;
1402
1403 if (sync_providers.count(m->cookie) == 0) {
1404 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1405 _sync_reply_no_cookie(op);
1406 return;
1407 }
1408
1409 assert(g_conf->mon_sync_provider_kill_at != 2);
1410
1411 SyncProvider& sp = sync_providers[m->cookie];
1412 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1413
1414 if (sp.last_committed < paxos->get_first_committed() &&
1415 paxos->get_first_committed() > 1) {
1416 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1417 << " < our fc " << paxos->get_first_committed() << dendl;
1418 sync_providers.erase(m->cookie);
1419 _sync_reply_no_cookie(op);
1420 return;
1421 }
1422
1423 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1424 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1425
1426 int left = g_conf->mon_sync_max_payload_size;
1427 while (sp.last_committed < paxos->get_version() && left > 0) {
1428 bufferlist bl;
1429 sp.last_committed++;
1430
1431 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1432 assert(err == 0);
1433
1434 tx->put(paxos->get_name(), sp.last_committed, bl);
1435 left -= bl.length();
1436 dout(20) << __func__ << " including paxos state " << sp.last_committed
1437 << dendl;
1438 }
1439 reply->last_committed = sp.last_committed;
1440
1441 if (sp.full && left > 0) {
1442 sp.synchronizer->get_chunk_tx(tx, left);
1443 sp.last_key = sp.synchronizer->get_last_key();
1444 reply->last_key = sp.last_key;
1445 }
1446
1447 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1448 sp.last_committed < paxos->get_version()) {
1449 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1450 << " key " << sp.last_key << dendl;
1451 } else {
1452 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1453 << " key " << sp.last_key << dendl;
1454 reply->op = MMonSync::OP_LAST_CHUNK;
1455
1456 assert(g_conf->mon_sync_provider_kill_at != 3);
1457
1458 // clean up our local state
1459 sync_providers.erase(sp.cookie);
1460 }
1461
1462 ::encode(*tx, reply->chunk_bl);
1463
1464 m->get_connection()->send_message(reply);
1465 }
1466
1467 // requester
1468
1469 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1470 {
1471 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1472 dout(10) << __func__ << " " << *m << dendl;
1473 if (sync_cookie) {
1474 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1475 return;
1476 }
1477 if (m->get_source_inst() != sync_provider) {
1478 dout(10) << __func__ << " source does not match, discarding" << dendl;
1479 return;
1480 }
1481 sync_cookie = m->cookie;
1482 sync_start_version = m->last_committed;
1483
1484 sync_reset_timeout();
1485 sync_get_next_chunk();
1486
1487 assert(g_conf->mon_sync_requester_kill_at != 3);
1488 }
1489
1490 void Monitor::sync_get_next_chunk()
1491 {
1492 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1493 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1494 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1495 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1496 }
1497 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1498 messenger->send_message(r, sync_provider);
1499
1500 assert(g_conf->mon_sync_requester_kill_at != 4);
1501 }
1502
1503 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1504 {
1505 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1506 dout(10) << __func__ << " " << *m << dendl;
1507
1508 if (m->cookie != sync_cookie) {
1509 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1510 return;
1511 }
1512 if (m->get_source_inst() != sync_provider) {
1513 dout(10) << __func__ << " source does not match, discarding" << dendl;
1514 return;
1515 }
1516
1517 assert(state == STATE_SYNCHRONIZING);
1518 assert(g_conf->mon_sync_requester_kill_at != 5);
1519
1520 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1521 tx->append_from_encoded(m->chunk_bl);
1522
1523 dout(30) << __func__ << " tx dump:\n";
1524 JSONFormatter f(true);
1525 tx->dump(&f);
1526 f.flush(*_dout);
1527 *_dout << dendl;
1528
1529 store->apply_transaction(tx);
1530
1531 assert(g_conf->mon_sync_requester_kill_at != 6);
1532
1533 if (!sync_full) {
1534 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1535 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1536 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1537 m->last_committed);
1538 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1539
1540 dout(30) << __func__ << " tx dump:\n";
1541 JSONFormatter f(true);
1542 tx->dump(&f);
1543 f.flush(*_dout);
1544 *_dout << dendl;
1545
1546 store->apply_transaction(tx);
1547 paxos->init(); // to refresh what we just wrote
1548 }
1549
1550 if (m->op == MMonSync::OP_CHUNK) {
1551 sync_reset_timeout();
1552 sync_get_next_chunk();
1553 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1554 sync_finish(m->last_committed);
1555 }
1556 }
1557
1558 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1559 {
1560 dout(10) << __func__ << dendl;
1561 bootstrap();
1562 }
1563
1564 void Monitor::sync_trim_providers()
1565 {
1566 dout(20) << __func__ << dendl;
1567
1568 utime_t now = ceph_clock_now();
1569 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1570 while (p != sync_providers.end()) {
1571 if (now > p->second.timeout) {
1572 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1573 sync_providers.erase(p++);
1574 } else {
1575 ++p;
1576 }
1577 }
1578 }
1579
1580 // ---------------------------------------------------
1581 // probe
1582
1583 void Monitor::cancel_probe_timeout()
1584 {
1585 if (probe_timeout_event) {
1586 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1587 timer.cancel_event(probe_timeout_event);
1588 probe_timeout_event = NULL;
1589 } else {
1590 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1591 }
1592 }
1593
1594 void Monitor::reset_probe_timeout()
1595 {
1596 cancel_probe_timeout();
1597 probe_timeout_event = new C_MonContext(this, [this](int r) {
1598 probe_timeout(r);
1599 });
1600 double t = g_conf->mon_probe_timeout;
1601 if (timer.add_event_after(t, probe_timeout_event)) {
1602 dout(10) << "reset_probe_timeout " << probe_timeout_event
1603 << " after " << t << " seconds" << dendl;
1604 } else {
1605 probe_timeout_event = nullptr;
1606 }
1607 }
1608
1609 void Monitor::probe_timeout(int r)
1610 {
1611 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1612 assert(is_probing() || is_synchronizing());
1613 assert(probe_timeout_event);
1614 probe_timeout_event = NULL;
1615 bootstrap();
1616 }
1617
1618 void Monitor::handle_probe(MonOpRequestRef op)
1619 {
1620 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1621 dout(10) << "handle_probe " << *m << dendl;
1622
1623 if (m->fsid != monmap->fsid) {
1624 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1625 return;
1626 }
1627
1628 switch (m->op) {
1629 case MMonProbe::OP_PROBE:
1630 handle_probe_probe(op);
1631 break;
1632
1633 case MMonProbe::OP_REPLY:
1634 handle_probe_reply(op);
1635 break;
1636
1637 case MMonProbe::OP_MISSING_FEATURES:
1638 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1639 << ", required " << m->required_features
1640 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1641 << dendl;
1642 break;
1643 }
1644 }
1645
1646 void Monitor::handle_probe_probe(MonOpRequestRef op)
1647 {
1648 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1649
1650 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1651 << " features " << m->get_connection()->get_features() << dendl;
1652 uint64_t missing = required_features & ~m->get_connection()->get_features();
1653 if (missing) {
1654 dout(1) << " peer " << m->get_source_addr() << " missing features "
1655 << missing << dendl;
1656 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1657 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1658 name, has_ever_joined);
1659 m->required_features = required_features;
1660 m->get_connection()->send_message(r);
1661 }
1662 goto out;
1663 }
1664
1665 if (!is_probing() && !is_synchronizing()) {
1666 // If the probing mon is way ahead of us, we need to re-bootstrap.
1667 // Normally we capture this case when we initially bootstrap, but
1668 // it is possible we pass those checks (we overlap with
1669 // quorum-to-be) but fail to join a quorum before it moves past
1670 // us. We need to be kicked back to bootstrap so we can
1671 // synchonize, not keep calling elections.
1672 if (paxos->get_version() + 1 < m->paxos_first_version) {
1673 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1674 << "ahead of us, re-bootstrapping" << dendl;
1675 bootstrap();
1676 goto out;
1677
1678 }
1679 }
1680
1681 MMonProbe *r;
1682 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1683 r->name = name;
1684 r->quorum = quorum;
1685 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1686 r->paxos_first_version = paxos->get_first_committed();
1687 r->paxos_last_version = paxos->get_version();
1688 m->get_connection()->send_message(r);
1689
1690 // did we discover a peer here?
1691 if (!monmap->contains(m->get_source_addr())) {
1692 dout(1) << " adding peer " << m->get_source_addr()
1693 << " to list of hints" << dendl;
1694 extra_probe_peers.insert(m->get_source_addr());
1695 }
1696
1697 out:
1698 return;
1699 }
1700
1701 void Monitor::handle_probe_reply(MonOpRequestRef op)
1702 {
1703 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1704 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1705 dout(10) << " monmap is " << *monmap << dendl;
1706
1707 // discover name and addrs during probing or electing states.
1708 if (!is_probing() && !is_electing()) {
1709 return;
1710 }
1711
1712 // newer map, or they've joined a quorum and we haven't?
1713 bufferlist mybl;
1714 monmap->encode(mybl, m->get_connection()->get_features());
1715 // make sure it's actually different; the checks below err toward
1716 // taking the other guy's map, which could cause us to loop.
1717 if (!mybl.contents_equal(m->monmap_bl)) {
1718 MonMap *newmap = new MonMap;
1719 newmap->decode(m->monmap_bl);
1720 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1721 !has_ever_joined)) {
1722 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1723 << ", mine was " << monmap->get_epoch() << dendl;
1724 delete newmap;
1725 monmap->decode(m->monmap_bl);
1726
1727 bootstrap();
1728 return;
1729 }
1730 delete newmap;
1731 }
1732
1733 // rename peer?
1734 string peer_name = monmap->get_name(m->get_source_addr());
1735 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1736 dout(10) << " renaming peer " << m->get_source_addr() << " "
1737 << peer_name << " -> " << m->name << " in my monmap"
1738 << dendl;
1739 monmap->rename(peer_name, m->name);
1740
1741 if (is_electing()) {
1742 bootstrap();
1743 return;
1744 }
1745 } else {
1746 dout(10) << " peer name is " << peer_name << dendl;
1747 }
1748
1749 // new initial peer?
1750 if (monmap->get_epoch() == 0 &&
1751 monmap->contains(m->name) &&
1752 monmap->get_addr(m->name).is_blank_ip()) {
1753 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1754 monmap->set_addr(m->name, m->get_source_addr());
1755
1756 bootstrap();
1757 return;
1758 }
1759
1760 // end discover phase
1761 if (!is_probing()) {
1762 return;
1763 }
1764
1765 assert(paxos != NULL);
1766
1767 if (is_synchronizing()) {
1768 dout(10) << " currently syncing" << dendl;
1769 return;
1770 }
1771
1772 entity_inst_t other = m->get_source_inst();
1773
1774 if (m->paxos_last_version < sync_last_committed_floor) {
1775 dout(10) << " peer paxos versions [" << m->paxos_first_version
1776 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1777 << sync_last_committed_floor << ", ignoring"
1778 << dendl;
1779 } else {
1780 if (paxos->get_version() < m->paxos_first_version &&
1781 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1782 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1783 << "," << m->paxos_last_version << "]"
1784 << " vs my version " << paxos->get_version()
1785 << " (too far ahead)"
1786 << dendl;
1787 cancel_probe_timeout();
1788 sync_start(other, true);
1789 return;
1790 }
1791 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1792 dout(10) << " peer paxos last version " << m->paxos_last_version
1793 << " vs my version " << paxos->get_version()
1794 << " (too far ahead)"
1795 << dendl;
1796 cancel_probe_timeout();
1797 sync_start(other, false);
1798 return;
1799 }
1800 }
1801
1802 // is there an existing quorum?
1803 if (m->quorum.size()) {
1804 dout(10) << " existing quorum " << m->quorum << dendl;
1805
1806 dout(10) << " peer paxos version " << m->paxos_last_version
1807 << " vs my version " << paxos->get_version()
1808 << " (ok)"
1809 << dendl;
1810
1811 if (monmap->contains(name) &&
1812 !monmap->get_addr(name).is_blank_ip()) {
1813 // i'm part of the cluster; just initiate a new election
1814 start_election();
1815 } else {
1816 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1817 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1818 monmap->get_inst(*m->quorum.begin()));
1819 }
1820 } else {
1821 if (monmap->contains(m->name)) {
1822 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1823 outside_quorum.insert(m->name);
1824 } else {
1825 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1826 return;
1827 }
1828
1829 unsigned need = monmap->size() / 2 + 1;
1830 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1831 if (outside_quorum.size() >= need) {
1832 if (outside_quorum.count(name)) {
1833 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1834 start_election();
1835 } else {
1836 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1837 }
1838 } else {
1839 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1840 }
1841 }
1842 }
1843
1844 void Monitor::join_election()
1845 {
1846 dout(10) << __func__ << dendl;
1847 wait_for_paxos_write();
1848 _reset();
1849 state = STATE_ELECTING;
1850
1851 logger->inc(l_mon_num_elections);
1852 }
1853
1854 void Monitor::start_election()
1855 {
1856 dout(10) << "start_election" << dendl;
1857 wait_for_paxos_write();
1858 _reset();
1859 state = STATE_ELECTING;
1860
1861 logger->inc(l_mon_num_elections);
1862 logger->inc(l_mon_election_call);
1863
1864 clog->info() << "mon." << name << " calling monitor election";
1865 elector.call_election();
1866 }
1867
1868 void Monitor::win_standalone_election()
1869 {
1870 dout(1) << "win_standalone_election" << dendl;
1871
1872 // bump election epoch, in case the previous epoch included other
1873 // monitors; we need to be able to make the distinction.
1874 elector.init();
1875 elector.advance_epoch();
1876
1877 rank = monmap->get_rank(name);
1878 assert(rank == 0);
1879 set<int> q;
1880 q.insert(rank);
1881
1882 map<int,Metadata> metadata;
1883 collect_metadata(&metadata[0]);
1884
1885 win_election(elector.get_epoch(), q,
1886 CEPH_FEATURES_ALL,
1887 ceph::features::mon::get_supported(),
1888 metadata);
1889 }
1890
1891 const utime_t& Monitor::get_leader_since() const
1892 {
1893 assert(state == STATE_LEADER);
1894 return leader_since;
1895 }
1896
1897 epoch_t Monitor::get_epoch()
1898 {
1899 return elector.get_epoch();
1900 }
1901
1902 void Monitor::_finish_svc_election()
1903 {
1904 assert(state == STATE_LEADER || state == STATE_PEON);
1905
1906 for (auto p : paxos_service) {
1907 // we already called election_finished() on monmon(); avoid callig twice
1908 if (state == STATE_LEADER && p == monmon())
1909 continue;
1910 p->election_finished();
1911 }
1912 }
1913
1914 void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1915 const mon_feature_t& mon_features,
1916 const map<int,Metadata>& metadata)
1917 {
1918 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1919 << " features " << features
1920 << " mon_features " << mon_features
1921 << dendl;
1922 assert(is_electing());
1923 state = STATE_LEADER;
1924 leader_since = ceph_clock_now();
1925 leader = rank;
1926 quorum = active;
1927 quorum_con_features = features;
1928 quorum_mon_features = mon_features;
1929 pending_metadata = metadata;
1930 outside_quorum.clear();
1931
1932 clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
1933 << " in quorum (ranks " << quorum << ")";
1934
1935 set_leader_commands(get_local_commands(mon_features));
1936
1937 paxos->leader_init();
1938 // NOTE: tell monmap monitor first. This is important for the
1939 // bootstrap case to ensure that the very first paxos proposal
1940 // codifies the monmap. Otherwise any manner of chaos can ensue
1941 // when monitors are call elections or participating in a paxos
1942 // round without agreeing on who the participants are.
1943 monmon()->election_finished();
1944 _finish_svc_election();
1945 health_monitor->start(epoch);
1946
1947 logger->inc(l_mon_election_win);
1948
1949 // inject new metadata in first transaction.
1950 {
1951 // include previous metadata for missing mons (that aren't part of
1952 // the current quorum).
1953 map<int,Metadata> m = metadata;
1954 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1955 if (m.count(rank) == 0 &&
1956 mon_metadata.count(rank)) {
1957 m[rank] = mon_metadata[rank];
1958 }
1959 }
1960
1961 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1962 // a new transaction immediately after the election finishes. We should
1963 // do that anyway for other reasons, though.
1964 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1965 bufferlist bl;
1966 ::encode(m, bl);
1967 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1968 }
1969
1970 finish_election();
1971 if (monmap->size() > 1 &&
1972 monmap->get_epoch() > 0) {
1973 timecheck_start();
1974 health_tick_start();
1975
1976 // Freshen the health status before doing health_to_clog in case
1977 // our just-completed election changed the health
1978 healthmon()->wait_for_active_ctx(new FunctionContext([this](int r){
1979 dout(20) << "healthmon now active" << dendl;
1980 healthmon()->tick();
1981 if (healthmon()->is_proposing()) {
1982 dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
1983 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
1984 [this](int r){
1985 assert(lock.is_locked_by_me());
1986 do_health_to_clog_interval();
1987 }));
1988
1989 } else {
1990 do_health_to_clog_interval();
1991 }
1992 }));
1993
1994 scrub_event_start();
1995 }
1996 }
1997
1998 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1999 uint64_t features,
2000 const mon_feature_t& mon_features)
2001 {
2002 state = STATE_PEON;
2003 leader_since = utime_t();
2004 leader = l;
2005 quorum = q;
2006 outside_quorum.clear();
2007 quorum_con_features = features;
2008 quorum_mon_features = mon_features;
2009 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
2010 << " quorum is " << quorum << " features are " << quorum_con_features
2011 << " mon_features are " << quorum_mon_features
2012 << dendl;
2013
2014 paxos->peon_init();
2015 _finish_svc_election();
2016 health_monitor->start(epoch);
2017
2018 logger->inc(l_mon_election_lose);
2019
2020 finish_election();
2021
2022 if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
2023 !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
2024 // for pre-luminous mons only
2025 Metadata sys_info;
2026 collect_metadata(&sys_info);
2027 messenger->send_message(new MMonMetadata(sys_info),
2028 monmap->get_inst(get_leader()));
2029 }
2030 }
2031
2032 void Monitor::collect_metadata(Metadata *m)
2033 {
2034 collect_sys_info(m, g_ceph_context);
2035 (*m)["addr"] = stringify(messenger->get_myaddr());
2036 }
2037
2038 void Monitor::finish_election()
2039 {
2040 apply_quorum_to_compatset_features();
2041 apply_monmap_to_compatset_features();
2042 timecheck_finish();
2043 exited_quorum = utime_t();
2044 finish_contexts(g_ceph_context, waitfor_quorum);
2045 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2046 resend_routed_requests();
2047 update_logger();
2048 register_cluster_logger();
2049
2050 // am i named properly?
2051 string cur_name = monmap->get_name(messenger->get_myaddr());
2052 if (cur_name != name) {
2053 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2054 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2055 monmap->get_inst(*quorum.begin()));
2056 }
2057 }
2058
2059 void Monitor::_apply_compatset_features(CompatSet &new_features)
2060 {
2061 if (new_features.compare(features) != 0) {
2062 CompatSet diff = features.unsupported(new_features);
2063 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2064 features = new_features;
2065
2066 auto t = std::make_shared<MonitorDBStore::Transaction>();
2067 write_features(t);
2068 store->apply_transaction(t);
2069
2070 calc_quorum_requirements();
2071 }
2072 }
2073
2074 void Monitor::apply_quorum_to_compatset_features()
2075 {
2076 CompatSet new_features(features);
2077 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2078 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2079 }
2080 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2081 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2082 }
2083 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2084 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2085 }
2086 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2087 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2088 }
2089 dout(5) << __func__ << dendl;
2090 _apply_compatset_features(new_features);
2091 }
2092
2093 void Monitor::apply_monmap_to_compatset_features()
2094 {
2095 CompatSet new_features(features);
2096 mon_feature_t monmap_features = monmap->get_required_features();
2097
2098 /* persistent monmap features may go into the compatset.
2099 * optional monmap features may not - why?
2100 * because optional monmap features may be set/unset by the admin,
2101 * and possibly by other means that haven't yet been thought out,
2102 * so we can't make the monitor enforce them on start - because they
2103 * may go away.
2104 * this, of course, does not invalidate setting a compatset feature
2105 * for an optional feature - as long as you make sure to clean it up
2106 * once you unset it.
2107 */
2108 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2109 assert(ceph::features::mon::get_persistent().contains_all(
2110 ceph::features::mon::FEATURE_KRAKEN));
2111 // this feature should only ever be set if the quorum supports it.
2112 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2113 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2114 }
2115 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2116 assert(ceph::features::mon::get_persistent().contains_all(
2117 ceph::features::mon::FEATURE_LUMINOUS));
2118 // this feature should only ever be set if the quorum supports it.
2119 assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2120 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2121 }
2122
2123 dout(5) << __func__ << dendl;
2124 _apply_compatset_features(new_features);
2125 }
2126
2127 void Monitor::calc_quorum_requirements()
2128 {
2129 required_features = 0;
2130
2131 // compatset
2132 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2133 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2134 }
2135 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2136 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2137 }
2138 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2139 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2140 }
2141 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2142 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2143 }
2144 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2145 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2146 }
2147 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2148 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2149 }
2150
2151 // monmap
2152 if (monmap->get_required_features().contains_all(
2153 ceph::features::mon::FEATURE_KRAKEN)) {
2154 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2155 }
2156 if (monmap->get_required_features().contains_all(
2157 ceph::features::mon::FEATURE_LUMINOUS)) {
2158 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2159 }
2160 dout(10) << __func__ << " required_features " << required_features << dendl;
2161 }
2162
2163 void Monitor::get_combined_feature_map(FeatureMap *fm)
2164 {
2165 *fm += session_map.feature_map;
2166 for (auto id : quorum) {
2167 if (id != rank) {
2168 *fm += quorum_feature_map[id];
2169 }
2170 }
2171 }
2172
2173 void Monitor::sync_force(Formatter *f, ostream& ss)
2174 {
2175 bool free_formatter = false;
2176
2177 if (!f) {
2178 // louzy/lazy hack: default to json if no formatter has been defined
2179 f = new JSONFormatter();
2180 free_formatter = true;
2181 }
2182
2183 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2184 sync_stash_critical_state(tx);
2185 tx->put("mon_sync", "force_sync", 1);
2186 store->apply_transaction(tx);
2187
2188 f->open_object_section("sync_force");
2189 f->dump_int("ret", 0);
2190 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2191 f->close_section(); // sync_force
2192 f->flush(ss);
2193 if (free_formatter)
2194 delete f;
2195 }
2196
2197 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2198 {
2199 bool free_formatter = false;
2200
2201 if (!f) {
2202 // louzy/lazy hack: default to json if no formatter has been defined
2203 f = new JSONFormatter();
2204 free_formatter = true;
2205 }
2206 f->open_object_section("quorum_status");
2207 f->dump_int("election_epoch", get_epoch());
2208
2209 f->open_array_section("quorum");
2210 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2211 f->dump_int("mon", *p);
2212 f->close_section(); // quorum
2213
2214 list<string> quorum_names = get_quorum_names();
2215 f->open_array_section("quorum_names");
2216 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2217 f->dump_string("mon", *p);
2218 f->close_section(); // quorum_names
2219
2220 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2221
2222 f->open_object_section("monmap");
2223 monmap->dump(f);
2224 f->close_section(); // monmap
2225
2226 f->close_section(); // quorum_status
2227 f->flush(ss);
2228 if (free_formatter)
2229 delete f;
2230 }
2231
2232 void Monitor::get_mon_status(Formatter *f, ostream& ss)
2233 {
2234 bool free_formatter = false;
2235
2236 if (!f) {
2237 // louzy/lazy hack: default to json if no formatter has been defined
2238 f = new JSONFormatter();
2239 free_formatter = true;
2240 }
2241
2242 f->open_object_section("mon_status");
2243 f->dump_string("name", name);
2244 f->dump_int("rank", rank);
2245 f->dump_string("state", get_state_name());
2246 f->dump_int("election_epoch", get_epoch());
2247
2248 f->open_array_section("quorum");
2249 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2250 f->dump_int("mon", *p);
2251 }
2252
2253 f->close_section(); // quorum
2254
2255 f->open_object_section("features");
2256 f->dump_stream("required_con") << required_features;
2257 mon_feature_t req_mon_features = get_required_mon_features();
2258 req_mon_features.dump(f, "required_mon");
2259 f->dump_stream("quorum_con") << quorum_con_features;
2260 quorum_mon_features.dump(f, "quorum_mon");
2261 f->close_section(); // features
2262
2263 f->open_array_section("outside_quorum");
2264 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2265 f->dump_string("mon", *p);
2266 f->close_section(); // outside_quorum
2267
2268 f->open_array_section("extra_probe_peers");
2269 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2270 p != extra_probe_peers.end();
2271 ++p)
2272 f->dump_stream("peer") << *p;
2273 f->close_section(); // extra_probe_peers
2274
2275 f->open_array_section("sync_provider");
2276 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2277 p != sync_providers.end();
2278 ++p) {
2279 f->dump_unsigned("cookie", p->second.cookie);
2280 f->dump_stream("entity") << p->second.entity;
2281 f->dump_stream("timeout") << p->second.timeout;
2282 f->dump_unsigned("last_committed", p->second.last_committed);
2283 f->dump_stream("last_key") << p->second.last_key;
2284 }
2285 f->close_section();
2286
2287 if (is_synchronizing()) {
2288 f->open_object_section("sync");
2289 f->dump_stream("sync_provider") << sync_provider;
2290 f->dump_unsigned("sync_cookie", sync_cookie);
2291 f->dump_unsigned("sync_start_version", sync_start_version);
2292 f->close_section();
2293 }
2294
2295 if (g_conf->mon_sync_provider_kill_at > 0)
2296 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2297 if (g_conf->mon_sync_requester_kill_at > 0)
2298 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2299
2300 f->open_object_section("monmap");
2301 monmap->dump(f);
2302 f->close_section();
2303
2304 f->dump_object("feature_map", session_map.feature_map);
2305 f->close_section(); // mon_status
2306
2307 if (free_formatter) {
2308 // flush formatter to ss and delete it iff we created the formatter
2309 f->flush(ss);
2310 delete f;
2311 }
2312 }
2313
2314
2315 // health status to clog
2316
2317 void Monitor::health_tick_start()
2318 {
2319 if (!cct->_conf->mon_health_to_clog ||
2320 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2321 return;
2322
2323 dout(15) << __func__ << dendl;
2324
2325 health_tick_stop();
2326 health_tick_event = timer.add_event_after(
2327 cct->_conf->mon_health_to_clog_tick_interval,
2328 new C_MonContext(this, [this](int r) {
2329 if (r < 0)
2330 return;
2331 health_tick_start();
2332 }));
2333 }
2334
2335 void Monitor::health_tick_stop()
2336 {
2337 dout(15) << __func__ << dendl;
2338
2339 if (health_tick_event) {
2340 timer.cancel_event(health_tick_event);
2341 health_tick_event = NULL;
2342 }
2343 }
2344
2345 utime_t Monitor::health_interval_calc_next_update()
2346 {
2347 utime_t now = ceph_clock_now();
2348
2349 time_t secs = now.sec();
2350 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2351 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2352 utime_t next = utime_t(secs + adjustment, 0);
2353
2354 dout(20) << __func__
2355 << " now: " << now << ","
2356 << " next: " << next << ","
2357 << " interval: " << cct->_conf->mon_health_to_clog_interval
2358 << dendl;
2359
2360 return next;
2361 }
2362
2363 void Monitor::health_interval_start()
2364 {
2365 dout(15) << __func__ << dendl;
2366
2367 if (!cct->_conf->mon_health_to_clog ||
2368 cct->_conf->mon_health_to_clog_interval <= 0) {
2369 return;
2370 }
2371
2372 health_interval_stop();
2373 utime_t next = health_interval_calc_next_update();
2374 health_interval_event = new C_MonContext(this, [this](int r) {
2375 if (r < 0)
2376 return;
2377 do_health_to_clog_interval();
2378 });
2379 if (!timer.add_event_at(next, health_interval_event)) {
2380 health_interval_event = nullptr;
2381 }
2382 }
2383
2384 void Monitor::health_interval_stop()
2385 {
2386 dout(15) << __func__ << dendl;
2387 if (health_interval_event) {
2388 timer.cancel_event(health_interval_event);
2389 }
2390 health_interval_event = NULL;
2391 }
2392
2393 void Monitor::health_events_cleanup()
2394 {
2395 health_tick_stop();
2396 health_interval_stop();
2397 health_status_cache.reset();
2398 }
2399
2400 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2401 {
2402 dout(20) << __func__ << dendl;
2403
2404 if (changed.count("mon_health_to_clog")) {
2405 if (!cct->_conf->mon_health_to_clog) {
2406 health_events_cleanup();
2407 } else {
2408 if (!health_tick_event) {
2409 health_tick_start();
2410 }
2411 if (!health_interval_event) {
2412 health_interval_start();
2413 }
2414 }
2415 }
2416
2417 if (changed.count("mon_health_to_clog_interval")) {
2418 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2419 health_interval_stop();
2420 } else {
2421 health_interval_start();
2422 }
2423 }
2424
2425 if (changed.count("mon_health_to_clog_tick_interval")) {
2426 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2427 health_tick_stop();
2428 } else {
2429 health_tick_start();
2430 }
2431 }
2432 }
2433
2434 void Monitor::do_health_to_clog_interval()
2435 {
2436 // outputting to clog may have been disabled in the conf
2437 // since we were scheduled.
2438 if (!cct->_conf->mon_health_to_clog ||
2439 cct->_conf->mon_health_to_clog_interval <= 0)
2440 return;
2441
2442 dout(10) << __func__ << dendl;
2443
2444 // do we have a cached value for next_clog_update? if not,
2445 // do we know when the last update was?
2446
2447 do_health_to_clog(true);
2448 health_interval_start();
2449 }
2450
2451 void Monitor::do_health_to_clog(bool force)
2452 {
2453 // outputting to clog may have been disabled in the conf
2454 // since we were scheduled.
2455 if (!cct->_conf->mon_health_to_clog ||
2456 cct->_conf->mon_health_to_clog_interval <= 0)
2457 return;
2458
2459 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2460
2461 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2462 string summary;
2463 health_status_t level = get_health_status(false, nullptr, &summary);
2464 if (!force &&
2465 summary == health_status_cache.summary &&
2466 level == health_status_cache.overall)
2467 return;
2468 clog->health(level) << "overall " << summary;
2469 health_status_cache.summary = summary;
2470 health_status_cache.overall = level;
2471 } else {
2472 // for jewel only
2473 list<string> status;
2474 health_status_t overall = get_health(status, NULL, NULL);
2475 dout(25) << __func__
2476 << (force ? " (force)" : "")
2477 << dendl;
2478
2479 string summary = joinify(status.begin(), status.end(), string("; "));
2480
2481 if (!force &&
2482 overall == health_status_cache.overall &&
2483 !health_status_cache.summary.empty() &&
2484 health_status_cache.summary == summary) {
2485 // we got a dup!
2486 return;
2487 }
2488
2489 clog->info() << summary;
2490
2491 health_status_cache.overall = overall;
2492 health_status_cache.summary = summary;
2493 }
2494 }
2495
2496 health_status_t Monitor::get_health_status(
2497 bool want_detail,
2498 Formatter *f,
2499 std::string *plain,
2500 const char *sep1,
2501 const char *sep2)
2502 {
2503 health_status_t r = HEALTH_OK;
2504 bool compat = g_conf->mon_health_preluminous_compat;
2505 bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
2506 if (f) {
2507 f->open_object_section("health");
2508 f->open_object_section("checks");
2509 }
2510
2511 string summary;
2512 string *psummary = f ? nullptr : &summary;
2513 for (auto& svc : paxos_service) {
2514 r = std::min(r, svc->get_health_checks().dump_summary(
2515 f, psummary, sep2, want_detail));
2516 }
2517
2518 if (f) {
2519 f->close_section();
2520 f->dump_stream("status") << r;
2521 } else {
2522 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2523 *plain = stringify(r);
2524 if (summary.size()) {
2525 *plain += sep1;
2526 *plain += summary;
2527 }
2528 *plain += "\n";
2529 }
2530
2531 const std::string old_fields_message = "'ceph health' JSON format has "
2532 "changed in luminous. If you see this your monitoring system is "
2533 "scraping the wrong fields. Disable this with 'mon health preluminous "
2534 "compat warning = false'";
2535
2536 if (f && (compat || compat_warn)) {
2537 health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
2538 f->open_array_section("summary");
2539 if (compat_warn) {
2540 f->open_object_section("item");
2541 f->dump_stream("severity") << HEALTH_WARN;
2542 f->dump_string("summary", old_fields_message);
2543 f->close_section();
2544 }
2545 if (compat) {
2546 for (auto& svc : paxos_service) {
2547 svc->get_health_checks().dump_summary_compat(f);
2548 }
2549 }
2550 f->close_section();
2551 f->dump_stream("overall_status") << cr;
2552 }
2553
2554 if (want_detail) {
2555 if (f && (compat || compat_warn)) {
2556 f->open_array_section("detail");
2557 if (compat_warn) {
2558 f->dump_string("item", old_fields_message);
2559 }
2560 }
2561
2562 for (auto& svc : paxos_service) {
2563 svc->get_health_checks().dump_detail(f, plain, compat);
2564 }
2565
2566 if (f && (compat || compat_warn)) {
2567 f->close_section();
2568 }
2569 }
2570 if (f) {
2571 f->close_section();
2572 }
2573 return r;
2574 }
2575
2576 void Monitor::log_health(
2577 const health_check_map_t& updated,
2578 const health_check_map_t& previous,
2579 MonitorDBStore::TransactionRef t)
2580 {
2581 if (!g_conf->mon_health_to_clog) {
2582 return;
2583 }
2584
2585 const utime_t now = ceph_clock_now();
2586
2587 // FIXME: log atomically as part of @t instead of using clog.
2588 dout(10) << __func__ << " updated " << updated.checks.size()
2589 << " previous " << previous.checks.size()
2590 << dendl;
2591 const auto min_log_period = g_conf->get_val<int64_t>(
2592 "mon_health_log_update_period");
2593 for (auto& p : updated.checks) {
2594 auto q = previous.checks.find(p.first);
2595 bool logged = false;
2596 if (q == previous.checks.end()) {
2597 // new
2598 ostringstream ss;
2599 ss << "Health check failed: " << p.second.summary << " ("
2600 << p.first << ")";
2601 clog->health(p.second.severity) << ss.str();
2602
2603 logged = true;
2604 } else {
2605 if (p.second.summary != q->second.summary ||
2606 p.second.severity != q->second.severity) {
2607
2608 auto status_iter = health_check_log_times.find(p.first);
2609 if (status_iter != health_check_log_times.end()) {
2610 if (p.second.severity == q->second.severity &&
2611 now - status_iter->second.updated_at < min_log_period) {
2612 // We already logged this recently and the severity is unchanged,
2613 // so skip emitting an update of the summary string.
2614 // We'll get an update out of tick() later if the check
2615 // is still failing.
2616 continue;
2617 }
2618 }
2619
2620 // summary or severity changed (ignore detail changes at this level)
2621 ostringstream ss;
2622 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2623 clog->health(p.second.severity) << ss.str();
2624
2625 logged = true;
2626 }
2627 }
2628 // Record the time at which we last logged, so that we can check this
2629 // when considering whether/when to print update messages.
2630 if (logged) {
2631 auto iter = health_check_log_times.find(p.first);
2632 if (iter == health_check_log_times.end()) {
2633 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2634 p.second.severity, p.second.summary, now));
2635 } else {
2636 iter->second = HealthCheckLogStatus(
2637 p.second.severity, p.second.summary, now);
2638 }
2639 }
2640 }
2641 for (auto& p : previous.checks) {
2642 if (!updated.checks.count(p.first)) {
2643 // cleared
2644 ostringstream ss;
2645 if (p.first == "DEGRADED_OBJECTS") {
2646 clog->info() << "All degraded objects recovered";
2647 } else if (p.first == "OSD_FLAGS") {
2648 clog->info() << "OSD flags cleared";
2649 } else {
2650 clog->info() << "Health check cleared: " << p.first << " (was: "
2651 << p.second.summary << ")";
2652 }
2653
2654 if (health_check_log_times.count(p.first)) {
2655 health_check_log_times.erase(p.first);
2656 }
2657 }
2658 }
2659
2660 if (previous.checks.size() && updated.checks.size() == 0) {
2661 // We might be going into a fully healthy state, check
2662 // other subsystems
2663 bool any_checks = false;
2664 for (auto& svc : paxos_service) {
2665 if (&(svc->get_health_checks()) == &(previous)) {
2666 // Ignore the ones we're clearing right now
2667 continue;
2668 }
2669
2670 if (svc->get_health_checks().checks.size() > 0) {
2671 any_checks = true;
2672 break;
2673 }
2674 }
2675 if (!any_checks) {
2676 clog->info() << "Cluster is now healthy";
2677 }
2678 }
2679 }
2680
2681 health_status_t Monitor::get_health(list<string>& status,
2682 bufferlist *detailbl,
2683 Formatter *f)
2684 {
2685 list<pair<health_status_t,string> > summary;
2686 list<pair<health_status_t,string> > detail;
2687
2688 if (f)
2689 f->open_object_section("health");
2690
2691 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2692 p != paxos_service.end();
2693 ++p) {
2694 PaxosService *s = *p;
2695 s->get_health(summary, detailbl ? &detail : NULL, cct);
2696 }
2697
2698 health_monitor->get_health(summary, (detailbl ? &detail : NULL));
2699
2700 health_status_t overall = HEALTH_OK;
2701 if (!timecheck_skews.empty()) {
2702 list<string> warns;
2703 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2704 i != timecheck_skews.end(); ++i) {
2705 entity_inst_t inst = i->first;
2706 double skew = i->second;
2707 double latency = timecheck_latencies[inst];
2708 string name = monmap->get_name(inst.addr);
2709 ostringstream tcss;
2710 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2711 if (tcstatus != HEALTH_OK) {
2712 if (overall > tcstatus)
2713 overall = tcstatus;
2714 warns.push_back(name);
2715 ostringstream tmp_ss;
2716 tmp_ss << "mon." << name
2717 << " addr " << inst.addr << " " << tcss.str()
2718 << " (latency " << latency << "s)";
2719 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2720 }
2721 }
2722 if (!warns.empty()) {
2723 ostringstream ss;
2724 ss << "clock skew detected on";
2725 while (!warns.empty()) {
2726 ss << " mon." << warns.front();
2727 warns.pop_front();
2728 if (!warns.empty())
2729 ss << ",";
2730 }
2731 status.push_back(ss.str());
2732 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2733 }
2734 }
2735
2736 if (f)
2737 f->open_array_section("summary");
2738 if (!summary.empty()) {
2739 while (!summary.empty()) {
2740 if (overall > summary.front().first)
2741 overall = summary.front().first;
2742 status.push_back(summary.front().second);
2743 if (f) {
2744 f->open_object_section("item");
2745 f->dump_stream("severity") << summary.front().first;
2746 f->dump_string("summary", summary.front().second);
2747 f->close_section();
2748 }
2749 summary.pop_front();
2750 }
2751 }
2752 if (f)
2753 f->close_section();
2754
2755 stringstream fss;
2756 fss << overall;
2757 status.push_front(fss.str());
2758 if (f)
2759 f->dump_stream("overall_status") << overall;
2760
2761 if (f)
2762 f->open_array_section("detail");
2763 while (!detail.empty()) {
2764 if (f)
2765 f->dump_string("item", detail.front().second);
2766 else if (detailbl != NULL) {
2767 detailbl->append(detail.front().second);
2768 detailbl->append('\n');
2769 }
2770 detail.pop_front();
2771 }
2772 if (f)
2773 f->close_section();
2774
2775 if (f)
2776 f->close_section();
2777
2778 return overall;
2779 }
2780
2781 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2782 {
2783 if (f)
2784 f->open_object_section("status");
2785
2786 if (f) {
2787 f->dump_stream("fsid") << monmap->get_fsid();
2788 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2789 get_health_status(false, f, nullptr);
2790 } else {
2791 list<string> health_str;
2792 get_health(health_str, nullptr, f);
2793 }
2794 f->dump_unsigned("election_epoch", get_epoch());
2795 {
2796 f->open_array_section("quorum");
2797 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2798 f->dump_int("rank", *p);
2799 f->close_section();
2800 f->open_array_section("quorum_names");
2801 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2802 f->dump_string("id", monmap->get_name(*p));
2803 f->close_section();
2804 }
2805 f->open_object_section("monmap");
2806 monmap->dump(f);
2807 f->close_section();
2808 f->open_object_section("osdmap");
2809 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
2810 f->close_section();
2811 f->open_object_section("pgmap");
2812 pgservice->print_summary(f, NULL);
2813 f->close_section();
2814 f->open_object_section("fsmap");
2815 mdsmon()->get_fsmap().print_summary(f, NULL);
2816 f->close_section();
2817 f->open_object_section("mgrmap");
2818 mgrmon()->get_map().print_summary(f, nullptr);
2819 f->close_section();
2820
2821 f->dump_object("servicemap", mgrstatmon()->get_service_map());
2822 f->close_section();
2823 } else {
2824 ss << " cluster:\n";
2825 ss << " id: " << monmap->get_fsid() << "\n";
2826
2827 string health;
2828 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2829 get_health_status(false, nullptr, &health,
2830 "\n ", "\n ");
2831 } else {
2832 list<string> ls;
2833 get_health(ls, NULL, f);
2834 health = joinify(ls.begin(), ls.end(),
2835 string("\n "));
2836 }
2837 ss << " health: " << health << "\n";
2838
2839 ss << "\n \n services:\n";
2840 {
2841 size_t maxlen = 3;
2842 auto& service_map = mgrstatmon()->get_service_map();
2843 for (auto& p : service_map.services) {
2844 maxlen = std::max(maxlen, p.first.size());
2845 }
2846 string spacing(maxlen - 3, ' ');
2847 const auto quorum_names = get_quorum_names();
2848 const auto mon_count = monmap->mon_info.size();
2849 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2850 << quorum_names;
2851 if (quorum_names.size() != mon_count) {
2852 std::list<std::string> out_of_q;
2853 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2854 if (quorum.count(i) == 0) {
2855 out_of_q.push_back(monmap->ranks[i]);
2856 }
2857 }
2858 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2859 out_of_q.end(), std::string(", "));
2860 }
2861 ss << "\n";
2862 if (mgrmon()->in_use()) {
2863 ss << " mgr: " << spacing;
2864 mgrmon()->get_map().print_summary(nullptr, &ss);
2865 ss << "\n";
2866 }
2867 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2868 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2869 }
2870 ss << " osd: " << spacing;
2871 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2872 ss << "\n";
2873 for (auto& p : service_map.services) {
2874 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2875 << p.second.get_summary() << "\n";
2876 }
2877 }
2878
2879 ss << "\n \n data:\n";
2880 pgservice->print_summary(NULL, &ss);
2881 ss << "\n ";
2882 }
2883 }
2884
2885 void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2886 map<string,string> &param_str_map)
2887 {
2888 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2889 p != cmdmap.end(); ++p) {
2890 if (p->first == "prefix")
2891 continue;
2892 if (p->first == "caps") {
2893 vector<string> cv;
2894 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2895 cv.size() % 2 == 0) {
2896 for (unsigned i = 0; i < cv.size(); i += 2) {
2897 string k = string("caps_") + cv[i];
2898 param_str_map[k] = cv[i + 1];
2899 }
2900 continue;
2901 }
2902 }
2903 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2904 }
2905 }
2906
2907 const MonCommand *Monitor::_get_moncommand(
2908 const string &cmd_prefix,
2909 const vector<MonCommand>& cmds)
2910 {
2911 for (auto& c : cmds) {
2912 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2913 return &c;
2914 }
2915 }
2916 return nullptr;
2917 }
2918
2919 bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2920 const map<string,cmd_vartype>& cmdmap,
2921 const map<string,string>& param_str_map,
2922 const MonCommand *this_cmd) {
2923
2924 bool cmd_r = this_cmd->requires_perm('r');
2925 bool cmd_w = this_cmd->requires_perm('w');
2926 bool cmd_x = this_cmd->requires_perm('x');
2927
2928 bool capable = s->caps.is_capable(
2929 g_ceph_context,
2930 CEPH_ENTITY_TYPE_MON,
2931 s->entity_name,
2932 module, prefix, param_str_map,
2933 cmd_r, cmd_w, cmd_x);
2934
2935 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2936 return capable;
2937 }
2938
2939 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
2940 Formatter *f,
2941 bufferlist *rdata,
2942 bool hide_mgr_flag)
2943 {
2944 int cmdnum = 0;
2945 f->open_object_section("command_descriptions");
2946 for (const auto &cmd : commands) {
2947 unsigned flags = cmd.flags;
2948 if (hide_mgr_flag) {
2949 flags &= ~MonCommand::FLAG_MGR;
2950 }
2951 ostringstream secname;
2952 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2953 dump_cmddesc_to_json(f, secname.str(),
2954 cmd.cmdstring, cmd.helpstring, cmd.module,
2955 cmd.req_perms, cmd.availability, flags);
2956 cmdnum++;
2957 }
2958 f->close_section(); // command_descriptions
2959
2960 f->flush(*rdata);
2961 }
2962
2963 bool Monitor::is_keyring_required()
2964 {
2965 string auth_cluster_required = g_conf->auth_supported.empty() ?
2966 g_conf->auth_cluster_required : g_conf->auth_supported;
2967 string auth_service_required = g_conf->auth_supported.empty() ?
2968 g_conf->auth_service_required : g_conf->auth_supported;
2969
2970 return auth_service_required == "cephx" ||
2971 auth_cluster_required == "cephx";
2972 }
2973
2974 struct C_MgrProxyCommand : public Context {
2975 Monitor *mon;
2976 MonOpRequestRef op;
2977 uint64_t size;
2978 bufferlist outbl;
2979 string outs;
2980 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2981 : mon(mon), op(op), size(s) { }
2982 void finish(int r) {
2983 Mutex::Locker l(mon->lock);
2984 mon->mgr_proxy_bytes -= size;
2985 mon->reply_command(op, r, outs, outbl, 0);
2986 }
2987 };
2988
2989 void Monitor::handle_command(MonOpRequestRef op)
2990 {
2991 assert(op->is_type_command());
2992 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2993 if (m->fsid != monmap->fsid) {
2994 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2995 reply_command(op, -EPERM, "wrong fsid", 0);
2996 return;
2997 }
2998
2999 MonSession *session = static_cast<MonSession *>(
3000 m->get_connection()->get_priv());
3001 if (!session) {
3002 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3003 return;
3004 }
3005 BOOST_SCOPE_EXIT_ALL(=) {
3006 session->put();
3007 };
3008
3009 if (m->cmd.empty()) {
3010 string rs = "No command supplied";
3011 reply_command(op, -EINVAL, rs, 0);
3012 return;
3013 }
3014
3015 string prefix;
3016 vector<string> fullcmd;
3017 map<string, cmd_vartype> cmdmap;
3018 stringstream ss, ds;
3019 bufferlist rdata;
3020 string rs;
3021 int r = -EINVAL;
3022 rs = "unrecognized command";
3023
3024 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3025 // ss has reason for failure
3026 r = -EINVAL;
3027 rs = ss.str();
3028 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3029 reply_command(op, r, rs, 0);
3030 return;
3031 }
3032
3033 // check return value. If no prefix parameter provided,
3034 // return value will be false, then return error info.
3035 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
3036 reply_command(op, -EINVAL, "command prefix not found", 0);
3037 return;
3038 }
3039
3040 // check prefix is empty
3041 if (prefix.empty()) {
3042 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3043 return;
3044 }
3045
3046 if (prefix == "get_command_descriptions") {
3047 bufferlist rdata;
3048 Formatter *f = Formatter::create("json");
3049 // hide mgr commands until luminous upgrade is complete
3050 bool hide_mgr_flag =
3051 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
3052
3053 std::vector<MonCommand> commands;
3054
3055 // only include mgr commands once all mons are upgrade (and we've dropped
3056 // the hard-coded PGMonitor commands)
3057 if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
3058 commands = static_cast<MgrMonitor*>(
3059 paxos_service[PAXOS_MGR])->get_command_descs();
3060 }
3061
3062 for (auto& c : leader_mon_commands) {
3063 commands.push_back(c);
3064 }
3065
3066 format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
3067 delete f;
3068 reply_command(op, 0, "", rdata, 0);
3069 return;
3070 }
3071
3072 string module;
3073 string err;
3074
3075 dout(0) << "handle_command " << *m << dendl;
3076
3077 string format;
3078 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
3079 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3080
3081 get_str_vec(prefix, fullcmd);
3082
3083 // make sure fullcmd is not empty.
3084 // invalid prefix will cause empty vector fullcmd.
3085 // such as, prefix=";,,;"
3086 if (fullcmd.empty()) {
3087 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3088 return;
3089 }
3090
3091 module = fullcmd[0];
3092
3093 // validate command is in leader map
3094
3095 const MonCommand *leader_cmd;
3096 const auto& mgr_cmds = mgrmon()->get_command_descs();
3097 const MonCommand *mgr_cmd = nullptr;
3098 if (!mgr_cmds.empty()) {
3099 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
3100 }
3101 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
3102 if (!leader_cmd) {
3103 leader_cmd = mgr_cmd;
3104 if (!leader_cmd) {
3105 reply_command(op, -EINVAL, "command not known", 0);
3106 return;
3107 }
3108 }
3109 // validate command is in our map & matches, or forward if it is allowed
3110 const MonCommand *mon_cmd = _get_moncommand(
3111 prefix,
3112 get_local_commands(quorum_mon_features));
3113 if (!mon_cmd) {
3114 mon_cmd = mgr_cmd;
3115 }
3116 if (!is_leader()) {
3117 if (!mon_cmd) {
3118 if (leader_cmd->is_noforward()) {
3119 reply_command(op, -EINVAL,
3120 "command not locally supported and not allowed to forward",
3121 0);
3122 return;
3123 }
3124 dout(10) << "Command not locally supported, forwarding request "
3125 << m << dendl;
3126 forward_request_leader(op);
3127 return;
3128 } else if (!mon_cmd->is_compat(leader_cmd)) {
3129 if (mon_cmd->is_noforward()) {
3130 reply_command(op, -EINVAL,
3131 "command not compatible with leader and not allowed to forward",
3132 0);
3133 return;
3134 }
3135 dout(10) << "Command not compatible with leader, forwarding request "
3136 << m << dendl;
3137 forward_request_leader(op);
3138 return;
3139 }
3140 }
3141
3142 if (mon_cmd->is_obsolete() ||
3143 (cct->_conf->mon_debug_deprecated_as_obsolete
3144 && mon_cmd->is_deprecated())) {
3145 reply_command(op, -ENOTSUP,
3146 "command is obsolete; please check usage and/or man page",
3147 0);
3148 return;
3149 }
3150
3151 if (session->proxy_con && mon_cmd->is_noforward()) {
3152 dout(10) << "Got forward for noforward command " << m << dendl;
3153 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3154 return;
3155 }
3156
3157 /* what we perceive as being the service the command falls under */
3158 string service(mon_cmd->module);
3159
3160 dout(25) << __func__ << " prefix='" << prefix
3161 << "' module='" << module
3162 << "' service='" << service << "'" << dendl;
3163
3164 bool cmd_is_rw =
3165 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3166
3167 // validate user's permissions for requested command
3168 map<string,string> param_str_map;
3169 _generate_command_map(cmdmap, param_str_map);
3170 if (!_allowed_command(session, service, prefix, cmdmap,
3171 param_str_map, mon_cmd)) {
3172 dout(1) << __func__ << " access denied" << dendl;
3173 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3174 << "from='" << session->inst << "' "
3175 << "entity='" << session->entity_name << "' "
3176 << "cmd=" << m->cmd << ": access denied";
3177 reply_command(op, -EACCES, "access denied", 0);
3178 return;
3179 }
3180
3181 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3182 << "from='" << session->inst << "' "
3183 << "entity='" << session->entity_name << "' "
3184 << "cmd=" << m->cmd << ": dispatch";
3185
3186 if (mon_cmd->is_mgr() &&
3187 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3188 const auto& hdr = m->get_header();
3189 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3190 uint64_t max = g_conf->get_val<uint64_t>("mon_client_bytes")
3191 * g_conf->get_val<double>("mon_mgr_proxy_client_bytes_ratio");
3192 if (mgr_proxy_bytes + size > max) {
3193 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3194 << " + " << size << " > max " << max << dendl;
3195 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3196 return;
3197 }
3198 mgr_proxy_bytes += size;
3199 dout(10) << __func__ << " proxying mgr command (+" << size
3200 << " -> " << mgr_proxy_bytes << ")" << dendl;
3201 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3202 mgr_client.start_command(m->cmd,
3203 m->get_data(),
3204 &fin->outbl,
3205 &fin->outs,
3206 new C_OnFinisher(fin, &finisher));
3207 return;
3208 }
3209
3210 if ((module == "mds" || module == "fs") &&
3211 prefix != "fs authorize") {
3212 mdsmon()->dispatch(op);
3213 return;
3214 }
3215 if ((module == "osd" || prefix == "pg map") &&
3216 prefix != "osd last-stat-seq") {
3217 osdmon()->dispatch(op);
3218 return;
3219 }
3220
3221 if (module == "pg") {
3222 pgmon()->dispatch(op);
3223 return;
3224 }
3225 if (module == "mon" &&
3226 /* Let the Monitor class handle the following commands:
3227 * 'mon compact'
3228 * 'mon scrub'
3229 * 'mon sync force'
3230 */
3231 prefix != "mon compact" &&
3232 prefix != "mon scrub" &&
3233 prefix != "mon sync force" &&
3234 prefix != "mon metadata" &&
3235 prefix != "mon versions" &&
3236 prefix != "mon count-metadata") {
3237 monmon()->dispatch(op);
3238 return;
3239 }
3240 if (module == "auth" || prefix == "fs authorize") {
3241 authmon()->dispatch(op);
3242 return;
3243 }
3244 if (module == "log") {
3245 logmon()->dispatch(op);
3246 return;
3247 }
3248
3249 if (module == "config-key") {
3250 config_key_service->dispatch(op);
3251 return;
3252 }
3253
3254 if (module == "mgr") {
3255 mgrmon()->dispatch(op);
3256 return;
3257 }
3258
3259 if (prefix == "fsid") {
3260 if (f) {
3261 f->open_object_section("fsid");
3262 f->dump_stream("fsid") << monmap->fsid;
3263 f->close_section();
3264 f->flush(rdata);
3265 } else {
3266 ds << monmap->fsid;
3267 rdata.append(ds);
3268 }
3269 reply_command(op, 0, "", rdata, 0);
3270 return;
3271 }
3272
3273 if (prefix == "scrub" || prefix == "mon scrub") {
3274 wait_for_paxos_write();
3275 if (is_leader()) {
3276 int r = scrub_start();
3277 reply_command(op, r, "", rdata, 0);
3278 } else if (is_peon()) {
3279 forward_request_leader(op);
3280 } else {
3281 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3282 }
3283 return;
3284 }
3285
3286 if (prefix == "compact" || prefix == "mon compact") {
3287 dout(1) << "triggering manual compaction" << dendl;
3288 utime_t start = ceph_clock_now();
3289 store->compact();
3290 utime_t end = ceph_clock_now();
3291 end -= start;
3292 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3293 ostringstream oss;
3294 oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
3295 rs = oss.str();
3296 r = 0;
3297 }
3298 else if (prefix == "injectargs") {
3299 vector<string> injected_args;
3300 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3301 if (!injected_args.empty()) {
3302 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3303 ostringstream oss;
3304 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3305 ss << "injectargs:" << oss.str();
3306 rs = ss.str();
3307 goto out;
3308 } else {
3309 rs = "must supply options to be parsed in a single string";
3310 r = -EINVAL;
3311 }
3312 } else if (prefix == "time-sync-status") {
3313 if (!f)
3314 f.reset(Formatter::create("json-pretty"));
3315 f->open_object_section("time_sync");
3316 if (!timecheck_skews.empty()) {
3317 f->open_object_section("time_skew_status");
3318 for (auto& i : timecheck_skews) {
3319 entity_inst_t inst = i.first;
3320 double skew = i.second;
3321 double latency = timecheck_latencies[inst];
3322 string name = monmap->get_name(inst.addr);
3323 ostringstream tcss;
3324 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3325 f->open_object_section(name.c_str());
3326 f->dump_float("skew", skew);
3327 f->dump_float("latency", latency);
3328 f->dump_stream("health") << tcstatus;
3329 if (tcstatus != HEALTH_OK) {
3330 f->dump_stream("details") << tcss.str();
3331 }
3332 f->close_section();
3333 }
3334 f->close_section();
3335 }
3336 f->open_object_section("timechecks");
3337 f->dump_unsigned("epoch", get_epoch());
3338 f->dump_int("round", timecheck_round);
3339 f->dump_stream("round_status") << ((timecheck_round%2) ?
3340 "on-going" : "finished");
3341 f->close_section();
3342 f->close_section();
3343 f->flush(rdata);
3344 r = 0;
3345 rs = "";
3346 } else if (prefix == "config set") {
3347 std::string key;
3348 cmd_getval(cct, cmdmap, "key", key);
3349 std::string val;
3350 cmd_getval(cct, cmdmap, "value", val);
3351 r = g_conf->set_val(key, val, true, &ss);
3352 if (r == 0) {
3353 g_conf->apply_changes(nullptr);
3354 }
3355 rs = ss.str();
3356 goto out;
3357 } else if (prefix == "status" ||
3358 prefix == "health" ||
3359 prefix == "df") {
3360 string detail;
3361 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3362
3363 if (prefix == "status") {
3364 // get_cluster_status handles f == NULL
3365 get_cluster_status(ds, f.get());
3366
3367 if (f) {
3368 f->flush(ds);
3369 ds << '\n';
3370 }
3371 rdata.append(ds);
3372 } else if (prefix == "health") {
3373 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3374 string plain;
3375 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3376 if (f) {
3377 f->flush(rdata);
3378 } else {
3379 rdata.append(plain);
3380 }
3381 } else {
3382 list<string> health_str;
3383 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3384 if (f) {
3385 f->flush(ds);
3386 ds << '\n';
3387 } else {
3388 assert(!health_str.empty());
3389 ds << health_str.front();
3390 health_str.pop_front();
3391 if (!health_str.empty()) {
3392 ds << ' ';
3393 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3394 }
3395 }
3396 bufferlist comb;
3397 comb.append(ds);
3398 if (detail == "detail")
3399 comb.append(rdata);
3400 rdata = comb;
3401 }
3402 } else if (prefix == "df") {
3403 bool verbose = (detail == "detail");
3404 if (f)
3405 f->open_object_section("stats");
3406
3407 pgservice->dump_fs_stats(&ds, f.get(), verbose);
3408 if (!f)
3409 ds << '\n';
3410 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3411
3412 if (f) {
3413 f->close_section();
3414 f->flush(ds);
3415 ds << '\n';
3416 }
3417 } else {
3418 assert(0 == "We should never get here!");
3419 return;
3420 }
3421 rdata.append(ds);
3422 rs = "";
3423 r = 0;
3424 } else if (prefix == "report") {
3425
3426 // this must be formatted, in its current form
3427 if (!f)
3428 f.reset(Formatter::create("json-pretty"));
3429 f->open_object_section("report");
3430 f->dump_stream("cluster_fingerprint") << fingerprint;
3431 f->dump_string("version", ceph_version_to_str());
3432 f->dump_string("commit", git_version_to_str());
3433 f->dump_stream("timestamp") << ceph_clock_now();
3434
3435 vector<string> tagsvec;
3436 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3437 string tagstr = str_join(tagsvec, " ");
3438 if (!tagstr.empty())
3439 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3440 f->dump_string("tag", tagstr);
3441
3442 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3443 get_health_status(true, f.get(), nullptr);
3444 } else {
3445 list<string> health_str;
3446 get_health(health_str, nullptr, f.get());
3447 }
3448
3449 monmon()->dump_info(f.get());
3450 osdmon()->dump_info(f.get());
3451 mdsmon()->dump_info(f.get());
3452 authmon()->dump_info(f.get());
3453 pgservice->dump_info(f.get());
3454
3455 paxos->dump_info(f.get());
3456
3457 f->close_section();
3458 f->flush(rdata);
3459
3460 ostringstream ss2;
3461 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3462 rs = ss2.str();
3463 r = 0;
3464 } else if (prefix == "osd last-stat-seq") {
3465 int64_t osd;
3466 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3467 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3468 if (f) {
3469 f->dump_unsigned("seq", seq);
3470 f->flush(ds);
3471 } else {
3472 ds << seq;
3473 rdata.append(ds);
3474 }
3475 rs = "";
3476 r = 0;
3477 } else if (prefix == "node ls") {
3478 string node_type("all");
3479 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3480 if (!f)
3481 f.reset(Formatter::create("json-pretty"));
3482 if (node_type == "all") {
3483 f->open_object_section("nodes");
3484 print_nodes(f.get(), ds);
3485 osdmon()->print_nodes(f.get());
3486 mdsmon()->print_nodes(f.get());
3487 f->close_section();
3488 } else if (node_type == "mon") {
3489 print_nodes(f.get(), ds);
3490 } else if (node_type == "osd") {
3491 osdmon()->print_nodes(f.get());
3492 } else if (node_type == "mds") {
3493 mdsmon()->print_nodes(f.get());
3494 }
3495 f->flush(ds);
3496 rdata.append(ds);
3497 rs = "";
3498 r = 0;
3499 } else if (prefix == "features") {
3500 if (!is_leader() && !is_peon()) {
3501 dout(10) << " waiting for quorum" << dendl;
3502 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3503 return;
3504 }
3505 if (!is_leader()) {
3506 forward_request_leader(op);
3507 return;
3508 }
3509 if (!f)
3510 f.reset(Formatter::create("json-pretty"));
3511 FeatureMap fm;
3512 get_combined_feature_map(&fm);
3513 f->dump_object("features", fm);
3514 f->flush(rdata);
3515 rs = "";
3516 r = 0;
3517 } else if (prefix == "mon metadata") {
3518 if (!f)
3519 f.reset(Formatter::create("json-pretty"));
3520
3521 string name;
3522 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3523 if (!all) {
3524 // Dump a single mon's metadata
3525 int mon = monmap->get_rank(name);
3526 if (mon < 0) {
3527 rs = "requested mon not found";
3528 r = -ENOENT;
3529 goto out;
3530 }
3531 f->open_object_section("mon_metadata");
3532 r = get_mon_metadata(mon, f.get(), ds);
3533 f->close_section();
3534 } else {
3535 // Dump all mons' metadata
3536 r = 0;
3537 f->open_array_section("mon_metadata");
3538 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3539 std::ostringstream get_err;
3540 f->open_object_section("mon");
3541 f->dump_string("name", monmap->get_name(rank));
3542 r = get_mon_metadata(rank, f.get(), get_err);
3543 f->close_section();
3544 if (r == -ENOENT || r == -EINVAL) {
3545 dout(1) << get_err.str() << dendl;
3546 // Drop error, list what metadata we do have
3547 r = 0;
3548 } else if (r != 0) {
3549 derr << "Unexpected error from get_mon_metadata: "
3550 << cpp_strerror(r) << dendl;
3551 ds << get_err.str();
3552 break;
3553 }
3554 }
3555 f->close_section();
3556 }
3557
3558 f->flush(ds);
3559 rdata.append(ds);
3560 rs = "";
3561 } else if (prefix == "mon versions") {
3562 if (!f)
3563 f.reset(Formatter::create("json-pretty"));
3564 count_metadata("ceph_version", f.get());
3565 f->flush(ds);
3566 rdata.append(ds);
3567 rs = "";
3568 r = 0;
3569 } else if (prefix == "mon count-metadata") {
3570 if (!f)
3571 f.reset(Formatter::create("json-pretty"));
3572 string field;
3573 cmd_getval(g_ceph_context, cmdmap, "property", field);
3574 count_metadata(field, f.get());
3575 f->flush(ds);
3576 rdata.append(ds);
3577 rs = "";
3578 r = 0;
3579 } else if (prefix == "quorum_status") {
3580 // make sure our map is readable and up to date
3581 if (!is_leader() && !is_peon()) {
3582 dout(10) << " waiting for quorum" << dendl;
3583 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3584 return;
3585 }
3586 _quorum_status(f.get(), ds);
3587 rdata.append(ds);
3588 rs = "";
3589 r = 0;
3590 } else if (prefix == "mon_status") {
3591 get_mon_status(f.get(), ds);
3592 if (f)
3593 f->flush(ds);
3594 rdata.append(ds);
3595 rs = "";
3596 r = 0;
3597 } else if (prefix == "sync force" ||
3598 prefix == "mon sync force") {
3599 string validate1, validate2;
3600 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3601 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3602 if (validate1 != "--yes-i-really-mean-it" ||
3603 validate2 != "--i-know-what-i-am-doing") {
3604 r = -EINVAL;
3605 rs = "are you SURE? this will mean the monitor store will be "
3606 "erased. pass '--yes-i-really-mean-it "
3607 "--i-know-what-i-am-doing' if you really do.";
3608 goto out;
3609 }
3610 sync_force(f.get(), ds);
3611 rs = ds.str();
3612 r = 0;
3613 } else if (prefix == "heap") {
3614 if (!ceph_using_tcmalloc())
3615 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3616 else {
3617 string heapcmd;
3618 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3619 // XXX 1-element vector, change at callee or make vector here?
3620 vector<string> heapcmd_vec;
3621 get_str_vec(heapcmd, heapcmd_vec);
3622 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3623 rdata.append(ds);
3624 rs = "";
3625 r = 0;
3626 }
3627 } else if (prefix == "quorum") {
3628 string quorumcmd;
3629 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3630 if (quorumcmd == "exit") {
3631 start_election();
3632 elector.stop_participating();
3633 rs = "stopped responding to quorum, initiated new election";
3634 r = 0;
3635 } else if (quorumcmd == "enter") {
3636 elector.start_participating();
3637 start_election();
3638 rs = "started responding to quorum, initiated new election";
3639 r = 0;
3640 } else {
3641 rs = "needs a valid 'quorum' command";
3642 r = -EINVAL;
3643 }
3644 } else if (prefix == "version") {
3645 if (f) {
3646 f->open_object_section("version");
3647 f->dump_string("version", pretty_version_to_str());
3648 f->close_section();
3649 f->flush(ds);
3650 } else {
3651 ds << pretty_version_to_str();
3652 }
3653 rdata.append(ds);
3654 rs = "";
3655 r = 0;
3656 } else if (prefix == "versions") {
3657 if (!f)
3658 f.reset(Formatter::create("json-pretty"));
3659 map<string,int> overall;
3660 f->open_object_section("version");
3661 map<string,int> mon, mgr, osd, mds;
3662
3663 count_metadata("ceph_version", &mon);
3664 f->open_object_section("mon");
3665 for (auto& p : mon) {
3666 f->dump_int(p.first.c_str(), p.second);
3667 overall[p.first] += p.second;
3668 }
3669 f->close_section();
3670
3671 mgrmon()->count_metadata("ceph_version", &mgr);
3672 f->open_object_section("mgr");
3673 for (auto& p : mgr) {
3674 f->dump_int(p.first.c_str(), p.second);
3675 overall[p.first] += p.second;
3676 }
3677 f->close_section();
3678
3679 osdmon()->count_metadata("ceph_version", &osd);
3680 f->open_object_section("osd");
3681 for (auto& p : osd) {
3682 f->dump_int(p.first.c_str(), p.second);
3683 overall[p.first] += p.second;
3684 }
3685 f->close_section();
3686
3687 mdsmon()->count_metadata("ceph_version", &mds);
3688 f->open_object_section("mds");
3689 for (auto& p : mds) {
3690 f->dump_int(p.first.c_str(), p.second);
3691 overall[p.first] += p.second;
3692 }
3693 f->close_section();
3694
3695 for (auto& p : mgrstatmon()->get_service_map().services) {
3696 f->open_object_section(p.first.c_str());
3697 map<string,int> m;
3698 p.second.count_metadata("ceph_version", &m);
3699 for (auto& q : m) {
3700 f->dump_int(q.first.c_str(), q.second);
3701 overall[q.first] += q.second;
3702 }
3703 f->close_section();
3704 }
3705
3706 f->open_object_section("overall");
3707 for (auto& p : overall) {
3708 f->dump_int(p.first.c_str(), p.second);
3709 }
3710 f->close_section();
3711 f->close_section();
3712 f->flush(rdata);
3713 rs = "";
3714 r = 0;
3715 }
3716
3717 out:
3718 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3719 reply_command(op, r, rs, rdata, 0);
3720 }
3721
3722 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3723 {
3724 bufferlist rdata;
3725 reply_command(op, rc, rs, rdata, version);
3726 }
3727
3728 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3729 bufferlist& rdata, version_t version)
3730 {
3731 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3732 assert(m->get_type() == MSG_MON_COMMAND);
3733 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3734 reply->set_tid(m->get_tid());
3735 reply->set_data(rdata);
3736 send_reply(op, reply);
3737 }
3738
3739
3740 // ------------------------
3741 // request/reply routing
3742 //
3743 // a client/mds/osd will connect to a random monitor. we need to forward any
3744 // messages requiring state updates to the leader, and then route any replies
3745 // back via the correct monitor and back to them. (the monitor will not
3746 // initiate any connections.)
3747
3748 void Monitor::forward_request_leader(MonOpRequestRef op)
3749 {
3750 op->mark_event(__func__);
3751
3752 int mon = get_leader();
3753 MonSession *session = op->get_session();
3754 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3755
3756 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3757 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3758 } else if (session->proxy_con) {
3759 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3760 } else if (!session->closed) {
3761 RoutedRequest *rr = new RoutedRequest;
3762 rr->tid = ++routed_request_tid;
3763 rr->client_inst = req->get_source_inst();
3764 rr->con = req->get_connection();
3765 rr->con_features = rr->con->get_features();
3766 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3767 rr->session = static_cast<MonSession *>(session->get());
3768 rr->op = op;
3769 routed_requests[rr->tid] = rr;
3770 session->routed_request_tids.insert(rr->tid);
3771
3772 dout(10) << "forward_request " << rr->tid << " request " << *req
3773 << " features " << rr->con_features << dendl;
3774
3775 MForward *forward = new MForward(rr->tid,
3776 req,
3777 rr->con_features,
3778 rr->session->caps);
3779 forward->set_priority(req->get_priority());
3780 if (session->auth_handler) {
3781 forward->entity_name = session->entity_name;
3782 } else if (req->get_source().is_mon()) {
3783 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3784 }
3785 messenger->send_message(forward, monmap->get_inst(mon));
3786 op->mark_forwarded();
3787 assert(op->get_req()->get_type() != 0);
3788 } else {
3789 dout(10) << "forward_request no session for request " << *req << dendl;
3790 }
3791 }
3792
3793 // fake connection attached to forwarded messages
3794 struct AnonConnection : public Connection {
3795 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3796
3797 int send_message(Message *m) override {
3798 assert(!"send_message on anonymous connection");
3799 }
3800 void send_keepalive() override {
3801 assert(!"send_keepalive on anonymous connection");
3802 }
3803 void mark_down() override {
3804 // silently ignore
3805 }
3806 void mark_disposable() override {
3807 // silengtly ignore
3808 }
3809 bool is_connected() override { return false; }
3810 };
3811
3812 //extract the original message and put it into the regular dispatch function
3813 void Monitor::handle_forward(MonOpRequestRef op)
3814 {
3815 MForward *m = static_cast<MForward*>(op->get_req());
3816 dout(10) << "received forwarded message from " << m->client
3817 << " via " << m->get_source_inst() << dendl;
3818 MonSession *session = op->get_session();
3819 assert(session);
3820
3821 if (!session->is_capable("mon", MON_CAP_X)) {
3822 dout(0) << "forward from entity with insufficient caps! "
3823 << session->caps << dendl;
3824 } else {
3825 // see PaxosService::dispatch(); we rely on this being anon
3826 // (c->msgr == NULL)
3827 PaxosServiceMessage *req = m->claim_message();
3828 assert(req != NULL);
3829
3830 ConnectionRef c(new AnonConnection(cct));
3831 MonSession *s = new MonSession(req->get_source_inst(),
3832 static_cast<Connection*>(c.get()));
3833 c->set_priv(s->get());
3834 c->set_peer_addr(m->client.addr);
3835 c->set_peer_type(m->client.name.type());
3836 c->set_features(m->con_features);
3837
3838 s->caps = m->client_caps;
3839 dout(10) << " caps are " << s->caps << dendl;
3840 s->entity_name = m->entity_name;
3841 dout(10) << " entity name '" << s->entity_name << "' type "
3842 << s->entity_name.get_type() << dendl;
3843 s->proxy_con = m->get_connection();
3844 s->proxy_tid = m->tid;
3845
3846 req->set_connection(c);
3847
3848 // not super accurate, but better than nothing.
3849 req->set_recv_stamp(m->get_recv_stamp());
3850
3851 /*
3852 * note which election epoch this is; we will drop the message if
3853 * there is a future election since our peers will resend routed
3854 * requests in that case.
3855 */
3856 req->rx_election_epoch = get_epoch();
3857
3858 /* Because this is a special fake connection, we need to break
3859 the ref loop between Connection and MonSession differently
3860 than we normally do. Here, the Message refers to the Connection
3861 which refers to the Session, and nobody else refers to the Connection
3862 or the Session. And due to the special nature of this message,
3863 nobody refers to the Connection via the Session. So, clear out that
3864 half of the ref loop.*/
3865 s->con.reset(NULL);
3866
3867 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3868
3869 _ms_dispatch(req);
3870 s->put();
3871 }
3872 }
3873
3874 void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3875 {
3876 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3877
3878 bufferlist bl;
3879 encode_message(m, quorum_con_features, bl);
3880
3881 messenger->send_message(m, to);
3882
3883 for (int i=0; i<(int)monmap->size(); i++) {
3884 if (i != rank)
3885 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3886 }
3887 }
3888
3889 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3890 {
3891 op->mark_event(__func__);
3892
3893 MonSession *session = op->get_session();
3894 assert(session);
3895 Message *req = op->get_req();
3896 ConnectionRef con = op->get_connection();
3897
3898 reply->set_cct(g_ceph_context);
3899 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3900
3901 if (!con) {
3902 dout(2) << "send_reply no connection, dropping reply " << *reply
3903 << " to " << req << " " << *req << dendl;
3904 reply->put();
3905 op->mark_event("reply: no connection");
3906 return;
3907 }
3908
3909 if (!session->con && !session->proxy_con) {
3910 dout(2) << "send_reply no connection, dropping reply " << *reply
3911 << " to " << req << " " << *req << dendl;
3912 reply->put();
3913 op->mark_event("reply: no connection");
3914 return;
3915 }
3916
3917 if (session->proxy_con) {
3918 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3919 << " via " << session->proxy_con->get_peer_addr()
3920 << " for request " << *req << dendl;
3921 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3922 op->mark_event("reply: send routed request");
3923 } else {
3924 session->con->send_message(reply);
3925 op->mark_event("reply: send");
3926 }
3927 }
3928
3929 void Monitor::no_reply(MonOpRequestRef op)
3930 {
3931 MonSession *session = op->get_session();
3932 Message *req = op->get_req();
3933
3934 if (session->proxy_con) {
3935 dout(10) << "no_reply to " << req->get_source_inst()
3936 << " via " << session->proxy_con->get_peer_addr()
3937 << " for request " << *req << dendl;
3938 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3939 op->mark_event("no_reply: send routed request");
3940 } else {
3941 dout(10) << "no_reply to " << req->get_source_inst()
3942 << " " << *req << dendl;
3943 op->mark_event("no_reply");
3944 }
3945 }
3946
3947 void Monitor::handle_route(MonOpRequestRef op)
3948 {
3949 MRoute *m = static_cast<MRoute*>(op->get_req());
3950 MonSession *session = op->get_session();
3951 //check privileges
3952 if (!session->is_capable("mon", MON_CAP_X)) {
3953 dout(0) << "MRoute received from entity without appropriate perms! "
3954 << dendl;
3955 return;
3956 }
3957 if (m->msg)
3958 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3959 else
3960 dout(10) << "handle_route null to " << m->dest << dendl;
3961
3962 // look it up
3963 if (m->session_mon_tid) {
3964 if (routed_requests.count(m->session_mon_tid)) {
3965 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3966
3967 // reset payload, in case encoding is dependent on target features
3968 if (m->msg) {
3969 m->msg->clear_payload();
3970 rr->con->send_message(m->msg);
3971 m->msg = NULL;
3972 }
3973 if (m->send_osdmap_first) {
3974 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3975 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3976 true, MonOpRequestRef());
3977 }
3978 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3979 routed_requests.erase(m->session_mon_tid);
3980 rr->session->routed_request_tids.erase(m->session_mon_tid);
3981 delete rr;
3982 } else {
3983 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3984 }
3985 } else {
3986 dout(10) << " not a routed request, trying to send anyway" << dendl;
3987 if (m->msg) {
3988 messenger->send_message(m->msg, m->dest);
3989 m->msg = NULL;
3990 }
3991 }
3992 }
3993
3994 void Monitor::resend_routed_requests()
3995 {
3996 dout(10) << "resend_routed_requests" << dendl;
3997 int mon = get_leader();
3998 list<Context*> retry;
3999 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
4000 p != routed_requests.end();
4001 ++p) {
4002 RoutedRequest *rr = p->second;
4003
4004 if (mon == rank) {
4005 dout(10) << " requeue for self tid " << rr->tid << dendl;
4006 rr->op->mark_event("retry routed request");
4007 retry.push_back(new C_RetryMessage(this, rr->op));
4008 if (rr->session) {
4009 assert(rr->session->routed_request_tids.count(p->first));
4010 rr->session->routed_request_tids.erase(p->first);
4011 }
4012 delete rr;
4013 } else {
4014 bufferlist::iterator q = rr->request_bl.begin();
4015 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
4016 rr->op->mark_event("resend forwarded message to leader");
4017 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
4018 MForward *forward = new MForward(rr->tid, req, rr->con_features,
4019 rr->session->caps);
4020 req->put(); // forward takes its own ref; drop ours.
4021 forward->client = rr->client_inst;
4022 forward->set_priority(req->get_priority());
4023 messenger->send_message(forward, monmap->get_inst(mon));
4024 }
4025 }
4026 if (mon == rank) {
4027 routed_requests.clear();
4028 finish_contexts(g_ceph_context, retry);
4029 }
4030 }
4031
4032 void Monitor::remove_session(MonSession *s)
4033 {
4034 dout(10) << "remove_session " << s << " " << s->inst
4035 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
4036 assert(s->con);
4037 assert(!s->closed);
4038 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4039 p != s->routed_request_tids.end();
4040 ++p) {
4041 assert(routed_requests.count(*p));
4042 RoutedRequest *rr = routed_requests[*p];
4043 dout(10) << " dropping routed request " << rr->tid << dendl;
4044 delete rr;
4045 routed_requests.erase(*p);
4046 }
4047 s->routed_request_tids.clear();
4048 s->con->set_priv(NULL);
4049 session_map.remove_session(s);
4050 logger->set(l_mon_num_sessions, session_map.get_size());
4051 logger->inc(l_mon_session_rm);
4052 }
4053
4054 void Monitor::remove_all_sessions()
4055 {
4056 Mutex::Locker l(session_map_lock);
4057 while (!session_map.sessions.empty()) {
4058 MonSession *s = session_map.sessions.front();
4059 remove_session(s);
4060 if (logger)
4061 logger->inc(l_mon_session_rm);
4062 }
4063 if (logger)
4064 logger->set(l_mon_num_sessions, session_map.get_size());
4065 }
4066
4067 void Monitor::send_command(const entity_inst_t& inst,
4068 const vector<string>& com)
4069 {
4070 dout(10) << "send_command " << inst << "" << com << dendl;
4071 MMonCommand *c = new MMonCommand(monmap->fsid);
4072 c->cmd = com;
4073 try_send_message(c, inst);
4074 }
4075
4076 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4077 {
4078 /**
4079 * Wait list the new session until we're in the quorum, assuming it's
4080 * sufficiently new.
4081 * tick() will periodically send them back through so we can send
4082 * the client elsewhere if we don't think we're getting back in.
4083 *
4084 * But we whitelist a few sorts of messages:
4085 * 1) Monitors can talk to us at any time, of course.
4086 * 2) auth messages. It's unlikely to go through much faster, but
4087 * it's possible we've just lost our quorum status and we want to take...
4088 * 3) command messages. We want to accept these under all possible
4089 * circumstances.
4090 */
4091 Message *m = op->get_req();
4092 MonSession *s = op->get_session();
4093 ConnectionRef con = op->get_connection();
4094 utime_t too_old = ceph_clock_now();
4095 too_old -= g_ceph_context->_conf->mon_lease;
4096 if (m->get_recv_stamp() > too_old &&
4097 con->is_connected()) {
4098 dout(5) << "waitlisting message " << *m << dendl;
4099 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4100 op->mark_wait_for_quorum();
4101 } else {
4102 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4103 con->mark_down();
4104 // proxied sessions aren't registered and don't have a con; don't remove
4105 // those.
4106 if (!s->proxy_con) {
4107 Mutex::Locker l(session_map_lock);
4108 remove_session(s);
4109 }
4110 op->mark_zap();
4111 }
4112 }
4113
4114 void Monitor::_ms_dispatch(Message *m)
4115 {
4116 if (is_shutdown()) {
4117 m->put();
4118 return;
4119 }
4120
4121 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4122 bool src_is_mon = op->is_src_mon();
4123 op->mark_event("mon:_ms_dispatch");
4124 MonSession *s = op->get_session();
4125 if (s && s->closed) {
4126 return;
4127 }
4128
4129 if (src_is_mon && s) {
4130 ConnectionRef con = m->get_connection();
4131 if (con->get_messenger() && con->get_features() != s->con_features) {
4132 // only update features if this is a non-anonymous connection
4133 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4134 << " (was " << s->con_features
4135 << ", now " << con->get_features() << ")" << dendl;
4136 // connection features changed - recreate session.
4137 if (s->con && s->con != con) {
4138 dout(10) << __func__ << " connection for " << m->get_source_inst()
4139 << " changed from session; mark down and replace" << dendl;
4140 s->con->mark_down();
4141 }
4142 if (s->item.is_on_list()) {
4143 // forwarded messages' sessions are not in the sessions map and
4144 // exist only while the op is being handled.
4145 remove_session(s);
4146 }
4147 s->put();
4148 s = nullptr;
4149 }
4150 }
4151
4152 if (!s) {
4153 // if the sender is not a monitor, make sure their first message for a
4154 // session is an MAuth. If it is not, assume it's a stray message,
4155 // and considering that we are creating a new session it is safe to
4156 // assume that the sender hasn't authenticated yet, so we have no way
4157 // of assessing whether we should handle it or not.
4158 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4159 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4160 m->get_type() != CEPH_MSG_PING)) {
4161 dout(1) << __func__ << " dropping stray message " << *m
4162 << " from " << m->get_source_inst() << dendl;
4163 return;
4164 }
4165
4166 ConnectionRef con = m->get_connection();
4167 {
4168 Mutex::Locker l(session_map_lock);
4169 s = session_map.new_session(m->get_source_inst(), con.get());
4170 }
4171 assert(s);
4172 con->set_priv(s->get());
4173 dout(10) << __func__ << " new session " << s << " " << *s
4174 << " features 0x" << std::hex
4175 << s->con_features << std::dec << dendl;
4176 op->set_session(s);
4177
4178 logger->set(l_mon_num_sessions, session_map.get_size());
4179 logger->inc(l_mon_session_add);
4180
4181 if (src_is_mon) {
4182 // give it monitor caps; the peer type has been authenticated
4183 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4184 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4185 s->caps = *mon_caps;
4186 }
4187 s->put();
4188 } else {
4189 dout(20) << __func__ << " existing session " << s << " for " << s->inst
4190 << dendl;
4191 }
4192
4193 assert(s);
4194
4195 s->session_timeout = ceph_clock_now();
4196 s->session_timeout += g_conf->mon_session_timeout;
4197
4198 if (s->auth_handler) {
4199 s->entity_name = s->auth_handler->get_entity_name();
4200 }
4201 dout(20) << " caps " << s->caps.get_str() << dendl;
4202
4203 if ((is_synchronizing() ||
4204 (s->global_id == 0 && !exited_quorum.is_zero())) &&
4205 !src_is_mon &&
4206 m->get_type() != CEPH_MSG_PING) {
4207 waitlist_or_zap_client(op);
4208 } else {
4209 dispatch_op(op);
4210 }
4211 return;
4212 }
4213
4214 void Monitor::dispatch_op(MonOpRequestRef op)
4215 {
4216 op->mark_event("mon:dispatch_op");
4217 MonSession *s = op->get_session();
4218 assert(s);
4219 if (s->closed) {
4220 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4221 return;
4222 }
4223
4224 /* we will consider the default type as being 'monitor' until proven wrong */
4225 op->set_type_monitor();
4226 /* deal with all messages that do not necessarily need caps */
4227 bool dealt_with = true;
4228 switch (op->get_req()->get_type()) {
4229 // auth
4230 case MSG_MON_GLOBAL_ID:
4231 case CEPH_MSG_AUTH:
4232 op->set_type_service();
4233 /* no need to check caps here */
4234 paxos_service[PAXOS_AUTH]->dispatch(op);
4235 break;
4236
4237 case CEPH_MSG_PING:
4238 handle_ping(op);
4239 break;
4240
4241 /* MMonGetMap may be used by clients to obtain a monmap *before*
4242 * authenticating with the monitor. We need to handle these without
4243 * checking caps because, even on a cluster without cephx, we only set
4244 * session caps *after* the auth handshake. A good example of this
4245 * is when a client calls MonClient::get_monmap_privately(), which does
4246 * not authenticate when obtaining a monmap.
4247 */
4248 case CEPH_MSG_MON_GET_MAP:
4249 handle_mon_get_map(op);
4250 break;
4251
4252 case CEPH_MSG_MON_METADATA:
4253 return handle_mon_metadata(op);
4254
4255 default:
4256 dealt_with = false;
4257 break;
4258 }
4259 if (dealt_with)
4260 return;
4261
4262 /* well, maybe the op belongs to a service... */
4263 op->set_type_service();
4264 /* deal with all messages which caps should be checked somewhere else */
4265 dealt_with = true;
4266 switch (op->get_req()->get_type()) {
4267
4268 // OSDs
4269 case CEPH_MSG_MON_GET_OSDMAP:
4270 case CEPH_MSG_POOLOP:
4271 case MSG_OSD_BEACON:
4272 case MSG_OSD_MARK_ME_DOWN:
4273 case MSG_OSD_FULL:
4274 case MSG_OSD_FAILURE:
4275 case MSG_OSD_BOOT:
4276 case MSG_OSD_ALIVE:
4277 case MSG_OSD_PGTEMP:
4278 case MSG_OSD_PG_CREATED:
4279 case MSG_REMOVE_SNAPS:
4280 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4281 break;
4282
4283 // MDSs
4284 case MSG_MDS_BEACON:
4285 case MSG_MDS_OFFLOAD_TARGETS:
4286 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4287 break;
4288
4289 // Mgrs
4290 case MSG_MGR_BEACON:
4291 paxos_service[PAXOS_MGR]->dispatch(op);
4292 break;
4293
4294 // MgrStat
4295 case CEPH_MSG_STATFS:
4296 // this is an ugly hack, sorry! force the version to 1 so that we do
4297 // not run afoul of the is_readable() paxos check. the client is going
4298 // by the pgmonitor version and the MgrStatMonitor version will lag behind
4299 // that until we complete the upgrade. The paxos ordering crap really
4300 // doesn't matter for statfs results, so just kludge around it here.
4301 if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
4302 ((MStatfs*)op->get_req())->version = 1;
4303 }
4304 case MSG_MON_MGR_REPORT:
4305 case MSG_GETPOOLSTATS:
4306 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4307 break;
4308
4309 // pg
4310 case MSG_PGSTATS:
4311 paxos_service[PAXOS_PGMAP]->dispatch(op);
4312 break;
4313
4314 // log
4315 case MSG_LOG:
4316 paxos_service[PAXOS_LOG]->dispatch(op);
4317 break;
4318
4319 // handle_command() does its own caps checking
4320 case MSG_MON_COMMAND:
4321 op->set_type_command();
4322 handle_command(op);
4323 break;
4324
4325 default:
4326 dealt_with = false;
4327 break;
4328 }
4329 if (dealt_with)
4330 return;
4331
4332 /* nop, looks like it's not a service message; revert back to monitor */
4333 op->set_type_monitor();
4334
4335 /* messages we, the Monitor class, need to deal with
4336 * but may be sent by clients. */
4337
4338 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4339 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4340 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4341 << dendl;
4342 goto drop;
4343 }
4344
4345 dealt_with = true;
4346 switch (op->get_req()->get_type()) {
4347
4348 // misc
4349 case CEPH_MSG_MON_GET_VERSION:
4350 handle_get_version(op);
4351 break;
4352
4353 case CEPH_MSG_MON_SUBSCRIBE:
4354 /* FIXME: check what's being subscribed, filter accordingly */
4355 handle_subscribe(op);
4356 break;
4357
4358 default:
4359 dealt_with = false;
4360 break;
4361 }
4362 if (dealt_with)
4363 return;
4364
4365 if (!op->is_src_mon()) {
4366 dout(1) << __func__ << " unexpected monitor message from"
4367 << " non-monitor entity " << op->get_req()->get_source_inst()
4368 << " " << *(op->get_req()) << " -- dropping" << dendl;
4369 goto drop;
4370 }
4371
4372 /* messages that should only be sent by another monitor */
4373 dealt_with = true;
4374 switch (op->get_req()->get_type()) {
4375
4376 case MSG_ROUTE:
4377 handle_route(op);
4378 break;
4379
4380 case MSG_MON_PROBE:
4381 handle_probe(op);
4382 break;
4383
4384 // Sync (i.e., the new slurp, but on steroids)
4385 case MSG_MON_SYNC:
4386 handle_sync(op);
4387 break;
4388 case MSG_MON_SCRUB:
4389 handle_scrub(op);
4390 break;
4391
4392 /* log acks are sent from a monitor we sent the MLog to, and are
4393 never sent by clients to us. */
4394 case MSG_LOGACK:
4395 log_client.handle_log_ack((MLogAck*)op->get_req());
4396 break;
4397
4398 // monmap
4399 case MSG_MON_JOIN:
4400 op->set_type_service();
4401 paxos_service[PAXOS_MONMAP]->dispatch(op);
4402 break;
4403
4404 // paxos
4405 case MSG_MON_PAXOS:
4406 {
4407 op->set_type_paxos();
4408 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4409 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4410 //can't send these!
4411 break;
4412 }
4413
4414 if (state == STATE_SYNCHRONIZING) {
4415 // we are synchronizing. These messages would do us no
4416 // good, thus just drop them and ignore them.
4417 dout(10) << __func__ << " ignore paxos msg from "
4418 << pm->get_source_inst() << dendl;
4419 break;
4420 }
4421
4422 // sanitize
4423 if (pm->epoch > get_epoch()) {
4424 bootstrap();
4425 break;
4426 }
4427 if (pm->epoch != get_epoch()) {
4428 break;
4429 }
4430
4431 paxos->dispatch(op);
4432 }
4433 break;
4434
4435 // elector messages
4436 case MSG_MON_ELECTION:
4437 op->set_type_election();
4438 //check privileges here for simplicity
4439 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4440 dout(0) << "MMonElection received from entity without enough caps!"
4441 << op->get_session()->caps << dendl;
4442 break;
4443 }
4444 if (!is_probing() && !is_synchronizing()) {
4445 elector.dispatch(op);
4446 }
4447 break;
4448
4449 case MSG_FORWARD:
4450 handle_forward(op);
4451 break;
4452
4453 case MSG_TIMECHECK:
4454 handle_timecheck(op);
4455 break;
4456
4457 case MSG_MON_HEALTH:
4458 health_monitor->dispatch(op);
4459 break;
4460
4461 case MSG_MON_HEALTH_CHECKS:
4462 op->set_type_service();
4463 paxos_service[PAXOS_HEALTH]->dispatch(op);
4464 break;
4465
4466 default:
4467 dealt_with = false;
4468 break;
4469 }
4470 if (!dealt_with) {
4471 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4472 goto drop;
4473 }
4474 return;
4475
4476 drop:
4477 return;
4478 }
4479
4480 void Monitor::handle_ping(MonOpRequestRef op)
4481 {
4482 MPing *m = static_cast<MPing*>(op->get_req());
4483 dout(10) << __func__ << " " << *m << dendl;
4484 MPing *reply = new MPing;
4485 entity_inst_t inst = m->get_source_inst();
4486 bufferlist payload;
4487 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4488 f->open_object_section("pong");
4489
4490 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
4491 get_health_status(false, f.get(), nullptr);
4492 } else {
4493 list<string> health_str;
4494 get_health(health_str, nullptr, f.get());
4495 }
4496
4497 {
4498 stringstream ss;
4499 get_mon_status(f.get(), ss);
4500 }
4501
4502 f->close_section();
4503 stringstream ss;
4504 f->flush(ss);
4505 ::encode(ss.str(), payload);
4506 reply->set_payload(payload);
4507 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4508 messenger->send_message(reply, inst);
4509 }
4510
4511 void Monitor::timecheck_start()
4512 {
4513 dout(10) << __func__ << dendl;
4514 timecheck_cleanup();
4515 timecheck_start_round();
4516 }
4517
4518 void Monitor::timecheck_finish()
4519 {
4520 dout(10) << __func__ << dendl;
4521 timecheck_cleanup();
4522 }
4523
4524 void Monitor::timecheck_start_round()
4525 {
4526 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4527 assert(is_leader());
4528
4529 if (monmap->size() == 1) {
4530 assert(0 == "We are alone; this shouldn't have been scheduled!");
4531 return;
4532 }
4533
4534 if (timecheck_round % 2) {
4535 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4536 utime_t curr_time = ceph_clock_now();
4537 double max = g_conf->mon_timecheck_interval*3;
4538 if (curr_time - timecheck_round_start < max) {
4539 dout(10) << __func__ << " keep current round going" << dendl;
4540 goto out;
4541 } else {
4542 dout(10) << __func__
4543 << " finish current timecheck and start new" << dendl;
4544 timecheck_cancel_round();
4545 }
4546 }
4547
4548 assert(timecheck_round % 2 == 0);
4549 timecheck_acks = 0;
4550 timecheck_round ++;
4551 timecheck_round_start = ceph_clock_now();
4552 dout(10) << __func__ << " new " << timecheck_round << dendl;
4553
4554 timecheck();
4555 out:
4556 dout(10) << __func__ << " setting up next event" << dendl;
4557 timecheck_reset_event();
4558 }
4559
4560 void Monitor::timecheck_finish_round(bool success)
4561 {
4562 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4563 assert(timecheck_round % 2);
4564 timecheck_round ++;
4565 timecheck_round_start = utime_t();
4566
4567 if (success) {
4568 assert(timecheck_waiting.empty());
4569 assert(timecheck_acks == quorum.size());
4570 timecheck_report();
4571 timecheck_check_skews();
4572 return;
4573 }
4574
4575 dout(10) << __func__ << " " << timecheck_waiting.size()
4576 << " peers still waiting:";
4577 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4578 p != timecheck_waiting.end(); ++p) {
4579 *_dout << " " << p->first.name;
4580 }
4581 *_dout << dendl;
4582 timecheck_waiting.clear();
4583
4584 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4585 }
4586
4587 void Monitor::timecheck_cancel_round()
4588 {
4589 timecheck_finish_round(false);
4590 }
4591
4592 void Monitor::timecheck_cleanup()
4593 {
4594 timecheck_round = 0;
4595 timecheck_acks = 0;
4596 timecheck_round_start = utime_t();
4597
4598 if (timecheck_event) {
4599 timer.cancel_event(timecheck_event);
4600 timecheck_event = NULL;
4601 }
4602 timecheck_waiting.clear();
4603 timecheck_skews.clear();
4604 timecheck_latencies.clear();
4605
4606 timecheck_rounds_since_clean = 0;
4607 }
4608
4609 void Monitor::timecheck_reset_event()
4610 {
4611 if (timecheck_event) {
4612 timer.cancel_event(timecheck_event);
4613 timecheck_event = NULL;
4614 }
4615
4616 double delay =
4617 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4618
4619 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4620 delay = cct->_conf->mon_timecheck_interval;
4621 }
4622
4623 dout(10) << __func__ << " delay " << delay
4624 << " rounds_since_clean " << timecheck_rounds_since_clean
4625 << dendl;
4626
4627 timecheck_event = timer.add_event_after(
4628 delay,
4629 new C_MonContext(this, [this](int) {
4630 timecheck_start_round();
4631 }));
4632 }
4633
4634 void Monitor::timecheck_check_skews()
4635 {
4636 dout(10) << __func__ << dendl;
4637 assert(is_leader());
4638 assert((timecheck_round % 2) == 0);
4639 if (monmap->size() == 1) {
4640 assert(0 == "We are alone; we shouldn't have gotten here!");
4641 return;
4642 }
4643 assert(timecheck_latencies.size() == timecheck_skews.size());
4644
4645 bool found_skew = false;
4646 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4647 p != timecheck_skews.end(); ++p) {
4648
4649 double abs_skew;
4650 if (timecheck_has_skew(p->second, &abs_skew)) {
4651 dout(10) << __func__
4652 << " " << p->first << " skew " << abs_skew << dendl;
4653 found_skew = true;
4654 }
4655 }
4656
4657 if (found_skew) {
4658 ++timecheck_rounds_since_clean;
4659 timecheck_reset_event();
4660 } else if (timecheck_rounds_since_clean > 0) {
4661 dout(1) << __func__
4662 << " no clock skews found after " << timecheck_rounds_since_clean
4663 << " rounds" << dendl;
4664 // make sure the skews are really gone and not just a transient success
4665 // this will run just once if not in the presence of skews again.
4666 timecheck_rounds_since_clean = 1;
4667 timecheck_reset_event();
4668 timecheck_rounds_since_clean = 0;
4669 }
4670
4671 }
4672
4673 void Monitor::timecheck_report()
4674 {
4675 dout(10) << __func__ << dendl;
4676 assert(is_leader());
4677 assert((timecheck_round % 2) == 0);
4678 if (monmap->size() == 1) {
4679 assert(0 == "We are alone; we shouldn't have gotten here!");
4680 return;
4681 }
4682
4683 assert(timecheck_latencies.size() == timecheck_skews.size());
4684 bool do_output = true; // only output report once
4685 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4686 if (monmap->get_name(*q) == name)
4687 continue;
4688
4689 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4690 m->epoch = get_epoch();
4691 m->round = timecheck_round;
4692
4693 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4694 it != timecheck_skews.end(); ++it) {
4695 double skew = it->second;
4696 double latency = timecheck_latencies[it->first];
4697
4698 m->skews[it->first] = skew;
4699 m->latencies[it->first] = latency;
4700
4701 if (do_output) {
4702 dout(25) << __func__ << " " << it->first
4703 << " latency " << latency
4704 << " skew " << skew << dendl;
4705 }
4706 }
4707 do_output = false;
4708 entity_inst_t inst = monmap->get_inst(*q);
4709 dout(10) << __func__ << " send report to " << inst << dendl;
4710 messenger->send_message(m, inst);
4711 }
4712 }
4713
4714 void Monitor::timecheck()
4715 {
4716 dout(10) << __func__ << dendl;
4717 assert(is_leader());
4718 if (monmap->size() == 1) {
4719 assert(0 == "We are alone; we shouldn't have gotten here!");
4720 return;
4721 }
4722 assert(timecheck_round % 2 != 0);
4723
4724 timecheck_acks = 1; // we ack ourselves
4725
4726 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4727 << " round " << timecheck_round << dendl;
4728
4729 // we are at the eye of the storm; the point of reference
4730 timecheck_skews[messenger->get_myinst()] = 0.0;
4731 timecheck_latencies[messenger->get_myinst()] = 0.0;
4732
4733 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4734 if (monmap->get_name(*it) == name)
4735 continue;
4736
4737 entity_inst_t inst = monmap->get_inst(*it);
4738 utime_t curr_time = ceph_clock_now();
4739 timecheck_waiting[inst] = curr_time;
4740 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4741 m->epoch = get_epoch();
4742 m->round = timecheck_round;
4743 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4744 messenger->send_message(m, inst);
4745 }
4746 }
4747
4748 health_status_t Monitor::timecheck_status(ostringstream &ss,
4749 const double skew_bound,
4750 const double latency)
4751 {
4752 health_status_t status = HEALTH_OK;
4753 assert(latency >= 0);
4754
4755 double abs_skew;
4756 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4757 status = HEALTH_WARN;
4758 ss << "clock skew " << abs_skew << "s"
4759 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4760 }
4761
4762 return status;
4763 }
4764
4765 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4766 {
4767 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4768 dout(10) << __func__ << " " << *m << dendl;
4769 /* handles PONG's */
4770 assert(m->op == MTimeCheck::OP_PONG);
4771
4772 entity_inst_t other = m->get_source_inst();
4773 if (m->epoch < get_epoch()) {
4774 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4775 << " from " << other
4776 << " curr " << get_epoch()
4777 << " -- severely lagged? discard" << dendl;
4778 return;
4779 }
4780 assert(m->epoch == get_epoch());
4781
4782 if (m->round < timecheck_round) {
4783 dout(1) << __func__ << " got old round " << m->round
4784 << " from " << other
4785 << " curr " << timecheck_round << " -- discard" << dendl;
4786 return;
4787 }
4788
4789 utime_t curr_time = ceph_clock_now();
4790
4791 assert(timecheck_waiting.count(other) > 0);
4792 utime_t timecheck_sent = timecheck_waiting[other];
4793 timecheck_waiting.erase(other);
4794 if (curr_time < timecheck_sent) {
4795 // our clock was readjusted -- drop everything until it all makes sense.
4796 dout(1) << __func__ << " our clock was readjusted --"
4797 << " bump round and drop current check"
4798 << dendl;
4799 timecheck_cancel_round();
4800 return;
4801 }
4802
4803 /* update peer latencies */
4804 double latency = (double)(curr_time - timecheck_sent);
4805
4806 if (timecheck_latencies.count(other) == 0)
4807 timecheck_latencies[other] = latency;
4808 else {
4809 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4810 timecheck_latencies[other] = avg_latency;
4811 }
4812
4813 /*
4814 * update skews
4815 *
4816 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4817 * and 'a' happens to be lower than 'b'; so we use double instead.
4818 *
4819 * latency is always expected to be >= 0.
4820 *
4821 * delta, the difference between theirs timestamp and ours, may either be
4822 * lower or higher than 0; will hardly ever be 0.
4823 *
4824 * The absolute skew is the absolute delta minus the latency, which is
4825 * taken as a whole instead of an rtt given that there is some queueing
4826 * and dispatch times involved and it's hard to assess how long exactly
4827 * it took for the message to travel to the other side and be handled. So
4828 * we call it a bounded skew, the worst case scenario.
4829 *
4830 * Now, to math!
4831 *
4832 * Given that the latency is always positive, we can establish that the
4833 * bounded skew will be:
4834 *
4835 * 1. positive if the absolute delta is higher than the latency and
4836 * delta is positive
4837 * 2. negative if the absolute delta is higher than the latency and
4838 * delta is negative.
4839 * 3. zero if the absolute delta is lower than the latency.
4840 *
4841 * On 3. we make a judgement call and treat the skew as non-existent.
4842 * This is because that, if the absolute delta is lower than the
4843 * latency, then the apparently existing skew is nothing more than a
4844 * side-effect of the high latency at work.
4845 *
4846 * This may not be entirely true though, as a severely skewed clock
4847 * may be masked by an even higher latency, but with high latencies
4848 * we probably have worse issues to deal with than just skewed clocks.
4849 */
4850 assert(latency >= 0);
4851
4852 double delta = ((double) m->timestamp) - ((double) curr_time);
4853 double abs_delta = (delta > 0 ? delta : -delta);
4854 double skew_bound = abs_delta - latency;
4855 if (skew_bound < 0)
4856 skew_bound = 0;
4857 else if (delta < 0)
4858 skew_bound = -skew_bound;
4859
4860 ostringstream ss;
4861 health_status_t status = timecheck_status(ss, skew_bound, latency);
4862 if (status != HEALTH_OK) {
4863 clog->health(status) << other << " " << ss.str();
4864 }
4865
4866 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4867 << " delta " << delta << " skew_bound " << skew_bound
4868 << " latency " << latency << dendl;
4869
4870 timecheck_skews[other] = skew_bound;
4871
4872 timecheck_acks++;
4873 if (timecheck_acks == quorum.size()) {
4874 dout(10) << __func__ << " got pongs from everybody ("
4875 << timecheck_acks << " total)" << dendl;
4876 assert(timecheck_skews.size() == timecheck_acks);
4877 assert(timecheck_waiting.empty());
4878 // everyone has acked, so bump the round to finish it.
4879 timecheck_finish_round();
4880 }
4881 }
4882
4883 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4884 {
4885 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4886 dout(10) << __func__ << " " << *m << dendl;
4887
4888 assert(is_peon());
4889 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4890
4891 if (m->epoch != get_epoch()) {
4892 dout(1) << __func__ << " got wrong epoch "
4893 << "(ours " << get_epoch()
4894 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4895 return;
4896 }
4897
4898 if (m->round < timecheck_round) {
4899 dout(1) << __func__ << " got old round " << m->round
4900 << " current " << timecheck_round
4901 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4902 return;
4903 }
4904
4905 timecheck_round = m->round;
4906
4907 if (m->op == MTimeCheck::OP_REPORT) {
4908 assert((timecheck_round % 2) == 0);
4909 timecheck_latencies.swap(m->latencies);
4910 timecheck_skews.swap(m->skews);
4911 return;
4912 }
4913
4914 assert((timecheck_round % 2) != 0);
4915 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4916 utime_t curr_time = ceph_clock_now();
4917 reply->timestamp = curr_time;
4918 reply->epoch = m->epoch;
4919 reply->round = m->round;
4920 dout(10) << __func__ << " send " << *m
4921 << " to " << m->get_source_inst() << dendl;
4922 m->get_connection()->send_message(reply);
4923 }
4924
4925 void Monitor::handle_timecheck(MonOpRequestRef op)
4926 {
4927 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4928 dout(10) << __func__ << " " << *m << dendl;
4929
4930 if (is_leader()) {
4931 if (m->op != MTimeCheck::OP_PONG) {
4932 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4933 } else {
4934 handle_timecheck_leader(op);
4935 }
4936 } else if (is_peon()) {
4937 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4938 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4939 } else {
4940 handle_timecheck_peon(op);
4941 }
4942 } else {
4943 dout(1) << __func__ << " drop unexpected msg" << dendl;
4944 }
4945 }
4946
4947 void Monitor::handle_subscribe(MonOpRequestRef op)
4948 {
4949 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4950 dout(10) << "handle_subscribe " << *m << dendl;
4951
4952 bool reply = false;
4953
4954 MonSession *s = op->get_session();
4955 assert(s);
4956
4957 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4958 p != m->what.end();
4959 ++p) {
4960 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4961 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4962 reply = true;
4963
4964 // remove conflicting subscribes
4965 if (logmon()->sub_name_to_id(p->first) >= 0) {
4966 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4967 it != s->sub_map.end(); ) {
4968 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4969 Mutex::Locker l(session_map_lock);
4970 session_map.remove_sub((it++)->second);
4971 } else {
4972 ++it;
4973 }
4974 }
4975 }
4976
4977 {
4978 Mutex::Locker l(session_map_lock);
4979 session_map.add_update_sub(s, p->first, p->second.start,
4980 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4981 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4982 }
4983
4984 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4985 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4986 if ((int)s->is_capable("mds", MON_CAP_R)) {
4987 Subscription *sub = s->sub_map[p->first];
4988 assert(sub != nullptr);
4989 mdsmon()->check_sub(sub);
4990 }
4991 } else if (p->first == "osdmap") {
4992 if ((int)s->is_capable("osd", MON_CAP_R)) {
4993 if (s->osd_epoch > p->second.start) {
4994 // client needs earlier osdmaps on purpose, so reset the sent epoch
4995 s->osd_epoch = 0;
4996 }
4997 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4998 }
4999 } else if (p->first == "osd_pg_creates") {
5000 if ((int)s->is_capable("osd", MON_CAP_W)) {
5001 if (monmap->get_required_features().contains_all(
5002 ceph::features::mon::FEATURE_LUMINOUS)) {
5003 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
5004 } else {
5005 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
5006 }
5007 }
5008 } else if (p->first == "monmap") {
5009 monmon()->check_sub(s->sub_map[p->first]);
5010 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
5011 logmon()->check_sub(s->sub_map[p->first]);
5012 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
5013 mgrmon()->check_sub(s->sub_map[p->first]);
5014 } else if (p->first == "servicemap") {
5015 mgrstatmon()->check_sub(s->sub_map[p->first]);
5016 }
5017 }
5018
5019 if (reply) {
5020 // we only need to reply if the client is old enough to think it
5021 // has to send renewals.
5022 ConnectionRef con = m->get_connection();
5023 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5024 m->get_connection()->send_message(new MMonSubscribeAck(
5025 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
5026 }
5027
5028 }
5029
5030 void Monitor::handle_get_version(MonOpRequestRef op)
5031 {
5032 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
5033 dout(10) << "handle_get_version " << *m << dendl;
5034 PaxosService *svc = NULL;
5035
5036 MonSession *s = op->get_session();
5037 assert(s);
5038
5039 if (!is_leader() && !is_peon()) {
5040 dout(10) << " waiting for quorum" << dendl;
5041 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5042 goto out;
5043 }
5044
5045 if (m->what == "mdsmap") {
5046 svc = mdsmon();
5047 } else if (m->what == "fsmap") {
5048 svc = mdsmon();
5049 } else if (m->what == "osdmap") {
5050 svc = osdmon();
5051 } else if (m->what == "monmap") {
5052 svc = monmon();
5053 } else {
5054 derr << "invalid map type " << m->what << dendl;
5055 }
5056
5057 if (svc) {
5058 if (!svc->is_readable()) {
5059 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5060 goto out;
5061 }
5062
5063 MMonGetVersionReply *reply = new MMonGetVersionReply();
5064 reply->handle = m->handle;
5065 reply->version = svc->get_last_committed();
5066 reply->oldest_version = svc->get_first_committed();
5067 reply->set_tid(m->get_tid());
5068
5069 m->get_connection()->send_message(reply);
5070 }
5071 out:
5072 return;
5073 }
5074
5075 bool Monitor::ms_handle_reset(Connection *con)
5076 {
5077 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5078
5079 // ignore lossless monitor sessions
5080 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5081 return false;
5082
5083 MonSession *s = static_cast<MonSession *>(con->get_priv());
5084 if (!s)
5085 return false;
5086
5087 // break any con <-> session ref cycle
5088 s->con->set_priv(NULL);
5089
5090 if (is_shutdown())
5091 return false;
5092
5093 Mutex::Locker l(lock);
5094
5095 dout(10) << "reset/close on session " << s->inst << dendl;
5096 if (!s->closed) {
5097 Mutex::Locker l(session_map_lock);
5098 remove_session(s);
5099 }
5100 s->put();
5101 return true;
5102 }
5103
5104 bool Monitor::ms_handle_refused(Connection *con)
5105 {
5106 // just log for now...
5107 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5108 return false;
5109 }
5110
5111 // -----
5112
5113 void Monitor::send_latest_monmap(Connection *con)
5114 {
5115 bufferlist bl;
5116 monmap->encode(bl, con->get_features());
5117 con->send_message(new MMonMap(bl));
5118 }
5119
5120 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5121 {
5122 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5123 dout(10) << "handle_mon_get_map" << dendl;
5124 send_latest_monmap(m->get_connection().get());
5125 }
5126
5127 void Monitor::handle_mon_metadata(MonOpRequestRef op)
5128 {
5129 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5130 if (is_leader()) {
5131 dout(10) << __func__ << dendl;
5132 update_mon_metadata(m->get_source().num(), std::move(m->data));
5133 }
5134 }
5135
5136 void Monitor::update_mon_metadata(int from, Metadata&& m)
5137 {
5138 // NOTE: this is now for legacy (kraken or jewel) mons only.
5139 pending_metadata[from] = std::move(m);
5140
5141 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5142 bufferlist bl;
5143 ::encode(pending_metadata, bl);
5144 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5145 paxos->trigger_propose();
5146 }
5147
5148 int Monitor::load_metadata()
5149 {
5150 bufferlist bl;
5151 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5152 if (r)
5153 return r;
5154 bufferlist::iterator it = bl.begin();
5155 ::decode(mon_metadata, it);
5156
5157 pending_metadata = mon_metadata;
5158 return 0;
5159 }
5160
5161 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5162 {
5163 assert(f);
5164 if (!mon_metadata.count(mon)) {
5165 err << "mon." << mon << " not found";
5166 return -EINVAL;
5167 }
5168 const Metadata& m = mon_metadata[mon];
5169 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5170 f->dump_string(p->first.c_str(), p->second);
5171 }
5172 return 0;
5173 }
5174
5175 void Monitor::count_metadata(const string& field, map<string,int> *out)
5176 {
5177 for (auto& p : mon_metadata) {
5178 auto q = p.second.find(field);
5179 if (q == p.second.end()) {
5180 (*out)["unknown"]++;
5181 } else {
5182 (*out)[q->second]++;
5183 }
5184 }
5185 }
5186
5187 void Monitor::count_metadata(const string& field, Formatter *f)
5188 {
5189 map<string,int> by_val;
5190 count_metadata(field, &by_val);
5191 f->open_object_section(field.c_str());
5192 for (auto& p : by_val) {
5193 f->dump_int(p.first.c_str(), p.second);
5194 }
5195 f->close_section();
5196 }
5197
5198 int Monitor::print_nodes(Formatter *f, ostream& err)
5199 {
5200 map<string, list<int> > mons; // hostname => mon
5201 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5202 it != mon_metadata.end(); ++it) {
5203 const Metadata& m = it->second;
5204 Metadata::const_iterator hostname = m.find("hostname");
5205 if (hostname == m.end()) {
5206 // not likely though
5207 continue;
5208 }
5209 mons[hostname->second].push_back(it->first);
5210 }
5211
5212 dump_services(f, mons, "mon");
5213 return 0;
5214 }
5215
5216 // ----------------------------------------------
5217 // scrub
5218
5219 int Monitor::scrub_start()
5220 {
5221 dout(10) << __func__ << dendl;
5222 assert(is_leader());
5223
5224 if (!scrub_result.empty()) {
5225 clog->info() << "scrub already in progress";
5226 return -EBUSY;
5227 }
5228
5229 scrub_event_cancel();
5230 scrub_result.clear();
5231 scrub_state.reset(new ScrubState);
5232
5233 scrub();
5234 return 0;
5235 }
5236
5237 int Monitor::scrub()
5238 {
5239 assert(is_leader());
5240 assert(scrub_state);
5241
5242 scrub_cancel_timeout();
5243 wait_for_paxos_write();
5244 scrub_version = paxos->get_version();
5245
5246
5247 // scrub all keys if we're the only monitor in the quorum
5248 int32_t num_keys =
5249 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5250
5251 for (set<int>::iterator p = quorum.begin();
5252 p != quorum.end();
5253 ++p) {
5254 if (*p == rank)
5255 continue;
5256 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5257 num_keys);
5258 r->key = scrub_state->last_key;
5259 messenger->send_message(r, monmap->get_inst(*p));
5260 }
5261
5262 // scrub my keys
5263 bool r = _scrub(&scrub_result[rank],
5264 &scrub_state->last_key,
5265 &num_keys);
5266
5267 scrub_state->finished = !r;
5268
5269 // only after we got our scrub results do we really care whether the
5270 // other monitors are late on their results. Also, this way we avoid
5271 // triggering the timeout if we end up getting stuck in _scrub() for
5272 // longer than the duration of the timeout.
5273 scrub_reset_timeout();
5274
5275 if (quorum.size() == 1) {
5276 assert(scrub_state->finished == true);
5277 scrub_finish();
5278 }
5279 return 0;
5280 }
5281
5282 void Monitor::handle_scrub(MonOpRequestRef op)
5283 {
5284 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5285 dout(10) << __func__ << " " << *m << dendl;
5286 switch (m->op) {
5287 case MMonScrub::OP_SCRUB:
5288 {
5289 if (!is_peon())
5290 break;
5291
5292 wait_for_paxos_write();
5293
5294 if (m->version != paxos->get_version())
5295 break;
5296
5297 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5298 m->version,
5299 m->num_keys);
5300
5301 reply->key = m->key;
5302 _scrub(&reply->result, &reply->key, &reply->num_keys);
5303 m->get_connection()->send_message(reply);
5304 }
5305 break;
5306
5307 case MMonScrub::OP_RESULT:
5308 {
5309 if (!is_leader())
5310 break;
5311 if (m->version != scrub_version)
5312 break;
5313 // reset the timeout each time we get a result
5314 scrub_reset_timeout();
5315
5316 int from = m->get_source().num();
5317 assert(scrub_result.count(from) == 0);
5318 scrub_result[from] = m->result;
5319
5320 if (scrub_result.size() == quorum.size()) {
5321 scrub_check_results();
5322 scrub_result.clear();
5323 if (scrub_state->finished)
5324 scrub_finish();
5325 else
5326 scrub();
5327 }
5328 }
5329 break;
5330 }
5331 }
5332
5333 bool Monitor::_scrub(ScrubResult *r,
5334 pair<string,string> *start,
5335 int *num_keys)
5336 {
5337 assert(r != NULL);
5338 assert(start != NULL);
5339 assert(num_keys != NULL);
5340
5341 set<string> prefixes = get_sync_targets_names();
5342 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5343
5344 dout(10) << __func__ << " start (" << *start << ")"
5345 << " num_keys " << *num_keys << dendl;
5346
5347 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5348
5349 int scrubbed_keys = 0;
5350 pair<string,string> last_key;
5351
5352 while (it->has_next_chunk()) {
5353
5354 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5355 break;
5356
5357 pair<string,string> k = it->get_next_key();
5358 if (prefixes.count(k.first) == 0)
5359 continue;
5360
5361 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5362 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5363 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5364 << dendl;
5365 continue;
5366 }
5367
5368 bufferlist bl;
5369 int err = store->get(k.first, k.second, bl);
5370 assert(err == 0);
5371
5372 uint32_t key_crc = bl.crc32c(0);
5373 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5374 << " crc " << key_crc << dendl;
5375 r->prefix_keys[k.first]++;
5376 if (r->prefix_crc.count(k.first) == 0) {
5377 r->prefix_crc[k.first] = 0;
5378 }
5379 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5380
5381 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5382 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5383 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5384 r->prefix_crc[k.first] += 1;
5385 }
5386
5387 ++scrubbed_keys;
5388 last_key = k;
5389 }
5390
5391 dout(20) << __func__ << " last_key (" << last_key << ")"
5392 << " scrubbed_keys " << scrubbed_keys
5393 << " has_next " << it->has_next_chunk() << dendl;
5394
5395 *start = last_key;
5396 *num_keys = scrubbed_keys;
5397
5398 return it->has_next_chunk();
5399 }
5400
5401 void Monitor::scrub_check_results()
5402 {
5403 dout(10) << __func__ << dendl;
5404
5405 // compare
5406 int errors = 0;
5407 ScrubResult& mine = scrub_result[rank];
5408 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5409 p != scrub_result.end();
5410 ++p) {
5411 if (p->first == rank)
5412 continue;
5413 if (p->second != mine) {
5414 ++errors;
5415 clog->error() << "scrub mismatch";
5416 clog->error() << " mon." << rank << " " << mine;
5417 clog->error() << " mon." << p->first << " " << p->second;
5418 }
5419 }
5420 if (!errors)
5421 clog->debug() << "scrub ok on " << quorum << ": " << mine;
5422 }
5423
5424 inline void Monitor::scrub_timeout()
5425 {
5426 dout(1) << __func__ << " restarting scrub" << dendl;
5427 scrub_reset();
5428 scrub_start();
5429 }
5430
5431 void Monitor::scrub_finish()
5432 {
5433 dout(10) << __func__ << dendl;
5434 scrub_reset();
5435 scrub_event_start();
5436 }
5437
5438 void Monitor::scrub_reset()
5439 {
5440 dout(10) << __func__ << dendl;
5441 scrub_cancel_timeout();
5442 scrub_version = 0;
5443 scrub_result.clear();
5444 scrub_state.reset();
5445 }
5446
5447 inline void Monitor::scrub_update_interval(int secs)
5448 {
5449 // we don't care about changes if we are not the leader.
5450 // changes will be visible if we become the leader.
5451 if (!is_leader())
5452 return;
5453
5454 dout(1) << __func__ << " new interval = " << secs << dendl;
5455
5456 // if scrub already in progress, all changes will already be visible during
5457 // the next round. Nothing to do.
5458 if (scrub_state != NULL)
5459 return;
5460
5461 scrub_event_cancel();
5462 scrub_event_start();
5463 }
5464
5465 void Monitor::scrub_event_start()
5466 {
5467 dout(10) << __func__ << dendl;
5468
5469 if (scrub_event)
5470 scrub_event_cancel();
5471
5472 if (cct->_conf->mon_scrub_interval <= 0) {
5473 dout(1) << __func__ << " scrub event is disabled"
5474 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5475 << ")" << dendl;
5476 return;
5477 }
5478
5479 scrub_event = timer.add_event_after(
5480 cct->_conf->mon_scrub_interval,
5481 new C_MonContext(this, [this](int) {
5482 scrub_start();
5483 }));
5484 }
5485
5486 void Monitor::scrub_event_cancel()
5487 {
5488 dout(10) << __func__ << dendl;
5489 if (scrub_event) {
5490 timer.cancel_event(scrub_event);
5491 scrub_event = NULL;
5492 }
5493 }
5494
5495 inline void Monitor::scrub_cancel_timeout()
5496 {
5497 if (scrub_timeout_event) {
5498 timer.cancel_event(scrub_timeout_event);
5499 scrub_timeout_event = NULL;
5500 }
5501 }
5502
5503 void Monitor::scrub_reset_timeout()
5504 {
5505 dout(15) << __func__ << " reset timeout event" << dendl;
5506 scrub_cancel_timeout();
5507 scrub_timeout_event = timer.add_event_after(
5508 g_conf->mon_scrub_timeout,
5509 new C_MonContext(this, [this](int) {
5510 scrub_timeout();
5511 }));
5512 }
5513
5514 /************ TICK ***************/
5515 void Monitor::new_tick()
5516 {
5517 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5518 tick();
5519 }));
5520 }
5521
5522 void Monitor::tick()
5523 {
5524 // ok go.
5525 dout(11) << "tick" << dendl;
5526 const utime_t now = ceph_clock_now();
5527
5528 // Check if we need to emit any delayed health check updated messages
5529 if (is_leader()) {
5530 const auto min_period = g_conf->get_val<int64_t>(
5531 "mon_health_log_update_period");
5532 for (auto& svc : paxos_service) {
5533 auto health = svc->get_health_checks();
5534
5535 for (const auto &i : health.checks) {
5536 const std::string &code = i.first;
5537 const std::string &summary = i.second.summary;
5538 const health_status_t severity = i.second.severity;
5539
5540 auto status_iter = health_check_log_times.find(code);
5541 if (status_iter == health_check_log_times.end()) {
5542 continue;
5543 }
5544
5545 auto &log_status = status_iter->second;
5546 bool const changed = log_status.last_message != summary
5547 || log_status.severity != severity;
5548
5549 if (changed && now - log_status.updated_at > min_period) {
5550 log_status.last_message = summary;
5551 log_status.updated_at = now;
5552 log_status.severity = severity;
5553
5554 ostringstream ss;
5555 ss << "Health check update: " << summary << " (" << code << ")";
5556 clog->health(severity) << ss.str();
5557 }
5558 }
5559 }
5560 }
5561
5562
5563 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5564 (*p)->tick();
5565 (*p)->maybe_trim();
5566 }
5567
5568 // trim sessions
5569 {
5570 Mutex::Locker l(session_map_lock);
5571 auto p = session_map.sessions.begin();
5572
5573 bool out_for_too_long = (!exited_quorum.is_zero() &&
5574 now > (exited_quorum + 2*g_conf->mon_lease));
5575
5576 while (!p.end()) {
5577 MonSession *s = *p;
5578 ++p;
5579
5580 // don't trim monitors
5581 if (s->inst.name.is_mon())
5582 continue;
5583
5584 if (s->session_timeout < now && s->con) {
5585 // check keepalive, too
5586 s->session_timeout = s->con->get_last_keepalive();
5587 s->session_timeout += g_conf->mon_session_timeout;
5588 }
5589 if (s->session_timeout < now) {
5590 dout(10) << " trimming session " << s->con << " " << s->inst
5591 << " (timeout " << s->session_timeout
5592 << " < now " << now << ")" << dendl;
5593 } else if (out_for_too_long) {
5594 // boot the client Session because we've taken too long getting back in
5595 dout(10) << " trimming session " << s->con << " " << s->inst
5596 << " because we've been out of quorum too long" << dendl;
5597 } else {
5598 continue;
5599 }
5600
5601 s->con->mark_down();
5602 remove_session(s);
5603 logger->inc(l_mon_session_trim);
5604 }
5605 }
5606 sync_trim_providers();
5607
5608 if (!maybe_wait_for_quorum.empty()) {
5609 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5610 }
5611
5612 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5613 // this is only necessary on upgraded clusters.
5614 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5615 prepare_new_fingerprint(t);
5616 paxos->trigger_propose();
5617 }
5618
5619 new_tick();
5620 }
5621
5622 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5623 {
5624 uuid_d nf;
5625 nf.generate_random();
5626 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5627
5628 bufferlist bl;
5629 ::encode(nf, bl);
5630 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5631 }
5632
5633 int Monitor::check_fsid()
5634 {
5635 bufferlist ebl;
5636 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5637 if (r == -ENOENT)
5638 return r;
5639 assert(r == 0);
5640
5641 string es(ebl.c_str(), ebl.length());
5642
5643 // only keep the first line
5644 size_t pos = es.find_first_of('\n');
5645 if (pos != string::npos)
5646 es.resize(pos);
5647
5648 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5649 uuid_d ondisk;
5650 if (!ondisk.parse(es.c_str())) {
5651 derr << "error: unable to parse uuid" << dendl;
5652 return -EINVAL;
5653 }
5654
5655 if (monmap->get_fsid() != ondisk) {
5656 derr << "error: cluster_uuid file exists with value " << ondisk
5657 << ", != our uuid " << monmap->get_fsid() << dendl;
5658 return -EEXIST;
5659 }
5660
5661 return 0;
5662 }
5663
5664 int Monitor::write_fsid()
5665 {
5666 auto t(std::make_shared<MonitorDBStore::Transaction>());
5667 write_fsid(t);
5668 int r = store->apply_transaction(t);
5669 return r;
5670 }
5671
5672 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5673 {
5674 ostringstream ss;
5675 ss << monmap->get_fsid() << "\n";
5676 string us = ss.str();
5677
5678 bufferlist b;
5679 b.append(us);
5680
5681 t->put(MONITOR_NAME, "cluster_uuid", b);
5682 return 0;
5683 }
5684
5685 /*
5686 * this is the closest thing to a traditional 'mkfs' for ceph.
5687 * initialize the monitor state machines to their initial values.
5688 */
5689 int Monitor::mkfs(bufferlist& osdmapbl)
5690 {
5691 auto t(std::make_shared<MonitorDBStore::Transaction>());
5692
5693 // verify cluster fsid
5694 int r = check_fsid();
5695 if (r < 0 && r != -ENOENT)
5696 return r;
5697
5698 bufferlist magicbl;
5699 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5700 magicbl.append("\n");
5701 t->put(MONITOR_NAME, "magic", magicbl);
5702
5703
5704 features = get_initial_supported_features();
5705 write_features(t);
5706
5707 // save monmap, osdmap, keyring.
5708 bufferlist monmapbl;
5709 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5710 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5711 t->put("mkfs", "monmap", monmapbl);
5712
5713 if (osdmapbl.length()) {
5714 // make sure it's a valid osdmap
5715 try {
5716 OSDMap om;
5717 om.decode(osdmapbl);
5718 }
5719 catch (buffer::error& e) {
5720 derr << "error decoding provided osdmap: " << e.what() << dendl;
5721 return -EINVAL;
5722 }
5723 t->put("mkfs", "osdmap", osdmapbl);
5724 }
5725
5726 if (is_keyring_required()) {
5727 KeyRing keyring;
5728 string keyring_filename;
5729
5730 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5731 if (r) {
5732 derr << "unable to find a keyring file on " << g_conf->keyring
5733 << ": " << cpp_strerror(r) << dendl;
5734 if (g_conf->key != "") {
5735 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5736 "\n\tcaps mon = \"allow *\"\n";
5737 bufferlist bl;
5738 bl.append(keyring_plaintext);
5739 try {
5740 bufferlist::iterator i = bl.begin();
5741 keyring.decode_plaintext(i);
5742 }
5743 catch (const buffer::error& e) {
5744 derr << "error decoding keyring " << keyring_plaintext
5745 << ": " << e.what() << dendl;
5746 return -EINVAL;
5747 }
5748 } else {
5749 return -ENOENT;
5750 }
5751 } else {
5752 r = keyring.load(g_ceph_context, keyring_filename);
5753 if (r < 0) {
5754 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5755 return r;
5756 }
5757 }
5758
5759 // put mon. key in external keyring; seed with everything else.
5760 extract_save_mon_key(keyring);
5761
5762 bufferlist keyringbl;
5763 keyring.encode_plaintext(keyringbl);
5764 t->put("mkfs", "keyring", keyringbl);
5765 }
5766 write_fsid(t);
5767 store->apply_transaction(t);
5768
5769 return 0;
5770 }
5771
5772 int Monitor::write_default_keyring(bufferlist& bl)
5773 {
5774 ostringstream os;
5775 os << g_conf->mon_data << "/keyring";
5776
5777 int err = 0;
5778 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5779 if (fd < 0) {
5780 err = -errno;
5781 dout(0) << __func__ << " failed to open " << os.str()
5782 << ": " << cpp_strerror(err) << dendl;
5783 return err;
5784 }
5785
5786 err = bl.write_fd(fd);
5787 if (!err)
5788 ::fsync(fd);
5789 VOID_TEMP_FAILURE_RETRY(::close(fd));
5790
5791 return err;
5792 }
5793
5794 void Monitor::extract_save_mon_key(KeyRing& keyring)
5795 {
5796 EntityName mon_name;
5797 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5798 EntityAuth mon_key;
5799 if (keyring.get_auth(mon_name, mon_key)) {
5800 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5801 KeyRing pkey;
5802 pkey.add(mon_name, mon_key);
5803 bufferlist bl;
5804 pkey.encode_plaintext(bl);
5805 write_default_keyring(bl);
5806 keyring.remove(mon_name);
5807 }
5808 }
5809
5810 bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5811 bool force_new)
5812 {
5813 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5814 << dendl;
5815
5816 if (is_shutdown())
5817 return false;
5818
5819 // we only connect to other monitors and mgr; every else connects to us.
5820 if (service_id != CEPH_ENTITY_TYPE_MON &&
5821 service_id != CEPH_ENTITY_TYPE_MGR)
5822 return false;
5823
5824 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5825 // auth_none
5826 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5827 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5828 handler.set_global_id(0);
5829 *authorizer = handler.build_authorizer(service_id);
5830 return true;
5831 }
5832
5833 CephXServiceTicketInfo auth_ticket_info;
5834 CephXSessionAuthInfo info;
5835 int ret;
5836
5837 EntityName name;
5838 name.set_type(CEPH_ENTITY_TYPE_MON);
5839 auth_ticket_info.ticket.name = name;
5840 auth_ticket_info.ticket.global_id = 0;
5841
5842 if (service_id == CEPH_ENTITY_TYPE_MON) {
5843 // mon to mon authentication uses the private monitor shared key and not the
5844 // rotating key
5845 CryptoKey secret;
5846 if (!keyring.get_secret(name, secret) &&
5847 !key_server.get_secret(name, secret)) {
5848 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5849 << dendl;
5850 stringstream ss, ds;
5851 int err = key_server.list_secrets(ds);
5852 if (err < 0)
5853 ss << "no installed auth entries!";
5854 else
5855 ss << "installed auth entries:";
5856 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5857 return false;
5858 }
5859
5860 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5861 secret, (uint64_t)-1);
5862 if (ret < 0) {
5863 dout(0) << __func__ << " failed to build mon session_auth_info "
5864 << cpp_strerror(ret) << dendl;
5865 return false;
5866 }
5867 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5868 // mgr
5869 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5870 if (ret < 0) {
5871 derr << __func__ << " failed to build mgr service session_auth_info "
5872 << cpp_strerror(ret) << dendl;
5873 return false;
5874 }
5875 } else {
5876 ceph_abort(); // see check at top of fn
5877 }
5878
5879 CephXTicketBlob blob;
5880 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5881 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5882 return false;
5883 }
5884 bufferlist ticket_data;
5885 ::encode(blob, ticket_data);
5886
5887 bufferlist::iterator iter = ticket_data.begin();
5888 CephXTicketHandler handler(g_ceph_context, service_id);
5889 ::decode(handler.ticket, iter);
5890
5891 handler.session_key = info.session_key;
5892
5893 *authorizer = handler.build_authorizer(0);
5894
5895 return true;
5896 }
5897
5898 bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5899 int protocol, bufferlist& authorizer_data,
5900 bufferlist& authorizer_reply,
5901 bool& isvalid, CryptoKey& session_key,
5902 std::unique_ptr<AuthAuthorizerChallenge> *challenge)
5903 {
5904 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5905 << " " << ceph_entity_type_name(peer_type)
5906 << " protocol " << protocol << dendl;
5907
5908 if (is_shutdown())
5909 return false;
5910
5911 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5912 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5913 // monitor, and cephx is enabled
5914 isvalid = false;
5915 if (protocol == CEPH_AUTH_CEPHX) {
5916 bufferlist::iterator iter = authorizer_data.begin();
5917 CephXServiceTicketInfo auth_ticket_info;
5918
5919 if (authorizer_data.length()) {
5920 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5921 auth_ticket_info, challenge, authorizer_reply);
5922 if (ret) {
5923 session_key = auth_ticket_info.session_key;
5924 isvalid = true;
5925 } else {
5926 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5927 }
5928 }
5929 } else {
5930 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5931 }
5932 } else {
5933 // who cares.
5934 isvalid = true;
5935 }
5936 return true;
5937 }