]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Monitor.cc
update sources to v12.2.1
[ceph.git] / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <sstream>
17 #include <stdlib.h>
18 #include <signal.h>
19 #include <limits.h>
20 #include <cstring>
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
23
24 #include "Monitor.h"
25 #include "common/version.h"
26
27 #include "osd/OSDMap.h"
28
29 #include "MonitorDBStore.h"
30
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
48
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
51
52 #include "messages/MAuthReply.h"
53
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
56
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
70
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/OldHealthMonitor.h"
81 #include "mon/HealthMonitor.h"
82 #include "mon/ConfigKeyService.h"
83 #include "common/config.h"
84 #include "common/cmdparse.h"
85 #include "include/assert.h"
86 #include "include/compat.h"
87 #include "perfglue/heap_profiler.h"
88
89 #include "auth/none/AuthNoneClientHandler.h"
90
91 #define dout_subsys ceph_subsys_mon
92 #undef dout_prefix
93 #define dout_prefix _prefix(_dout, this)
94 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
95 return *_dout << "mon." << mon->name << "@" << mon->rank
96 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
97 }
98
99 const string Monitor::MONITOR_NAME = "monitor";
100 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
101
102
103 #undef FLAG
104 #undef COMMAND
105 #undef COMMAND_WITH_FLAG
106 #define FLAG(f) (MonCommand::FLAG_##f)
107 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
108 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
109 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
110 {parsesig, helptext, modulename, req_perms, avail, flags},
111 MonCommand mon_commands[] = {
112 #include <mon/MonCommands.h>
113 };
114 MonCommand pgmonitor_commands[] = {
115 #include <mon/PGMonitorCommands.h>
116 };
117 #undef COMMAND
118 #undef COMMAND_WITH_FLAG
119
120
121 void C_MonContext::finish(int r) {
122 if (mon->is_shutdown())
123 return;
124 FunctionContext::finish(r);
125 }
126
127 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
128 Messenger *m, Messenger *mgr_m, MonMap *map) :
129 Dispatcher(cct_),
130 name(nm),
131 rank(-1),
132 messenger(m),
133 con_self(m ? m->get_loopback_connection() : NULL),
134 lock("Monitor::lock"),
135 timer(cct_, lock),
136 finisher(cct_, "mon_finisher", "fin"),
137 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
138 has_ever_joined(false),
139 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
140 monmap(map),
141 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
142 key_server(cct, &keyring),
143 auth_cluster_required(cct,
144 cct->_conf->auth_supported.empty() ?
145 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
146 auth_service_required(cct,
147 cct->_conf->auth_supported.empty() ?
148 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
149 mgr_messenger(mgr_m),
150 mgr_client(cct_, mgr_m),
151 pgservice(nullptr),
152 store(s),
153
154 state(STATE_PROBING),
155
156 elector(this),
157 required_features(0),
158 leader(0),
159 quorum_con_features(0),
160 // scrub
161 scrub_version(0),
162 scrub_event(NULL),
163 scrub_timeout_event(NULL),
164
165 // sync state
166 sync_provider_count(0),
167 sync_cookie(0),
168 sync_full(false),
169 sync_start_version(0),
170 sync_timeout_event(NULL),
171 sync_last_committed_floor(0),
172
173 timecheck_round(0),
174 timecheck_acks(0),
175 timecheck_rounds_since_clean(0),
176 timecheck_event(NULL),
177
178 paxos_service(PAXOS_NUM),
179 admin_hook(NULL),
180 routed_request_tid(0),
181 op_tracker(cct, true, 1)
182 {
183 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
184 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
185
186 update_log_clients();
187
188 paxos = new Paxos(this, "paxos");
189
190 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
191 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
192 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
193 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
194 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
195 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
196 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
197 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
198 paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
199
200 health_monitor = new OldHealthMonitor(this);
201 config_key_service = new ConfigKeyService(this, paxos);
202
203 mon_caps = new MonCap();
204 bool r = mon_caps->parse("allow *", NULL);
205 assert(r);
206
207 exited_quorum = ceph_clock_now();
208
209 // prepare local commands
210 local_mon_commands.resize(ARRAY_SIZE(mon_commands));
211 for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
212 local_mon_commands[i] = mon_commands[i];
213 }
214 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
215
216 local_upgrading_mon_commands = local_mon_commands;
217 for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
218 local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
219 }
220 MonCommand::encode_vector(local_upgrading_mon_commands,
221 local_upgrading_mon_commands_bl);
222
223 // assume our commands until we have an election. this only means
224 // we won't reply with EINVAL before the election; any command that
225 // actually matters will wait until we have quorum etc and then
226 // retry (and revalidate).
227 leader_mon_commands = local_mon_commands;
228
229 // note: OSDMonitor may update this based on the luminous flag.
230 pgservice = mgrstatmon()->get_pg_stat_service();
231 }
232
233 Monitor::~Monitor()
234 {
235 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
236 delete *p;
237 delete health_monitor;
238 delete config_key_service;
239 delete paxos;
240 assert(session_map.sessions.empty());
241 delete mon_caps;
242 }
243
244
245 class AdminHook : public AdminSocketHook {
246 Monitor *mon;
247 public:
248 explicit AdminHook(Monitor *m) : mon(m) {}
249 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
250 bufferlist& out) override {
251 stringstream ss;
252 mon->do_admin_command(command, cmdmap, format, ss);
253 out.append(ss);
254 return true;
255 }
256 };
257
258 void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
259 ostream& ss)
260 {
261 Mutex::Locker l(lock);
262
263 boost::scoped_ptr<Formatter> f(Formatter::create(format));
264
265 string args;
266 for (cmdmap_t::iterator p = cmdmap.begin();
267 p != cmdmap.end(); ++p) {
268 if (p->first == "prefix")
269 continue;
270 if (!args.empty())
271 args += ", ";
272 args += cmd_vartype_stringify(p->second);
273 }
274 args = "[" + args + "]";
275
276 bool read_only = (command == "mon_status" ||
277 command == "mon metadata" ||
278 command == "quorum_status" ||
279 command == "ops" ||
280 command == "sessions");
281
282 (read_only ? audit_clog->debug() : audit_clog->info())
283 << "from='admin socket' entity='admin socket' "
284 << "cmd='" << command << "' args=" << args << ": dispatch";
285
286 if (command == "mon_status") {
287 get_mon_status(f.get(), ss);
288 if (f)
289 f->flush(ss);
290 } else if (command == "quorum_status") {
291 _quorum_status(f.get(), ss);
292 } else if (command == "sync_force") {
293 string validate;
294 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
295 (validate != "--yes-i-really-mean-it")) {
296 ss << "are you SURE? this will mean the monitor store will be erased "
297 "the next time the monitor is restarted. pass "
298 "'--yes-i-really-mean-it' if you really do.";
299 goto abort;
300 }
301 sync_force(f.get(), ss);
302 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
303 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
304 goto abort;
305 } else if (command == "quorum enter") {
306 elector.start_participating();
307 start_election();
308 ss << "started responding to quorum, initiated new election";
309 } else if (command == "quorum exit") {
310 start_election();
311 elector.stop_participating();
312 ss << "stopped responding to quorum, initiated new election";
313 } else if (command == "ops") {
314 (void)op_tracker.dump_ops_in_flight(f.get());
315 if (f) {
316 f->flush(ss);
317 }
318 } else if (command == "sessions") {
319
320 if (f) {
321 f->open_array_section("sessions");
322 for (auto p : session_map.sessions) {
323 f->dump_stream("session") << *p;
324 }
325 f->close_section();
326 f->flush(ss);
327 }
328
329 } else {
330 assert(0 == "bad AdminSocket command binding");
331 }
332 (read_only ? audit_clog->debug() : audit_clog->info())
333 << "from='admin socket' "
334 << "entity='admin socket' "
335 << "cmd=" << command << " "
336 << "args=" << args << ": finished";
337 return;
338
339 abort:
340 (read_only ? audit_clog->debug() : audit_clog->info())
341 << "from='admin socket' "
342 << "entity='admin socket' "
343 << "cmd=" << command << " "
344 << "args=" << args << ": aborted";
345 }
346
347 void Monitor::handle_signal(int signum)
348 {
349 assert(signum == SIGINT || signum == SIGTERM);
350 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
351 shutdown();
352 }
353
354 CompatSet Monitor::get_initial_supported_features()
355 {
356 CompatSet::FeatureSet ceph_mon_feature_compat;
357 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
358 CompatSet::FeatureSet ceph_mon_feature_incompat;
359 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
360 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
361 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
362 ceph_mon_feature_incompat);
363 }
364
365 CompatSet Monitor::get_supported_features()
366 {
367 CompatSet compat = get_initial_supported_features();
368 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
369 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
370 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
371 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
372 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
373 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
374 return compat;
375 }
376
377 CompatSet Monitor::get_legacy_features()
378 {
379 CompatSet::FeatureSet ceph_mon_feature_compat;
380 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
381 CompatSet::FeatureSet ceph_mon_feature_incompat;
382 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
383 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
384 ceph_mon_feature_incompat);
385 }
386
387 int Monitor::check_features(MonitorDBStore *store)
388 {
389 CompatSet required = get_supported_features();
390 CompatSet ondisk;
391
392 read_features_off_disk(store, &ondisk);
393
394 if (!required.writeable(ondisk)) {
395 CompatSet diff = required.unsupported(ondisk);
396 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
397 return -EPERM;
398 }
399
400 return 0;
401 }
402
403 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
404 {
405 bufferlist featuresbl;
406 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
407 if (featuresbl.length() == 0) {
408 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
409 << "Assuming it is old-style and introducing one." << dendl;
410 //we only want the baseline ~v.18 features assumed to be on disk.
411 //If new features are introduced this code needs to disappear or
412 //be made smarter.
413 *features = get_legacy_features();
414
415 features->encode(featuresbl);
416 auto t(std::make_shared<MonitorDBStore::Transaction>());
417 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
418 store->apply_transaction(t);
419 } else {
420 bufferlist::iterator it = featuresbl.begin();
421 features->decode(it);
422 }
423 }
424
425 void Monitor::read_features()
426 {
427 read_features_off_disk(store, &features);
428 dout(10) << "features " << features << dendl;
429
430 calc_quorum_requirements();
431 dout(10) << "required_features " << required_features << dendl;
432 }
433
434 void Monitor::write_features(MonitorDBStore::TransactionRef t)
435 {
436 bufferlist bl;
437 features.encode(bl);
438 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
439 }
440
441 const char** Monitor::get_tracked_conf_keys() const
442 {
443 static const char* KEYS[] = {
444 "crushtool", // helpful for testing
445 "mon_election_timeout",
446 "mon_lease",
447 "mon_lease_renew_interval_factor",
448 "mon_lease_ack_timeout_factor",
449 "mon_accept_timeout_factor",
450 // clog & admin clog
451 "clog_to_monitors",
452 "clog_to_syslog",
453 "clog_to_syslog_facility",
454 "clog_to_syslog_level",
455 "clog_to_graylog",
456 "clog_to_graylog_host",
457 "clog_to_graylog_port",
458 "host",
459 "fsid",
460 // periodic health to clog
461 "mon_health_to_clog",
462 "mon_health_to_clog_interval",
463 "mon_health_to_clog_tick_interval",
464 // scrub interval
465 "mon_scrub_interval",
466 NULL
467 };
468 return KEYS;
469 }
470
471 void Monitor::handle_conf_change(const struct md_config_t *conf,
472 const std::set<std::string> &changed)
473 {
474 sanitize_options();
475
476 dout(10) << __func__ << " " << changed << dendl;
477
478 if (changed.count("clog_to_monitors") ||
479 changed.count("clog_to_syslog") ||
480 changed.count("clog_to_syslog_level") ||
481 changed.count("clog_to_syslog_facility") ||
482 changed.count("clog_to_graylog") ||
483 changed.count("clog_to_graylog_host") ||
484 changed.count("clog_to_graylog_port") ||
485 changed.count("host") ||
486 changed.count("fsid")) {
487 update_log_clients();
488 }
489
490 if (changed.count("mon_health_to_clog") ||
491 changed.count("mon_health_to_clog_interval") ||
492 changed.count("mon_health_to_clog_tick_interval")) {
493 health_to_clog_update_conf(changed);
494 }
495
496 if (changed.count("mon_scrub_interval")) {
497 scrub_update_interval(conf->mon_scrub_interval);
498 }
499 }
500
501 void Monitor::update_log_clients()
502 {
503 map<string,string> log_to_monitors;
504 map<string,string> log_to_syslog;
505 map<string,string> log_channel;
506 map<string,string> log_prio;
507 map<string,string> log_to_graylog;
508 map<string,string> log_to_graylog_host;
509 map<string,string> log_to_graylog_port;
510 uuid_d fsid;
511 string host;
512
513 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
514 log_channel, log_prio, log_to_graylog,
515 log_to_graylog_host, log_to_graylog_port,
516 fsid, host))
517 return;
518
519 clog->update_config(log_to_monitors, log_to_syslog,
520 log_channel, log_prio, log_to_graylog,
521 log_to_graylog_host, log_to_graylog_port,
522 fsid, host);
523
524 audit_clog->update_config(log_to_monitors, log_to_syslog,
525 log_channel, log_prio, log_to_graylog,
526 log_to_graylog_host, log_to_graylog_port,
527 fsid, host);
528 }
529
530 int Monitor::sanitize_options()
531 {
532 int r = 0;
533
534 // mon_lease must be greater than mon_lease_renewal; otherwise we
535 // may incur in leases expiring before they are renewed.
536 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
537 clog->error() << "mon_lease_renew_interval_factor ("
538 << g_conf->mon_lease_renew_interval_factor
539 << ") must be less than 1.0";
540 r = -EINVAL;
541 }
542
543 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
544 // got time to renew the lease and get an ack for it. Having both options
545 // with the same value, for a given small vale, could mean timing out if
546 // the monitors happened to be overloaded -- or even under normal load for
547 // a small enough value.
548 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
549 clog->error() << "mon_lease_ack_timeout_factor ("
550 << g_conf->mon_lease_ack_timeout_factor
551 << ") must be greater than 1.0";
552 r = -EINVAL;
553 }
554
555 return r;
556 }
557
558 int Monitor::preinit()
559 {
560 lock.Lock();
561
562 dout(1) << "preinit fsid " << monmap->fsid << dendl;
563
564 int r = sanitize_options();
565 if (r < 0) {
566 derr << "option sanitization failed!" << dendl;
567 lock.Unlock();
568 return r;
569 }
570
571 assert(!logger);
572 {
573 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
574 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess");
575 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd");
576 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm");
577 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions");
578 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in");
579 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started");
580 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won");
581 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost");
582 logger = pcb.create_perf_counters();
583 cct->get_perfcounters_collection()->add(logger);
584 }
585
586 assert(!cluster_logger);
587 {
588 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
589 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
590 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
591 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
592 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
593 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
594 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
595 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
596 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
597 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
598 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
599 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
600 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
601 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
602 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
603 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
604 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
605 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
606 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
607 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
608 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
609 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
610 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
611 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
612 cluster_logger = pcb.create_perf_counters();
613 }
614
615 paxos->init_logger();
616
617 // verify cluster_uuid
618 {
619 int r = check_fsid();
620 if (r == -ENOENT)
621 r = write_fsid();
622 if (r < 0) {
623 lock.Unlock();
624 return r;
625 }
626 }
627
628 // open compatset
629 read_features();
630
631 // have we ever joined a quorum?
632 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
633 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
634
635 if (!has_ever_joined) {
636 // impose initial quorum restrictions?
637 list<string> initial_members;
638 get_str_list(g_conf->mon_initial_members, initial_members);
639
640 if (!initial_members.empty()) {
641 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
642
643 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
644 &extra_probe_peers);
645
646 dout(10) << " monmap is " << *monmap << dendl;
647 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
648 }
649 } else if (!monmap->contains(name)) {
650 derr << "not in monmap and have been in a quorum before; "
651 << "must have been removed" << dendl;
652 if (g_conf->mon_force_quorum_join) {
653 dout(0) << "we should have died but "
654 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
655 } else {
656 derr << "commit suicide!" << dendl;
657 lock.Unlock();
658 return -ENOENT;
659 }
660 }
661
662 {
663 // We have a potentially inconsistent store state in hands. Get rid of it
664 // and start fresh.
665 bool clear_store = false;
666 if (store->exists("mon_sync", "in_sync")) {
667 dout(1) << __func__ << " clean up potentially inconsistent store state"
668 << dendl;
669 clear_store = true;
670 }
671
672 if (store->get("mon_sync", "force_sync") > 0) {
673 dout(1) << __func__ << " force sync by clearing store state" << dendl;
674 clear_store = true;
675 }
676
677 if (clear_store) {
678 set<string> sync_prefixes = get_sync_targets_names();
679 store->clear(sync_prefixes);
680 }
681 }
682
683 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
684 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
685
686 init_paxos();
687 health_monitor->init();
688
689 if (is_keyring_required()) {
690 // we need to bootstrap authentication keys so we can form an
691 // initial quorum.
692 if (authmon()->get_last_committed() == 0) {
693 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
694 bufferlist bl;
695 int err = store->get("mkfs", "keyring", bl);
696 if (err == 0 && bl.length() > 0) {
697 // Attempt to decode and extract keyring only if it is found.
698 KeyRing keyring;
699 bufferlist::iterator p = bl.begin();
700 ::decode(keyring, p);
701 extract_save_mon_key(keyring);
702 }
703 }
704
705 string keyring_loc = g_conf->mon_data + "/keyring";
706
707 r = keyring.load(cct, keyring_loc);
708 if (r < 0) {
709 EntityName mon_name;
710 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
711 EntityAuth mon_key;
712 if (key_server.get_auth(mon_name, mon_key)) {
713 dout(1) << "copying mon. key from old db to external keyring" << dendl;
714 keyring.add(mon_name, mon_key);
715 bufferlist bl;
716 keyring.encode_plaintext(bl);
717 write_default_keyring(bl);
718 } else {
719 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
720 lock.Unlock();
721 return r;
722 }
723 }
724 }
725
726 admin_hook = new AdminHook(this);
727 AdminSocket* admin_socket = cct->get_admin_socket();
728
729 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
730 lock.Unlock();
731 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
732 "show current monitor status");
733 assert(r == 0);
734 r = admin_socket->register_command("quorum_status", "quorum_status",
735 admin_hook, "show current quorum status");
736 assert(r == 0);
737 r = admin_socket->register_command("sync_force",
738 "sync_force name=validate,"
739 "type=CephChoices,"
740 "strings=--yes-i-really-mean-it",
741 admin_hook,
742 "force sync of and clear monitor store");
743 assert(r == 0);
744 r = admin_socket->register_command("add_bootstrap_peer_hint",
745 "add_bootstrap_peer_hint name=addr,"
746 "type=CephIPAddr",
747 admin_hook,
748 "add peer address as potential bootstrap"
749 " peer for cluster bringup");
750 assert(r == 0);
751 r = admin_socket->register_command("quorum enter", "quorum enter",
752 admin_hook,
753 "force monitor back into quorum");
754 assert(r == 0);
755 r = admin_socket->register_command("quorum exit", "quorum exit",
756 admin_hook,
757 "force monitor out of the quorum");
758 assert(r == 0);
759 r = admin_socket->register_command("ops",
760 "ops",
761 admin_hook,
762 "show the ops currently in flight");
763 assert(r == 0);
764 r = admin_socket->register_command("sessions",
765 "sessions",
766 admin_hook,
767 "list existing sessions");
768 assert(r == 0);
769
770 lock.Lock();
771
772 // add ourselves as a conf observer
773 g_conf->add_observer(this);
774
775 lock.Unlock();
776 return 0;
777 }
778
779 int Monitor::init()
780 {
781 dout(2) << "init" << dendl;
782 Mutex::Locker l(lock);
783
784 finisher.start();
785
786 // start ticker
787 timer.init();
788 new_tick();
789
790 cpu_tp.start();
791
792 // i'm ready!
793 messenger->add_dispatcher_tail(this);
794
795 mgr_client.init();
796 mgr_messenger->add_dispatcher_tail(&mgr_client);
797 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
798
799 bootstrap();
800 // add features of myself into feature_map
801 session_map.feature_map.add_mon(con_self->get_features());
802 return 0;
803 }
804
805 void Monitor::init_paxos()
806 {
807 dout(10) << __func__ << dendl;
808 paxos->init();
809
810 // init services
811 for (int i = 0; i < PAXOS_NUM; ++i) {
812 paxos_service[i]->init();
813 }
814
815 refresh_from_paxos(NULL);
816 }
817
818 void Monitor::refresh_from_paxos(bool *need_bootstrap)
819 {
820 dout(10) << __func__ << dendl;
821
822 bufferlist bl;
823 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
824 if (r >= 0) {
825 try {
826 bufferlist::iterator p = bl.begin();
827 ::decode(fingerprint, p);
828 }
829 catch (buffer::error& e) {
830 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
831 }
832 } else {
833 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
834 }
835
836 for (int i = 0; i < PAXOS_NUM; ++i) {
837 paxos_service[i]->refresh(need_bootstrap);
838 }
839 for (int i = 0; i < PAXOS_NUM; ++i) {
840 paxos_service[i]->post_refresh();
841 }
842 load_metadata();
843 }
844
845 void Monitor::register_cluster_logger()
846 {
847 if (!cluster_logger_registered) {
848 dout(10) << "register_cluster_logger" << dendl;
849 cluster_logger_registered = true;
850 cct->get_perfcounters_collection()->add(cluster_logger);
851 } else {
852 dout(10) << "register_cluster_logger - already registered" << dendl;
853 }
854 }
855
856 void Monitor::unregister_cluster_logger()
857 {
858 if (cluster_logger_registered) {
859 dout(10) << "unregister_cluster_logger" << dendl;
860 cluster_logger_registered = false;
861 cct->get_perfcounters_collection()->remove(cluster_logger);
862 } else {
863 dout(10) << "unregister_cluster_logger - not registered" << dendl;
864 }
865 }
866
867 void Monitor::update_logger()
868 {
869 cluster_logger->set(l_cluster_num_mon, monmap->size());
870 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
871 }
872
873 void Monitor::shutdown()
874 {
875 dout(1) << "shutdown" << dendl;
876
877 lock.Lock();
878
879 wait_for_paxos_write();
880
881 state = STATE_SHUTDOWN;
882
883 g_conf->remove_observer(this);
884
885 if (admin_hook) {
886 AdminSocket* admin_socket = cct->get_admin_socket();
887 admin_socket->unregister_command("mon_status");
888 admin_socket->unregister_command("quorum_status");
889 admin_socket->unregister_command("sync_force");
890 admin_socket->unregister_command("add_bootstrap_peer_hint");
891 admin_socket->unregister_command("quorum enter");
892 admin_socket->unregister_command("quorum exit");
893 admin_socket->unregister_command("ops");
894 admin_socket->unregister_command("sessions");
895 delete admin_hook;
896 admin_hook = NULL;
897 }
898
899 elector.shutdown();
900
901 mgr_client.shutdown();
902
903 lock.Unlock();
904 finisher.wait_for_empty();
905 finisher.stop();
906 lock.Lock();
907
908 // clean up
909 paxos->shutdown();
910 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
911 (*p)->shutdown();
912 health_monitor->shutdown();
913
914 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
915 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
916
917 timer.shutdown();
918
919 cpu_tp.stop();
920
921 remove_all_sessions();
922
923 if (logger) {
924 cct->get_perfcounters_collection()->remove(logger);
925 delete logger;
926 logger = NULL;
927 }
928 if (cluster_logger) {
929 if (cluster_logger_registered)
930 cct->get_perfcounters_collection()->remove(cluster_logger);
931 delete cluster_logger;
932 cluster_logger = NULL;
933 }
934
935 log_client.shutdown();
936
937 // unlock before msgr shutdown...
938 lock.Unlock();
939
940 messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
941 mgr_messenger->shutdown();
942 }
943
944 void Monitor::wait_for_paxos_write()
945 {
946 if (paxos->is_writing() || paxos->is_writing_previous()) {
947 dout(10) << __func__ << " flushing pending write" << dendl;
948 lock.Unlock();
949 store->flush();
950 lock.Lock();
951 dout(10) << __func__ << " flushed pending write" << dendl;
952 }
953 }
954
955 void Monitor::bootstrap()
956 {
957 dout(10) << "bootstrap" << dendl;
958 wait_for_paxos_write();
959
960 sync_reset_requester();
961 unregister_cluster_logger();
962 cancel_probe_timeout();
963
964 // note my rank
965 int newrank = monmap->get_rank(messenger->get_myaddr());
966 if (newrank < 0 && rank >= 0) {
967 // was i ever part of the quorum?
968 if (has_ever_joined) {
969 dout(0) << " removed from monmap, suicide." << dendl;
970 exit(0);
971 }
972 }
973 if (newrank != rank) {
974 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
975 messenger->set_myname(entity_name_t::MON(newrank));
976 rank = newrank;
977
978 // reset all connections, or else our peers will think we are someone else.
979 messenger->mark_down_all();
980 }
981
982 // reset
983 state = STATE_PROBING;
984
985 _reset();
986
987 // sync store
988 if (g_conf->mon_compact_on_bootstrap) {
989 dout(10) << "bootstrap -- triggering compaction" << dendl;
990 store->compact();
991 dout(10) << "bootstrap -- finished compaction" << dendl;
992 }
993
994 // singleton monitor?
995 if (monmap->size() == 1 && rank == 0) {
996 win_standalone_election();
997 return;
998 }
999
1000 reset_probe_timeout();
1001
1002 // i'm outside the quorum
1003 if (monmap->contains(name))
1004 outside_quorum.insert(name);
1005
1006 // probe monitors
1007 dout(10) << "probing other monitors" << dendl;
1008 for (unsigned i = 0; i < monmap->size(); i++) {
1009 if ((int)i != rank)
1010 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1011 monmap->get_inst(i));
1012 }
1013 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1014 p != extra_probe_peers.end();
1015 ++p) {
1016 if (*p != messenger->get_myaddr()) {
1017 entity_inst_t i;
1018 i.name = entity_name_t::MON(-1);
1019 i.addr = *p;
1020 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1021 }
1022 }
1023 }
1024
1025 bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1026 {
1027 string addrstr;
1028 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1029 ss << "unable to parse address string value '"
1030 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1031 return false;
1032 }
1033 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1034 << addrstr << "'" << dendl;
1035
1036 entity_addr_t addr;
1037 const char *end = 0;
1038 if (!addr.parse(addrstr.c_str(), &end)) {
1039 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1040 return false;
1041 }
1042
1043 if (is_leader() || is_peon()) {
1044 ss << "mon already active; ignoring bootstrap hint";
1045 return true;
1046 }
1047
1048 if (addr.get_port() == 0)
1049 addr.set_port(CEPH_MON_PORT);
1050
1051 extra_probe_peers.insert(addr);
1052 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1053 return true;
1054 }
1055
1056 // called by bootstrap(), or on leader|peon -> electing
1057 void Monitor::_reset()
1058 {
1059 dout(10) << __func__ << dendl;
1060
1061 cancel_probe_timeout();
1062 timecheck_finish();
1063 health_events_cleanup();
1064 health_check_log_times.clear();
1065 scrub_event_cancel();
1066
1067 leader_since = utime_t();
1068 if (!quorum.empty()) {
1069 exited_quorum = ceph_clock_now();
1070 }
1071 quorum.clear();
1072 outside_quorum.clear();
1073 quorum_feature_map.clear();
1074
1075 scrub_reset();
1076
1077 paxos->restart();
1078
1079 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1080 (*p)->restart();
1081 health_monitor->finish();
1082 }
1083
1084
1085 // -----------------------------------------------------------
1086 // sync
1087
1088 set<string> Monitor::get_sync_targets_names()
1089 {
1090 set<string> targets;
1091 targets.insert(paxos->get_name());
1092 for (int i = 0; i < PAXOS_NUM; ++i)
1093 paxos_service[i]->get_store_prefixes(targets);
1094 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1095 assert(config_key_service_ptr);
1096 config_key_service_ptr->get_store_prefixes(targets);
1097 return targets;
1098 }
1099
1100
1101 void Monitor::sync_timeout()
1102 {
1103 dout(10) << __func__ << dendl;
1104 assert(state == STATE_SYNCHRONIZING);
1105 bootstrap();
1106 }
1107
1108 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1109 {
1110 dout(1) << __func__ << dendl;
1111
1112 MonMap latest_monmap;
1113
1114 // Grab latest monmap from MonmapMonitor
1115 bufferlist monmon_bl;
1116 int err = monmon()->get_monmap(monmon_bl);
1117 if (err < 0) {
1118 if (err != -ENOENT) {
1119 derr << __func__
1120 << " something wrong happened while reading the store: "
1121 << cpp_strerror(err) << dendl;
1122 assert(0 == "error reading the store");
1123 }
1124 } else {
1125 latest_monmap.decode(monmon_bl);
1126 }
1127
1128 // Grab last backed up monmap (if any) and compare epochs
1129 if (store->exists("mon_sync", "latest_monmap")) {
1130 bufferlist backup_bl;
1131 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1132 if (err < 0) {
1133 derr << __func__
1134 << " something wrong happened while reading the store: "
1135 << cpp_strerror(err) << dendl;
1136 assert(0 == "error reading the store");
1137 }
1138 assert(backup_bl.length() > 0);
1139
1140 MonMap backup_monmap;
1141 backup_monmap.decode(backup_bl);
1142
1143 if (backup_monmap.epoch > latest_monmap.epoch)
1144 latest_monmap = backup_monmap;
1145 }
1146
1147 // Check if our current monmap's epoch is greater than the one we've
1148 // got so far.
1149 if (monmap->epoch > latest_monmap.epoch)
1150 latest_monmap = *monmap;
1151
1152 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1153
1154 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1155 }
1156
1157 void Monitor::sync_reset_requester()
1158 {
1159 dout(10) << __func__ << dendl;
1160
1161 if (sync_timeout_event) {
1162 timer.cancel_event(sync_timeout_event);
1163 sync_timeout_event = NULL;
1164 }
1165
1166 sync_provider = entity_inst_t();
1167 sync_cookie = 0;
1168 sync_full = false;
1169 sync_start_version = 0;
1170 }
1171
1172 void Monitor::sync_reset_provider()
1173 {
1174 dout(10) << __func__ << dendl;
1175 sync_providers.clear();
1176 }
1177
1178 void Monitor::sync_start(entity_inst_t &other, bool full)
1179 {
1180 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1181
1182 assert(state == STATE_PROBING ||
1183 state == STATE_SYNCHRONIZING);
1184 state = STATE_SYNCHRONIZING;
1185
1186 // make sure are not a provider for anyone!
1187 sync_reset_provider();
1188
1189 sync_full = full;
1190
1191 if (sync_full) {
1192 // stash key state, and mark that we are syncing
1193 auto t(std::make_shared<MonitorDBStore::Transaction>());
1194 sync_stash_critical_state(t);
1195 t->put("mon_sync", "in_sync", 1);
1196
1197 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1198 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1199 << sync_last_committed_floor << dendl;
1200 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1201
1202 store->apply_transaction(t);
1203
1204 assert(g_conf->mon_sync_requester_kill_at != 1);
1205
1206 // clear the underlying store
1207 set<string> targets = get_sync_targets_names();
1208 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1209 store->clear(targets);
1210
1211 // make sure paxos knows it has been reset. this prevents a
1212 // bootstrap and then different probe reply order from possibly
1213 // deciding a partial or no sync is needed.
1214 paxos->init();
1215
1216 assert(g_conf->mon_sync_requester_kill_at != 2);
1217 }
1218
1219 // assume 'other' as the leader. We will update the leader once we receive
1220 // a reply to the sync start.
1221 sync_provider = other;
1222
1223 sync_reset_timeout();
1224
1225 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1226 if (!sync_full)
1227 m->last_committed = paxos->get_version();
1228 messenger->send_message(m, sync_provider);
1229 }
1230
1231 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1232 {
1233 dout(10) << __func__ << dendl;
1234 bufferlist backup_monmap;
1235 sync_obtain_latest_monmap(backup_monmap);
1236 assert(backup_monmap.length() > 0);
1237 t->put("mon_sync", "latest_monmap", backup_monmap);
1238 }
1239
1240 void Monitor::sync_reset_timeout()
1241 {
1242 dout(10) << __func__ << dendl;
1243 if (sync_timeout_event)
1244 timer.cancel_event(sync_timeout_event);
1245 sync_timeout_event = new C_MonContext(this, [this](int) {
1246 sync_timeout();
1247 });
1248 timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
1249 }
1250
1251 void Monitor::sync_finish(version_t last_committed)
1252 {
1253 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1254
1255 assert(g_conf->mon_sync_requester_kill_at != 7);
1256
1257 if (sync_full) {
1258 // finalize the paxos commits
1259 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1260 paxos->read_and_prepare_transactions(tx, sync_start_version,
1261 last_committed);
1262 tx->put(paxos->get_name(), "last_committed", last_committed);
1263
1264 dout(30) << __func__ << " final tx dump:\n";
1265 JSONFormatter f(true);
1266 tx->dump(&f);
1267 f.flush(*_dout);
1268 *_dout << dendl;
1269
1270 store->apply_transaction(tx);
1271 }
1272
1273 assert(g_conf->mon_sync_requester_kill_at != 8);
1274
1275 auto t(std::make_shared<MonitorDBStore::Transaction>());
1276 t->erase("mon_sync", "in_sync");
1277 t->erase("mon_sync", "force_sync");
1278 t->erase("mon_sync", "last_committed_floor");
1279 store->apply_transaction(t);
1280
1281 assert(g_conf->mon_sync_requester_kill_at != 9);
1282
1283 init_paxos();
1284
1285 assert(g_conf->mon_sync_requester_kill_at != 10);
1286
1287 bootstrap();
1288 }
1289
1290 void Monitor::handle_sync(MonOpRequestRef op)
1291 {
1292 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1293 dout(10) << __func__ << " " << *m << dendl;
1294 switch (m->op) {
1295
1296 // provider ---------
1297
1298 case MMonSync::OP_GET_COOKIE_FULL:
1299 case MMonSync::OP_GET_COOKIE_RECENT:
1300 handle_sync_get_cookie(op);
1301 break;
1302 case MMonSync::OP_GET_CHUNK:
1303 handle_sync_get_chunk(op);
1304 break;
1305
1306 // client -----------
1307
1308 case MMonSync::OP_COOKIE:
1309 handle_sync_cookie(op);
1310 break;
1311
1312 case MMonSync::OP_CHUNK:
1313 case MMonSync::OP_LAST_CHUNK:
1314 handle_sync_chunk(op);
1315 break;
1316 case MMonSync::OP_NO_COOKIE:
1317 handle_sync_no_cookie(op);
1318 break;
1319
1320 default:
1321 dout(0) << __func__ << " unknown op " << m->op << dendl;
1322 assert(0 == "unknown op");
1323 }
1324 }
1325
1326 // leader
1327
1328 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1329 {
1330 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1331 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1332 m->get_connection()->send_message(reply);
1333 }
1334
1335 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1336 {
1337 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1338 if (is_synchronizing()) {
1339 _sync_reply_no_cookie(op);
1340 return;
1341 }
1342
1343 assert(g_conf->mon_sync_provider_kill_at != 1);
1344
1345 // make sure they can understand us.
1346 if ((required_features ^ m->get_connection()->get_features()) &
1347 required_features) {
1348 dout(5) << " ignoring peer mon." << m->get_source().num()
1349 << " has features " << std::hex
1350 << m->get_connection()->get_features()
1351 << " but we require " << required_features << std::dec << dendl;
1352 return;
1353 }
1354
1355 // make up a unique cookie. include election epoch (which persists
1356 // across restarts for the whole cluster) and a counter for this
1357 // process instance. there is no need to be unique *across*
1358 // monitors, though.
1359 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1360 assert(sync_providers.count(cookie) == 0);
1361
1362 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1363
1364 SyncProvider& sp = sync_providers[cookie];
1365 sp.cookie = cookie;
1366 sp.entity = m->get_source_inst();
1367 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1368
1369 set<string> sync_targets;
1370 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1371 // full scan
1372 sync_targets = get_sync_targets_names();
1373 sp.last_committed = paxos->get_version();
1374 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1375 sp.full = true;
1376 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1377 } else {
1378 // just catch up paxos
1379 sp.last_committed = m->last_committed;
1380 }
1381 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1382
1383 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1384 reply->last_committed = sp.last_committed;
1385 m->get_connection()->send_message(reply);
1386 }
1387
1388 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1389 {
1390 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1391 dout(10) << __func__ << " " << *m << dendl;
1392
1393 if (sync_providers.count(m->cookie) == 0) {
1394 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1395 _sync_reply_no_cookie(op);
1396 return;
1397 }
1398
1399 assert(g_conf->mon_sync_provider_kill_at != 2);
1400
1401 SyncProvider& sp = sync_providers[m->cookie];
1402 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1403
1404 if (sp.last_committed < paxos->get_first_committed() &&
1405 paxos->get_first_committed() > 1) {
1406 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1407 << " < our fc " << paxos->get_first_committed() << dendl;
1408 sync_providers.erase(m->cookie);
1409 _sync_reply_no_cookie(op);
1410 return;
1411 }
1412
1413 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1414 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1415
1416 int left = g_conf->mon_sync_max_payload_size;
1417 while (sp.last_committed < paxos->get_version() && left > 0) {
1418 bufferlist bl;
1419 sp.last_committed++;
1420
1421 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1422 assert(err == 0);
1423
1424 tx->put(paxos->get_name(), sp.last_committed, bl);
1425 left -= bl.length();
1426 dout(20) << __func__ << " including paxos state " << sp.last_committed
1427 << dendl;
1428 }
1429 reply->last_committed = sp.last_committed;
1430
1431 if (sp.full && left > 0) {
1432 sp.synchronizer->get_chunk_tx(tx, left);
1433 sp.last_key = sp.synchronizer->get_last_key();
1434 reply->last_key = sp.last_key;
1435 }
1436
1437 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1438 sp.last_committed < paxos->get_version()) {
1439 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1440 << " key " << sp.last_key << dendl;
1441 } else {
1442 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1443 << " key " << sp.last_key << dendl;
1444 reply->op = MMonSync::OP_LAST_CHUNK;
1445
1446 assert(g_conf->mon_sync_provider_kill_at != 3);
1447
1448 // clean up our local state
1449 sync_providers.erase(sp.cookie);
1450 }
1451
1452 ::encode(*tx, reply->chunk_bl);
1453
1454 m->get_connection()->send_message(reply);
1455 }
1456
1457 // requester
1458
1459 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1460 {
1461 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1462 dout(10) << __func__ << " " << *m << dendl;
1463 if (sync_cookie) {
1464 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1465 return;
1466 }
1467 if (m->get_source_inst() != sync_provider) {
1468 dout(10) << __func__ << " source does not match, discarding" << dendl;
1469 return;
1470 }
1471 sync_cookie = m->cookie;
1472 sync_start_version = m->last_committed;
1473
1474 sync_reset_timeout();
1475 sync_get_next_chunk();
1476
1477 assert(g_conf->mon_sync_requester_kill_at != 3);
1478 }
1479
1480 void Monitor::sync_get_next_chunk()
1481 {
1482 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1483 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1484 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1485 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1486 }
1487 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1488 messenger->send_message(r, sync_provider);
1489
1490 assert(g_conf->mon_sync_requester_kill_at != 4);
1491 }
1492
1493 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1494 {
1495 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1496 dout(10) << __func__ << " " << *m << dendl;
1497
1498 if (m->cookie != sync_cookie) {
1499 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1500 return;
1501 }
1502 if (m->get_source_inst() != sync_provider) {
1503 dout(10) << __func__ << " source does not match, discarding" << dendl;
1504 return;
1505 }
1506
1507 assert(state == STATE_SYNCHRONIZING);
1508 assert(g_conf->mon_sync_requester_kill_at != 5);
1509
1510 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1511 tx->append_from_encoded(m->chunk_bl);
1512
1513 dout(30) << __func__ << " tx dump:\n";
1514 JSONFormatter f(true);
1515 tx->dump(&f);
1516 f.flush(*_dout);
1517 *_dout << dendl;
1518
1519 store->apply_transaction(tx);
1520
1521 assert(g_conf->mon_sync_requester_kill_at != 6);
1522
1523 if (!sync_full) {
1524 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1525 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1526 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1527 m->last_committed);
1528 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1529
1530 dout(30) << __func__ << " tx dump:\n";
1531 JSONFormatter f(true);
1532 tx->dump(&f);
1533 f.flush(*_dout);
1534 *_dout << dendl;
1535
1536 store->apply_transaction(tx);
1537 paxos->init(); // to refresh what we just wrote
1538 }
1539
1540 if (m->op == MMonSync::OP_CHUNK) {
1541 sync_reset_timeout();
1542 sync_get_next_chunk();
1543 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1544 sync_finish(m->last_committed);
1545 }
1546 }
1547
1548 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1549 {
1550 dout(10) << __func__ << dendl;
1551 bootstrap();
1552 }
1553
1554 void Monitor::sync_trim_providers()
1555 {
1556 dout(20) << __func__ << dendl;
1557
1558 utime_t now = ceph_clock_now();
1559 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1560 while (p != sync_providers.end()) {
1561 if (now > p->second.timeout) {
1562 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1563 sync_providers.erase(p++);
1564 } else {
1565 ++p;
1566 }
1567 }
1568 }
1569
1570 // ---------------------------------------------------
1571 // probe
1572
1573 void Monitor::cancel_probe_timeout()
1574 {
1575 if (probe_timeout_event) {
1576 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1577 timer.cancel_event(probe_timeout_event);
1578 probe_timeout_event = NULL;
1579 } else {
1580 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1581 }
1582 }
1583
1584 void Monitor::reset_probe_timeout()
1585 {
1586 cancel_probe_timeout();
1587 probe_timeout_event = new C_MonContext(this, [this](int r) {
1588 probe_timeout(r);
1589 });
1590 double t = g_conf->mon_probe_timeout;
1591 timer.add_event_after(t, probe_timeout_event);
1592 dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
1593 }
1594
1595 void Monitor::probe_timeout(int r)
1596 {
1597 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1598 assert(is_probing() || is_synchronizing());
1599 assert(probe_timeout_event);
1600 probe_timeout_event = NULL;
1601 bootstrap();
1602 }
1603
1604 void Monitor::handle_probe(MonOpRequestRef op)
1605 {
1606 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1607 dout(10) << "handle_probe " << *m << dendl;
1608
1609 if (m->fsid != monmap->fsid) {
1610 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1611 return;
1612 }
1613
1614 switch (m->op) {
1615 case MMonProbe::OP_PROBE:
1616 handle_probe_probe(op);
1617 break;
1618
1619 case MMonProbe::OP_REPLY:
1620 handle_probe_reply(op);
1621 break;
1622
1623 case MMonProbe::OP_MISSING_FEATURES:
1624 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1625 << ", required " << m->required_features
1626 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1627 << dendl;
1628 break;
1629 }
1630 }
1631
1632 void Monitor::handle_probe_probe(MonOpRequestRef op)
1633 {
1634 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1635
1636 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1637 << " features " << m->get_connection()->get_features() << dendl;
1638 uint64_t missing = required_features & ~m->get_connection()->get_features();
1639 if (missing) {
1640 dout(1) << " peer " << m->get_source_addr() << " missing features "
1641 << missing << dendl;
1642 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1643 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1644 name, has_ever_joined);
1645 m->required_features = required_features;
1646 m->get_connection()->send_message(r);
1647 }
1648 goto out;
1649 }
1650
1651 if (!is_probing() && !is_synchronizing()) {
1652 // If the probing mon is way ahead of us, we need to re-bootstrap.
1653 // Normally we capture this case when we initially bootstrap, but
1654 // it is possible we pass those checks (we overlap with
1655 // quorum-to-be) but fail to join a quorum before it moves past
1656 // us. We need to be kicked back to bootstrap so we can
1657 // synchonize, not keep calling elections.
1658 if (paxos->get_version() + 1 < m->paxos_first_version) {
1659 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1660 << "ahead of us, re-bootstrapping" << dendl;
1661 bootstrap();
1662 goto out;
1663
1664 }
1665 }
1666
1667 MMonProbe *r;
1668 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1669 r->name = name;
1670 r->quorum = quorum;
1671 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1672 r->paxos_first_version = paxos->get_first_committed();
1673 r->paxos_last_version = paxos->get_version();
1674 m->get_connection()->send_message(r);
1675
1676 // did we discover a peer here?
1677 if (!monmap->contains(m->get_source_addr())) {
1678 dout(1) << " adding peer " << m->get_source_addr()
1679 << " to list of hints" << dendl;
1680 extra_probe_peers.insert(m->get_source_addr());
1681 }
1682
1683 out:
1684 return;
1685 }
1686
1687 void Monitor::handle_probe_reply(MonOpRequestRef op)
1688 {
1689 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1690 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1691 dout(10) << " monmap is " << *monmap << dendl;
1692
1693 // discover name and addrs during probing or electing states.
1694 if (!is_probing() && !is_electing()) {
1695 return;
1696 }
1697
1698 // newer map, or they've joined a quorum and we haven't?
1699 bufferlist mybl;
1700 monmap->encode(mybl, m->get_connection()->get_features());
1701 // make sure it's actually different; the checks below err toward
1702 // taking the other guy's map, which could cause us to loop.
1703 if (!mybl.contents_equal(m->monmap_bl)) {
1704 MonMap *newmap = new MonMap;
1705 newmap->decode(m->monmap_bl);
1706 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1707 !has_ever_joined)) {
1708 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1709 << ", mine was " << monmap->get_epoch() << dendl;
1710 delete newmap;
1711 monmap->decode(m->monmap_bl);
1712
1713 bootstrap();
1714 return;
1715 }
1716 delete newmap;
1717 }
1718
1719 // rename peer?
1720 string peer_name = monmap->get_name(m->get_source_addr());
1721 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1722 dout(10) << " renaming peer " << m->get_source_addr() << " "
1723 << peer_name << " -> " << m->name << " in my monmap"
1724 << dendl;
1725 monmap->rename(peer_name, m->name);
1726
1727 if (is_electing()) {
1728 bootstrap();
1729 return;
1730 }
1731 } else {
1732 dout(10) << " peer name is " << peer_name << dendl;
1733 }
1734
1735 // new initial peer?
1736 if (monmap->get_epoch() == 0 &&
1737 monmap->contains(m->name) &&
1738 monmap->get_addr(m->name).is_blank_ip()) {
1739 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1740 monmap->set_addr(m->name, m->get_source_addr());
1741
1742 bootstrap();
1743 return;
1744 }
1745
1746 // end discover phase
1747 if (!is_probing()) {
1748 return;
1749 }
1750
1751 assert(paxos != NULL);
1752
1753 if (is_synchronizing()) {
1754 dout(10) << " currently syncing" << dendl;
1755 return;
1756 }
1757
1758 entity_inst_t other = m->get_source_inst();
1759
1760 if (m->paxos_last_version < sync_last_committed_floor) {
1761 dout(10) << " peer paxos versions [" << m->paxos_first_version
1762 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1763 << sync_last_committed_floor << ", ignoring"
1764 << dendl;
1765 } else {
1766 if (paxos->get_version() < m->paxos_first_version &&
1767 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1768 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1769 << "," << m->paxos_last_version << "]"
1770 << " vs my version " << paxos->get_version()
1771 << " (too far ahead)"
1772 << dendl;
1773 cancel_probe_timeout();
1774 sync_start(other, true);
1775 return;
1776 }
1777 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1778 dout(10) << " peer paxos last version " << m->paxos_last_version
1779 << " vs my version " << paxos->get_version()
1780 << " (too far ahead)"
1781 << dendl;
1782 cancel_probe_timeout();
1783 sync_start(other, false);
1784 return;
1785 }
1786 }
1787
1788 // is there an existing quorum?
1789 if (m->quorum.size()) {
1790 dout(10) << " existing quorum " << m->quorum << dendl;
1791
1792 dout(10) << " peer paxos version " << m->paxos_last_version
1793 << " vs my version " << paxos->get_version()
1794 << " (ok)"
1795 << dendl;
1796
1797 if (monmap->contains(name) &&
1798 !monmap->get_addr(name).is_blank_ip()) {
1799 // i'm part of the cluster; just initiate a new election
1800 start_election();
1801 } else {
1802 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1803 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1804 monmap->get_inst(*m->quorum.begin()));
1805 }
1806 } else {
1807 if (monmap->contains(m->name)) {
1808 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1809 outside_quorum.insert(m->name);
1810 } else {
1811 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1812 return;
1813 }
1814
1815 unsigned need = monmap->size() / 2 + 1;
1816 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1817 if (outside_quorum.size() >= need) {
1818 if (outside_quorum.count(name)) {
1819 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1820 start_election();
1821 } else {
1822 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1823 }
1824 } else {
1825 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1826 }
1827 }
1828 }
1829
1830 void Monitor::join_election()
1831 {
1832 dout(10) << __func__ << dendl;
1833 wait_for_paxos_write();
1834 _reset();
1835 state = STATE_ELECTING;
1836
1837 logger->inc(l_mon_num_elections);
1838 }
1839
1840 void Monitor::start_election()
1841 {
1842 dout(10) << "start_election" << dendl;
1843 wait_for_paxos_write();
1844 _reset();
1845 state = STATE_ELECTING;
1846
1847 logger->inc(l_mon_num_elections);
1848 logger->inc(l_mon_election_call);
1849
1850 clog->info() << "mon." << name << " calling new monitor election";
1851 elector.call_election();
1852 }
1853
1854 void Monitor::win_standalone_election()
1855 {
1856 dout(1) << "win_standalone_election" << dendl;
1857
1858 // bump election epoch, in case the previous epoch included other
1859 // monitors; we need to be able to make the distinction.
1860 elector.init();
1861 elector.advance_epoch();
1862
1863 rank = monmap->get_rank(name);
1864 assert(rank == 0);
1865 set<int> q;
1866 q.insert(rank);
1867
1868 map<int,Metadata> metadata;
1869 collect_metadata(&metadata[0]);
1870
1871 win_election(elector.get_epoch(), q,
1872 CEPH_FEATURES_ALL,
1873 ceph::features::mon::get_supported(),
1874 metadata);
1875 }
1876
1877 const utime_t& Monitor::get_leader_since() const
1878 {
1879 assert(state == STATE_LEADER);
1880 return leader_since;
1881 }
1882
1883 epoch_t Monitor::get_epoch()
1884 {
1885 return elector.get_epoch();
1886 }
1887
1888 void Monitor::_finish_svc_election()
1889 {
1890 assert(state == STATE_LEADER || state == STATE_PEON);
1891
1892 for (auto p : paxos_service) {
1893 // we already called election_finished() on monmon(); avoid callig twice
1894 if (state == STATE_LEADER && p == monmon())
1895 continue;
1896 p->election_finished();
1897 }
1898 }
1899
1900 void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1901 const mon_feature_t& mon_features,
1902 const map<int,Metadata>& metadata)
1903 {
1904 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1905 << " features " << features
1906 << " mon_features " << mon_features
1907 << dendl;
1908 assert(is_electing());
1909 state = STATE_LEADER;
1910 leader_since = ceph_clock_now();
1911 leader = rank;
1912 quorum = active;
1913 quorum_con_features = features;
1914 quorum_mon_features = mon_features;
1915 pending_metadata = metadata;
1916 outside_quorum.clear();
1917
1918 clog->info() << "mon." << name << "@" << rank
1919 << " won leader election with quorum " << quorum;
1920
1921 set_leader_commands(get_local_commands(mon_features));
1922
1923 paxos->leader_init();
1924 // NOTE: tell monmap monitor first. This is important for the
1925 // bootstrap case to ensure that the very first paxos proposal
1926 // codifies the monmap. Otherwise any manner of chaos can ensue
1927 // when monitors are call elections or participating in a paxos
1928 // round without agreeing on who the participants are.
1929 monmon()->election_finished();
1930 _finish_svc_election();
1931 health_monitor->start(epoch);
1932
1933 logger->inc(l_mon_election_win);
1934
1935 // inject new metadata in first transaction.
1936 {
1937 // include previous metadata for missing mons (that aren't part of
1938 // the current quorum).
1939 map<int,Metadata> m = metadata;
1940 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1941 if (m.count(rank) == 0 &&
1942 mon_metadata.count(rank)) {
1943 m[rank] = mon_metadata[rank];
1944 }
1945 }
1946
1947 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1948 // a new transaction immediately after the election finishes. We should
1949 // do that anyway for other reasons, though.
1950 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1951 bufferlist bl;
1952 ::encode(m, bl);
1953 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1954 }
1955
1956 finish_election();
1957 if (monmap->size() > 1 &&
1958 monmap->get_epoch() > 0) {
1959 timecheck_start();
1960 health_tick_start();
1961 do_health_to_clog_interval();
1962 scrub_event_start();
1963 }
1964 }
1965
1966 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1967 uint64_t features,
1968 const mon_feature_t& mon_features)
1969 {
1970 state = STATE_PEON;
1971 leader_since = utime_t();
1972 leader = l;
1973 quorum = q;
1974 outside_quorum.clear();
1975 quorum_con_features = features;
1976 quorum_mon_features = mon_features;
1977 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
1978 << " quorum is " << quorum << " features are " << quorum_con_features
1979 << " mon_features are " << quorum_mon_features
1980 << dendl;
1981
1982 paxos->peon_init();
1983 _finish_svc_election();
1984 health_monitor->start(epoch);
1985
1986 logger->inc(l_mon_election_lose);
1987
1988 finish_election();
1989
1990 if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
1991 !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
1992 // for pre-luminous mons only
1993 Metadata sys_info;
1994 collect_metadata(&sys_info);
1995 messenger->send_message(new MMonMetadata(sys_info),
1996 monmap->get_inst(get_leader()));
1997 }
1998 }
1999
2000 void Monitor::collect_metadata(Metadata *m)
2001 {
2002 collect_sys_info(m, g_ceph_context);
2003 (*m)["addr"] = stringify(messenger->get_myaddr());
2004 }
2005
2006 void Monitor::finish_election()
2007 {
2008 apply_quorum_to_compatset_features();
2009 apply_monmap_to_compatset_features();
2010 timecheck_finish();
2011 exited_quorum = utime_t();
2012 finish_contexts(g_ceph_context, waitfor_quorum);
2013 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2014 resend_routed_requests();
2015 update_logger();
2016 register_cluster_logger();
2017
2018 // am i named properly?
2019 string cur_name = monmap->get_name(messenger->get_myaddr());
2020 if (cur_name != name) {
2021 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2022 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2023 monmap->get_inst(*quorum.begin()));
2024 }
2025 }
2026
2027 void Monitor::_apply_compatset_features(CompatSet &new_features)
2028 {
2029 if (new_features.compare(features) != 0) {
2030 CompatSet diff = features.unsupported(new_features);
2031 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2032 features = new_features;
2033
2034 auto t = std::make_shared<MonitorDBStore::Transaction>();
2035 write_features(t);
2036 store->apply_transaction(t);
2037
2038 calc_quorum_requirements();
2039 }
2040 }
2041
2042 void Monitor::apply_quorum_to_compatset_features()
2043 {
2044 CompatSet new_features(features);
2045 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2046 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2047 }
2048 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2049 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2050 }
2051 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2052 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2053 }
2054 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2055 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2056 }
2057 dout(5) << __func__ << dendl;
2058 _apply_compatset_features(new_features);
2059 }
2060
2061 void Monitor::apply_monmap_to_compatset_features()
2062 {
2063 CompatSet new_features(features);
2064 mon_feature_t monmap_features = monmap->get_required_features();
2065
2066 /* persistent monmap features may go into the compatset.
2067 * optional monmap features may not - why?
2068 * because optional monmap features may be set/unset by the admin,
2069 * and possibly by other means that haven't yet been thought out,
2070 * so we can't make the monitor enforce them on start - because they
2071 * may go away.
2072 * this, of course, does not invalidate setting a compatset feature
2073 * for an optional feature - as long as you make sure to clean it up
2074 * once you unset it.
2075 */
2076 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2077 assert(ceph::features::mon::get_persistent().contains_all(
2078 ceph::features::mon::FEATURE_KRAKEN));
2079 // this feature should only ever be set if the quorum supports it.
2080 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2081 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2082 }
2083 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2084 assert(ceph::features::mon::get_persistent().contains_all(
2085 ceph::features::mon::FEATURE_LUMINOUS));
2086 // this feature should only ever be set if the quorum supports it.
2087 assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2088 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2089 }
2090
2091 dout(5) << __func__ << dendl;
2092 _apply_compatset_features(new_features);
2093 }
2094
2095 void Monitor::calc_quorum_requirements()
2096 {
2097 required_features = 0;
2098
2099 // compatset
2100 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2101 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2102 }
2103 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2104 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2105 }
2106 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2107 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2108 }
2109 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2110 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2111 }
2112 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2113 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2114 }
2115 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2116 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2117 }
2118
2119 // monmap
2120 if (monmap->get_required_features().contains_all(
2121 ceph::features::mon::FEATURE_KRAKEN)) {
2122 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2123 }
2124 if (monmap->get_required_features().contains_all(
2125 ceph::features::mon::FEATURE_LUMINOUS)) {
2126 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2127 }
2128 dout(10) << __func__ << " required_features " << required_features << dendl;
2129 }
2130
2131 void Monitor::get_combined_feature_map(FeatureMap *fm)
2132 {
2133 *fm += session_map.feature_map;
2134 for (auto id : quorum) {
2135 if (id != rank) {
2136 *fm += quorum_feature_map[id];
2137 }
2138 }
2139 }
2140
2141 void Monitor::sync_force(Formatter *f, ostream& ss)
2142 {
2143 bool free_formatter = false;
2144
2145 if (!f) {
2146 // louzy/lazy hack: default to json if no formatter has been defined
2147 f = new JSONFormatter();
2148 free_formatter = true;
2149 }
2150
2151 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2152 sync_stash_critical_state(tx);
2153 tx->put("mon_sync", "force_sync", 1);
2154 store->apply_transaction(tx);
2155
2156 f->open_object_section("sync_force");
2157 f->dump_int("ret", 0);
2158 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2159 f->close_section(); // sync_force
2160 f->flush(ss);
2161 if (free_formatter)
2162 delete f;
2163 }
2164
2165 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2166 {
2167 bool free_formatter = false;
2168
2169 if (!f) {
2170 // louzy/lazy hack: default to json if no formatter has been defined
2171 f = new JSONFormatter();
2172 free_formatter = true;
2173 }
2174 f->open_object_section("quorum_status");
2175 f->dump_int("election_epoch", get_epoch());
2176
2177 f->open_array_section("quorum");
2178 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2179 f->dump_int("mon", *p);
2180 f->close_section(); // quorum
2181
2182 list<string> quorum_names = get_quorum_names();
2183 f->open_array_section("quorum_names");
2184 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2185 f->dump_string("mon", *p);
2186 f->close_section(); // quorum_names
2187
2188 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2189
2190 f->open_object_section("monmap");
2191 monmap->dump(f);
2192 f->close_section(); // monmap
2193
2194 f->close_section(); // quorum_status
2195 f->flush(ss);
2196 if (free_formatter)
2197 delete f;
2198 }
2199
2200 void Monitor::get_mon_status(Formatter *f, ostream& ss)
2201 {
2202 bool free_formatter = false;
2203
2204 if (!f) {
2205 // louzy/lazy hack: default to json if no formatter has been defined
2206 f = new JSONFormatter();
2207 free_formatter = true;
2208 }
2209
2210 f->open_object_section("mon_status");
2211 f->dump_string("name", name);
2212 f->dump_int("rank", rank);
2213 f->dump_string("state", get_state_name());
2214 f->dump_int("election_epoch", get_epoch());
2215
2216 f->open_array_section("quorum");
2217 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2218 f->dump_int("mon", *p);
2219 }
2220
2221 f->close_section(); // quorum
2222
2223 f->open_object_section("features");
2224 f->dump_stream("required_con") << required_features;
2225 mon_feature_t req_mon_features = get_required_mon_features();
2226 req_mon_features.dump(f, "required_mon");
2227 f->dump_stream("quorum_con") << quorum_con_features;
2228 quorum_mon_features.dump(f, "quorum_mon");
2229 f->close_section(); // features
2230
2231 f->open_array_section("outside_quorum");
2232 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2233 f->dump_string("mon", *p);
2234 f->close_section(); // outside_quorum
2235
2236 f->open_array_section("extra_probe_peers");
2237 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2238 p != extra_probe_peers.end();
2239 ++p)
2240 f->dump_stream("peer") << *p;
2241 f->close_section(); // extra_probe_peers
2242
2243 f->open_array_section("sync_provider");
2244 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2245 p != sync_providers.end();
2246 ++p) {
2247 f->dump_unsigned("cookie", p->second.cookie);
2248 f->dump_stream("entity") << p->second.entity;
2249 f->dump_stream("timeout") << p->second.timeout;
2250 f->dump_unsigned("last_committed", p->second.last_committed);
2251 f->dump_stream("last_key") << p->second.last_key;
2252 }
2253 f->close_section();
2254
2255 if (is_synchronizing()) {
2256 f->open_object_section("sync");
2257 f->dump_stream("sync_provider") << sync_provider;
2258 f->dump_unsigned("sync_cookie", sync_cookie);
2259 f->dump_unsigned("sync_start_version", sync_start_version);
2260 f->close_section();
2261 }
2262
2263 if (g_conf->mon_sync_provider_kill_at > 0)
2264 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2265 if (g_conf->mon_sync_requester_kill_at > 0)
2266 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2267
2268 f->open_object_section("monmap");
2269 monmap->dump(f);
2270 f->close_section();
2271
2272 f->dump_object("feature_map", session_map.feature_map);
2273 f->close_section(); // mon_status
2274
2275 if (free_formatter) {
2276 // flush formatter to ss and delete it iff we created the formatter
2277 f->flush(ss);
2278 delete f;
2279 }
2280 }
2281
2282
2283 // health status to clog
2284
2285 void Monitor::health_tick_start()
2286 {
2287 if (!cct->_conf->mon_health_to_clog ||
2288 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2289 return;
2290
2291 dout(15) << __func__ << dendl;
2292
2293 health_tick_stop();
2294 health_tick_event = new C_MonContext(this, [this](int r) {
2295 if (r < 0)
2296 return;
2297 do_health_to_clog();
2298 health_tick_start();
2299 });
2300 timer.add_event_after(cct->_conf->mon_health_to_clog_tick_interval,
2301 health_tick_event);
2302 }
2303
2304 void Monitor::health_tick_stop()
2305 {
2306 dout(15) << __func__ << dendl;
2307
2308 if (health_tick_event) {
2309 timer.cancel_event(health_tick_event);
2310 health_tick_event = NULL;
2311 }
2312 }
2313
2314 utime_t Monitor::health_interval_calc_next_update()
2315 {
2316 utime_t now = ceph_clock_now();
2317
2318 time_t secs = now.sec();
2319 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2320 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2321 utime_t next = utime_t(secs + adjustment, 0);
2322
2323 dout(20) << __func__
2324 << " now: " << now << ","
2325 << " next: " << next << ","
2326 << " interval: " << cct->_conf->mon_health_to_clog_interval
2327 << dendl;
2328
2329 return next;
2330 }
2331
2332 void Monitor::health_interval_start()
2333 {
2334 dout(15) << __func__ << dendl;
2335
2336 if (!cct->_conf->mon_health_to_clog ||
2337 cct->_conf->mon_health_to_clog_interval <= 0) {
2338 return;
2339 }
2340
2341 health_interval_stop();
2342 utime_t next = health_interval_calc_next_update();
2343 health_interval_event = new C_MonContext(this, [this](int r) {
2344 if (r < 0)
2345 return;
2346 do_health_to_clog_interval();
2347 });
2348 timer.add_event_at(next, health_interval_event);
2349 }
2350
2351 void Monitor::health_interval_stop()
2352 {
2353 dout(15) << __func__ << dendl;
2354 if (health_interval_event) {
2355 timer.cancel_event(health_interval_event);
2356 }
2357 health_interval_event = NULL;
2358 }
2359
2360 void Monitor::health_events_cleanup()
2361 {
2362 health_tick_stop();
2363 health_interval_stop();
2364 health_status_cache.reset();
2365 }
2366
2367 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2368 {
2369 dout(20) << __func__ << dendl;
2370
2371 if (changed.count("mon_health_to_clog")) {
2372 if (!cct->_conf->mon_health_to_clog) {
2373 health_events_cleanup();
2374 } else {
2375 if (!health_tick_event) {
2376 health_tick_start();
2377 }
2378 if (!health_interval_event) {
2379 health_interval_start();
2380 }
2381 }
2382 }
2383
2384 if (changed.count("mon_health_to_clog_interval")) {
2385 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2386 health_interval_stop();
2387 } else {
2388 health_interval_start();
2389 }
2390 }
2391
2392 if (changed.count("mon_health_to_clog_tick_interval")) {
2393 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2394 health_tick_stop();
2395 } else {
2396 health_tick_start();
2397 }
2398 }
2399 }
2400
2401 void Monitor::do_health_to_clog_interval()
2402 {
2403 // outputting to clog may have been disabled in the conf
2404 // since we were scheduled.
2405 if (!cct->_conf->mon_health_to_clog ||
2406 cct->_conf->mon_health_to_clog_interval <= 0)
2407 return;
2408
2409 dout(10) << __func__ << dendl;
2410
2411 // do we have a cached value for next_clog_update? if not,
2412 // do we know when the last update was?
2413
2414 do_health_to_clog(true);
2415 health_interval_start();
2416 }
2417
2418 void Monitor::do_health_to_clog(bool force)
2419 {
2420 // outputting to clog may have been disabled in the conf
2421 // since we were scheduled.
2422 if (!cct->_conf->mon_health_to_clog ||
2423 cct->_conf->mon_health_to_clog_interval <= 0)
2424 return;
2425
2426 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2427
2428 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2429 string summary;
2430 health_status_t level = get_health_status(false, nullptr, &summary);
2431 if (!force &&
2432 summary == health_status_cache.summary &&
2433 level == health_status_cache.overall)
2434 return;
2435 clog->health(level) << "overall " << summary;
2436 health_status_cache.summary = summary;
2437 health_status_cache.overall = level;
2438 } else {
2439 // for jewel only
2440 list<string> status;
2441 health_status_t overall = get_health(status, NULL, NULL);
2442 dout(25) << __func__
2443 << (force ? " (force)" : "")
2444 << dendl;
2445
2446 string summary = joinify(status.begin(), status.end(), string("; "));
2447
2448 if (!force &&
2449 overall == health_status_cache.overall &&
2450 !health_status_cache.summary.empty() &&
2451 health_status_cache.summary == summary) {
2452 // we got a dup!
2453 return;
2454 }
2455
2456 clog->info() << summary;
2457
2458 health_status_cache.overall = overall;
2459 health_status_cache.summary = summary;
2460 }
2461 }
2462
2463 health_status_t Monitor::get_health_status(
2464 bool want_detail,
2465 Formatter *f,
2466 std::string *plain,
2467 const char *sep1,
2468 const char *sep2)
2469 {
2470 health_status_t r = HEALTH_OK;
2471 bool compat = g_conf->mon_health_preluminous_compat;
2472 bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
2473 if (f) {
2474 f->open_object_section("health");
2475 f->open_object_section("checks");
2476 }
2477
2478 string summary;
2479 string *psummary = f ? nullptr : &summary;
2480 for (auto& svc : paxos_service) {
2481 r = std::min(r, svc->get_health_checks().dump_summary(
2482 f, psummary, sep2, want_detail));
2483 }
2484
2485 if (f) {
2486 f->close_section();
2487 f->dump_stream("status") << r;
2488 } else {
2489 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2490 *plain = stringify(r);
2491 if (summary.size()) {
2492 *plain += sep1;
2493 *plain += summary;
2494 }
2495 *plain += "\n";
2496 }
2497
2498 if (f && (compat || compat_warn)) {
2499 health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
2500 if (compat) {
2501 f->open_array_section("summary");
2502 if (compat_warn) {
2503 f->open_object_section("item");
2504 f->dump_stream("severity") << HEALTH_WARN;
2505 f->dump_string("summary", "'ceph health' JSON format has changed in luminous; update your health monitoring scripts");
2506 f->close_section();
2507 }
2508 for (auto& svc : paxos_service) {
2509 svc->get_health_checks().dump_summary_compat(f);
2510 }
2511 f->close_section();
2512 }
2513 f->dump_stream("overall_status") << cr;
2514 }
2515
2516 if (want_detail) {
2517 if (f && (compat || compat_warn)) {
2518 f->open_array_section("detail");
2519 if (compat_warn) {
2520 f->dump_string("item", "'ceph health' JSON format has changed in luminous. If you see this your monitoring system is scraping the wrong fields. Disable this with 'mon health preluminous compat warning = false'");
2521 }
2522 }
2523
2524 for (auto& svc : paxos_service) {
2525 svc->get_health_checks().dump_detail(f, plain, compat);
2526 }
2527
2528 if (f && (compat || compat_warn)) {
2529 f->close_section();
2530 }
2531 }
2532 if (f) {
2533 f->close_section();
2534 }
2535 return r;
2536 }
2537
2538 void Monitor::log_health(
2539 const health_check_map_t& updated,
2540 const health_check_map_t& previous,
2541 MonitorDBStore::TransactionRef t)
2542 {
2543 if (!g_conf->mon_health_to_clog) {
2544 return;
2545 }
2546
2547 const utime_t now = ceph_clock_now();
2548
2549 // FIXME: log atomically as part of @t instead of using clog.
2550 dout(10) << __func__ << " updated " << updated.checks.size()
2551 << " previous " << previous.checks.size()
2552 << dendl;
2553 const auto min_log_period = g_conf->get_val<int64_t>(
2554 "mon_health_log_update_period");
2555 for (auto& p : updated.checks) {
2556 auto q = previous.checks.find(p.first);
2557 bool logged = false;
2558 if (q == previous.checks.end()) {
2559 // new
2560 ostringstream ss;
2561 ss << "Health check failed: " << p.second.summary << " ("
2562 << p.first << ")";
2563 clog->health(p.second.severity) << ss.str();
2564
2565 logged = true;
2566 } else {
2567 if (p.second.summary != q->second.summary ||
2568 p.second.severity != q->second.severity) {
2569
2570 auto status_iter = health_check_log_times.find(p.first);
2571 if (status_iter != health_check_log_times.end()) {
2572 if (p.second.severity == q->second.severity &&
2573 now - status_iter->second.updated_at < min_log_period) {
2574 // We already logged this recently and the severity is unchanged,
2575 // so skip emitting an update of the summary string.
2576 // We'll get an update out of tick() later if the check
2577 // is still failing.
2578 continue;
2579 }
2580 }
2581
2582 // summary or severity changed (ignore detail changes at this level)
2583 ostringstream ss;
2584 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2585 clog->health(p.second.severity) << ss.str();
2586
2587 logged = true;
2588 }
2589 }
2590 // Record the time at which we last logged, so that we can check this
2591 // when considering whether/when to print update messages.
2592 if (logged) {
2593 auto iter = health_check_log_times.find(p.first);
2594 if (iter == health_check_log_times.end()) {
2595 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2596 p.second.severity, p.second.summary, now));
2597 } else {
2598 iter->second = HealthCheckLogStatus(
2599 p.second.severity, p.second.summary, now);
2600 }
2601 }
2602 }
2603 for (auto& p : previous.checks) {
2604 if (!updated.checks.count(p.first)) {
2605 // cleared
2606 ostringstream ss;
2607 if (p.first == "DEGRADED_OBJECTS") {
2608 clog->info() << "All degraded objects recovered";
2609 } else if (p.first == "OSD_FLAGS") {
2610 clog->info() << "OSD flags cleared";
2611 } else {
2612 clog->info() << "Health check cleared: " << p.first << " (was: "
2613 << p.second.summary << ")";
2614 }
2615
2616 if (health_check_log_times.count(p.first)) {
2617 health_check_log_times.erase(p.first);
2618 }
2619 }
2620 }
2621
2622 if (previous.checks.size() && updated.checks.size() == 0) {
2623 // We might be going into a fully healthy state, check
2624 // other subsystems
2625 bool any_checks = false;
2626 for (auto& svc : paxos_service) {
2627 if (&(svc->get_health_checks()) == &(previous)) {
2628 // Ignore the ones we're clearing right now
2629 continue;
2630 }
2631
2632 if (svc->get_health_checks().checks.size() > 0) {
2633 any_checks = true;
2634 break;
2635 }
2636 }
2637 if (!any_checks) {
2638 clog->info() << "Cluster is now healthy";
2639 }
2640 }
2641 }
2642
2643 health_status_t Monitor::get_health(list<string>& status,
2644 bufferlist *detailbl,
2645 Formatter *f)
2646 {
2647 list<pair<health_status_t,string> > summary;
2648 list<pair<health_status_t,string> > detail;
2649
2650 if (f)
2651 f->open_object_section("health");
2652
2653 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2654 p != paxos_service.end();
2655 ++p) {
2656 PaxosService *s = *p;
2657 s->get_health(summary, detailbl ? &detail : NULL, cct);
2658 }
2659
2660 health_monitor->get_health(summary, (detailbl ? &detail : NULL));
2661
2662 health_status_t overall = HEALTH_OK;
2663 if (!timecheck_skews.empty()) {
2664 list<string> warns;
2665 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2666 i != timecheck_skews.end(); ++i) {
2667 entity_inst_t inst = i->first;
2668 double skew = i->second;
2669 double latency = timecheck_latencies[inst];
2670 string name = monmap->get_name(inst.addr);
2671 ostringstream tcss;
2672 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2673 if (tcstatus != HEALTH_OK) {
2674 if (overall > tcstatus)
2675 overall = tcstatus;
2676 warns.push_back(name);
2677 ostringstream tmp_ss;
2678 tmp_ss << "mon." << name
2679 << " addr " << inst.addr << " " << tcss.str()
2680 << " (latency " << latency << "s)";
2681 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2682 }
2683 }
2684 if (!warns.empty()) {
2685 ostringstream ss;
2686 ss << "clock skew detected on";
2687 while (!warns.empty()) {
2688 ss << " mon." << warns.front();
2689 warns.pop_front();
2690 if (!warns.empty())
2691 ss << ",";
2692 }
2693 status.push_back(ss.str());
2694 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2695 }
2696 }
2697
2698 if (f)
2699 f->open_array_section("summary");
2700 if (!summary.empty()) {
2701 while (!summary.empty()) {
2702 if (overall > summary.front().first)
2703 overall = summary.front().first;
2704 status.push_back(summary.front().second);
2705 if (f) {
2706 f->open_object_section("item");
2707 f->dump_stream("severity") << summary.front().first;
2708 f->dump_string("summary", summary.front().second);
2709 f->close_section();
2710 }
2711 summary.pop_front();
2712 }
2713 }
2714 if (f)
2715 f->close_section();
2716
2717 stringstream fss;
2718 fss << overall;
2719 status.push_front(fss.str());
2720 if (f)
2721 f->dump_stream("overall_status") << overall;
2722
2723 if (f)
2724 f->open_array_section("detail");
2725 while (!detail.empty()) {
2726 if (f)
2727 f->dump_string("item", detail.front().second);
2728 else if (detailbl != NULL) {
2729 detailbl->append(detail.front().second);
2730 detailbl->append('\n');
2731 }
2732 detail.pop_front();
2733 }
2734 if (f)
2735 f->close_section();
2736
2737 if (f)
2738 f->close_section();
2739
2740 return overall;
2741 }
2742
2743 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2744 {
2745 if (f)
2746 f->open_object_section("status");
2747
2748 if (f) {
2749 f->dump_stream("fsid") << monmap->get_fsid();
2750 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2751 get_health_status(false, f, nullptr);
2752 } else {
2753 list<string> health_str;
2754 get_health(health_str, nullptr, f);
2755 }
2756 f->dump_unsigned("election_epoch", get_epoch());
2757 {
2758 f->open_array_section("quorum");
2759 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2760 f->dump_int("rank", *p);
2761 f->close_section();
2762 f->open_array_section("quorum_names");
2763 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2764 f->dump_string("id", monmap->get_name(*p));
2765 f->close_section();
2766 }
2767 f->open_object_section("monmap");
2768 monmap->dump(f);
2769 f->close_section();
2770 f->open_object_section("osdmap");
2771 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
2772 f->close_section();
2773 f->open_object_section("pgmap");
2774 pgservice->print_summary(f, NULL);
2775 f->close_section();
2776 f->open_object_section("fsmap");
2777 mdsmon()->get_fsmap().print_summary(f, NULL);
2778 f->close_section();
2779 f->open_object_section("mgrmap");
2780 mgrmon()->get_map().print_summary(f, nullptr);
2781 f->close_section();
2782
2783 f->dump_object("servicemap", mgrstatmon()->get_service_map());
2784 f->close_section();
2785 } else {
2786 ss << " cluster:\n";
2787 ss << " id: " << monmap->get_fsid() << "\n";
2788
2789 string health;
2790 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2791 get_health_status(false, nullptr, &health,
2792 "\n ", "\n ");
2793 } else {
2794 list<string> ls;
2795 get_health(ls, NULL, f);
2796 health = joinify(ls.begin(), ls.end(),
2797 string("\n "));
2798 }
2799 ss << " health: " << health << "\n";
2800
2801 ss << "\n \n services:\n";
2802 {
2803 size_t maxlen = 3;
2804 auto& service_map = mgrstatmon()->get_service_map();
2805 for (auto& p : service_map.services) {
2806 maxlen = std::max(maxlen, p.first.size());
2807 }
2808 string spacing(maxlen - 3, ' ');
2809 const auto quorum_names = get_quorum_names();
2810 const auto mon_count = monmap->mon_info.size();
2811 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2812 << quorum_names;
2813 if (quorum_names.size() != mon_count) {
2814 std::list<std::string> out_of_q;
2815 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2816 if (quorum.count(i) == 0) {
2817 out_of_q.push_back(monmap->ranks[i]);
2818 }
2819 }
2820 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2821 out_of_q.end(), std::string(", "));
2822 }
2823 ss << "\n";
2824 if (mgrmon()->in_use()) {
2825 ss << " mgr: " << spacing;
2826 mgrmon()->get_map().print_summary(nullptr, &ss);
2827 ss << "\n";
2828 }
2829 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2830 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2831 }
2832 ss << " osd: " << spacing;
2833 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2834 ss << "\n";
2835 for (auto& p : service_map.services) {
2836 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2837 << p.second.get_summary() << "\n";
2838 }
2839 }
2840
2841 ss << "\n \n data:\n";
2842 pgservice->print_summary(NULL, &ss);
2843 ss << "\n ";
2844 }
2845 }
2846
2847 void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2848 map<string,string> &param_str_map)
2849 {
2850 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2851 p != cmdmap.end(); ++p) {
2852 if (p->first == "prefix")
2853 continue;
2854 if (p->first == "caps") {
2855 vector<string> cv;
2856 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2857 cv.size() % 2 == 0) {
2858 for (unsigned i = 0; i < cv.size(); i += 2) {
2859 string k = string("caps_") + cv[i];
2860 param_str_map[k] = cv[i + 1];
2861 }
2862 continue;
2863 }
2864 }
2865 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2866 }
2867 }
2868
2869 const MonCommand *Monitor::_get_moncommand(
2870 const string &cmd_prefix,
2871 const vector<MonCommand>& cmds)
2872 {
2873 for (auto& c : cmds) {
2874 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2875 return &c;
2876 }
2877 }
2878 return nullptr;
2879 }
2880
2881 bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2882 const map<string,cmd_vartype>& cmdmap,
2883 const map<string,string>& param_str_map,
2884 const MonCommand *this_cmd) {
2885
2886 bool cmd_r = this_cmd->requires_perm('r');
2887 bool cmd_w = this_cmd->requires_perm('w');
2888 bool cmd_x = this_cmd->requires_perm('x');
2889
2890 bool capable = s->caps.is_capable(
2891 g_ceph_context,
2892 CEPH_ENTITY_TYPE_MON,
2893 s->entity_name,
2894 module, prefix, param_str_map,
2895 cmd_r, cmd_w, cmd_x);
2896
2897 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2898 return capable;
2899 }
2900
2901 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
2902 Formatter *f,
2903 bufferlist *rdata,
2904 bool hide_mgr_flag)
2905 {
2906 int cmdnum = 0;
2907 f->open_object_section("command_descriptions");
2908 for (const auto &cmd : commands) {
2909 unsigned flags = cmd.flags;
2910 if (hide_mgr_flag) {
2911 flags &= ~MonCommand::FLAG_MGR;
2912 }
2913 ostringstream secname;
2914 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2915 dump_cmddesc_to_json(f, secname.str(),
2916 cmd.cmdstring, cmd.helpstring, cmd.module,
2917 cmd.req_perms, cmd.availability, flags);
2918 cmdnum++;
2919 }
2920 f->close_section(); // command_descriptions
2921
2922 f->flush(*rdata);
2923 }
2924
2925 bool Monitor::is_keyring_required()
2926 {
2927 string auth_cluster_required = g_conf->auth_supported.empty() ?
2928 g_conf->auth_cluster_required : g_conf->auth_supported;
2929 string auth_service_required = g_conf->auth_supported.empty() ?
2930 g_conf->auth_service_required : g_conf->auth_supported;
2931
2932 return auth_service_required == "cephx" ||
2933 auth_cluster_required == "cephx";
2934 }
2935
2936 struct C_MgrProxyCommand : public Context {
2937 Monitor *mon;
2938 MonOpRequestRef op;
2939 uint64_t size;
2940 bufferlist outbl;
2941 string outs;
2942 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2943 : mon(mon), op(op), size(s) { }
2944 void finish(int r) {
2945 Mutex::Locker l(mon->lock);
2946 mon->mgr_proxy_bytes -= size;
2947 mon->reply_command(op, r, outs, outbl, 0);
2948 }
2949 };
2950
2951 void Monitor::handle_command(MonOpRequestRef op)
2952 {
2953 assert(op->is_type_command());
2954 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2955 if (m->fsid != monmap->fsid) {
2956 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2957 reply_command(op, -EPERM, "wrong fsid", 0);
2958 return;
2959 }
2960
2961 MonSession *session = static_cast<MonSession *>(
2962 m->get_connection()->get_priv());
2963 if (!session) {
2964 dout(5) << __func__ << " dropping stray message " << *m << dendl;
2965 return;
2966 }
2967 BOOST_SCOPE_EXIT_ALL(=) {
2968 session->put();
2969 };
2970
2971 if (m->cmd.empty()) {
2972 string rs = "No command supplied";
2973 reply_command(op, -EINVAL, rs, 0);
2974 return;
2975 }
2976
2977 string prefix;
2978 vector<string> fullcmd;
2979 map<string, cmd_vartype> cmdmap;
2980 stringstream ss, ds;
2981 bufferlist rdata;
2982 string rs;
2983 int r = -EINVAL;
2984 rs = "unrecognized command";
2985
2986 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
2987 // ss has reason for failure
2988 r = -EINVAL;
2989 rs = ss.str();
2990 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
2991 reply_command(op, r, rs, 0);
2992 return;
2993 }
2994
2995 // check return value. If no prefix parameter provided,
2996 // return value will be false, then return error info.
2997 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
2998 reply_command(op, -EINVAL, "command prefix not found", 0);
2999 return;
3000 }
3001
3002 // check prefix is empty
3003 if (prefix.empty()) {
3004 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3005 return;
3006 }
3007
3008 if (prefix == "get_command_descriptions") {
3009 bufferlist rdata;
3010 Formatter *f = Formatter::create("json");
3011 // hide mgr commands until luminous upgrade is complete
3012 bool hide_mgr_flag =
3013 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
3014
3015 std::vector<MonCommand> commands;
3016
3017 // only include mgr commands once all mons are upgrade (and we've dropped
3018 // the hard-coded PGMonitor commands)
3019 if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
3020 commands = static_cast<MgrMonitor*>(
3021 paxos_service[PAXOS_MGR])->get_command_descs();
3022 }
3023
3024 for (auto& c : leader_mon_commands) {
3025 commands.push_back(c);
3026 }
3027
3028 format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
3029 delete f;
3030 reply_command(op, 0, "", rdata, 0);
3031 return;
3032 }
3033
3034 string module;
3035 string err;
3036
3037 dout(0) << "handle_command " << *m << dendl;
3038
3039 string format;
3040 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
3041 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3042
3043 get_str_vec(prefix, fullcmd);
3044
3045 // make sure fullcmd is not empty.
3046 // invalid prefix will cause empty vector fullcmd.
3047 // such as, prefix=";,,;"
3048 if (fullcmd.empty()) {
3049 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3050 return;
3051 }
3052
3053 module = fullcmd[0];
3054
3055 // validate command is in leader map
3056
3057 const MonCommand *leader_cmd;
3058 const auto& mgr_cmds = mgrmon()->get_command_descs();
3059 const MonCommand *mgr_cmd = nullptr;
3060 if (!mgr_cmds.empty()) {
3061 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
3062 }
3063 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
3064 if (!leader_cmd) {
3065 leader_cmd = mgr_cmd;
3066 if (!leader_cmd) {
3067 reply_command(op, -EINVAL, "command not known", 0);
3068 return;
3069 }
3070 }
3071 // validate command is in our map & matches, or forward if it is allowed
3072 const MonCommand *mon_cmd = _get_moncommand(
3073 prefix,
3074 get_local_commands(quorum_mon_features));
3075 if (!mon_cmd) {
3076 mon_cmd = mgr_cmd;
3077 }
3078 if (!is_leader()) {
3079 if (!mon_cmd) {
3080 if (leader_cmd->is_noforward()) {
3081 reply_command(op, -EINVAL,
3082 "command not locally supported and not allowed to forward",
3083 0);
3084 return;
3085 }
3086 dout(10) << "Command not locally supported, forwarding request "
3087 << m << dendl;
3088 forward_request_leader(op);
3089 return;
3090 } else if (!mon_cmd->is_compat(leader_cmd)) {
3091 if (mon_cmd->is_noforward()) {
3092 reply_command(op, -EINVAL,
3093 "command not compatible with leader and not allowed to forward",
3094 0);
3095 return;
3096 }
3097 dout(10) << "Command not compatible with leader, forwarding request "
3098 << m << dendl;
3099 forward_request_leader(op);
3100 return;
3101 }
3102 }
3103
3104 if (mon_cmd->is_obsolete() ||
3105 (cct->_conf->mon_debug_deprecated_as_obsolete
3106 && mon_cmd->is_deprecated())) {
3107 reply_command(op, -ENOTSUP,
3108 "command is obsolete; please check usage and/or man page",
3109 0);
3110 return;
3111 }
3112
3113 if (session->proxy_con && mon_cmd->is_noforward()) {
3114 dout(10) << "Got forward for noforward command " << m << dendl;
3115 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3116 return;
3117 }
3118
3119 /* what we perceive as being the service the command falls under */
3120 string service(mon_cmd->module);
3121
3122 dout(25) << __func__ << " prefix='" << prefix
3123 << "' module='" << module
3124 << "' service='" << service << "'" << dendl;
3125
3126 bool cmd_is_rw =
3127 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3128
3129 // validate user's permissions for requested command
3130 map<string,string> param_str_map;
3131 _generate_command_map(cmdmap, param_str_map);
3132 if (!_allowed_command(session, service, prefix, cmdmap,
3133 param_str_map, mon_cmd)) {
3134 dout(1) << __func__ << " access denied" << dendl;
3135 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3136 << "from='" << session->inst << "' "
3137 << "entity='" << session->entity_name << "' "
3138 << "cmd=" << m->cmd << ": access denied";
3139 reply_command(op, -EACCES, "access denied", 0);
3140 return;
3141 }
3142
3143 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3144 << "from='" << session->inst << "' "
3145 << "entity='" << session->entity_name << "' "
3146 << "cmd=" << m->cmd << ": dispatch";
3147
3148 if (mon_cmd->is_mgr() &&
3149 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3150 const auto& hdr = m->get_header();
3151 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3152 uint64_t max =
3153 g_conf->mon_client_bytes * g_conf->mon_mgr_proxy_client_bytes_ratio;
3154 if (mgr_proxy_bytes + size > max) {
3155 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3156 << " + " << size << " > max " << max << dendl;
3157 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3158 return;
3159 }
3160 mgr_proxy_bytes += size;
3161 dout(10) << __func__ << " proxying mgr command (+" << size
3162 << " -> " << mgr_proxy_bytes << ")" << dendl;
3163 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3164 mgr_client.start_command(m->cmd,
3165 m->get_data(),
3166 &fin->outbl,
3167 &fin->outs,
3168 new C_OnFinisher(fin, &finisher));
3169 return;
3170 }
3171
3172 if ((module == "mds" || module == "fs") &&
3173 prefix != "fs authorize") {
3174 mdsmon()->dispatch(op);
3175 return;
3176 }
3177 if ((module == "osd" || prefix == "pg map") &&
3178 prefix != "osd last-stat-seq") {
3179 osdmon()->dispatch(op);
3180 return;
3181 }
3182
3183 if (module == "pg") {
3184 pgmon()->dispatch(op);
3185 return;
3186 }
3187 if (module == "mon" &&
3188 /* Let the Monitor class handle the following commands:
3189 * 'mon compact'
3190 * 'mon scrub'
3191 * 'mon sync force'
3192 */
3193 prefix != "mon compact" &&
3194 prefix != "mon scrub" &&
3195 prefix != "mon sync force" &&
3196 prefix != "mon metadata" &&
3197 prefix != "mon versions" &&
3198 prefix != "mon count-metadata") {
3199 monmon()->dispatch(op);
3200 return;
3201 }
3202 if (module == "auth" || prefix == "fs authorize") {
3203 authmon()->dispatch(op);
3204 return;
3205 }
3206 if (module == "log") {
3207 logmon()->dispatch(op);
3208 return;
3209 }
3210
3211 if (module == "config-key") {
3212 config_key_service->dispatch(op);
3213 return;
3214 }
3215
3216 if (module == "mgr") {
3217 mgrmon()->dispatch(op);
3218 return;
3219 }
3220
3221 if (prefix == "fsid") {
3222 if (f) {
3223 f->open_object_section("fsid");
3224 f->dump_stream("fsid") << monmap->fsid;
3225 f->close_section();
3226 f->flush(rdata);
3227 } else {
3228 ds << monmap->fsid;
3229 rdata.append(ds);
3230 }
3231 reply_command(op, 0, "", rdata, 0);
3232 return;
3233 }
3234
3235 if (prefix == "scrub" || prefix == "mon scrub") {
3236 wait_for_paxos_write();
3237 if (is_leader()) {
3238 int r = scrub_start();
3239 reply_command(op, r, "", rdata, 0);
3240 } else if (is_peon()) {
3241 forward_request_leader(op);
3242 } else {
3243 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3244 }
3245 return;
3246 }
3247
3248 if (prefix == "compact" || prefix == "mon compact") {
3249 dout(1) << "triggering manual compaction" << dendl;
3250 utime_t start = ceph_clock_now();
3251 store->compact();
3252 utime_t end = ceph_clock_now();
3253 end -= start;
3254 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3255 ostringstream oss;
3256 oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
3257 rs = oss.str();
3258 r = 0;
3259 }
3260 else if (prefix == "injectargs") {
3261 vector<string> injected_args;
3262 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3263 if (!injected_args.empty()) {
3264 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3265 ostringstream oss;
3266 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3267 ss << "injectargs:" << oss.str();
3268 rs = ss.str();
3269 goto out;
3270 } else {
3271 rs = "must supply options to be parsed in a single string";
3272 r = -EINVAL;
3273 }
3274 } else if (prefix == "time-sync-status") {
3275 if (!f)
3276 f.reset(Formatter::create("json-pretty"));
3277 f->open_object_section("time_sync");
3278 if (!timecheck_skews.empty()) {
3279 f->open_object_section("time_skew_status");
3280 for (auto& i : timecheck_skews) {
3281 entity_inst_t inst = i.first;
3282 double skew = i.second;
3283 double latency = timecheck_latencies[inst];
3284 string name = monmap->get_name(inst.addr);
3285 ostringstream tcss;
3286 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3287 f->open_object_section(name.c_str());
3288 f->dump_float("skew", skew);
3289 f->dump_float("latency", latency);
3290 f->dump_stream("health") << tcstatus;
3291 if (tcstatus != HEALTH_OK) {
3292 f->dump_stream("details") << tcss.str();
3293 }
3294 f->close_section();
3295 }
3296 f->close_section();
3297 }
3298 f->open_object_section("timechecks");
3299 f->dump_unsigned("epoch", get_epoch());
3300 f->dump_int("round", timecheck_round);
3301 f->dump_stream("round_status") << ((timecheck_round%2) ?
3302 "on-going" : "finished");
3303 f->close_section();
3304 f->close_section();
3305 f->flush(rdata);
3306 r = 0;
3307 rs = "";
3308 } else if (prefix == "config set") {
3309 std::string key;
3310 cmd_getval(cct, cmdmap, "key", key);
3311 std::string val;
3312 cmd_getval(cct, cmdmap, "value", val);
3313 r = g_conf->set_val(key, val, true, &ss);
3314 if (r == 0) {
3315 g_conf->apply_changes(nullptr);
3316 }
3317 rs = ss.str();
3318 goto out;
3319 } else if (prefix == "status" ||
3320 prefix == "health" ||
3321 prefix == "df") {
3322 string detail;
3323 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3324
3325 if (prefix == "status") {
3326 // get_cluster_status handles f == NULL
3327 get_cluster_status(ds, f.get());
3328
3329 if (f) {
3330 f->flush(ds);
3331 ds << '\n';
3332 }
3333 rdata.append(ds);
3334 } else if (prefix == "health") {
3335 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3336 string plain;
3337 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3338 if (f) {
3339 f->flush(rdata);
3340 } else {
3341 rdata.append(plain);
3342 }
3343 } else {
3344 list<string> health_str;
3345 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3346 if (f) {
3347 f->flush(ds);
3348 ds << '\n';
3349 } else {
3350 assert(!health_str.empty());
3351 ds << health_str.front();
3352 health_str.pop_front();
3353 if (!health_str.empty()) {
3354 ds << ' ';
3355 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3356 }
3357 }
3358 bufferlist comb;
3359 comb.append(ds);
3360 if (detail == "detail")
3361 comb.append(rdata);
3362 rdata = comb;
3363 }
3364 } else if (prefix == "df") {
3365 bool verbose = (detail == "detail");
3366 if (f)
3367 f->open_object_section("stats");
3368
3369 pgservice->dump_fs_stats(&ds, f.get(), verbose);
3370 if (!f)
3371 ds << '\n';
3372 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3373
3374 if (f) {
3375 f->close_section();
3376 f->flush(ds);
3377 ds << '\n';
3378 }
3379 } else {
3380 assert(0 == "We should never get here!");
3381 return;
3382 }
3383 rdata.append(ds);
3384 rs = "";
3385 r = 0;
3386 } else if (prefix == "report") {
3387
3388 // this must be formatted, in its current form
3389 if (!f)
3390 f.reset(Formatter::create("json-pretty"));
3391 f->open_object_section("report");
3392 f->dump_stream("cluster_fingerprint") << fingerprint;
3393 f->dump_string("version", ceph_version_to_str());
3394 f->dump_string("commit", git_version_to_str());
3395 f->dump_stream("timestamp") << ceph_clock_now();
3396
3397 vector<string> tagsvec;
3398 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3399 string tagstr = str_join(tagsvec, " ");
3400 if (!tagstr.empty())
3401 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3402 f->dump_string("tag", tagstr);
3403
3404 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3405 get_health_status(true, f.get(), nullptr);
3406 } else {
3407 list<string> health_str;
3408 get_health(health_str, nullptr, f.get());
3409 }
3410
3411 monmon()->dump_info(f.get());
3412 osdmon()->dump_info(f.get());
3413 mdsmon()->dump_info(f.get());
3414 authmon()->dump_info(f.get());
3415 pgservice->dump_info(f.get());
3416
3417 paxos->dump_info(f.get());
3418
3419 f->close_section();
3420 f->flush(rdata);
3421
3422 ostringstream ss2;
3423 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3424 rs = ss2.str();
3425 r = 0;
3426 } else if (prefix == "osd last-stat-seq") {
3427 int64_t osd;
3428 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3429 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3430 if (f) {
3431 f->dump_unsigned("seq", seq);
3432 f->flush(ds);
3433 } else {
3434 ds << seq;
3435 rdata.append(ds);
3436 }
3437 rs = "";
3438 r = 0;
3439 } else if (prefix == "node ls") {
3440 string node_type("all");
3441 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3442 if (!f)
3443 f.reset(Formatter::create("json-pretty"));
3444 if (node_type == "all") {
3445 f->open_object_section("nodes");
3446 print_nodes(f.get(), ds);
3447 osdmon()->print_nodes(f.get());
3448 mdsmon()->print_nodes(f.get());
3449 f->close_section();
3450 } else if (node_type == "mon") {
3451 print_nodes(f.get(), ds);
3452 } else if (node_type == "osd") {
3453 osdmon()->print_nodes(f.get());
3454 } else if (node_type == "mds") {
3455 mdsmon()->print_nodes(f.get());
3456 }
3457 f->flush(ds);
3458 rdata.append(ds);
3459 rs = "";
3460 r = 0;
3461 } else if (prefix == "features") {
3462 if (!is_leader() && !is_peon()) {
3463 dout(10) << " waiting for quorum" << dendl;
3464 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3465 return;
3466 }
3467 if (!is_leader()) {
3468 forward_request_leader(op);
3469 return;
3470 }
3471 if (!f)
3472 f.reset(Formatter::create("json-pretty"));
3473 FeatureMap fm;
3474 get_combined_feature_map(&fm);
3475 f->dump_object("features", fm);
3476 f->flush(rdata);
3477 rs = "";
3478 r = 0;
3479 } else if (prefix == "mon metadata") {
3480 if (!f)
3481 f.reset(Formatter::create("json-pretty"));
3482
3483 string name;
3484 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3485 if (!all) {
3486 // Dump a single mon's metadata
3487 int mon = monmap->get_rank(name);
3488 if (mon < 0) {
3489 rs = "requested mon not found";
3490 r = -ENOENT;
3491 goto out;
3492 }
3493 f->open_object_section("mon_metadata");
3494 r = get_mon_metadata(mon, f.get(), ds);
3495 f->close_section();
3496 } else {
3497 // Dump all mons' metadata
3498 r = 0;
3499 f->open_array_section("mon_metadata");
3500 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3501 std::ostringstream get_err;
3502 f->open_object_section("mon");
3503 f->dump_string("name", monmap->get_name(rank));
3504 r = get_mon_metadata(rank, f.get(), get_err);
3505 f->close_section();
3506 if (r == -ENOENT || r == -EINVAL) {
3507 dout(1) << get_err.str() << dendl;
3508 // Drop error, list what metadata we do have
3509 r = 0;
3510 } else if (r != 0) {
3511 derr << "Unexpected error from get_mon_metadata: "
3512 << cpp_strerror(r) << dendl;
3513 ds << get_err.str();
3514 break;
3515 }
3516 }
3517 f->close_section();
3518 }
3519
3520 f->flush(ds);
3521 rdata.append(ds);
3522 rs = "";
3523 } else if (prefix == "mon versions") {
3524 if (!f)
3525 f.reset(Formatter::create("json-pretty"));
3526 count_metadata("ceph_version", f.get());
3527 f->flush(ds);
3528 rdata.append(ds);
3529 rs = "";
3530 r = 0;
3531 } else if (prefix == "mon count-metadata") {
3532 if (!f)
3533 f.reset(Formatter::create("json-pretty"));
3534 string field;
3535 cmd_getval(g_ceph_context, cmdmap, "property", field);
3536 count_metadata(field, f.get());
3537 f->flush(ds);
3538 rdata.append(ds);
3539 rs = "";
3540 r = 0;
3541 } else if (prefix == "quorum_status") {
3542 // make sure our map is readable and up to date
3543 if (!is_leader() && !is_peon()) {
3544 dout(10) << " waiting for quorum" << dendl;
3545 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3546 return;
3547 }
3548 _quorum_status(f.get(), ds);
3549 rdata.append(ds);
3550 rs = "";
3551 r = 0;
3552 } else if (prefix == "mon_status") {
3553 get_mon_status(f.get(), ds);
3554 if (f)
3555 f->flush(ds);
3556 rdata.append(ds);
3557 rs = "";
3558 r = 0;
3559 } else if (prefix == "sync force" ||
3560 prefix == "mon sync force") {
3561 string validate1, validate2;
3562 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3563 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3564 if (validate1 != "--yes-i-really-mean-it" ||
3565 validate2 != "--i-know-what-i-am-doing") {
3566 r = -EINVAL;
3567 rs = "are you SURE? this will mean the monitor store will be "
3568 "erased. pass '--yes-i-really-mean-it "
3569 "--i-know-what-i-am-doing' if you really do.";
3570 goto out;
3571 }
3572 sync_force(f.get(), ds);
3573 rs = ds.str();
3574 r = 0;
3575 } else if (prefix == "heap") {
3576 if (!ceph_using_tcmalloc())
3577 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3578 else {
3579 string heapcmd;
3580 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3581 // XXX 1-element vector, change at callee or make vector here?
3582 vector<string> heapcmd_vec;
3583 get_str_vec(heapcmd, heapcmd_vec);
3584 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3585 rdata.append(ds);
3586 rs = "";
3587 r = 0;
3588 }
3589 } else if (prefix == "quorum") {
3590 string quorumcmd;
3591 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3592 if (quorumcmd == "exit") {
3593 start_election();
3594 elector.stop_participating();
3595 rs = "stopped responding to quorum, initiated new election";
3596 r = 0;
3597 } else if (quorumcmd == "enter") {
3598 elector.start_participating();
3599 start_election();
3600 rs = "started responding to quorum, initiated new election";
3601 r = 0;
3602 } else {
3603 rs = "needs a valid 'quorum' command";
3604 r = -EINVAL;
3605 }
3606 } else if (prefix == "version") {
3607 if (f) {
3608 f->open_object_section("version");
3609 f->dump_string("version", pretty_version_to_str());
3610 f->close_section();
3611 f->flush(ds);
3612 } else {
3613 ds << pretty_version_to_str();
3614 }
3615 rdata.append(ds);
3616 rs = "";
3617 r = 0;
3618 } else if (prefix == "versions") {
3619 if (!f)
3620 f.reset(Formatter::create("json-pretty"));
3621 map<string,int> overall;
3622 f->open_object_section("version");
3623 map<string,int> mon, mgr, osd, mds;
3624
3625 count_metadata("ceph_version", &mon);
3626 f->open_object_section("mon");
3627 for (auto& p : mon) {
3628 f->dump_int(p.first.c_str(), p.second);
3629 overall[p.first] += p.second;
3630 }
3631 f->close_section();
3632
3633 mgrmon()->count_metadata("ceph_version", &mgr);
3634 f->open_object_section("mgr");
3635 for (auto& p : mgr) {
3636 f->dump_int(p.first.c_str(), p.second);
3637 overall[p.first] += p.second;
3638 }
3639 f->close_section();
3640
3641 osdmon()->count_metadata("ceph_version", &osd);
3642 f->open_object_section("osd");
3643 for (auto& p : osd) {
3644 f->dump_int(p.first.c_str(), p.second);
3645 overall[p.first] += p.second;
3646 }
3647 f->close_section();
3648
3649 mdsmon()->count_metadata("ceph_version", &mds);
3650 f->open_object_section("mds");
3651 for (auto& p : mds) {
3652 f->dump_int(p.first.c_str(), p.second);
3653 overall[p.first] += p.second;
3654 }
3655 f->close_section();
3656
3657 for (auto& p : mgrstatmon()->get_service_map().services) {
3658 f->open_object_section(p.first.c_str());
3659 map<string,int> m;
3660 p.second.count_metadata("ceph_version", &m);
3661 for (auto& q : m) {
3662 f->dump_int(q.first.c_str(), q.second);
3663 overall[q.first] += q.second;
3664 }
3665 f->close_section();
3666 }
3667
3668 f->open_object_section("overall");
3669 for (auto& p : overall) {
3670 f->dump_int(p.first.c_str(), p.second);
3671 }
3672 f->close_section();
3673 f->close_section();
3674 f->flush(rdata);
3675 rs = "";
3676 r = 0;
3677 }
3678
3679 out:
3680 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3681 reply_command(op, r, rs, rdata, 0);
3682 }
3683
3684 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3685 {
3686 bufferlist rdata;
3687 reply_command(op, rc, rs, rdata, version);
3688 }
3689
3690 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3691 bufferlist& rdata, version_t version)
3692 {
3693 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3694 assert(m->get_type() == MSG_MON_COMMAND);
3695 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3696 reply->set_tid(m->get_tid());
3697 reply->set_data(rdata);
3698 send_reply(op, reply);
3699 }
3700
3701
3702 // ------------------------
3703 // request/reply routing
3704 //
3705 // a client/mds/osd will connect to a random monitor. we need to forward any
3706 // messages requiring state updates to the leader, and then route any replies
3707 // back via the correct monitor and back to them. (the monitor will not
3708 // initiate any connections.)
3709
3710 void Monitor::forward_request_leader(MonOpRequestRef op)
3711 {
3712 op->mark_event(__func__);
3713
3714 int mon = get_leader();
3715 MonSession *session = op->get_session();
3716 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3717
3718 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3719 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3720 } else if (session->proxy_con) {
3721 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3722 } else if (!session->closed) {
3723 RoutedRequest *rr = new RoutedRequest;
3724 rr->tid = ++routed_request_tid;
3725 rr->client_inst = req->get_source_inst();
3726 rr->con = req->get_connection();
3727 rr->con_features = rr->con->get_features();
3728 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3729 rr->session = static_cast<MonSession *>(session->get());
3730 rr->op = op;
3731 routed_requests[rr->tid] = rr;
3732 session->routed_request_tids.insert(rr->tid);
3733
3734 dout(10) << "forward_request " << rr->tid << " request " << *req
3735 << " features " << rr->con_features << dendl;
3736
3737 MForward *forward = new MForward(rr->tid,
3738 req,
3739 rr->con_features,
3740 rr->session->caps);
3741 forward->set_priority(req->get_priority());
3742 if (session->auth_handler) {
3743 forward->entity_name = session->entity_name;
3744 } else if (req->get_source().is_mon()) {
3745 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3746 }
3747 messenger->send_message(forward, monmap->get_inst(mon));
3748 op->mark_forwarded();
3749 assert(op->get_req()->get_type() != 0);
3750 } else {
3751 dout(10) << "forward_request no session for request " << *req << dendl;
3752 }
3753 }
3754
3755 // fake connection attached to forwarded messages
3756 struct AnonConnection : public Connection {
3757 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3758
3759 int send_message(Message *m) override {
3760 assert(!"send_message on anonymous connection");
3761 }
3762 void send_keepalive() override {
3763 assert(!"send_keepalive on anonymous connection");
3764 }
3765 void mark_down() override {
3766 // silently ignore
3767 }
3768 void mark_disposable() override {
3769 // silengtly ignore
3770 }
3771 bool is_connected() override { return false; }
3772 };
3773
3774 //extract the original message and put it into the regular dispatch function
3775 void Monitor::handle_forward(MonOpRequestRef op)
3776 {
3777 MForward *m = static_cast<MForward*>(op->get_req());
3778 dout(10) << "received forwarded message from " << m->client
3779 << " via " << m->get_source_inst() << dendl;
3780 MonSession *session = op->get_session();
3781 assert(session);
3782
3783 if (!session->is_capable("mon", MON_CAP_X)) {
3784 dout(0) << "forward from entity with insufficient caps! "
3785 << session->caps << dendl;
3786 } else {
3787 // see PaxosService::dispatch(); we rely on this being anon
3788 // (c->msgr == NULL)
3789 PaxosServiceMessage *req = m->claim_message();
3790 assert(req != NULL);
3791
3792 ConnectionRef c(new AnonConnection(cct));
3793 MonSession *s = new MonSession(req->get_source_inst(),
3794 static_cast<Connection*>(c.get()));
3795 c->set_priv(s->get());
3796 c->set_peer_addr(m->client.addr);
3797 c->set_peer_type(m->client.name.type());
3798 c->set_features(m->con_features);
3799
3800 s->caps = m->client_caps;
3801 dout(10) << " caps are " << s->caps << dendl;
3802 s->entity_name = m->entity_name;
3803 dout(10) << " entity name '" << s->entity_name << "' type "
3804 << s->entity_name.get_type() << dendl;
3805 s->proxy_con = m->get_connection();
3806 s->proxy_tid = m->tid;
3807
3808 req->set_connection(c);
3809
3810 // not super accurate, but better than nothing.
3811 req->set_recv_stamp(m->get_recv_stamp());
3812
3813 /*
3814 * note which election epoch this is; we will drop the message if
3815 * there is a future election since our peers will resend routed
3816 * requests in that case.
3817 */
3818 req->rx_election_epoch = get_epoch();
3819
3820 /* Because this is a special fake connection, we need to break
3821 the ref loop between Connection and MonSession differently
3822 than we normally do. Here, the Message refers to the Connection
3823 which refers to the Session, and nobody else refers to the Connection
3824 or the Session. And due to the special nature of this message,
3825 nobody refers to the Connection via the Session. So, clear out that
3826 half of the ref loop.*/
3827 s->con.reset(NULL);
3828
3829 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3830
3831 _ms_dispatch(req);
3832 s->put();
3833 }
3834 }
3835
3836 void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3837 {
3838 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3839
3840 bufferlist bl;
3841 encode_message(m, quorum_con_features, bl);
3842
3843 messenger->send_message(m, to);
3844
3845 for (int i=0; i<(int)monmap->size(); i++) {
3846 if (i != rank)
3847 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3848 }
3849 }
3850
3851 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3852 {
3853 op->mark_event(__func__);
3854
3855 MonSession *session = op->get_session();
3856 assert(session);
3857 Message *req = op->get_req();
3858 ConnectionRef con = op->get_connection();
3859
3860 reply->set_cct(g_ceph_context);
3861 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3862
3863 if (!con) {
3864 dout(2) << "send_reply no connection, dropping reply " << *reply
3865 << " to " << req << " " << *req << dendl;
3866 reply->put();
3867 op->mark_event("reply: no connection");
3868 return;
3869 }
3870
3871 if (!session->con && !session->proxy_con) {
3872 dout(2) << "send_reply no connection, dropping reply " << *reply
3873 << " to " << req << " " << *req << dendl;
3874 reply->put();
3875 op->mark_event("reply: no connection");
3876 return;
3877 }
3878
3879 if (session->proxy_con) {
3880 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3881 << " via " << session->proxy_con->get_peer_addr()
3882 << " for request " << *req << dendl;
3883 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3884 op->mark_event("reply: send routed request");
3885 } else {
3886 session->con->send_message(reply);
3887 op->mark_event("reply: send");
3888 }
3889 }
3890
3891 void Monitor::no_reply(MonOpRequestRef op)
3892 {
3893 MonSession *session = op->get_session();
3894 Message *req = op->get_req();
3895
3896 if (session->proxy_con) {
3897 dout(10) << "no_reply to " << req->get_source_inst()
3898 << " via " << session->proxy_con->get_peer_addr()
3899 << " for request " << *req << dendl;
3900 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3901 op->mark_event("no_reply: send routed request");
3902 } else {
3903 dout(10) << "no_reply to " << req->get_source_inst()
3904 << " " << *req << dendl;
3905 op->mark_event("no_reply");
3906 }
3907 }
3908
3909 void Monitor::handle_route(MonOpRequestRef op)
3910 {
3911 MRoute *m = static_cast<MRoute*>(op->get_req());
3912 MonSession *session = op->get_session();
3913 //check privileges
3914 if (!session->is_capable("mon", MON_CAP_X)) {
3915 dout(0) << "MRoute received from entity without appropriate perms! "
3916 << dendl;
3917 return;
3918 }
3919 if (m->msg)
3920 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3921 else
3922 dout(10) << "handle_route null to " << m->dest << dendl;
3923
3924 // look it up
3925 if (m->session_mon_tid) {
3926 if (routed_requests.count(m->session_mon_tid)) {
3927 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3928
3929 // reset payload, in case encoding is dependent on target features
3930 if (m->msg) {
3931 m->msg->clear_payload();
3932 rr->con->send_message(m->msg);
3933 m->msg = NULL;
3934 }
3935 if (m->send_osdmap_first) {
3936 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3937 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3938 true, MonOpRequestRef());
3939 }
3940 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3941 routed_requests.erase(m->session_mon_tid);
3942 rr->session->routed_request_tids.erase(m->session_mon_tid);
3943 delete rr;
3944 } else {
3945 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3946 }
3947 } else {
3948 dout(10) << " not a routed request, trying to send anyway" << dendl;
3949 if (m->msg) {
3950 messenger->send_message(m->msg, m->dest);
3951 m->msg = NULL;
3952 }
3953 }
3954 }
3955
3956 void Monitor::resend_routed_requests()
3957 {
3958 dout(10) << "resend_routed_requests" << dendl;
3959 int mon = get_leader();
3960 list<Context*> retry;
3961 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
3962 p != routed_requests.end();
3963 ++p) {
3964 RoutedRequest *rr = p->second;
3965
3966 if (mon == rank) {
3967 dout(10) << " requeue for self tid " << rr->tid << dendl;
3968 rr->op->mark_event("retry routed request");
3969 retry.push_back(new C_RetryMessage(this, rr->op));
3970 if (rr->session) {
3971 assert(rr->session->routed_request_tids.count(p->first));
3972 rr->session->routed_request_tids.erase(p->first);
3973 }
3974 delete rr;
3975 } else {
3976 bufferlist::iterator q = rr->request_bl.begin();
3977 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
3978 rr->op->mark_event("resend forwarded message to leader");
3979 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
3980 MForward *forward = new MForward(rr->tid, req, rr->con_features,
3981 rr->session->caps);
3982 req->put(); // forward takes its own ref; drop ours.
3983 forward->client = rr->client_inst;
3984 forward->set_priority(req->get_priority());
3985 messenger->send_message(forward, monmap->get_inst(mon));
3986 }
3987 }
3988 if (mon == rank) {
3989 routed_requests.clear();
3990 finish_contexts(g_ceph_context, retry);
3991 }
3992 }
3993
3994 void Monitor::remove_session(MonSession *s)
3995 {
3996 dout(10) << "remove_session " << s << " " << s->inst
3997 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
3998 assert(s->con);
3999 assert(!s->closed);
4000 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4001 p != s->routed_request_tids.end();
4002 ++p) {
4003 assert(routed_requests.count(*p));
4004 RoutedRequest *rr = routed_requests[*p];
4005 dout(10) << " dropping routed request " << rr->tid << dendl;
4006 delete rr;
4007 routed_requests.erase(*p);
4008 }
4009 s->routed_request_tids.clear();
4010 s->con->set_priv(NULL);
4011 session_map.remove_session(s);
4012 logger->set(l_mon_num_sessions, session_map.get_size());
4013 logger->inc(l_mon_session_rm);
4014 }
4015
4016 void Monitor::remove_all_sessions()
4017 {
4018 Mutex::Locker l(session_map_lock);
4019 while (!session_map.sessions.empty()) {
4020 MonSession *s = session_map.sessions.front();
4021 remove_session(s);
4022 if (logger)
4023 logger->inc(l_mon_session_rm);
4024 }
4025 if (logger)
4026 logger->set(l_mon_num_sessions, session_map.get_size());
4027 }
4028
4029 void Monitor::send_command(const entity_inst_t& inst,
4030 const vector<string>& com)
4031 {
4032 dout(10) << "send_command " << inst << "" << com << dendl;
4033 MMonCommand *c = new MMonCommand(monmap->fsid);
4034 c->cmd = com;
4035 try_send_message(c, inst);
4036 }
4037
4038 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4039 {
4040 /**
4041 * Wait list the new session until we're in the quorum, assuming it's
4042 * sufficiently new.
4043 * tick() will periodically send them back through so we can send
4044 * the client elsewhere if we don't think we're getting back in.
4045 *
4046 * But we whitelist a few sorts of messages:
4047 * 1) Monitors can talk to us at any time, of course.
4048 * 2) auth messages. It's unlikely to go through much faster, but
4049 * it's possible we've just lost our quorum status and we want to take...
4050 * 3) command messages. We want to accept these under all possible
4051 * circumstances.
4052 */
4053 Message *m = op->get_req();
4054 MonSession *s = op->get_session();
4055 ConnectionRef con = op->get_connection();
4056 utime_t too_old = ceph_clock_now();
4057 too_old -= g_ceph_context->_conf->mon_lease;
4058 if (m->get_recv_stamp() > too_old &&
4059 con->is_connected()) {
4060 dout(5) << "waitlisting message " << *m << dendl;
4061 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4062 op->mark_wait_for_quorum();
4063 } else {
4064 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4065 con->mark_down();
4066 // proxied sessions aren't registered and don't have a con; don't remove
4067 // those.
4068 if (!s->proxy_con) {
4069 Mutex::Locker l(session_map_lock);
4070 remove_session(s);
4071 }
4072 op->mark_zap();
4073 }
4074 }
4075
4076 void Monitor::_ms_dispatch(Message *m)
4077 {
4078 if (is_shutdown()) {
4079 m->put();
4080 return;
4081 }
4082
4083 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4084 bool src_is_mon = op->is_src_mon();
4085 op->mark_event("mon:_ms_dispatch");
4086 MonSession *s = op->get_session();
4087 if (s && s->closed) {
4088 return;
4089 }
4090
4091 if (src_is_mon && s) {
4092 ConnectionRef con = m->get_connection();
4093 if (con->get_messenger() && con->get_features() != s->con_features) {
4094 // only update features if this is a non-anonymous connection
4095 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4096 << " (was " << s->con_features
4097 << ", now " << con->get_features() << ")" << dendl;
4098 // connection features changed - recreate session.
4099 if (s->con && s->con != con) {
4100 dout(10) << __func__ << " connection for " << m->get_source_inst()
4101 << " changed from session; mark down and replace" << dendl;
4102 s->con->mark_down();
4103 }
4104 if (s->item.is_on_list()) {
4105 // forwarded messages' sessions are not in the sessions map and
4106 // exist only while the op is being handled.
4107 remove_session(s);
4108 }
4109 s->put();
4110 s = nullptr;
4111 }
4112 }
4113
4114 if (!s) {
4115 // if the sender is not a monitor, make sure their first message for a
4116 // session is an MAuth. If it is not, assume it's a stray message,
4117 // and considering that we are creating a new session it is safe to
4118 // assume that the sender hasn't authenticated yet, so we have no way
4119 // of assessing whether we should handle it or not.
4120 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4121 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4122 m->get_type() != CEPH_MSG_PING)) {
4123 dout(1) << __func__ << " dropping stray message " << *m
4124 << " from " << m->get_source_inst() << dendl;
4125 return;
4126 }
4127
4128 ConnectionRef con = m->get_connection();
4129 {
4130 Mutex::Locker l(session_map_lock);
4131 s = session_map.new_session(m->get_source_inst(), con.get());
4132 }
4133 assert(s);
4134 con->set_priv(s->get());
4135 dout(10) << __func__ << " new session " << s << " " << *s
4136 << " features 0x" << std::hex
4137 << s->con_features << std::dec << dendl;
4138 op->set_session(s);
4139
4140 logger->set(l_mon_num_sessions, session_map.get_size());
4141 logger->inc(l_mon_session_add);
4142
4143 if (src_is_mon) {
4144 // give it monitor caps; the peer type has been authenticated
4145 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4146 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4147 s->caps = *mon_caps;
4148 }
4149 s->put();
4150 } else {
4151 dout(20) << __func__ << " existing session " << s << " for " << s->inst
4152 << dendl;
4153 }
4154
4155 assert(s);
4156
4157 s->session_timeout = ceph_clock_now();
4158 s->session_timeout += g_conf->mon_session_timeout;
4159
4160 if (s->auth_handler) {
4161 s->entity_name = s->auth_handler->get_entity_name();
4162 }
4163 dout(20) << " caps " << s->caps.get_str() << dendl;
4164
4165 if ((is_synchronizing() ||
4166 (s->global_id == 0 && !exited_quorum.is_zero())) &&
4167 !src_is_mon &&
4168 m->get_type() != CEPH_MSG_PING) {
4169 waitlist_or_zap_client(op);
4170 } else {
4171 dispatch_op(op);
4172 }
4173 return;
4174 }
4175
4176 void Monitor::dispatch_op(MonOpRequestRef op)
4177 {
4178 op->mark_event("mon:dispatch_op");
4179 MonSession *s = op->get_session();
4180 assert(s);
4181 if (s->closed) {
4182 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4183 return;
4184 }
4185
4186 /* we will consider the default type as being 'monitor' until proven wrong */
4187 op->set_type_monitor();
4188 /* deal with all messages that do not necessarily need caps */
4189 bool dealt_with = true;
4190 switch (op->get_req()->get_type()) {
4191 // auth
4192 case MSG_MON_GLOBAL_ID:
4193 case CEPH_MSG_AUTH:
4194 op->set_type_service();
4195 /* no need to check caps here */
4196 paxos_service[PAXOS_AUTH]->dispatch(op);
4197 break;
4198
4199 case CEPH_MSG_PING:
4200 handle_ping(op);
4201 break;
4202
4203 /* MMonGetMap may be used by clients to obtain a monmap *before*
4204 * authenticating with the monitor. We need to handle these without
4205 * checking caps because, even on a cluster without cephx, we only set
4206 * session caps *after* the auth handshake. A good example of this
4207 * is when a client calls MonClient::get_monmap_privately(), which does
4208 * not authenticate when obtaining a monmap.
4209 */
4210 case CEPH_MSG_MON_GET_MAP:
4211 handle_mon_get_map(op);
4212 break;
4213
4214 case CEPH_MSG_MON_METADATA:
4215 return handle_mon_metadata(op);
4216
4217 default:
4218 dealt_with = false;
4219 break;
4220 }
4221 if (dealt_with)
4222 return;
4223
4224 /* well, maybe the op belongs to a service... */
4225 op->set_type_service();
4226 /* deal with all messages which caps should be checked somewhere else */
4227 dealt_with = true;
4228 switch (op->get_req()->get_type()) {
4229
4230 // OSDs
4231 case CEPH_MSG_MON_GET_OSDMAP:
4232 case CEPH_MSG_POOLOP:
4233 case MSG_OSD_BEACON:
4234 case MSG_OSD_MARK_ME_DOWN:
4235 case MSG_OSD_FULL:
4236 case MSG_OSD_FAILURE:
4237 case MSG_OSD_BOOT:
4238 case MSG_OSD_ALIVE:
4239 case MSG_OSD_PGTEMP:
4240 case MSG_OSD_PG_CREATED:
4241 case MSG_REMOVE_SNAPS:
4242 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4243 break;
4244
4245 // MDSs
4246 case MSG_MDS_BEACON:
4247 case MSG_MDS_OFFLOAD_TARGETS:
4248 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4249 break;
4250
4251 // Mgrs
4252 case MSG_MGR_BEACON:
4253 paxos_service[PAXOS_MGR]->dispatch(op);
4254 break;
4255
4256 // MgrStat
4257 case MSG_MON_MGR_REPORT:
4258 case CEPH_MSG_STATFS:
4259 case MSG_GETPOOLSTATS:
4260 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4261 break;
4262
4263 // pg
4264 case MSG_PGSTATS:
4265 paxos_service[PAXOS_PGMAP]->dispatch(op);
4266 break;
4267
4268 // log
4269 case MSG_LOG:
4270 paxos_service[PAXOS_LOG]->dispatch(op);
4271 break;
4272
4273 // handle_command() does its own caps checking
4274 case MSG_MON_COMMAND:
4275 op->set_type_command();
4276 handle_command(op);
4277 break;
4278
4279 default:
4280 dealt_with = false;
4281 break;
4282 }
4283 if (dealt_with)
4284 return;
4285
4286 /* nop, looks like it's not a service message; revert back to monitor */
4287 op->set_type_monitor();
4288
4289 /* messages we, the Monitor class, need to deal with
4290 * but may be sent by clients. */
4291
4292 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4293 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4294 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4295 << dendl;
4296 goto drop;
4297 }
4298
4299 dealt_with = true;
4300 switch (op->get_req()->get_type()) {
4301
4302 // misc
4303 case CEPH_MSG_MON_GET_VERSION:
4304 handle_get_version(op);
4305 break;
4306
4307 case CEPH_MSG_MON_SUBSCRIBE:
4308 /* FIXME: check what's being subscribed, filter accordingly */
4309 handle_subscribe(op);
4310 break;
4311
4312 default:
4313 dealt_with = false;
4314 break;
4315 }
4316 if (dealt_with)
4317 return;
4318
4319 if (!op->is_src_mon()) {
4320 dout(1) << __func__ << " unexpected monitor message from"
4321 << " non-monitor entity " << op->get_req()->get_source_inst()
4322 << " " << *(op->get_req()) << " -- dropping" << dendl;
4323 goto drop;
4324 }
4325
4326 /* messages that should only be sent by another monitor */
4327 dealt_with = true;
4328 switch (op->get_req()->get_type()) {
4329
4330 case MSG_ROUTE:
4331 handle_route(op);
4332 break;
4333
4334 case MSG_MON_PROBE:
4335 handle_probe(op);
4336 break;
4337
4338 // Sync (i.e., the new slurp, but on steroids)
4339 case MSG_MON_SYNC:
4340 handle_sync(op);
4341 break;
4342 case MSG_MON_SCRUB:
4343 handle_scrub(op);
4344 break;
4345
4346 /* log acks are sent from a monitor we sent the MLog to, and are
4347 never sent by clients to us. */
4348 case MSG_LOGACK:
4349 log_client.handle_log_ack((MLogAck*)op->get_req());
4350 break;
4351
4352 // monmap
4353 case MSG_MON_JOIN:
4354 op->set_type_service();
4355 paxos_service[PAXOS_MONMAP]->dispatch(op);
4356 break;
4357
4358 // paxos
4359 case MSG_MON_PAXOS:
4360 {
4361 op->set_type_paxos();
4362 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4363 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4364 //can't send these!
4365 break;
4366 }
4367
4368 if (state == STATE_SYNCHRONIZING) {
4369 // we are synchronizing. These messages would do us no
4370 // good, thus just drop them and ignore them.
4371 dout(10) << __func__ << " ignore paxos msg from "
4372 << pm->get_source_inst() << dendl;
4373 break;
4374 }
4375
4376 // sanitize
4377 if (pm->epoch > get_epoch()) {
4378 bootstrap();
4379 break;
4380 }
4381 if (pm->epoch != get_epoch()) {
4382 break;
4383 }
4384
4385 paxos->dispatch(op);
4386 }
4387 break;
4388
4389 // elector messages
4390 case MSG_MON_ELECTION:
4391 op->set_type_election();
4392 //check privileges here for simplicity
4393 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4394 dout(0) << "MMonElection received from entity without enough caps!"
4395 << op->get_session()->caps << dendl;
4396 break;
4397 }
4398 if (!is_probing() && !is_synchronizing()) {
4399 elector.dispatch(op);
4400 }
4401 break;
4402
4403 case MSG_FORWARD:
4404 handle_forward(op);
4405 break;
4406
4407 case MSG_TIMECHECK:
4408 handle_timecheck(op);
4409 break;
4410
4411 case MSG_MON_HEALTH:
4412 health_monitor->dispatch(op);
4413 break;
4414
4415 case MSG_MON_HEALTH_CHECKS:
4416 op->set_type_service();
4417 paxos_service[PAXOS_HEALTH]->dispatch(op);
4418 break;
4419
4420 default:
4421 dealt_with = false;
4422 break;
4423 }
4424 if (!dealt_with) {
4425 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4426 goto drop;
4427 }
4428 return;
4429
4430 drop:
4431 return;
4432 }
4433
4434 void Monitor::handle_ping(MonOpRequestRef op)
4435 {
4436 MPing *m = static_cast<MPing*>(op->get_req());
4437 dout(10) << __func__ << " " << *m << dendl;
4438 MPing *reply = new MPing;
4439 entity_inst_t inst = m->get_source_inst();
4440 bufferlist payload;
4441 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4442 f->open_object_section("pong");
4443
4444 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
4445 get_health_status(false, f.get(), nullptr);
4446 } else {
4447 list<string> health_str;
4448 get_health(health_str, nullptr, f.get());
4449 }
4450
4451 {
4452 stringstream ss;
4453 get_mon_status(f.get(), ss);
4454 }
4455
4456 f->close_section();
4457 stringstream ss;
4458 f->flush(ss);
4459 ::encode(ss.str(), payload);
4460 reply->set_payload(payload);
4461 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4462 messenger->send_message(reply, inst);
4463 }
4464
4465 void Monitor::timecheck_start()
4466 {
4467 dout(10) << __func__ << dendl;
4468 timecheck_cleanup();
4469 timecheck_start_round();
4470 }
4471
4472 void Monitor::timecheck_finish()
4473 {
4474 dout(10) << __func__ << dendl;
4475 timecheck_cleanup();
4476 }
4477
4478 void Monitor::timecheck_start_round()
4479 {
4480 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4481 assert(is_leader());
4482
4483 if (monmap->size() == 1) {
4484 assert(0 == "We are alone; this shouldn't have been scheduled!");
4485 return;
4486 }
4487
4488 if (timecheck_round % 2) {
4489 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4490 utime_t curr_time = ceph_clock_now();
4491 double max = g_conf->mon_timecheck_interval*3;
4492 if (curr_time - timecheck_round_start < max) {
4493 dout(10) << __func__ << " keep current round going" << dendl;
4494 goto out;
4495 } else {
4496 dout(10) << __func__
4497 << " finish current timecheck and start new" << dendl;
4498 timecheck_cancel_round();
4499 }
4500 }
4501
4502 assert(timecheck_round % 2 == 0);
4503 timecheck_acks = 0;
4504 timecheck_round ++;
4505 timecheck_round_start = ceph_clock_now();
4506 dout(10) << __func__ << " new " << timecheck_round << dendl;
4507
4508 timecheck();
4509 out:
4510 dout(10) << __func__ << " setting up next event" << dendl;
4511 timecheck_reset_event();
4512 }
4513
4514 void Monitor::timecheck_finish_round(bool success)
4515 {
4516 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4517 assert(timecheck_round % 2);
4518 timecheck_round ++;
4519 timecheck_round_start = utime_t();
4520
4521 if (success) {
4522 assert(timecheck_waiting.empty());
4523 assert(timecheck_acks == quorum.size());
4524 timecheck_report();
4525 timecheck_check_skews();
4526 return;
4527 }
4528
4529 dout(10) << __func__ << " " << timecheck_waiting.size()
4530 << " peers still waiting:";
4531 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4532 p != timecheck_waiting.end(); ++p) {
4533 *_dout << " " << p->first.name;
4534 }
4535 *_dout << dendl;
4536 timecheck_waiting.clear();
4537
4538 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4539 }
4540
4541 void Monitor::timecheck_cancel_round()
4542 {
4543 timecheck_finish_round(false);
4544 }
4545
4546 void Monitor::timecheck_cleanup()
4547 {
4548 timecheck_round = 0;
4549 timecheck_acks = 0;
4550 timecheck_round_start = utime_t();
4551
4552 if (timecheck_event) {
4553 timer.cancel_event(timecheck_event);
4554 timecheck_event = NULL;
4555 }
4556 timecheck_waiting.clear();
4557 timecheck_skews.clear();
4558 timecheck_latencies.clear();
4559
4560 timecheck_rounds_since_clean = 0;
4561 }
4562
4563 void Monitor::timecheck_reset_event()
4564 {
4565 if (timecheck_event) {
4566 timer.cancel_event(timecheck_event);
4567 timecheck_event = NULL;
4568 }
4569
4570 double delay =
4571 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4572
4573 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4574 delay = cct->_conf->mon_timecheck_interval;
4575 }
4576
4577 dout(10) << __func__ << " delay " << delay
4578 << " rounds_since_clean " << timecheck_rounds_since_clean
4579 << dendl;
4580
4581 timecheck_event = new C_MonContext(this, [this](int) {
4582 timecheck_start_round();
4583 });
4584 timer.add_event_after(delay, timecheck_event);
4585 }
4586
4587 void Monitor::timecheck_check_skews()
4588 {
4589 dout(10) << __func__ << dendl;
4590 assert(is_leader());
4591 assert((timecheck_round % 2) == 0);
4592 if (monmap->size() == 1) {
4593 assert(0 == "We are alone; we shouldn't have gotten here!");
4594 return;
4595 }
4596 assert(timecheck_latencies.size() == timecheck_skews.size());
4597
4598 bool found_skew = false;
4599 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4600 p != timecheck_skews.end(); ++p) {
4601
4602 double abs_skew;
4603 if (timecheck_has_skew(p->second, &abs_skew)) {
4604 dout(10) << __func__
4605 << " " << p->first << " skew " << abs_skew << dendl;
4606 found_skew = true;
4607 }
4608 }
4609
4610 if (found_skew) {
4611 ++timecheck_rounds_since_clean;
4612 timecheck_reset_event();
4613 } else if (timecheck_rounds_since_clean > 0) {
4614 dout(1) << __func__
4615 << " no clock skews found after " << timecheck_rounds_since_clean
4616 << " rounds" << dendl;
4617 // make sure the skews are really gone and not just a transient success
4618 // this will run just once if not in the presence of skews again.
4619 timecheck_rounds_since_clean = 1;
4620 timecheck_reset_event();
4621 timecheck_rounds_since_clean = 0;
4622 }
4623
4624 }
4625
4626 void Monitor::timecheck_report()
4627 {
4628 dout(10) << __func__ << dendl;
4629 assert(is_leader());
4630 assert((timecheck_round % 2) == 0);
4631 if (monmap->size() == 1) {
4632 assert(0 == "We are alone; we shouldn't have gotten here!");
4633 return;
4634 }
4635
4636 assert(timecheck_latencies.size() == timecheck_skews.size());
4637 bool do_output = true; // only output report once
4638 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4639 if (monmap->get_name(*q) == name)
4640 continue;
4641
4642 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4643 m->epoch = get_epoch();
4644 m->round = timecheck_round;
4645
4646 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4647 it != timecheck_skews.end(); ++it) {
4648 double skew = it->second;
4649 double latency = timecheck_latencies[it->first];
4650
4651 m->skews[it->first] = skew;
4652 m->latencies[it->first] = latency;
4653
4654 if (do_output) {
4655 dout(25) << __func__ << " " << it->first
4656 << " latency " << latency
4657 << " skew " << skew << dendl;
4658 }
4659 }
4660 do_output = false;
4661 entity_inst_t inst = monmap->get_inst(*q);
4662 dout(10) << __func__ << " send report to " << inst << dendl;
4663 messenger->send_message(m, inst);
4664 }
4665 }
4666
4667 void Monitor::timecheck()
4668 {
4669 dout(10) << __func__ << dendl;
4670 assert(is_leader());
4671 if (monmap->size() == 1) {
4672 assert(0 == "We are alone; we shouldn't have gotten here!");
4673 return;
4674 }
4675 assert(timecheck_round % 2 != 0);
4676
4677 timecheck_acks = 1; // we ack ourselves
4678
4679 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4680 << " round " << timecheck_round << dendl;
4681
4682 // we are at the eye of the storm; the point of reference
4683 timecheck_skews[messenger->get_myinst()] = 0.0;
4684 timecheck_latencies[messenger->get_myinst()] = 0.0;
4685
4686 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4687 if (monmap->get_name(*it) == name)
4688 continue;
4689
4690 entity_inst_t inst = monmap->get_inst(*it);
4691 utime_t curr_time = ceph_clock_now();
4692 timecheck_waiting[inst] = curr_time;
4693 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4694 m->epoch = get_epoch();
4695 m->round = timecheck_round;
4696 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4697 messenger->send_message(m, inst);
4698 }
4699 }
4700
4701 health_status_t Monitor::timecheck_status(ostringstream &ss,
4702 const double skew_bound,
4703 const double latency)
4704 {
4705 health_status_t status = HEALTH_OK;
4706 assert(latency >= 0);
4707
4708 double abs_skew;
4709 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4710 status = HEALTH_WARN;
4711 ss << "clock skew " << abs_skew << "s"
4712 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4713 }
4714
4715 return status;
4716 }
4717
4718 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4719 {
4720 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4721 dout(10) << __func__ << " " << *m << dendl;
4722 /* handles PONG's */
4723 assert(m->op == MTimeCheck::OP_PONG);
4724
4725 entity_inst_t other = m->get_source_inst();
4726 if (m->epoch < get_epoch()) {
4727 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4728 << " from " << other
4729 << " curr " << get_epoch()
4730 << " -- severely lagged? discard" << dendl;
4731 return;
4732 }
4733 assert(m->epoch == get_epoch());
4734
4735 if (m->round < timecheck_round) {
4736 dout(1) << __func__ << " got old round " << m->round
4737 << " from " << other
4738 << " curr " << timecheck_round << " -- discard" << dendl;
4739 return;
4740 }
4741
4742 utime_t curr_time = ceph_clock_now();
4743
4744 assert(timecheck_waiting.count(other) > 0);
4745 utime_t timecheck_sent = timecheck_waiting[other];
4746 timecheck_waiting.erase(other);
4747 if (curr_time < timecheck_sent) {
4748 // our clock was readjusted -- drop everything until it all makes sense.
4749 dout(1) << __func__ << " our clock was readjusted --"
4750 << " bump round and drop current check"
4751 << dendl;
4752 timecheck_cancel_round();
4753 return;
4754 }
4755
4756 /* update peer latencies */
4757 double latency = (double)(curr_time - timecheck_sent);
4758
4759 if (timecheck_latencies.count(other) == 0)
4760 timecheck_latencies[other] = latency;
4761 else {
4762 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4763 timecheck_latencies[other] = avg_latency;
4764 }
4765
4766 /*
4767 * update skews
4768 *
4769 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4770 * and 'a' happens to be lower than 'b'; so we use double instead.
4771 *
4772 * latency is always expected to be >= 0.
4773 *
4774 * delta, the difference between theirs timestamp and ours, may either be
4775 * lower or higher than 0; will hardly ever be 0.
4776 *
4777 * The absolute skew is the absolute delta minus the latency, which is
4778 * taken as a whole instead of an rtt given that there is some queueing
4779 * and dispatch times involved and it's hard to assess how long exactly
4780 * it took for the message to travel to the other side and be handled. So
4781 * we call it a bounded skew, the worst case scenario.
4782 *
4783 * Now, to math!
4784 *
4785 * Given that the latency is always positive, we can establish that the
4786 * bounded skew will be:
4787 *
4788 * 1. positive if the absolute delta is higher than the latency and
4789 * delta is positive
4790 * 2. negative if the absolute delta is higher than the latency and
4791 * delta is negative.
4792 * 3. zero if the absolute delta is lower than the latency.
4793 *
4794 * On 3. we make a judgement call and treat the skew as non-existent.
4795 * This is because that, if the absolute delta is lower than the
4796 * latency, then the apparently existing skew is nothing more than a
4797 * side-effect of the high latency at work.
4798 *
4799 * This may not be entirely true though, as a severely skewed clock
4800 * may be masked by an even higher latency, but with high latencies
4801 * we probably have worse issues to deal with than just skewed clocks.
4802 */
4803 assert(latency >= 0);
4804
4805 double delta = ((double) m->timestamp) - ((double) curr_time);
4806 double abs_delta = (delta > 0 ? delta : -delta);
4807 double skew_bound = abs_delta - latency;
4808 if (skew_bound < 0)
4809 skew_bound = 0;
4810 else if (delta < 0)
4811 skew_bound = -skew_bound;
4812
4813 ostringstream ss;
4814 health_status_t status = timecheck_status(ss, skew_bound, latency);
4815 clog->health(status) << other << " " << ss.str();
4816
4817 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4818 << " delta " << delta << " skew_bound " << skew_bound
4819 << " latency " << latency << dendl;
4820
4821 timecheck_skews[other] = skew_bound;
4822
4823 timecheck_acks++;
4824 if (timecheck_acks == quorum.size()) {
4825 dout(10) << __func__ << " got pongs from everybody ("
4826 << timecheck_acks << " total)" << dendl;
4827 assert(timecheck_skews.size() == timecheck_acks);
4828 assert(timecheck_waiting.empty());
4829 // everyone has acked, so bump the round to finish it.
4830 timecheck_finish_round();
4831 }
4832 }
4833
4834 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4835 {
4836 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4837 dout(10) << __func__ << " " << *m << dendl;
4838
4839 assert(is_peon());
4840 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4841
4842 if (m->epoch != get_epoch()) {
4843 dout(1) << __func__ << " got wrong epoch "
4844 << "(ours " << get_epoch()
4845 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4846 return;
4847 }
4848
4849 if (m->round < timecheck_round) {
4850 dout(1) << __func__ << " got old round " << m->round
4851 << " current " << timecheck_round
4852 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4853 return;
4854 }
4855
4856 timecheck_round = m->round;
4857
4858 if (m->op == MTimeCheck::OP_REPORT) {
4859 assert((timecheck_round % 2) == 0);
4860 timecheck_latencies.swap(m->latencies);
4861 timecheck_skews.swap(m->skews);
4862 return;
4863 }
4864
4865 assert((timecheck_round % 2) != 0);
4866 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4867 utime_t curr_time = ceph_clock_now();
4868 reply->timestamp = curr_time;
4869 reply->epoch = m->epoch;
4870 reply->round = m->round;
4871 dout(10) << __func__ << " send " << *m
4872 << " to " << m->get_source_inst() << dendl;
4873 m->get_connection()->send_message(reply);
4874 }
4875
4876 void Monitor::handle_timecheck(MonOpRequestRef op)
4877 {
4878 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4879 dout(10) << __func__ << " " << *m << dendl;
4880
4881 if (is_leader()) {
4882 if (m->op != MTimeCheck::OP_PONG) {
4883 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4884 } else {
4885 handle_timecheck_leader(op);
4886 }
4887 } else if (is_peon()) {
4888 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4889 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4890 } else {
4891 handle_timecheck_peon(op);
4892 }
4893 } else {
4894 dout(1) << __func__ << " drop unexpected msg" << dendl;
4895 }
4896 }
4897
4898 void Monitor::handle_subscribe(MonOpRequestRef op)
4899 {
4900 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4901 dout(10) << "handle_subscribe " << *m << dendl;
4902
4903 bool reply = false;
4904
4905 MonSession *s = op->get_session();
4906 assert(s);
4907
4908 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4909 p != m->what.end();
4910 ++p) {
4911 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4912 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4913 reply = true;
4914
4915 // remove conflicting subscribes
4916 if (logmon()->sub_name_to_id(p->first) >= 0) {
4917 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4918 it != s->sub_map.end(); ) {
4919 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4920 Mutex::Locker l(session_map_lock);
4921 session_map.remove_sub((it++)->second);
4922 } else {
4923 ++it;
4924 }
4925 }
4926 }
4927
4928 {
4929 Mutex::Locker l(session_map_lock);
4930 session_map.add_update_sub(s, p->first, p->second.start,
4931 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4932 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4933 }
4934
4935 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4936 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4937 if ((int)s->is_capable("mds", MON_CAP_R)) {
4938 Subscription *sub = s->sub_map[p->first];
4939 assert(sub != nullptr);
4940 mdsmon()->check_sub(sub);
4941 }
4942 } else if (p->first == "osdmap") {
4943 if ((int)s->is_capable("osd", MON_CAP_R)) {
4944 if (s->osd_epoch > p->second.start) {
4945 // client needs earlier osdmaps on purpose, so reset the sent epoch
4946 s->osd_epoch = 0;
4947 }
4948 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4949 }
4950 } else if (p->first == "osd_pg_creates") {
4951 if ((int)s->is_capable("osd", MON_CAP_W)) {
4952 if (monmap->get_required_features().contains_all(
4953 ceph::features::mon::FEATURE_LUMINOUS)) {
4954 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
4955 } else {
4956 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
4957 }
4958 }
4959 } else if (p->first == "monmap") {
4960 monmon()->check_sub(s->sub_map[p->first]);
4961 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
4962 logmon()->check_sub(s->sub_map[p->first]);
4963 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
4964 mgrmon()->check_sub(s->sub_map[p->first]);
4965 } else if (p->first == "servicemap") {
4966 mgrstatmon()->check_sub(s->sub_map[p->first]);
4967 }
4968 }
4969
4970 if (reply) {
4971 // we only need to reply if the client is old enough to think it
4972 // has to send renewals.
4973 ConnectionRef con = m->get_connection();
4974 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
4975 m->get_connection()->send_message(new MMonSubscribeAck(
4976 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
4977 }
4978
4979 }
4980
4981 void Monitor::handle_get_version(MonOpRequestRef op)
4982 {
4983 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
4984 dout(10) << "handle_get_version " << *m << dendl;
4985 PaxosService *svc = NULL;
4986
4987 MonSession *s = op->get_session();
4988 assert(s);
4989
4990 if (!is_leader() && !is_peon()) {
4991 dout(10) << " waiting for quorum" << dendl;
4992 waitfor_quorum.push_back(new C_RetryMessage(this, op));
4993 goto out;
4994 }
4995
4996 if (m->what == "mdsmap") {
4997 svc = mdsmon();
4998 } else if (m->what == "fsmap") {
4999 svc = mdsmon();
5000 } else if (m->what == "osdmap") {
5001 svc = osdmon();
5002 } else if (m->what == "monmap") {
5003 svc = monmon();
5004 } else {
5005 derr << "invalid map type " << m->what << dendl;
5006 }
5007
5008 if (svc) {
5009 if (!svc->is_readable()) {
5010 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5011 goto out;
5012 }
5013
5014 MMonGetVersionReply *reply = new MMonGetVersionReply();
5015 reply->handle = m->handle;
5016 reply->version = svc->get_last_committed();
5017 reply->oldest_version = svc->get_first_committed();
5018 reply->set_tid(m->get_tid());
5019
5020 m->get_connection()->send_message(reply);
5021 }
5022 out:
5023 return;
5024 }
5025
5026 bool Monitor::ms_handle_reset(Connection *con)
5027 {
5028 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5029
5030 // ignore lossless monitor sessions
5031 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5032 return false;
5033
5034 MonSession *s = static_cast<MonSession *>(con->get_priv());
5035 if (!s)
5036 return false;
5037
5038 // break any con <-> session ref cycle
5039 s->con->set_priv(NULL);
5040
5041 if (is_shutdown())
5042 return false;
5043
5044 Mutex::Locker l(lock);
5045
5046 dout(10) << "reset/close on session " << s->inst << dendl;
5047 if (!s->closed) {
5048 Mutex::Locker l(session_map_lock);
5049 remove_session(s);
5050 }
5051 s->put();
5052 return true;
5053 }
5054
5055 bool Monitor::ms_handle_refused(Connection *con)
5056 {
5057 // just log for now...
5058 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5059 return false;
5060 }
5061
5062 // -----
5063
5064 void Monitor::send_latest_monmap(Connection *con)
5065 {
5066 bufferlist bl;
5067 monmap->encode(bl, con->get_features());
5068 con->send_message(new MMonMap(bl));
5069 }
5070
5071 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5072 {
5073 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5074 dout(10) << "handle_mon_get_map" << dendl;
5075 send_latest_monmap(m->get_connection().get());
5076 }
5077
5078 void Monitor::handle_mon_metadata(MonOpRequestRef op)
5079 {
5080 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5081 if (is_leader()) {
5082 dout(10) << __func__ << dendl;
5083 update_mon_metadata(m->get_source().num(), std::move(m->data));
5084 }
5085 }
5086
5087 void Monitor::update_mon_metadata(int from, Metadata&& m)
5088 {
5089 // NOTE: this is now for legacy (kraken or jewel) mons only.
5090 pending_metadata[from] = std::move(m);
5091
5092 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5093 bufferlist bl;
5094 ::encode(pending_metadata, bl);
5095 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5096 paxos->trigger_propose();
5097 }
5098
5099 int Monitor::load_metadata()
5100 {
5101 bufferlist bl;
5102 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5103 if (r)
5104 return r;
5105 bufferlist::iterator it = bl.begin();
5106 ::decode(mon_metadata, it);
5107
5108 pending_metadata = mon_metadata;
5109 return 0;
5110 }
5111
5112 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5113 {
5114 assert(f);
5115 if (!mon_metadata.count(mon)) {
5116 err << "mon." << mon << " not found";
5117 return -EINVAL;
5118 }
5119 const Metadata& m = mon_metadata[mon];
5120 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5121 f->dump_string(p->first.c_str(), p->second);
5122 }
5123 return 0;
5124 }
5125
5126 void Monitor::count_metadata(const string& field, map<string,int> *out)
5127 {
5128 for (auto& p : mon_metadata) {
5129 auto q = p.second.find(field);
5130 if (q == p.second.end()) {
5131 (*out)["unknown"]++;
5132 } else {
5133 (*out)[q->second]++;
5134 }
5135 }
5136 }
5137
5138 void Monitor::count_metadata(const string& field, Formatter *f)
5139 {
5140 map<string,int> by_val;
5141 count_metadata(field, &by_val);
5142 f->open_object_section(field.c_str());
5143 for (auto& p : by_val) {
5144 f->dump_int(p.first.c_str(), p.second);
5145 }
5146 f->close_section();
5147 }
5148
5149 int Monitor::print_nodes(Formatter *f, ostream& err)
5150 {
5151 map<string, list<int> > mons; // hostname => mon
5152 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5153 it != mon_metadata.end(); ++it) {
5154 const Metadata& m = it->second;
5155 Metadata::const_iterator hostname = m.find("hostname");
5156 if (hostname == m.end()) {
5157 // not likely though
5158 continue;
5159 }
5160 mons[hostname->second].push_back(it->first);
5161 }
5162
5163 dump_services(f, mons, "mon");
5164 return 0;
5165 }
5166
5167 // ----------------------------------------------
5168 // scrub
5169
5170 int Monitor::scrub_start()
5171 {
5172 dout(10) << __func__ << dendl;
5173 assert(is_leader());
5174
5175 if (!scrub_result.empty()) {
5176 clog->info() << "scrub already in progress";
5177 return -EBUSY;
5178 }
5179
5180 scrub_event_cancel();
5181 scrub_result.clear();
5182 scrub_state.reset(new ScrubState);
5183
5184 scrub();
5185 return 0;
5186 }
5187
5188 int Monitor::scrub()
5189 {
5190 assert(is_leader());
5191 assert(scrub_state);
5192
5193 scrub_cancel_timeout();
5194 wait_for_paxos_write();
5195 scrub_version = paxos->get_version();
5196
5197
5198 // scrub all keys if we're the only monitor in the quorum
5199 int32_t num_keys =
5200 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5201
5202 for (set<int>::iterator p = quorum.begin();
5203 p != quorum.end();
5204 ++p) {
5205 if (*p == rank)
5206 continue;
5207 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5208 num_keys);
5209 r->key = scrub_state->last_key;
5210 messenger->send_message(r, monmap->get_inst(*p));
5211 }
5212
5213 // scrub my keys
5214 bool r = _scrub(&scrub_result[rank],
5215 &scrub_state->last_key,
5216 &num_keys);
5217
5218 scrub_state->finished = !r;
5219
5220 // only after we got our scrub results do we really care whether the
5221 // other monitors are late on their results. Also, this way we avoid
5222 // triggering the timeout if we end up getting stuck in _scrub() for
5223 // longer than the duration of the timeout.
5224 scrub_reset_timeout();
5225
5226 if (quorum.size() == 1) {
5227 assert(scrub_state->finished == true);
5228 scrub_finish();
5229 }
5230 return 0;
5231 }
5232
5233 void Monitor::handle_scrub(MonOpRequestRef op)
5234 {
5235 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5236 dout(10) << __func__ << " " << *m << dendl;
5237 switch (m->op) {
5238 case MMonScrub::OP_SCRUB:
5239 {
5240 if (!is_peon())
5241 break;
5242
5243 wait_for_paxos_write();
5244
5245 if (m->version != paxos->get_version())
5246 break;
5247
5248 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5249 m->version,
5250 m->num_keys);
5251
5252 reply->key = m->key;
5253 _scrub(&reply->result, &reply->key, &reply->num_keys);
5254 m->get_connection()->send_message(reply);
5255 }
5256 break;
5257
5258 case MMonScrub::OP_RESULT:
5259 {
5260 if (!is_leader())
5261 break;
5262 if (m->version != scrub_version)
5263 break;
5264 // reset the timeout each time we get a result
5265 scrub_reset_timeout();
5266
5267 int from = m->get_source().num();
5268 assert(scrub_result.count(from) == 0);
5269 scrub_result[from] = m->result;
5270
5271 if (scrub_result.size() == quorum.size()) {
5272 scrub_check_results();
5273 scrub_result.clear();
5274 if (scrub_state->finished)
5275 scrub_finish();
5276 else
5277 scrub();
5278 }
5279 }
5280 break;
5281 }
5282 }
5283
5284 bool Monitor::_scrub(ScrubResult *r,
5285 pair<string,string> *start,
5286 int *num_keys)
5287 {
5288 assert(r != NULL);
5289 assert(start != NULL);
5290 assert(num_keys != NULL);
5291
5292 set<string> prefixes = get_sync_targets_names();
5293 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5294
5295 dout(10) << __func__ << " start (" << *start << ")"
5296 << " num_keys " << *num_keys << dendl;
5297
5298 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5299
5300 int scrubbed_keys = 0;
5301 pair<string,string> last_key;
5302
5303 while (it->has_next_chunk()) {
5304
5305 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5306 break;
5307
5308 pair<string,string> k = it->get_next_key();
5309 if (prefixes.count(k.first) == 0)
5310 continue;
5311
5312 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5313 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5314 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5315 << dendl;
5316 continue;
5317 }
5318
5319 bufferlist bl;
5320 int err = store->get(k.first, k.second, bl);
5321 assert(err == 0);
5322
5323 uint32_t key_crc = bl.crc32c(0);
5324 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5325 << " crc " << key_crc << dendl;
5326 r->prefix_keys[k.first]++;
5327 if (r->prefix_crc.count(k.first) == 0) {
5328 r->prefix_crc[k.first] = 0;
5329 }
5330 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5331
5332 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5333 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5334 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5335 r->prefix_crc[k.first] += 1;
5336 }
5337
5338 ++scrubbed_keys;
5339 last_key = k;
5340 }
5341
5342 dout(20) << __func__ << " last_key (" << last_key << ")"
5343 << " scrubbed_keys " << scrubbed_keys
5344 << " has_next " << it->has_next_chunk() << dendl;
5345
5346 *start = last_key;
5347 *num_keys = scrubbed_keys;
5348
5349 return it->has_next_chunk();
5350 }
5351
5352 void Monitor::scrub_check_results()
5353 {
5354 dout(10) << __func__ << dendl;
5355
5356 // compare
5357 int errors = 0;
5358 ScrubResult& mine = scrub_result[rank];
5359 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5360 p != scrub_result.end();
5361 ++p) {
5362 if (p->first == rank)
5363 continue;
5364 if (p->second != mine) {
5365 ++errors;
5366 clog->error() << "scrub mismatch";
5367 clog->error() << " mon." << rank << " " << mine;
5368 clog->error() << " mon." << p->first << " " << p->second;
5369 }
5370 }
5371 if (!errors)
5372 clog->debug() << "scrub ok on " << quorum << ": " << mine;
5373 }
5374
5375 inline void Monitor::scrub_timeout()
5376 {
5377 dout(1) << __func__ << " restarting scrub" << dendl;
5378 scrub_reset();
5379 scrub_start();
5380 }
5381
5382 void Monitor::scrub_finish()
5383 {
5384 dout(10) << __func__ << dendl;
5385 scrub_reset();
5386 scrub_event_start();
5387 }
5388
5389 void Monitor::scrub_reset()
5390 {
5391 dout(10) << __func__ << dendl;
5392 scrub_cancel_timeout();
5393 scrub_version = 0;
5394 scrub_result.clear();
5395 scrub_state.reset();
5396 }
5397
5398 inline void Monitor::scrub_update_interval(int secs)
5399 {
5400 // we don't care about changes if we are not the leader.
5401 // changes will be visible if we become the leader.
5402 if (!is_leader())
5403 return;
5404
5405 dout(1) << __func__ << " new interval = " << secs << dendl;
5406
5407 // if scrub already in progress, all changes will already be visible during
5408 // the next round. Nothing to do.
5409 if (scrub_state != NULL)
5410 return;
5411
5412 scrub_event_cancel();
5413 scrub_event_start();
5414 }
5415
5416 void Monitor::scrub_event_start()
5417 {
5418 dout(10) << __func__ << dendl;
5419
5420 if (scrub_event)
5421 scrub_event_cancel();
5422
5423 if (cct->_conf->mon_scrub_interval <= 0) {
5424 dout(1) << __func__ << " scrub event is disabled"
5425 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5426 << ")" << dendl;
5427 return;
5428 }
5429
5430 scrub_event = new C_MonContext(this, [this](int) {
5431 scrub_start();
5432 });
5433 timer.add_event_after(cct->_conf->mon_scrub_interval, scrub_event);
5434 }
5435
5436 void Monitor::scrub_event_cancel()
5437 {
5438 dout(10) << __func__ << dendl;
5439 if (scrub_event) {
5440 timer.cancel_event(scrub_event);
5441 scrub_event = NULL;
5442 }
5443 }
5444
5445 inline void Monitor::scrub_cancel_timeout()
5446 {
5447 if (scrub_timeout_event) {
5448 timer.cancel_event(scrub_timeout_event);
5449 scrub_timeout_event = NULL;
5450 }
5451 }
5452
5453 void Monitor::scrub_reset_timeout()
5454 {
5455 dout(15) << __func__ << " reset timeout event" << dendl;
5456 scrub_cancel_timeout();
5457
5458 scrub_timeout_event = new C_MonContext(this, [this](int) {
5459 scrub_timeout();
5460 });
5461 timer.add_event_after(g_conf->mon_scrub_timeout, scrub_timeout_event);
5462 }
5463
5464 /************ TICK ***************/
5465 void Monitor::new_tick()
5466 {
5467 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5468 tick();
5469 }));
5470 }
5471
5472 void Monitor::tick()
5473 {
5474 // ok go.
5475 dout(11) << "tick" << dendl;
5476 const utime_t now = ceph_clock_now();
5477
5478 // Check if we need to emit any delayed health check updated messages
5479 if (is_leader()) {
5480 const auto min_period = g_conf->get_val<int64_t>(
5481 "mon_health_log_update_period");
5482 for (auto& svc : paxos_service) {
5483 auto health = svc->get_health_checks();
5484
5485 for (const auto &i : health.checks) {
5486 const std::string &code = i.first;
5487 const std::string &summary = i.second.summary;
5488 const health_status_t severity = i.second.severity;
5489
5490 auto status_iter = health_check_log_times.find(code);
5491 if (status_iter == health_check_log_times.end()) {
5492 continue;
5493 }
5494
5495 auto &log_status = status_iter->second;
5496 bool const changed = log_status.last_message != summary
5497 || log_status.severity != severity;
5498
5499 if (changed && now - log_status.updated_at > min_period) {
5500 log_status.last_message = summary;
5501 log_status.updated_at = now;
5502 log_status.severity = severity;
5503
5504 ostringstream ss;
5505 ss << "Health check update: " << summary << " (" << code << ")";
5506 clog->health(severity) << ss.str();
5507 }
5508 }
5509 }
5510 }
5511
5512
5513 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5514 (*p)->tick();
5515 (*p)->maybe_trim();
5516 }
5517
5518 // trim sessions
5519 {
5520 Mutex::Locker l(session_map_lock);
5521 auto p = session_map.sessions.begin();
5522
5523 bool out_for_too_long = (!exited_quorum.is_zero() &&
5524 now > (exited_quorum + 2*g_conf->mon_lease));
5525
5526 while (!p.end()) {
5527 MonSession *s = *p;
5528 ++p;
5529
5530 // don't trim monitors
5531 if (s->inst.name.is_mon())
5532 continue;
5533
5534 if (s->session_timeout < now && s->con) {
5535 // check keepalive, too
5536 s->session_timeout = s->con->get_last_keepalive();
5537 s->session_timeout += g_conf->mon_session_timeout;
5538 }
5539 if (s->session_timeout < now) {
5540 dout(10) << " trimming session " << s->con << " " << s->inst
5541 << " (timeout " << s->session_timeout
5542 << " < now " << now << ")" << dendl;
5543 } else if (out_for_too_long) {
5544 // boot the client Session because we've taken too long getting back in
5545 dout(10) << " trimming session " << s->con << " " << s->inst
5546 << " because we've been out of quorum too long" << dendl;
5547 } else {
5548 continue;
5549 }
5550
5551 s->con->mark_down();
5552 remove_session(s);
5553 logger->inc(l_mon_session_trim);
5554 }
5555 }
5556 sync_trim_providers();
5557
5558 if (!maybe_wait_for_quorum.empty()) {
5559 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5560 }
5561
5562 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5563 // this is only necessary on upgraded clusters.
5564 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5565 prepare_new_fingerprint(t);
5566 paxos->trigger_propose();
5567 }
5568
5569 new_tick();
5570 }
5571
5572 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5573 {
5574 uuid_d nf;
5575 nf.generate_random();
5576 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5577
5578 bufferlist bl;
5579 ::encode(nf, bl);
5580 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5581 }
5582
5583 int Monitor::check_fsid()
5584 {
5585 bufferlist ebl;
5586 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5587 if (r == -ENOENT)
5588 return r;
5589 assert(r == 0);
5590
5591 string es(ebl.c_str(), ebl.length());
5592
5593 // only keep the first line
5594 size_t pos = es.find_first_of('\n');
5595 if (pos != string::npos)
5596 es.resize(pos);
5597
5598 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5599 uuid_d ondisk;
5600 if (!ondisk.parse(es.c_str())) {
5601 derr << "error: unable to parse uuid" << dendl;
5602 return -EINVAL;
5603 }
5604
5605 if (monmap->get_fsid() != ondisk) {
5606 derr << "error: cluster_uuid file exists with value " << ondisk
5607 << ", != our uuid " << monmap->get_fsid() << dendl;
5608 return -EEXIST;
5609 }
5610
5611 return 0;
5612 }
5613
5614 int Monitor::write_fsid()
5615 {
5616 auto t(std::make_shared<MonitorDBStore::Transaction>());
5617 write_fsid(t);
5618 int r = store->apply_transaction(t);
5619 return r;
5620 }
5621
5622 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5623 {
5624 ostringstream ss;
5625 ss << monmap->get_fsid() << "\n";
5626 string us = ss.str();
5627
5628 bufferlist b;
5629 b.append(us);
5630
5631 t->put(MONITOR_NAME, "cluster_uuid", b);
5632 return 0;
5633 }
5634
5635 /*
5636 * this is the closest thing to a traditional 'mkfs' for ceph.
5637 * initialize the monitor state machines to their initial values.
5638 */
5639 int Monitor::mkfs(bufferlist& osdmapbl)
5640 {
5641 auto t(std::make_shared<MonitorDBStore::Transaction>());
5642
5643 // verify cluster fsid
5644 int r = check_fsid();
5645 if (r < 0 && r != -ENOENT)
5646 return r;
5647
5648 bufferlist magicbl;
5649 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5650 magicbl.append("\n");
5651 t->put(MONITOR_NAME, "magic", magicbl);
5652
5653
5654 features = get_initial_supported_features();
5655 write_features(t);
5656
5657 // save monmap, osdmap, keyring.
5658 bufferlist monmapbl;
5659 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5660 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5661 t->put("mkfs", "monmap", monmapbl);
5662
5663 if (osdmapbl.length()) {
5664 // make sure it's a valid osdmap
5665 try {
5666 OSDMap om;
5667 om.decode(osdmapbl);
5668 }
5669 catch (buffer::error& e) {
5670 derr << "error decoding provided osdmap: " << e.what() << dendl;
5671 return -EINVAL;
5672 }
5673 t->put("mkfs", "osdmap", osdmapbl);
5674 }
5675
5676 if (is_keyring_required()) {
5677 KeyRing keyring;
5678 string keyring_filename;
5679
5680 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5681 if (r) {
5682 derr << "unable to find a keyring file on " << g_conf->keyring
5683 << ": " << cpp_strerror(r) << dendl;
5684 if (g_conf->key != "") {
5685 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5686 "\n\tcaps mon = \"allow *\"\n";
5687 bufferlist bl;
5688 bl.append(keyring_plaintext);
5689 try {
5690 bufferlist::iterator i = bl.begin();
5691 keyring.decode_plaintext(i);
5692 }
5693 catch (const buffer::error& e) {
5694 derr << "error decoding keyring " << keyring_plaintext
5695 << ": " << e.what() << dendl;
5696 return -EINVAL;
5697 }
5698 } else {
5699 return -ENOENT;
5700 }
5701 } else {
5702 r = keyring.load(g_ceph_context, keyring_filename);
5703 if (r < 0) {
5704 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5705 return r;
5706 }
5707 }
5708
5709 // put mon. key in external keyring; seed with everything else.
5710 extract_save_mon_key(keyring);
5711
5712 bufferlist keyringbl;
5713 keyring.encode_plaintext(keyringbl);
5714 t->put("mkfs", "keyring", keyringbl);
5715 }
5716 write_fsid(t);
5717 store->apply_transaction(t);
5718
5719 return 0;
5720 }
5721
5722 int Monitor::write_default_keyring(bufferlist& bl)
5723 {
5724 ostringstream os;
5725 os << g_conf->mon_data << "/keyring";
5726
5727 int err = 0;
5728 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5729 if (fd < 0) {
5730 err = -errno;
5731 dout(0) << __func__ << " failed to open " << os.str()
5732 << ": " << cpp_strerror(err) << dendl;
5733 return err;
5734 }
5735
5736 err = bl.write_fd(fd);
5737 if (!err)
5738 ::fsync(fd);
5739 VOID_TEMP_FAILURE_RETRY(::close(fd));
5740
5741 return err;
5742 }
5743
5744 void Monitor::extract_save_mon_key(KeyRing& keyring)
5745 {
5746 EntityName mon_name;
5747 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5748 EntityAuth mon_key;
5749 if (keyring.get_auth(mon_name, mon_key)) {
5750 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5751 KeyRing pkey;
5752 pkey.add(mon_name, mon_key);
5753 bufferlist bl;
5754 pkey.encode_plaintext(bl);
5755 write_default_keyring(bl);
5756 keyring.remove(mon_name);
5757 }
5758 }
5759
5760 bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5761 bool force_new)
5762 {
5763 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5764 << dendl;
5765
5766 if (is_shutdown())
5767 return false;
5768
5769 // we only connect to other monitors and mgr; every else connects to us.
5770 if (service_id != CEPH_ENTITY_TYPE_MON &&
5771 service_id != CEPH_ENTITY_TYPE_MGR)
5772 return false;
5773
5774 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5775 // auth_none
5776 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5777 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5778 handler.set_global_id(0);
5779 *authorizer = handler.build_authorizer(service_id);
5780 return true;
5781 }
5782
5783 CephXServiceTicketInfo auth_ticket_info;
5784 CephXSessionAuthInfo info;
5785 int ret;
5786
5787 EntityName name;
5788 name.set_type(CEPH_ENTITY_TYPE_MON);
5789 auth_ticket_info.ticket.name = name;
5790 auth_ticket_info.ticket.global_id = 0;
5791
5792 if (service_id == CEPH_ENTITY_TYPE_MON) {
5793 // mon to mon authentication uses the private monitor shared key and not the
5794 // rotating key
5795 CryptoKey secret;
5796 if (!keyring.get_secret(name, secret) &&
5797 !key_server.get_secret(name, secret)) {
5798 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5799 << dendl;
5800 stringstream ss, ds;
5801 int err = key_server.list_secrets(ds);
5802 if (err < 0)
5803 ss << "no installed auth entries!";
5804 else
5805 ss << "installed auth entries:";
5806 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5807 return false;
5808 }
5809
5810 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5811 secret, (uint64_t)-1);
5812 if (ret < 0) {
5813 dout(0) << __func__ << " failed to build mon session_auth_info "
5814 << cpp_strerror(ret) << dendl;
5815 return false;
5816 }
5817 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5818 // mgr
5819 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5820 if (ret < 0) {
5821 derr << __func__ << " failed to build mgr service session_auth_info "
5822 << cpp_strerror(ret) << dendl;
5823 return false;
5824 }
5825 } else {
5826 ceph_abort(); // see check at top of fn
5827 }
5828
5829 CephXTicketBlob blob;
5830 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5831 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5832 return false;
5833 }
5834 bufferlist ticket_data;
5835 ::encode(blob, ticket_data);
5836
5837 bufferlist::iterator iter = ticket_data.begin();
5838 CephXTicketHandler handler(g_ceph_context, service_id);
5839 ::decode(handler.ticket, iter);
5840
5841 handler.session_key = info.session_key;
5842
5843 *authorizer = handler.build_authorizer(0);
5844
5845 return true;
5846 }
5847
5848 bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5849 int protocol, bufferlist& authorizer_data,
5850 bufferlist& authorizer_reply,
5851 bool& isvalid, CryptoKey& session_key)
5852 {
5853 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5854 << " " << ceph_entity_type_name(peer_type)
5855 << " protocol " << protocol << dendl;
5856
5857 if (is_shutdown())
5858 return false;
5859
5860 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5861 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5862 // monitor, and cephx is enabled
5863 isvalid = false;
5864 if (protocol == CEPH_AUTH_CEPHX) {
5865 bufferlist::iterator iter = authorizer_data.begin();
5866 CephXServiceTicketInfo auth_ticket_info;
5867
5868 if (authorizer_data.length()) {
5869 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5870 auth_ticket_info, authorizer_reply);
5871 if (ret) {
5872 session_key = auth_ticket_info.session_key;
5873 isvalid = true;
5874 } else {
5875 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5876 }
5877 }
5878 } else {
5879 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5880 }
5881 } else {
5882 // who cares.
5883 isvalid = true;
5884 }
5885 return true;
5886 }