]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Monitor.cc
update sources to 12.2.2
[ceph.git] / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <sstream>
17 #include <stdlib.h>
18 #include <signal.h>
19 #include <limits.h>
20 #include <cstring>
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
23
24 #include "Monitor.h"
25 #include "common/version.h"
26
27 #include "osd/OSDMap.h"
28
29 #include "MonitorDBStore.h"
30
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
48
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
51
52 #include "messages/MAuthReply.h"
53
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
56
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
70
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/OldHealthMonitor.h"
81 #include "mon/HealthMonitor.h"
82 #include "mon/ConfigKeyService.h"
83 #include "common/config.h"
84 #include "common/cmdparse.h"
85 #include "include/assert.h"
86 #include "include/compat.h"
87 #include "perfglue/heap_profiler.h"
88
89 #include "auth/none/AuthNoneClientHandler.h"
90
91 #define dout_subsys ceph_subsys_mon
92 #undef dout_prefix
93 #define dout_prefix _prefix(_dout, this)
94 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
95 return *_dout << "mon." << mon->name << "@" << mon->rank
96 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
97 }
98
99 const string Monitor::MONITOR_NAME = "monitor";
100 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
101
102
103 #undef FLAG
104 #undef COMMAND
105 #undef COMMAND_WITH_FLAG
106 #define FLAG(f) (MonCommand::FLAG_##f)
107 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
108 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
109 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
110 {parsesig, helptext, modulename, req_perms, avail, flags},
111 MonCommand mon_commands[] = {
112 #include <mon/MonCommands.h>
113 };
114 MonCommand pgmonitor_commands[] = {
115 #include <mon/PGMonitorCommands.h>
116 };
117 #undef COMMAND
118 #undef COMMAND_WITH_FLAG
119
120
121 void C_MonContext::finish(int r) {
122 if (mon->is_shutdown())
123 return;
124 FunctionContext::finish(r);
125 }
126
127 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
128 Messenger *m, Messenger *mgr_m, MonMap *map) :
129 Dispatcher(cct_),
130 name(nm),
131 rank(-1),
132 messenger(m),
133 con_self(m ? m->get_loopback_connection() : NULL),
134 lock("Monitor::lock"),
135 timer(cct_, lock),
136 finisher(cct_, "mon_finisher", "fin"),
137 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
138 has_ever_joined(false),
139 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
140 monmap(map),
141 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
142 key_server(cct, &keyring),
143 auth_cluster_required(cct,
144 cct->_conf->auth_supported.empty() ?
145 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
146 auth_service_required(cct,
147 cct->_conf->auth_supported.empty() ?
148 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
149 mgr_messenger(mgr_m),
150 mgr_client(cct_, mgr_m),
151 pgservice(nullptr),
152 store(s),
153
154 state(STATE_PROBING),
155
156 elector(this),
157 required_features(0),
158 leader(0),
159 quorum_con_features(0),
160 // scrub
161 scrub_version(0),
162 scrub_event(NULL),
163 scrub_timeout_event(NULL),
164
165 // sync state
166 sync_provider_count(0),
167 sync_cookie(0),
168 sync_full(false),
169 sync_start_version(0),
170 sync_timeout_event(NULL),
171 sync_last_committed_floor(0),
172
173 timecheck_round(0),
174 timecheck_acks(0),
175 timecheck_rounds_since_clean(0),
176 timecheck_event(NULL),
177
178 paxos_service(PAXOS_NUM),
179 admin_hook(NULL),
180 routed_request_tid(0),
181 op_tracker(cct, true, 1)
182 {
183 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
184 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
185
186 update_log_clients();
187
188 paxos = new Paxos(this, "paxos");
189
190 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
191 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
192 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
193 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
194 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
195 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
196 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
197 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
198 paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
199
200 health_monitor = new OldHealthMonitor(this);
201 config_key_service = new ConfigKeyService(this, paxos);
202
203 mon_caps = new MonCap();
204 bool r = mon_caps->parse("allow *", NULL);
205 assert(r);
206
207 exited_quorum = ceph_clock_now();
208
209 // prepare local commands
210 local_mon_commands.resize(ARRAY_SIZE(mon_commands));
211 for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
212 local_mon_commands[i] = mon_commands[i];
213 }
214 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
215
216 local_upgrading_mon_commands = local_mon_commands;
217 for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
218 local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
219 }
220 MonCommand::encode_vector(local_upgrading_mon_commands,
221 local_upgrading_mon_commands_bl);
222
223 // assume our commands until we have an election. this only means
224 // we won't reply with EINVAL before the election; any command that
225 // actually matters will wait until we have quorum etc and then
226 // retry (and revalidate).
227 leader_mon_commands = local_mon_commands;
228
229 // note: OSDMonitor may update this based on the luminous flag.
230 pgservice = mgrstatmon()->get_pg_stat_service();
231 }
232
233 Monitor::~Monitor()
234 {
235 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
236 delete *p;
237 delete health_monitor;
238 delete config_key_service;
239 delete paxos;
240 assert(session_map.sessions.empty());
241 delete mon_caps;
242 }
243
244
245 class AdminHook : public AdminSocketHook {
246 Monitor *mon;
247 public:
248 explicit AdminHook(Monitor *m) : mon(m) {}
249 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
250 bufferlist& out) override {
251 stringstream ss;
252 mon->do_admin_command(command, cmdmap, format, ss);
253 out.append(ss);
254 return true;
255 }
256 };
257
258 void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
259 ostream& ss)
260 {
261 Mutex::Locker l(lock);
262
263 boost::scoped_ptr<Formatter> f(Formatter::create(format));
264
265 string args;
266 for (cmdmap_t::iterator p = cmdmap.begin();
267 p != cmdmap.end(); ++p) {
268 if (p->first == "prefix")
269 continue;
270 if (!args.empty())
271 args += ", ";
272 args += cmd_vartype_stringify(p->second);
273 }
274 args = "[" + args + "]";
275
276 bool read_only = (command == "mon_status" ||
277 command == "mon metadata" ||
278 command == "quorum_status" ||
279 command == "ops" ||
280 command == "sessions");
281
282 (read_only ? audit_clog->debug() : audit_clog->info())
283 << "from='admin socket' entity='admin socket' "
284 << "cmd='" << command << "' args=" << args << ": dispatch";
285
286 if (command == "mon_status") {
287 get_mon_status(f.get(), ss);
288 if (f)
289 f->flush(ss);
290 } else if (command == "quorum_status") {
291 _quorum_status(f.get(), ss);
292 } else if (command == "sync_force") {
293 string validate;
294 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
295 (validate != "--yes-i-really-mean-it")) {
296 ss << "are you SURE? this will mean the monitor store will be erased "
297 "the next time the monitor is restarted. pass "
298 "'--yes-i-really-mean-it' if you really do.";
299 goto abort;
300 }
301 sync_force(f.get(), ss);
302 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
303 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
304 goto abort;
305 } else if (command == "quorum enter") {
306 elector.start_participating();
307 start_election();
308 ss << "started responding to quorum, initiated new election";
309 } else if (command == "quorum exit") {
310 start_election();
311 elector.stop_participating();
312 ss << "stopped responding to quorum, initiated new election";
313 } else if (command == "ops") {
314 (void)op_tracker.dump_ops_in_flight(f.get());
315 if (f) {
316 f->flush(ss);
317 }
318 } else if (command == "sessions") {
319
320 if (f) {
321 f->open_array_section("sessions");
322 for (auto p : session_map.sessions) {
323 f->dump_stream("session") << *p;
324 }
325 f->close_section();
326 f->flush(ss);
327 }
328
329 } else {
330 assert(0 == "bad AdminSocket command binding");
331 }
332 (read_only ? audit_clog->debug() : audit_clog->info())
333 << "from='admin socket' "
334 << "entity='admin socket' "
335 << "cmd=" << command << " "
336 << "args=" << args << ": finished";
337 return;
338
339 abort:
340 (read_only ? audit_clog->debug() : audit_clog->info())
341 << "from='admin socket' "
342 << "entity='admin socket' "
343 << "cmd=" << command << " "
344 << "args=" << args << ": aborted";
345 }
346
347 void Monitor::handle_signal(int signum)
348 {
349 assert(signum == SIGINT || signum == SIGTERM);
350 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
351 shutdown();
352 }
353
354 CompatSet Monitor::get_initial_supported_features()
355 {
356 CompatSet::FeatureSet ceph_mon_feature_compat;
357 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
358 CompatSet::FeatureSet ceph_mon_feature_incompat;
359 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
360 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
361 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
362 ceph_mon_feature_incompat);
363 }
364
365 CompatSet Monitor::get_supported_features()
366 {
367 CompatSet compat = get_initial_supported_features();
368 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
369 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
370 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
371 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
372 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
373 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
374 return compat;
375 }
376
377 CompatSet Monitor::get_legacy_features()
378 {
379 CompatSet::FeatureSet ceph_mon_feature_compat;
380 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
381 CompatSet::FeatureSet ceph_mon_feature_incompat;
382 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
383 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
384 ceph_mon_feature_incompat);
385 }
386
387 int Monitor::check_features(MonitorDBStore *store)
388 {
389 CompatSet required = get_supported_features();
390 CompatSet ondisk;
391
392 read_features_off_disk(store, &ondisk);
393
394 if (!required.writeable(ondisk)) {
395 CompatSet diff = required.unsupported(ondisk);
396 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
397 return -EPERM;
398 }
399
400 return 0;
401 }
402
403 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
404 {
405 bufferlist featuresbl;
406 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
407 if (featuresbl.length() == 0) {
408 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
409 << "Assuming it is old-style and introducing one." << dendl;
410 //we only want the baseline ~v.18 features assumed to be on disk.
411 //If new features are introduced this code needs to disappear or
412 //be made smarter.
413 *features = get_legacy_features();
414
415 features->encode(featuresbl);
416 auto t(std::make_shared<MonitorDBStore::Transaction>());
417 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
418 store->apply_transaction(t);
419 } else {
420 bufferlist::iterator it = featuresbl.begin();
421 features->decode(it);
422 }
423 }
424
425 void Monitor::read_features()
426 {
427 read_features_off_disk(store, &features);
428 dout(10) << "features " << features << dendl;
429
430 calc_quorum_requirements();
431 dout(10) << "required_features " << required_features << dendl;
432 }
433
434 void Monitor::write_features(MonitorDBStore::TransactionRef t)
435 {
436 bufferlist bl;
437 features.encode(bl);
438 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
439 }
440
441 const char** Monitor::get_tracked_conf_keys() const
442 {
443 static const char* KEYS[] = {
444 "crushtool", // helpful for testing
445 "mon_election_timeout",
446 "mon_lease",
447 "mon_lease_renew_interval_factor",
448 "mon_lease_ack_timeout_factor",
449 "mon_accept_timeout_factor",
450 // clog & admin clog
451 "clog_to_monitors",
452 "clog_to_syslog",
453 "clog_to_syslog_facility",
454 "clog_to_syslog_level",
455 "clog_to_graylog",
456 "clog_to_graylog_host",
457 "clog_to_graylog_port",
458 "host",
459 "fsid",
460 // periodic health to clog
461 "mon_health_to_clog",
462 "mon_health_to_clog_interval",
463 "mon_health_to_clog_tick_interval",
464 // scrub interval
465 "mon_scrub_interval",
466 NULL
467 };
468 return KEYS;
469 }
470
471 void Monitor::handle_conf_change(const struct md_config_t *conf,
472 const std::set<std::string> &changed)
473 {
474 sanitize_options();
475
476 dout(10) << __func__ << " " << changed << dendl;
477
478 if (changed.count("clog_to_monitors") ||
479 changed.count("clog_to_syslog") ||
480 changed.count("clog_to_syslog_level") ||
481 changed.count("clog_to_syslog_facility") ||
482 changed.count("clog_to_graylog") ||
483 changed.count("clog_to_graylog_host") ||
484 changed.count("clog_to_graylog_port") ||
485 changed.count("host") ||
486 changed.count("fsid")) {
487 update_log_clients();
488 }
489
490 if (changed.count("mon_health_to_clog") ||
491 changed.count("mon_health_to_clog_interval") ||
492 changed.count("mon_health_to_clog_tick_interval")) {
493 health_to_clog_update_conf(changed);
494 }
495
496 if (changed.count("mon_scrub_interval")) {
497 scrub_update_interval(conf->mon_scrub_interval);
498 }
499 }
500
501 void Monitor::update_log_clients()
502 {
503 map<string,string> log_to_monitors;
504 map<string,string> log_to_syslog;
505 map<string,string> log_channel;
506 map<string,string> log_prio;
507 map<string,string> log_to_graylog;
508 map<string,string> log_to_graylog_host;
509 map<string,string> log_to_graylog_port;
510 uuid_d fsid;
511 string host;
512
513 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
514 log_channel, log_prio, log_to_graylog,
515 log_to_graylog_host, log_to_graylog_port,
516 fsid, host))
517 return;
518
519 clog->update_config(log_to_monitors, log_to_syslog,
520 log_channel, log_prio, log_to_graylog,
521 log_to_graylog_host, log_to_graylog_port,
522 fsid, host);
523
524 audit_clog->update_config(log_to_monitors, log_to_syslog,
525 log_channel, log_prio, log_to_graylog,
526 log_to_graylog_host, log_to_graylog_port,
527 fsid, host);
528 }
529
530 int Monitor::sanitize_options()
531 {
532 int r = 0;
533
534 // mon_lease must be greater than mon_lease_renewal; otherwise we
535 // may incur in leases expiring before they are renewed.
536 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
537 clog->error() << "mon_lease_renew_interval_factor ("
538 << g_conf->mon_lease_renew_interval_factor
539 << ") must be less than 1.0";
540 r = -EINVAL;
541 }
542
543 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
544 // got time to renew the lease and get an ack for it. Having both options
545 // with the same value, for a given small vale, could mean timing out if
546 // the monitors happened to be overloaded -- or even under normal load for
547 // a small enough value.
548 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
549 clog->error() << "mon_lease_ack_timeout_factor ("
550 << g_conf->mon_lease_ack_timeout_factor
551 << ") must be greater than 1.0";
552 r = -EINVAL;
553 }
554
555 return r;
556 }
557
558 int Monitor::preinit()
559 {
560 lock.Lock();
561
562 dout(1) << "preinit fsid " << monmap->fsid << dendl;
563
564 int r = sanitize_options();
565 if (r < 0) {
566 derr << "option sanitization failed!" << dendl;
567 lock.Unlock();
568 return r;
569 }
570
571 assert(!logger);
572 {
573 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
574 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
575 PerfCountersBuilder::PRIO_USEFUL);
576 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
577 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
578 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
579 "srm", PerfCountersBuilder::PRIO_INTERESTING);
580 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
581 "strm", PerfCountersBuilder::PRIO_USEFUL);
582 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
583 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
584 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
585 "estt", PerfCountersBuilder::PRIO_INTERESTING);
586 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
587 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
588 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
589 "elst", PerfCountersBuilder::PRIO_INTERESTING);
590 logger = pcb.create_perf_counters();
591 cct->get_perfcounters_collection()->add(logger);
592 }
593
594 assert(!cluster_logger);
595 {
596 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
597 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
598 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
599 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
600 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
601 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
602 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
603 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
604 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
605 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
606 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
607 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
608 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
609 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
610 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
611 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
612 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
613 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
614 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
615 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
616 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
617 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
618 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
619 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
620 cluster_logger = pcb.create_perf_counters();
621 }
622
623 paxos->init_logger();
624
625 // verify cluster_uuid
626 {
627 int r = check_fsid();
628 if (r == -ENOENT)
629 r = write_fsid();
630 if (r < 0) {
631 lock.Unlock();
632 return r;
633 }
634 }
635
636 // open compatset
637 read_features();
638
639 // have we ever joined a quorum?
640 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
641 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
642
643 if (!has_ever_joined) {
644 // impose initial quorum restrictions?
645 list<string> initial_members;
646 get_str_list(g_conf->mon_initial_members, initial_members);
647
648 if (!initial_members.empty()) {
649 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
650
651 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
652 &extra_probe_peers);
653
654 dout(10) << " monmap is " << *monmap << dendl;
655 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
656 }
657 } else if (!monmap->contains(name)) {
658 derr << "not in monmap and have been in a quorum before; "
659 << "must have been removed" << dendl;
660 if (g_conf->mon_force_quorum_join) {
661 dout(0) << "we should have died but "
662 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
663 } else {
664 derr << "commit suicide!" << dendl;
665 lock.Unlock();
666 return -ENOENT;
667 }
668 }
669
670 {
671 // We have a potentially inconsistent store state in hands. Get rid of it
672 // and start fresh.
673 bool clear_store = false;
674 if (store->exists("mon_sync", "in_sync")) {
675 dout(1) << __func__ << " clean up potentially inconsistent store state"
676 << dendl;
677 clear_store = true;
678 }
679
680 if (store->get("mon_sync", "force_sync") > 0) {
681 dout(1) << __func__ << " force sync by clearing store state" << dendl;
682 clear_store = true;
683 }
684
685 if (clear_store) {
686 set<string> sync_prefixes = get_sync_targets_names();
687 store->clear(sync_prefixes);
688 }
689 }
690
691 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
692 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
693
694 init_paxos();
695 health_monitor->init();
696
697 if (is_keyring_required()) {
698 // we need to bootstrap authentication keys so we can form an
699 // initial quorum.
700 if (authmon()->get_last_committed() == 0) {
701 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
702 bufferlist bl;
703 int err = store->get("mkfs", "keyring", bl);
704 if (err == 0 && bl.length() > 0) {
705 // Attempt to decode and extract keyring only if it is found.
706 KeyRing keyring;
707 bufferlist::iterator p = bl.begin();
708 ::decode(keyring, p);
709 extract_save_mon_key(keyring);
710 }
711 }
712
713 string keyring_loc = g_conf->mon_data + "/keyring";
714
715 r = keyring.load(cct, keyring_loc);
716 if (r < 0) {
717 EntityName mon_name;
718 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
719 EntityAuth mon_key;
720 if (key_server.get_auth(mon_name, mon_key)) {
721 dout(1) << "copying mon. key from old db to external keyring" << dendl;
722 keyring.add(mon_name, mon_key);
723 bufferlist bl;
724 keyring.encode_plaintext(bl);
725 write_default_keyring(bl);
726 } else {
727 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
728 lock.Unlock();
729 return r;
730 }
731 }
732 }
733
734 admin_hook = new AdminHook(this);
735 AdminSocket* admin_socket = cct->get_admin_socket();
736
737 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
738 lock.Unlock();
739 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
740 "show current monitor status");
741 assert(r == 0);
742 r = admin_socket->register_command("quorum_status", "quorum_status",
743 admin_hook, "show current quorum status");
744 assert(r == 0);
745 r = admin_socket->register_command("sync_force",
746 "sync_force name=validate,"
747 "type=CephChoices,"
748 "strings=--yes-i-really-mean-it",
749 admin_hook,
750 "force sync of and clear monitor store");
751 assert(r == 0);
752 r = admin_socket->register_command("add_bootstrap_peer_hint",
753 "add_bootstrap_peer_hint name=addr,"
754 "type=CephIPAddr",
755 admin_hook,
756 "add peer address as potential bootstrap"
757 " peer for cluster bringup");
758 assert(r == 0);
759 r = admin_socket->register_command("quorum enter", "quorum enter",
760 admin_hook,
761 "force monitor back into quorum");
762 assert(r == 0);
763 r = admin_socket->register_command("quorum exit", "quorum exit",
764 admin_hook,
765 "force monitor out of the quorum");
766 assert(r == 0);
767 r = admin_socket->register_command("ops",
768 "ops",
769 admin_hook,
770 "show the ops currently in flight");
771 assert(r == 0);
772 r = admin_socket->register_command("sessions",
773 "sessions",
774 admin_hook,
775 "list existing sessions");
776 assert(r == 0);
777
778 lock.Lock();
779
780 // add ourselves as a conf observer
781 g_conf->add_observer(this);
782
783 lock.Unlock();
784 return 0;
785 }
786
787 int Monitor::init()
788 {
789 dout(2) << "init" << dendl;
790 Mutex::Locker l(lock);
791
792 finisher.start();
793
794 // start ticker
795 timer.init();
796 new_tick();
797
798 cpu_tp.start();
799
800 // i'm ready!
801 messenger->add_dispatcher_tail(this);
802
803 mgr_client.init();
804 mgr_messenger->add_dispatcher_tail(&mgr_client);
805 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
806
807 bootstrap();
808 // add features of myself into feature_map
809 session_map.feature_map.add_mon(con_self->get_features());
810 return 0;
811 }
812
813 void Monitor::init_paxos()
814 {
815 dout(10) << __func__ << dendl;
816 paxos->init();
817
818 // init services
819 for (int i = 0; i < PAXOS_NUM; ++i) {
820 paxos_service[i]->init();
821 }
822
823 refresh_from_paxos(NULL);
824 }
825
826 void Monitor::refresh_from_paxos(bool *need_bootstrap)
827 {
828 dout(10) << __func__ << dendl;
829
830 bufferlist bl;
831 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
832 if (r >= 0) {
833 try {
834 bufferlist::iterator p = bl.begin();
835 ::decode(fingerprint, p);
836 }
837 catch (buffer::error& e) {
838 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
839 }
840 } else {
841 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
842 }
843
844 for (int i = 0; i < PAXOS_NUM; ++i) {
845 paxos_service[i]->refresh(need_bootstrap);
846 }
847 for (int i = 0; i < PAXOS_NUM; ++i) {
848 paxos_service[i]->post_refresh();
849 }
850 load_metadata();
851 }
852
853 void Monitor::register_cluster_logger()
854 {
855 if (!cluster_logger_registered) {
856 dout(10) << "register_cluster_logger" << dendl;
857 cluster_logger_registered = true;
858 cct->get_perfcounters_collection()->add(cluster_logger);
859 } else {
860 dout(10) << "register_cluster_logger - already registered" << dendl;
861 }
862 }
863
864 void Monitor::unregister_cluster_logger()
865 {
866 if (cluster_logger_registered) {
867 dout(10) << "unregister_cluster_logger" << dendl;
868 cluster_logger_registered = false;
869 cct->get_perfcounters_collection()->remove(cluster_logger);
870 } else {
871 dout(10) << "unregister_cluster_logger - not registered" << dendl;
872 }
873 }
874
875 void Monitor::update_logger()
876 {
877 cluster_logger->set(l_cluster_num_mon, monmap->size());
878 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
879 }
880
881 void Monitor::shutdown()
882 {
883 dout(1) << "shutdown" << dendl;
884
885 lock.Lock();
886
887 wait_for_paxos_write();
888
889 state = STATE_SHUTDOWN;
890
891 g_conf->remove_observer(this);
892
893 if (admin_hook) {
894 AdminSocket* admin_socket = cct->get_admin_socket();
895 admin_socket->unregister_command("mon_status");
896 admin_socket->unregister_command("quorum_status");
897 admin_socket->unregister_command("sync_force");
898 admin_socket->unregister_command("add_bootstrap_peer_hint");
899 admin_socket->unregister_command("quorum enter");
900 admin_socket->unregister_command("quorum exit");
901 admin_socket->unregister_command("ops");
902 admin_socket->unregister_command("sessions");
903 delete admin_hook;
904 admin_hook = NULL;
905 }
906
907 elector.shutdown();
908
909 mgr_client.shutdown();
910
911 lock.Unlock();
912 finisher.wait_for_empty();
913 finisher.stop();
914 lock.Lock();
915
916 // clean up
917 paxos->shutdown();
918 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
919 (*p)->shutdown();
920 health_monitor->shutdown();
921
922 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
923 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
924
925 timer.shutdown();
926
927 cpu_tp.stop();
928
929 remove_all_sessions();
930
931 if (logger) {
932 cct->get_perfcounters_collection()->remove(logger);
933 delete logger;
934 logger = NULL;
935 }
936 if (cluster_logger) {
937 if (cluster_logger_registered)
938 cct->get_perfcounters_collection()->remove(cluster_logger);
939 delete cluster_logger;
940 cluster_logger = NULL;
941 }
942
943 log_client.shutdown();
944
945 // unlock before msgr shutdown...
946 lock.Unlock();
947
948 messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
949 mgr_messenger->shutdown();
950 }
951
952 void Monitor::wait_for_paxos_write()
953 {
954 if (paxos->is_writing() || paxos->is_writing_previous()) {
955 dout(10) << __func__ << " flushing pending write" << dendl;
956 lock.Unlock();
957 store->flush();
958 lock.Lock();
959 dout(10) << __func__ << " flushed pending write" << dendl;
960 }
961 }
962
963 void Monitor::bootstrap()
964 {
965 dout(10) << "bootstrap" << dendl;
966 wait_for_paxos_write();
967
968 sync_reset_requester();
969 unregister_cluster_logger();
970 cancel_probe_timeout();
971
972 // note my rank
973 int newrank = monmap->get_rank(messenger->get_myaddr());
974 if (newrank < 0 && rank >= 0) {
975 // was i ever part of the quorum?
976 if (has_ever_joined) {
977 dout(0) << " removed from monmap, suicide." << dendl;
978 exit(0);
979 }
980 }
981 if (newrank != rank) {
982 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
983 messenger->set_myname(entity_name_t::MON(newrank));
984 rank = newrank;
985
986 // reset all connections, or else our peers will think we are someone else.
987 messenger->mark_down_all();
988 }
989
990 // reset
991 state = STATE_PROBING;
992
993 _reset();
994
995 // sync store
996 if (g_conf->mon_compact_on_bootstrap) {
997 dout(10) << "bootstrap -- triggering compaction" << dendl;
998 store->compact();
999 dout(10) << "bootstrap -- finished compaction" << dendl;
1000 }
1001
1002 // singleton monitor?
1003 if (monmap->size() == 1 && rank == 0) {
1004 win_standalone_election();
1005 return;
1006 }
1007
1008 reset_probe_timeout();
1009
1010 // i'm outside the quorum
1011 if (monmap->contains(name))
1012 outside_quorum.insert(name);
1013
1014 // probe monitors
1015 dout(10) << "probing other monitors" << dendl;
1016 for (unsigned i = 0; i < monmap->size(); i++) {
1017 if ((int)i != rank)
1018 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1019 monmap->get_inst(i));
1020 }
1021 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1022 p != extra_probe_peers.end();
1023 ++p) {
1024 if (*p != messenger->get_myaddr()) {
1025 entity_inst_t i;
1026 i.name = entity_name_t::MON(-1);
1027 i.addr = *p;
1028 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1029 }
1030 }
1031 }
1032
1033 bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1034 {
1035 string addrstr;
1036 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1037 ss << "unable to parse address string value '"
1038 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1039 return false;
1040 }
1041 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1042 << addrstr << "'" << dendl;
1043
1044 entity_addr_t addr;
1045 const char *end = 0;
1046 if (!addr.parse(addrstr.c_str(), &end)) {
1047 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1048 return false;
1049 }
1050
1051 if (is_leader() || is_peon()) {
1052 ss << "mon already active; ignoring bootstrap hint";
1053 return true;
1054 }
1055
1056 if (addr.get_port() == 0)
1057 addr.set_port(CEPH_MON_PORT);
1058
1059 extra_probe_peers.insert(addr);
1060 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1061 return true;
1062 }
1063
1064 // called by bootstrap(), or on leader|peon -> electing
1065 void Monitor::_reset()
1066 {
1067 dout(10) << __func__ << dendl;
1068
1069 cancel_probe_timeout();
1070 timecheck_finish();
1071 health_events_cleanup();
1072 health_check_log_times.clear();
1073 scrub_event_cancel();
1074
1075 leader_since = utime_t();
1076 if (!quorum.empty()) {
1077 exited_quorum = ceph_clock_now();
1078 }
1079 quorum.clear();
1080 outside_quorum.clear();
1081 quorum_feature_map.clear();
1082
1083 scrub_reset();
1084
1085 paxos->restart();
1086
1087 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1088 (*p)->restart();
1089 health_monitor->finish();
1090 }
1091
1092
1093 // -----------------------------------------------------------
1094 // sync
1095
1096 set<string> Monitor::get_sync_targets_names()
1097 {
1098 set<string> targets;
1099 targets.insert(paxos->get_name());
1100 for (int i = 0; i < PAXOS_NUM; ++i)
1101 paxos_service[i]->get_store_prefixes(targets);
1102 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1103 assert(config_key_service_ptr);
1104 config_key_service_ptr->get_store_prefixes(targets);
1105 return targets;
1106 }
1107
1108
1109 void Monitor::sync_timeout()
1110 {
1111 dout(10) << __func__ << dendl;
1112 assert(state == STATE_SYNCHRONIZING);
1113 bootstrap();
1114 }
1115
1116 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1117 {
1118 dout(1) << __func__ << dendl;
1119
1120 MonMap latest_monmap;
1121
1122 // Grab latest monmap from MonmapMonitor
1123 bufferlist monmon_bl;
1124 int err = monmon()->get_monmap(monmon_bl);
1125 if (err < 0) {
1126 if (err != -ENOENT) {
1127 derr << __func__
1128 << " something wrong happened while reading the store: "
1129 << cpp_strerror(err) << dendl;
1130 assert(0 == "error reading the store");
1131 }
1132 } else {
1133 latest_monmap.decode(monmon_bl);
1134 }
1135
1136 // Grab last backed up monmap (if any) and compare epochs
1137 if (store->exists("mon_sync", "latest_monmap")) {
1138 bufferlist backup_bl;
1139 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1140 if (err < 0) {
1141 derr << __func__
1142 << " something wrong happened while reading the store: "
1143 << cpp_strerror(err) << dendl;
1144 assert(0 == "error reading the store");
1145 }
1146 assert(backup_bl.length() > 0);
1147
1148 MonMap backup_monmap;
1149 backup_monmap.decode(backup_bl);
1150
1151 if (backup_monmap.epoch > latest_monmap.epoch)
1152 latest_monmap = backup_monmap;
1153 }
1154
1155 // Check if our current monmap's epoch is greater than the one we've
1156 // got so far.
1157 if (monmap->epoch > latest_monmap.epoch)
1158 latest_monmap = *monmap;
1159
1160 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1161
1162 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1163 }
1164
1165 void Monitor::sync_reset_requester()
1166 {
1167 dout(10) << __func__ << dendl;
1168
1169 if (sync_timeout_event) {
1170 timer.cancel_event(sync_timeout_event);
1171 sync_timeout_event = NULL;
1172 }
1173
1174 sync_provider = entity_inst_t();
1175 sync_cookie = 0;
1176 sync_full = false;
1177 sync_start_version = 0;
1178 }
1179
1180 void Monitor::sync_reset_provider()
1181 {
1182 dout(10) << __func__ << dendl;
1183 sync_providers.clear();
1184 }
1185
1186 void Monitor::sync_start(entity_inst_t &other, bool full)
1187 {
1188 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1189
1190 assert(state == STATE_PROBING ||
1191 state == STATE_SYNCHRONIZING);
1192 state = STATE_SYNCHRONIZING;
1193
1194 // make sure are not a provider for anyone!
1195 sync_reset_provider();
1196
1197 sync_full = full;
1198
1199 if (sync_full) {
1200 // stash key state, and mark that we are syncing
1201 auto t(std::make_shared<MonitorDBStore::Transaction>());
1202 sync_stash_critical_state(t);
1203 t->put("mon_sync", "in_sync", 1);
1204
1205 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1206 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1207 << sync_last_committed_floor << dendl;
1208 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1209
1210 store->apply_transaction(t);
1211
1212 assert(g_conf->mon_sync_requester_kill_at != 1);
1213
1214 // clear the underlying store
1215 set<string> targets = get_sync_targets_names();
1216 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1217 store->clear(targets);
1218
1219 // make sure paxos knows it has been reset. this prevents a
1220 // bootstrap and then different probe reply order from possibly
1221 // deciding a partial or no sync is needed.
1222 paxos->init();
1223
1224 assert(g_conf->mon_sync_requester_kill_at != 2);
1225 }
1226
1227 // assume 'other' as the leader. We will update the leader once we receive
1228 // a reply to the sync start.
1229 sync_provider = other;
1230
1231 sync_reset_timeout();
1232
1233 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1234 if (!sync_full)
1235 m->last_committed = paxos->get_version();
1236 messenger->send_message(m, sync_provider);
1237 }
1238
1239 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1240 {
1241 dout(10) << __func__ << dendl;
1242 bufferlist backup_monmap;
1243 sync_obtain_latest_monmap(backup_monmap);
1244 assert(backup_monmap.length() > 0);
1245 t->put("mon_sync", "latest_monmap", backup_monmap);
1246 }
1247
1248 void Monitor::sync_reset_timeout()
1249 {
1250 dout(10) << __func__ << dendl;
1251 if (sync_timeout_event)
1252 timer.cancel_event(sync_timeout_event);
1253 sync_timeout_event = timer.add_event_after(
1254 g_conf->mon_sync_timeout,
1255 new C_MonContext(this, [this](int) {
1256 sync_timeout();
1257 }));
1258 }
1259
1260 void Monitor::sync_finish(version_t last_committed)
1261 {
1262 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1263
1264 assert(g_conf->mon_sync_requester_kill_at != 7);
1265
1266 if (sync_full) {
1267 // finalize the paxos commits
1268 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1269 paxos->read_and_prepare_transactions(tx, sync_start_version,
1270 last_committed);
1271 tx->put(paxos->get_name(), "last_committed", last_committed);
1272
1273 dout(30) << __func__ << " final tx dump:\n";
1274 JSONFormatter f(true);
1275 tx->dump(&f);
1276 f.flush(*_dout);
1277 *_dout << dendl;
1278
1279 store->apply_transaction(tx);
1280 }
1281
1282 assert(g_conf->mon_sync_requester_kill_at != 8);
1283
1284 auto t(std::make_shared<MonitorDBStore::Transaction>());
1285 t->erase("mon_sync", "in_sync");
1286 t->erase("mon_sync", "force_sync");
1287 t->erase("mon_sync", "last_committed_floor");
1288 store->apply_transaction(t);
1289
1290 assert(g_conf->mon_sync_requester_kill_at != 9);
1291
1292 init_paxos();
1293
1294 assert(g_conf->mon_sync_requester_kill_at != 10);
1295
1296 bootstrap();
1297 }
1298
1299 void Monitor::handle_sync(MonOpRequestRef op)
1300 {
1301 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1302 dout(10) << __func__ << " " << *m << dendl;
1303 switch (m->op) {
1304
1305 // provider ---------
1306
1307 case MMonSync::OP_GET_COOKIE_FULL:
1308 case MMonSync::OP_GET_COOKIE_RECENT:
1309 handle_sync_get_cookie(op);
1310 break;
1311 case MMonSync::OP_GET_CHUNK:
1312 handle_sync_get_chunk(op);
1313 break;
1314
1315 // client -----------
1316
1317 case MMonSync::OP_COOKIE:
1318 handle_sync_cookie(op);
1319 break;
1320
1321 case MMonSync::OP_CHUNK:
1322 case MMonSync::OP_LAST_CHUNK:
1323 handle_sync_chunk(op);
1324 break;
1325 case MMonSync::OP_NO_COOKIE:
1326 handle_sync_no_cookie(op);
1327 break;
1328
1329 default:
1330 dout(0) << __func__ << " unknown op " << m->op << dendl;
1331 assert(0 == "unknown op");
1332 }
1333 }
1334
1335 // leader
1336
1337 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1338 {
1339 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1340 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1341 m->get_connection()->send_message(reply);
1342 }
1343
1344 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1345 {
1346 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1347 if (is_synchronizing()) {
1348 _sync_reply_no_cookie(op);
1349 return;
1350 }
1351
1352 assert(g_conf->mon_sync_provider_kill_at != 1);
1353
1354 // make sure they can understand us.
1355 if ((required_features ^ m->get_connection()->get_features()) &
1356 required_features) {
1357 dout(5) << " ignoring peer mon." << m->get_source().num()
1358 << " has features " << std::hex
1359 << m->get_connection()->get_features()
1360 << " but we require " << required_features << std::dec << dendl;
1361 return;
1362 }
1363
1364 // make up a unique cookie. include election epoch (which persists
1365 // across restarts for the whole cluster) and a counter for this
1366 // process instance. there is no need to be unique *across*
1367 // monitors, though.
1368 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1369 assert(sync_providers.count(cookie) == 0);
1370
1371 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1372
1373 SyncProvider& sp = sync_providers[cookie];
1374 sp.cookie = cookie;
1375 sp.entity = m->get_source_inst();
1376 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1377
1378 set<string> sync_targets;
1379 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1380 // full scan
1381 sync_targets = get_sync_targets_names();
1382 sp.last_committed = paxos->get_version();
1383 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1384 sp.full = true;
1385 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1386 } else {
1387 // just catch up paxos
1388 sp.last_committed = m->last_committed;
1389 }
1390 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1391
1392 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1393 reply->last_committed = sp.last_committed;
1394 m->get_connection()->send_message(reply);
1395 }
1396
1397 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1398 {
1399 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1400 dout(10) << __func__ << " " << *m << dendl;
1401
1402 if (sync_providers.count(m->cookie) == 0) {
1403 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1404 _sync_reply_no_cookie(op);
1405 return;
1406 }
1407
1408 assert(g_conf->mon_sync_provider_kill_at != 2);
1409
1410 SyncProvider& sp = sync_providers[m->cookie];
1411 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1412
1413 if (sp.last_committed < paxos->get_first_committed() &&
1414 paxos->get_first_committed() > 1) {
1415 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1416 << " < our fc " << paxos->get_first_committed() << dendl;
1417 sync_providers.erase(m->cookie);
1418 _sync_reply_no_cookie(op);
1419 return;
1420 }
1421
1422 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1423 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1424
1425 int left = g_conf->mon_sync_max_payload_size;
1426 while (sp.last_committed < paxos->get_version() && left > 0) {
1427 bufferlist bl;
1428 sp.last_committed++;
1429
1430 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1431 assert(err == 0);
1432
1433 tx->put(paxos->get_name(), sp.last_committed, bl);
1434 left -= bl.length();
1435 dout(20) << __func__ << " including paxos state " << sp.last_committed
1436 << dendl;
1437 }
1438 reply->last_committed = sp.last_committed;
1439
1440 if (sp.full && left > 0) {
1441 sp.synchronizer->get_chunk_tx(tx, left);
1442 sp.last_key = sp.synchronizer->get_last_key();
1443 reply->last_key = sp.last_key;
1444 }
1445
1446 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1447 sp.last_committed < paxos->get_version()) {
1448 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1449 << " key " << sp.last_key << dendl;
1450 } else {
1451 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1452 << " key " << sp.last_key << dendl;
1453 reply->op = MMonSync::OP_LAST_CHUNK;
1454
1455 assert(g_conf->mon_sync_provider_kill_at != 3);
1456
1457 // clean up our local state
1458 sync_providers.erase(sp.cookie);
1459 }
1460
1461 ::encode(*tx, reply->chunk_bl);
1462
1463 m->get_connection()->send_message(reply);
1464 }
1465
1466 // requester
1467
1468 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1469 {
1470 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1471 dout(10) << __func__ << " " << *m << dendl;
1472 if (sync_cookie) {
1473 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1474 return;
1475 }
1476 if (m->get_source_inst() != sync_provider) {
1477 dout(10) << __func__ << " source does not match, discarding" << dendl;
1478 return;
1479 }
1480 sync_cookie = m->cookie;
1481 sync_start_version = m->last_committed;
1482
1483 sync_reset_timeout();
1484 sync_get_next_chunk();
1485
1486 assert(g_conf->mon_sync_requester_kill_at != 3);
1487 }
1488
1489 void Monitor::sync_get_next_chunk()
1490 {
1491 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1492 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1493 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1494 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1495 }
1496 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1497 messenger->send_message(r, sync_provider);
1498
1499 assert(g_conf->mon_sync_requester_kill_at != 4);
1500 }
1501
1502 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1503 {
1504 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1505 dout(10) << __func__ << " " << *m << dendl;
1506
1507 if (m->cookie != sync_cookie) {
1508 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1509 return;
1510 }
1511 if (m->get_source_inst() != sync_provider) {
1512 dout(10) << __func__ << " source does not match, discarding" << dendl;
1513 return;
1514 }
1515
1516 assert(state == STATE_SYNCHRONIZING);
1517 assert(g_conf->mon_sync_requester_kill_at != 5);
1518
1519 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1520 tx->append_from_encoded(m->chunk_bl);
1521
1522 dout(30) << __func__ << " tx dump:\n";
1523 JSONFormatter f(true);
1524 tx->dump(&f);
1525 f.flush(*_dout);
1526 *_dout << dendl;
1527
1528 store->apply_transaction(tx);
1529
1530 assert(g_conf->mon_sync_requester_kill_at != 6);
1531
1532 if (!sync_full) {
1533 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1534 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1535 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1536 m->last_committed);
1537 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1538
1539 dout(30) << __func__ << " tx dump:\n";
1540 JSONFormatter f(true);
1541 tx->dump(&f);
1542 f.flush(*_dout);
1543 *_dout << dendl;
1544
1545 store->apply_transaction(tx);
1546 paxos->init(); // to refresh what we just wrote
1547 }
1548
1549 if (m->op == MMonSync::OP_CHUNK) {
1550 sync_reset_timeout();
1551 sync_get_next_chunk();
1552 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1553 sync_finish(m->last_committed);
1554 }
1555 }
1556
1557 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1558 {
1559 dout(10) << __func__ << dendl;
1560 bootstrap();
1561 }
1562
1563 void Monitor::sync_trim_providers()
1564 {
1565 dout(20) << __func__ << dendl;
1566
1567 utime_t now = ceph_clock_now();
1568 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1569 while (p != sync_providers.end()) {
1570 if (now > p->second.timeout) {
1571 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1572 sync_providers.erase(p++);
1573 } else {
1574 ++p;
1575 }
1576 }
1577 }
1578
1579 // ---------------------------------------------------
1580 // probe
1581
1582 void Monitor::cancel_probe_timeout()
1583 {
1584 if (probe_timeout_event) {
1585 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1586 timer.cancel_event(probe_timeout_event);
1587 probe_timeout_event = NULL;
1588 } else {
1589 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1590 }
1591 }
1592
1593 void Monitor::reset_probe_timeout()
1594 {
1595 cancel_probe_timeout();
1596 probe_timeout_event = new C_MonContext(this, [this](int r) {
1597 probe_timeout(r);
1598 });
1599 double t = g_conf->mon_probe_timeout;
1600 if (timer.add_event_after(t, probe_timeout_event)) {
1601 dout(10) << "reset_probe_timeout " << probe_timeout_event
1602 << " after " << t << " seconds" << dendl;
1603 } else {
1604 probe_timeout_event = nullptr;
1605 }
1606 }
1607
1608 void Monitor::probe_timeout(int r)
1609 {
1610 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1611 assert(is_probing() || is_synchronizing());
1612 assert(probe_timeout_event);
1613 probe_timeout_event = NULL;
1614 bootstrap();
1615 }
1616
1617 void Monitor::handle_probe(MonOpRequestRef op)
1618 {
1619 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1620 dout(10) << "handle_probe " << *m << dendl;
1621
1622 if (m->fsid != monmap->fsid) {
1623 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1624 return;
1625 }
1626
1627 switch (m->op) {
1628 case MMonProbe::OP_PROBE:
1629 handle_probe_probe(op);
1630 break;
1631
1632 case MMonProbe::OP_REPLY:
1633 handle_probe_reply(op);
1634 break;
1635
1636 case MMonProbe::OP_MISSING_FEATURES:
1637 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1638 << ", required " << m->required_features
1639 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1640 << dendl;
1641 break;
1642 }
1643 }
1644
1645 void Monitor::handle_probe_probe(MonOpRequestRef op)
1646 {
1647 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1648
1649 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1650 << " features " << m->get_connection()->get_features() << dendl;
1651 uint64_t missing = required_features & ~m->get_connection()->get_features();
1652 if (missing) {
1653 dout(1) << " peer " << m->get_source_addr() << " missing features "
1654 << missing << dendl;
1655 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1656 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1657 name, has_ever_joined);
1658 m->required_features = required_features;
1659 m->get_connection()->send_message(r);
1660 }
1661 goto out;
1662 }
1663
1664 if (!is_probing() && !is_synchronizing()) {
1665 // If the probing mon is way ahead of us, we need to re-bootstrap.
1666 // Normally we capture this case when we initially bootstrap, but
1667 // it is possible we pass those checks (we overlap with
1668 // quorum-to-be) but fail to join a quorum before it moves past
1669 // us. We need to be kicked back to bootstrap so we can
1670 // synchonize, not keep calling elections.
1671 if (paxos->get_version() + 1 < m->paxos_first_version) {
1672 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1673 << "ahead of us, re-bootstrapping" << dendl;
1674 bootstrap();
1675 goto out;
1676
1677 }
1678 }
1679
1680 MMonProbe *r;
1681 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1682 r->name = name;
1683 r->quorum = quorum;
1684 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1685 r->paxos_first_version = paxos->get_first_committed();
1686 r->paxos_last_version = paxos->get_version();
1687 m->get_connection()->send_message(r);
1688
1689 // did we discover a peer here?
1690 if (!monmap->contains(m->get_source_addr())) {
1691 dout(1) << " adding peer " << m->get_source_addr()
1692 << " to list of hints" << dendl;
1693 extra_probe_peers.insert(m->get_source_addr());
1694 }
1695
1696 out:
1697 return;
1698 }
1699
1700 void Monitor::handle_probe_reply(MonOpRequestRef op)
1701 {
1702 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1703 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1704 dout(10) << " monmap is " << *monmap << dendl;
1705
1706 // discover name and addrs during probing or electing states.
1707 if (!is_probing() && !is_electing()) {
1708 return;
1709 }
1710
1711 // newer map, or they've joined a quorum and we haven't?
1712 bufferlist mybl;
1713 monmap->encode(mybl, m->get_connection()->get_features());
1714 // make sure it's actually different; the checks below err toward
1715 // taking the other guy's map, which could cause us to loop.
1716 if (!mybl.contents_equal(m->monmap_bl)) {
1717 MonMap *newmap = new MonMap;
1718 newmap->decode(m->monmap_bl);
1719 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1720 !has_ever_joined)) {
1721 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1722 << ", mine was " << monmap->get_epoch() << dendl;
1723 delete newmap;
1724 monmap->decode(m->monmap_bl);
1725
1726 bootstrap();
1727 return;
1728 }
1729 delete newmap;
1730 }
1731
1732 // rename peer?
1733 string peer_name = monmap->get_name(m->get_source_addr());
1734 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1735 dout(10) << " renaming peer " << m->get_source_addr() << " "
1736 << peer_name << " -> " << m->name << " in my monmap"
1737 << dendl;
1738 monmap->rename(peer_name, m->name);
1739
1740 if (is_electing()) {
1741 bootstrap();
1742 return;
1743 }
1744 } else {
1745 dout(10) << " peer name is " << peer_name << dendl;
1746 }
1747
1748 // new initial peer?
1749 if (monmap->get_epoch() == 0 &&
1750 monmap->contains(m->name) &&
1751 monmap->get_addr(m->name).is_blank_ip()) {
1752 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1753 monmap->set_addr(m->name, m->get_source_addr());
1754
1755 bootstrap();
1756 return;
1757 }
1758
1759 // end discover phase
1760 if (!is_probing()) {
1761 return;
1762 }
1763
1764 assert(paxos != NULL);
1765
1766 if (is_synchronizing()) {
1767 dout(10) << " currently syncing" << dendl;
1768 return;
1769 }
1770
1771 entity_inst_t other = m->get_source_inst();
1772
1773 if (m->paxos_last_version < sync_last_committed_floor) {
1774 dout(10) << " peer paxos versions [" << m->paxos_first_version
1775 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1776 << sync_last_committed_floor << ", ignoring"
1777 << dendl;
1778 } else {
1779 if (paxos->get_version() < m->paxos_first_version &&
1780 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1781 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1782 << "," << m->paxos_last_version << "]"
1783 << " vs my version " << paxos->get_version()
1784 << " (too far ahead)"
1785 << dendl;
1786 cancel_probe_timeout();
1787 sync_start(other, true);
1788 return;
1789 }
1790 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1791 dout(10) << " peer paxos last version " << m->paxos_last_version
1792 << " vs my version " << paxos->get_version()
1793 << " (too far ahead)"
1794 << dendl;
1795 cancel_probe_timeout();
1796 sync_start(other, false);
1797 return;
1798 }
1799 }
1800
1801 // is there an existing quorum?
1802 if (m->quorum.size()) {
1803 dout(10) << " existing quorum " << m->quorum << dendl;
1804
1805 dout(10) << " peer paxos version " << m->paxos_last_version
1806 << " vs my version " << paxos->get_version()
1807 << " (ok)"
1808 << dendl;
1809
1810 if (monmap->contains(name) &&
1811 !monmap->get_addr(name).is_blank_ip()) {
1812 // i'm part of the cluster; just initiate a new election
1813 start_election();
1814 } else {
1815 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1816 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1817 monmap->get_inst(*m->quorum.begin()));
1818 }
1819 } else {
1820 if (monmap->contains(m->name)) {
1821 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1822 outside_quorum.insert(m->name);
1823 } else {
1824 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1825 return;
1826 }
1827
1828 unsigned need = monmap->size() / 2 + 1;
1829 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1830 if (outside_quorum.size() >= need) {
1831 if (outside_quorum.count(name)) {
1832 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1833 start_election();
1834 } else {
1835 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1836 }
1837 } else {
1838 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1839 }
1840 }
1841 }
1842
1843 void Monitor::join_election()
1844 {
1845 dout(10) << __func__ << dendl;
1846 wait_for_paxos_write();
1847 _reset();
1848 state = STATE_ELECTING;
1849
1850 logger->inc(l_mon_num_elections);
1851 }
1852
1853 void Monitor::start_election()
1854 {
1855 dout(10) << "start_election" << dendl;
1856 wait_for_paxos_write();
1857 _reset();
1858 state = STATE_ELECTING;
1859
1860 logger->inc(l_mon_num_elections);
1861 logger->inc(l_mon_election_call);
1862
1863 clog->info() << "mon." << name << " calling new monitor election";
1864 elector.call_election();
1865 }
1866
1867 void Monitor::win_standalone_election()
1868 {
1869 dout(1) << "win_standalone_election" << dendl;
1870
1871 // bump election epoch, in case the previous epoch included other
1872 // monitors; we need to be able to make the distinction.
1873 elector.init();
1874 elector.advance_epoch();
1875
1876 rank = monmap->get_rank(name);
1877 assert(rank == 0);
1878 set<int> q;
1879 q.insert(rank);
1880
1881 map<int,Metadata> metadata;
1882 collect_metadata(&metadata[0]);
1883
1884 win_election(elector.get_epoch(), q,
1885 CEPH_FEATURES_ALL,
1886 ceph::features::mon::get_supported(),
1887 metadata);
1888 }
1889
1890 const utime_t& Monitor::get_leader_since() const
1891 {
1892 assert(state == STATE_LEADER);
1893 return leader_since;
1894 }
1895
1896 epoch_t Monitor::get_epoch()
1897 {
1898 return elector.get_epoch();
1899 }
1900
1901 void Monitor::_finish_svc_election()
1902 {
1903 assert(state == STATE_LEADER || state == STATE_PEON);
1904
1905 for (auto p : paxos_service) {
1906 // we already called election_finished() on monmon(); avoid callig twice
1907 if (state == STATE_LEADER && p == monmon())
1908 continue;
1909 p->election_finished();
1910 }
1911 }
1912
1913 void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1914 const mon_feature_t& mon_features,
1915 const map<int,Metadata>& metadata)
1916 {
1917 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1918 << " features " << features
1919 << " mon_features " << mon_features
1920 << dendl;
1921 assert(is_electing());
1922 state = STATE_LEADER;
1923 leader_since = ceph_clock_now();
1924 leader = rank;
1925 quorum = active;
1926 quorum_con_features = features;
1927 quorum_mon_features = mon_features;
1928 pending_metadata = metadata;
1929 outside_quorum.clear();
1930
1931 clog->info() << "mon." << name << "@" << rank
1932 << " won leader election with quorum " << quorum;
1933
1934 set_leader_commands(get_local_commands(mon_features));
1935
1936 paxos->leader_init();
1937 // NOTE: tell monmap monitor first. This is important for the
1938 // bootstrap case to ensure that the very first paxos proposal
1939 // codifies the monmap. Otherwise any manner of chaos can ensue
1940 // when monitors are call elections or participating in a paxos
1941 // round without agreeing on who the participants are.
1942 monmon()->election_finished();
1943 _finish_svc_election();
1944 health_monitor->start(epoch);
1945
1946 logger->inc(l_mon_election_win);
1947
1948 // inject new metadata in first transaction.
1949 {
1950 // include previous metadata for missing mons (that aren't part of
1951 // the current quorum).
1952 map<int,Metadata> m = metadata;
1953 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1954 if (m.count(rank) == 0 &&
1955 mon_metadata.count(rank)) {
1956 m[rank] = mon_metadata[rank];
1957 }
1958 }
1959
1960 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1961 // a new transaction immediately after the election finishes. We should
1962 // do that anyway for other reasons, though.
1963 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1964 bufferlist bl;
1965 ::encode(m, bl);
1966 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1967 }
1968
1969 finish_election();
1970 if (monmap->size() > 1 &&
1971 monmap->get_epoch() > 0) {
1972 timecheck_start();
1973 health_tick_start();
1974 do_health_to_clog_interval();
1975 scrub_event_start();
1976 }
1977 }
1978
1979 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1980 uint64_t features,
1981 const mon_feature_t& mon_features)
1982 {
1983 state = STATE_PEON;
1984 leader_since = utime_t();
1985 leader = l;
1986 quorum = q;
1987 outside_quorum.clear();
1988 quorum_con_features = features;
1989 quorum_mon_features = mon_features;
1990 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
1991 << " quorum is " << quorum << " features are " << quorum_con_features
1992 << " mon_features are " << quorum_mon_features
1993 << dendl;
1994
1995 paxos->peon_init();
1996 _finish_svc_election();
1997 health_monitor->start(epoch);
1998
1999 logger->inc(l_mon_election_lose);
2000
2001 finish_election();
2002
2003 if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
2004 !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
2005 // for pre-luminous mons only
2006 Metadata sys_info;
2007 collect_metadata(&sys_info);
2008 messenger->send_message(new MMonMetadata(sys_info),
2009 monmap->get_inst(get_leader()));
2010 }
2011 }
2012
2013 void Monitor::collect_metadata(Metadata *m)
2014 {
2015 collect_sys_info(m, g_ceph_context);
2016 (*m)["addr"] = stringify(messenger->get_myaddr());
2017 }
2018
2019 void Monitor::finish_election()
2020 {
2021 apply_quorum_to_compatset_features();
2022 apply_monmap_to_compatset_features();
2023 timecheck_finish();
2024 exited_quorum = utime_t();
2025 finish_contexts(g_ceph_context, waitfor_quorum);
2026 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2027 resend_routed_requests();
2028 update_logger();
2029 register_cluster_logger();
2030
2031 // am i named properly?
2032 string cur_name = monmap->get_name(messenger->get_myaddr());
2033 if (cur_name != name) {
2034 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2035 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2036 monmap->get_inst(*quorum.begin()));
2037 }
2038 }
2039
2040 void Monitor::_apply_compatset_features(CompatSet &new_features)
2041 {
2042 if (new_features.compare(features) != 0) {
2043 CompatSet diff = features.unsupported(new_features);
2044 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2045 features = new_features;
2046
2047 auto t = std::make_shared<MonitorDBStore::Transaction>();
2048 write_features(t);
2049 store->apply_transaction(t);
2050
2051 calc_quorum_requirements();
2052 }
2053 }
2054
2055 void Monitor::apply_quorum_to_compatset_features()
2056 {
2057 CompatSet new_features(features);
2058 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2059 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2060 }
2061 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2062 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2063 }
2064 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2065 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2066 }
2067 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2068 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2069 }
2070 dout(5) << __func__ << dendl;
2071 _apply_compatset_features(new_features);
2072 }
2073
2074 void Monitor::apply_monmap_to_compatset_features()
2075 {
2076 CompatSet new_features(features);
2077 mon_feature_t monmap_features = monmap->get_required_features();
2078
2079 /* persistent monmap features may go into the compatset.
2080 * optional monmap features may not - why?
2081 * because optional monmap features may be set/unset by the admin,
2082 * and possibly by other means that haven't yet been thought out,
2083 * so we can't make the monitor enforce them on start - because they
2084 * may go away.
2085 * this, of course, does not invalidate setting a compatset feature
2086 * for an optional feature - as long as you make sure to clean it up
2087 * once you unset it.
2088 */
2089 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2090 assert(ceph::features::mon::get_persistent().contains_all(
2091 ceph::features::mon::FEATURE_KRAKEN));
2092 // this feature should only ever be set if the quorum supports it.
2093 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2094 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2095 }
2096 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2097 assert(ceph::features::mon::get_persistent().contains_all(
2098 ceph::features::mon::FEATURE_LUMINOUS));
2099 // this feature should only ever be set if the quorum supports it.
2100 assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2101 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2102 }
2103
2104 dout(5) << __func__ << dendl;
2105 _apply_compatset_features(new_features);
2106 }
2107
2108 void Monitor::calc_quorum_requirements()
2109 {
2110 required_features = 0;
2111
2112 // compatset
2113 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2114 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2115 }
2116 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2117 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2118 }
2119 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2120 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2121 }
2122 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2123 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2124 }
2125 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2126 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2127 }
2128 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2129 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2130 }
2131
2132 // monmap
2133 if (monmap->get_required_features().contains_all(
2134 ceph::features::mon::FEATURE_KRAKEN)) {
2135 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2136 }
2137 if (monmap->get_required_features().contains_all(
2138 ceph::features::mon::FEATURE_LUMINOUS)) {
2139 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2140 }
2141 dout(10) << __func__ << " required_features " << required_features << dendl;
2142 }
2143
2144 void Monitor::get_combined_feature_map(FeatureMap *fm)
2145 {
2146 *fm += session_map.feature_map;
2147 for (auto id : quorum) {
2148 if (id != rank) {
2149 *fm += quorum_feature_map[id];
2150 }
2151 }
2152 }
2153
2154 void Monitor::sync_force(Formatter *f, ostream& ss)
2155 {
2156 bool free_formatter = false;
2157
2158 if (!f) {
2159 // louzy/lazy hack: default to json if no formatter has been defined
2160 f = new JSONFormatter();
2161 free_formatter = true;
2162 }
2163
2164 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2165 sync_stash_critical_state(tx);
2166 tx->put("mon_sync", "force_sync", 1);
2167 store->apply_transaction(tx);
2168
2169 f->open_object_section("sync_force");
2170 f->dump_int("ret", 0);
2171 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2172 f->close_section(); // sync_force
2173 f->flush(ss);
2174 if (free_formatter)
2175 delete f;
2176 }
2177
2178 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2179 {
2180 bool free_formatter = false;
2181
2182 if (!f) {
2183 // louzy/lazy hack: default to json if no formatter has been defined
2184 f = new JSONFormatter();
2185 free_formatter = true;
2186 }
2187 f->open_object_section("quorum_status");
2188 f->dump_int("election_epoch", get_epoch());
2189
2190 f->open_array_section("quorum");
2191 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2192 f->dump_int("mon", *p);
2193 f->close_section(); // quorum
2194
2195 list<string> quorum_names = get_quorum_names();
2196 f->open_array_section("quorum_names");
2197 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2198 f->dump_string("mon", *p);
2199 f->close_section(); // quorum_names
2200
2201 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2202
2203 f->open_object_section("monmap");
2204 monmap->dump(f);
2205 f->close_section(); // monmap
2206
2207 f->close_section(); // quorum_status
2208 f->flush(ss);
2209 if (free_formatter)
2210 delete f;
2211 }
2212
2213 void Monitor::get_mon_status(Formatter *f, ostream& ss)
2214 {
2215 bool free_formatter = false;
2216
2217 if (!f) {
2218 // louzy/lazy hack: default to json if no formatter has been defined
2219 f = new JSONFormatter();
2220 free_formatter = true;
2221 }
2222
2223 f->open_object_section("mon_status");
2224 f->dump_string("name", name);
2225 f->dump_int("rank", rank);
2226 f->dump_string("state", get_state_name());
2227 f->dump_int("election_epoch", get_epoch());
2228
2229 f->open_array_section("quorum");
2230 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2231 f->dump_int("mon", *p);
2232 }
2233
2234 f->close_section(); // quorum
2235
2236 f->open_object_section("features");
2237 f->dump_stream("required_con") << required_features;
2238 mon_feature_t req_mon_features = get_required_mon_features();
2239 req_mon_features.dump(f, "required_mon");
2240 f->dump_stream("quorum_con") << quorum_con_features;
2241 quorum_mon_features.dump(f, "quorum_mon");
2242 f->close_section(); // features
2243
2244 f->open_array_section("outside_quorum");
2245 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2246 f->dump_string("mon", *p);
2247 f->close_section(); // outside_quorum
2248
2249 f->open_array_section("extra_probe_peers");
2250 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2251 p != extra_probe_peers.end();
2252 ++p)
2253 f->dump_stream("peer") << *p;
2254 f->close_section(); // extra_probe_peers
2255
2256 f->open_array_section("sync_provider");
2257 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2258 p != sync_providers.end();
2259 ++p) {
2260 f->dump_unsigned("cookie", p->second.cookie);
2261 f->dump_stream("entity") << p->second.entity;
2262 f->dump_stream("timeout") << p->second.timeout;
2263 f->dump_unsigned("last_committed", p->second.last_committed);
2264 f->dump_stream("last_key") << p->second.last_key;
2265 }
2266 f->close_section();
2267
2268 if (is_synchronizing()) {
2269 f->open_object_section("sync");
2270 f->dump_stream("sync_provider") << sync_provider;
2271 f->dump_unsigned("sync_cookie", sync_cookie);
2272 f->dump_unsigned("sync_start_version", sync_start_version);
2273 f->close_section();
2274 }
2275
2276 if (g_conf->mon_sync_provider_kill_at > 0)
2277 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2278 if (g_conf->mon_sync_requester_kill_at > 0)
2279 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2280
2281 f->open_object_section("monmap");
2282 monmap->dump(f);
2283 f->close_section();
2284
2285 f->dump_object("feature_map", session_map.feature_map);
2286 f->close_section(); // mon_status
2287
2288 if (free_formatter) {
2289 // flush formatter to ss and delete it iff we created the formatter
2290 f->flush(ss);
2291 delete f;
2292 }
2293 }
2294
2295
2296 // health status to clog
2297
2298 void Monitor::health_tick_start()
2299 {
2300 if (!cct->_conf->mon_health_to_clog ||
2301 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2302 return;
2303
2304 dout(15) << __func__ << dendl;
2305
2306 health_tick_stop();
2307 health_tick_event = timer.add_event_after(
2308 cct->_conf->mon_health_to_clog_tick_interval,
2309 new C_MonContext(this, [this](int r) {
2310 if (r < 0)
2311 return;
2312 do_health_to_clog();
2313 health_tick_start();
2314 }));
2315 }
2316
2317 void Monitor::health_tick_stop()
2318 {
2319 dout(15) << __func__ << dendl;
2320
2321 if (health_tick_event) {
2322 timer.cancel_event(health_tick_event);
2323 health_tick_event = NULL;
2324 }
2325 }
2326
2327 utime_t Monitor::health_interval_calc_next_update()
2328 {
2329 utime_t now = ceph_clock_now();
2330
2331 time_t secs = now.sec();
2332 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2333 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2334 utime_t next = utime_t(secs + adjustment, 0);
2335
2336 dout(20) << __func__
2337 << " now: " << now << ","
2338 << " next: " << next << ","
2339 << " interval: " << cct->_conf->mon_health_to_clog_interval
2340 << dendl;
2341
2342 return next;
2343 }
2344
2345 void Monitor::health_interval_start()
2346 {
2347 dout(15) << __func__ << dendl;
2348
2349 if (!cct->_conf->mon_health_to_clog ||
2350 cct->_conf->mon_health_to_clog_interval <= 0) {
2351 return;
2352 }
2353
2354 health_interval_stop();
2355 utime_t next = health_interval_calc_next_update();
2356 health_interval_event = new C_MonContext(this, [this](int r) {
2357 if (r < 0)
2358 return;
2359 do_health_to_clog_interval();
2360 });
2361 if (!timer.add_event_at(next, health_interval_event)) {
2362 health_interval_event = nullptr;
2363 }
2364 }
2365
2366 void Monitor::health_interval_stop()
2367 {
2368 dout(15) << __func__ << dendl;
2369 if (health_interval_event) {
2370 timer.cancel_event(health_interval_event);
2371 }
2372 health_interval_event = NULL;
2373 }
2374
2375 void Monitor::health_events_cleanup()
2376 {
2377 health_tick_stop();
2378 health_interval_stop();
2379 health_status_cache.reset();
2380 }
2381
2382 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2383 {
2384 dout(20) << __func__ << dendl;
2385
2386 if (changed.count("mon_health_to_clog")) {
2387 if (!cct->_conf->mon_health_to_clog) {
2388 health_events_cleanup();
2389 } else {
2390 if (!health_tick_event) {
2391 health_tick_start();
2392 }
2393 if (!health_interval_event) {
2394 health_interval_start();
2395 }
2396 }
2397 }
2398
2399 if (changed.count("mon_health_to_clog_interval")) {
2400 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2401 health_interval_stop();
2402 } else {
2403 health_interval_start();
2404 }
2405 }
2406
2407 if (changed.count("mon_health_to_clog_tick_interval")) {
2408 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2409 health_tick_stop();
2410 } else {
2411 health_tick_start();
2412 }
2413 }
2414 }
2415
2416 void Monitor::do_health_to_clog_interval()
2417 {
2418 // outputting to clog may have been disabled in the conf
2419 // since we were scheduled.
2420 if (!cct->_conf->mon_health_to_clog ||
2421 cct->_conf->mon_health_to_clog_interval <= 0)
2422 return;
2423
2424 dout(10) << __func__ << dendl;
2425
2426 // do we have a cached value for next_clog_update? if not,
2427 // do we know when the last update was?
2428
2429 do_health_to_clog(true);
2430 health_interval_start();
2431 }
2432
2433 void Monitor::do_health_to_clog(bool force)
2434 {
2435 // outputting to clog may have been disabled in the conf
2436 // since we were scheduled.
2437 if (!cct->_conf->mon_health_to_clog ||
2438 cct->_conf->mon_health_to_clog_interval <= 0)
2439 return;
2440
2441 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2442
2443 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2444 string summary;
2445 health_status_t level = get_health_status(false, nullptr, &summary);
2446 if (!force &&
2447 summary == health_status_cache.summary &&
2448 level == health_status_cache.overall)
2449 return;
2450 clog->health(level) << "overall " << summary;
2451 health_status_cache.summary = summary;
2452 health_status_cache.overall = level;
2453 } else {
2454 // for jewel only
2455 list<string> status;
2456 health_status_t overall = get_health(status, NULL, NULL);
2457 dout(25) << __func__
2458 << (force ? " (force)" : "")
2459 << dendl;
2460
2461 string summary = joinify(status.begin(), status.end(), string("; "));
2462
2463 if (!force &&
2464 overall == health_status_cache.overall &&
2465 !health_status_cache.summary.empty() &&
2466 health_status_cache.summary == summary) {
2467 // we got a dup!
2468 return;
2469 }
2470
2471 clog->info() << summary;
2472
2473 health_status_cache.overall = overall;
2474 health_status_cache.summary = summary;
2475 }
2476 }
2477
2478 health_status_t Monitor::get_health_status(
2479 bool want_detail,
2480 Formatter *f,
2481 std::string *plain,
2482 const char *sep1,
2483 const char *sep2)
2484 {
2485 health_status_t r = HEALTH_OK;
2486 bool compat = g_conf->mon_health_preluminous_compat;
2487 bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
2488 if (f) {
2489 f->open_object_section("health");
2490 f->open_object_section("checks");
2491 }
2492
2493 string summary;
2494 string *psummary = f ? nullptr : &summary;
2495 for (auto& svc : paxos_service) {
2496 r = std::min(r, svc->get_health_checks().dump_summary(
2497 f, psummary, sep2, want_detail));
2498 }
2499
2500 if (f) {
2501 f->close_section();
2502 f->dump_stream("status") << r;
2503 } else {
2504 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2505 *plain = stringify(r);
2506 if (summary.size()) {
2507 *plain += sep1;
2508 *plain += summary;
2509 }
2510 *plain += "\n";
2511 }
2512
2513 const std::string old_fields_message = "'ceph health' JSON format has "
2514 "changed in luminous. If you see this your monitoring system is "
2515 "scraping the wrong fields. Disable this with 'mon health preluminous "
2516 "compat warning = false'";
2517
2518 if (f && (compat || compat_warn)) {
2519 health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
2520 f->open_array_section("summary");
2521 if (compat_warn) {
2522 f->open_object_section("item");
2523 f->dump_stream("severity") << HEALTH_WARN;
2524 f->dump_string("summary", old_fields_message);
2525 f->close_section();
2526 }
2527 if (compat) {
2528 for (auto& svc : paxos_service) {
2529 svc->get_health_checks().dump_summary_compat(f);
2530 }
2531 }
2532 f->close_section();
2533 f->dump_stream("overall_status") << cr;
2534 }
2535
2536 if (want_detail) {
2537 if (f && (compat || compat_warn)) {
2538 f->open_array_section("detail");
2539 if (compat_warn) {
2540 f->dump_string("item", old_fields_message);
2541 }
2542 }
2543
2544 for (auto& svc : paxos_service) {
2545 svc->get_health_checks().dump_detail(f, plain, compat);
2546 }
2547
2548 if (f && (compat || compat_warn)) {
2549 f->close_section();
2550 }
2551 }
2552 if (f) {
2553 f->close_section();
2554 }
2555 return r;
2556 }
2557
2558 void Monitor::log_health(
2559 const health_check_map_t& updated,
2560 const health_check_map_t& previous,
2561 MonitorDBStore::TransactionRef t)
2562 {
2563 if (!g_conf->mon_health_to_clog) {
2564 return;
2565 }
2566
2567 const utime_t now = ceph_clock_now();
2568
2569 // FIXME: log atomically as part of @t instead of using clog.
2570 dout(10) << __func__ << " updated " << updated.checks.size()
2571 << " previous " << previous.checks.size()
2572 << dendl;
2573 const auto min_log_period = g_conf->get_val<int64_t>(
2574 "mon_health_log_update_period");
2575 for (auto& p : updated.checks) {
2576 auto q = previous.checks.find(p.first);
2577 bool logged = false;
2578 if (q == previous.checks.end()) {
2579 // new
2580 ostringstream ss;
2581 ss << "Health check failed: " << p.second.summary << " ("
2582 << p.first << ")";
2583 clog->health(p.second.severity) << ss.str();
2584
2585 logged = true;
2586 } else {
2587 if (p.second.summary != q->second.summary ||
2588 p.second.severity != q->second.severity) {
2589
2590 auto status_iter = health_check_log_times.find(p.first);
2591 if (status_iter != health_check_log_times.end()) {
2592 if (p.second.severity == q->second.severity &&
2593 now - status_iter->second.updated_at < min_log_period) {
2594 // We already logged this recently and the severity is unchanged,
2595 // so skip emitting an update of the summary string.
2596 // We'll get an update out of tick() later if the check
2597 // is still failing.
2598 continue;
2599 }
2600 }
2601
2602 // summary or severity changed (ignore detail changes at this level)
2603 ostringstream ss;
2604 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2605 clog->health(p.second.severity) << ss.str();
2606
2607 logged = true;
2608 }
2609 }
2610 // Record the time at which we last logged, so that we can check this
2611 // when considering whether/when to print update messages.
2612 if (logged) {
2613 auto iter = health_check_log_times.find(p.first);
2614 if (iter == health_check_log_times.end()) {
2615 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2616 p.second.severity, p.second.summary, now));
2617 } else {
2618 iter->second = HealthCheckLogStatus(
2619 p.second.severity, p.second.summary, now);
2620 }
2621 }
2622 }
2623 for (auto& p : previous.checks) {
2624 if (!updated.checks.count(p.first)) {
2625 // cleared
2626 ostringstream ss;
2627 if (p.first == "DEGRADED_OBJECTS") {
2628 clog->info() << "All degraded objects recovered";
2629 } else if (p.first == "OSD_FLAGS") {
2630 clog->info() << "OSD flags cleared";
2631 } else {
2632 clog->info() << "Health check cleared: " << p.first << " (was: "
2633 << p.second.summary << ")";
2634 }
2635
2636 if (health_check_log_times.count(p.first)) {
2637 health_check_log_times.erase(p.first);
2638 }
2639 }
2640 }
2641
2642 if (previous.checks.size() && updated.checks.size() == 0) {
2643 // We might be going into a fully healthy state, check
2644 // other subsystems
2645 bool any_checks = false;
2646 for (auto& svc : paxos_service) {
2647 if (&(svc->get_health_checks()) == &(previous)) {
2648 // Ignore the ones we're clearing right now
2649 continue;
2650 }
2651
2652 if (svc->get_health_checks().checks.size() > 0) {
2653 any_checks = true;
2654 break;
2655 }
2656 }
2657 if (!any_checks) {
2658 clog->info() << "Cluster is now healthy";
2659 }
2660 }
2661 }
2662
2663 health_status_t Monitor::get_health(list<string>& status,
2664 bufferlist *detailbl,
2665 Formatter *f)
2666 {
2667 list<pair<health_status_t,string> > summary;
2668 list<pair<health_status_t,string> > detail;
2669
2670 if (f)
2671 f->open_object_section("health");
2672
2673 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2674 p != paxos_service.end();
2675 ++p) {
2676 PaxosService *s = *p;
2677 s->get_health(summary, detailbl ? &detail : NULL, cct);
2678 }
2679
2680 health_monitor->get_health(summary, (detailbl ? &detail : NULL));
2681
2682 health_status_t overall = HEALTH_OK;
2683 if (!timecheck_skews.empty()) {
2684 list<string> warns;
2685 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2686 i != timecheck_skews.end(); ++i) {
2687 entity_inst_t inst = i->first;
2688 double skew = i->second;
2689 double latency = timecheck_latencies[inst];
2690 string name = monmap->get_name(inst.addr);
2691 ostringstream tcss;
2692 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2693 if (tcstatus != HEALTH_OK) {
2694 if (overall > tcstatus)
2695 overall = tcstatus;
2696 warns.push_back(name);
2697 ostringstream tmp_ss;
2698 tmp_ss << "mon." << name
2699 << " addr " << inst.addr << " " << tcss.str()
2700 << " (latency " << latency << "s)";
2701 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2702 }
2703 }
2704 if (!warns.empty()) {
2705 ostringstream ss;
2706 ss << "clock skew detected on";
2707 while (!warns.empty()) {
2708 ss << " mon." << warns.front();
2709 warns.pop_front();
2710 if (!warns.empty())
2711 ss << ",";
2712 }
2713 status.push_back(ss.str());
2714 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2715 }
2716 }
2717
2718 if (f)
2719 f->open_array_section("summary");
2720 if (!summary.empty()) {
2721 while (!summary.empty()) {
2722 if (overall > summary.front().first)
2723 overall = summary.front().first;
2724 status.push_back(summary.front().second);
2725 if (f) {
2726 f->open_object_section("item");
2727 f->dump_stream("severity") << summary.front().first;
2728 f->dump_string("summary", summary.front().second);
2729 f->close_section();
2730 }
2731 summary.pop_front();
2732 }
2733 }
2734 if (f)
2735 f->close_section();
2736
2737 stringstream fss;
2738 fss << overall;
2739 status.push_front(fss.str());
2740 if (f)
2741 f->dump_stream("overall_status") << overall;
2742
2743 if (f)
2744 f->open_array_section("detail");
2745 while (!detail.empty()) {
2746 if (f)
2747 f->dump_string("item", detail.front().second);
2748 else if (detailbl != NULL) {
2749 detailbl->append(detail.front().second);
2750 detailbl->append('\n');
2751 }
2752 detail.pop_front();
2753 }
2754 if (f)
2755 f->close_section();
2756
2757 if (f)
2758 f->close_section();
2759
2760 return overall;
2761 }
2762
2763 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2764 {
2765 if (f)
2766 f->open_object_section("status");
2767
2768 if (f) {
2769 f->dump_stream("fsid") << monmap->get_fsid();
2770 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2771 get_health_status(false, f, nullptr);
2772 } else {
2773 list<string> health_str;
2774 get_health(health_str, nullptr, f);
2775 }
2776 f->dump_unsigned("election_epoch", get_epoch());
2777 {
2778 f->open_array_section("quorum");
2779 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2780 f->dump_int("rank", *p);
2781 f->close_section();
2782 f->open_array_section("quorum_names");
2783 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2784 f->dump_string("id", monmap->get_name(*p));
2785 f->close_section();
2786 }
2787 f->open_object_section("monmap");
2788 monmap->dump(f);
2789 f->close_section();
2790 f->open_object_section("osdmap");
2791 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
2792 f->close_section();
2793 f->open_object_section("pgmap");
2794 pgservice->print_summary(f, NULL);
2795 f->close_section();
2796 f->open_object_section("fsmap");
2797 mdsmon()->get_fsmap().print_summary(f, NULL);
2798 f->close_section();
2799 f->open_object_section("mgrmap");
2800 mgrmon()->get_map().print_summary(f, nullptr);
2801 f->close_section();
2802
2803 f->dump_object("servicemap", mgrstatmon()->get_service_map());
2804 f->close_section();
2805 } else {
2806 ss << " cluster:\n";
2807 ss << " id: " << monmap->get_fsid() << "\n";
2808
2809 string health;
2810 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2811 get_health_status(false, nullptr, &health,
2812 "\n ", "\n ");
2813 } else {
2814 list<string> ls;
2815 get_health(ls, NULL, f);
2816 health = joinify(ls.begin(), ls.end(),
2817 string("\n "));
2818 }
2819 ss << " health: " << health << "\n";
2820
2821 ss << "\n \n services:\n";
2822 {
2823 size_t maxlen = 3;
2824 auto& service_map = mgrstatmon()->get_service_map();
2825 for (auto& p : service_map.services) {
2826 maxlen = std::max(maxlen, p.first.size());
2827 }
2828 string spacing(maxlen - 3, ' ');
2829 const auto quorum_names = get_quorum_names();
2830 const auto mon_count = monmap->mon_info.size();
2831 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2832 << quorum_names;
2833 if (quorum_names.size() != mon_count) {
2834 std::list<std::string> out_of_q;
2835 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2836 if (quorum.count(i) == 0) {
2837 out_of_q.push_back(monmap->ranks[i]);
2838 }
2839 }
2840 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2841 out_of_q.end(), std::string(", "));
2842 }
2843 ss << "\n";
2844 if (mgrmon()->in_use()) {
2845 ss << " mgr: " << spacing;
2846 mgrmon()->get_map().print_summary(nullptr, &ss);
2847 ss << "\n";
2848 }
2849 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2850 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2851 }
2852 ss << " osd: " << spacing;
2853 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2854 ss << "\n";
2855 for (auto& p : service_map.services) {
2856 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2857 << p.second.get_summary() << "\n";
2858 }
2859 }
2860
2861 ss << "\n \n data:\n";
2862 pgservice->print_summary(NULL, &ss);
2863 ss << "\n ";
2864 }
2865 }
2866
2867 void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2868 map<string,string> &param_str_map)
2869 {
2870 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2871 p != cmdmap.end(); ++p) {
2872 if (p->first == "prefix")
2873 continue;
2874 if (p->first == "caps") {
2875 vector<string> cv;
2876 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2877 cv.size() % 2 == 0) {
2878 for (unsigned i = 0; i < cv.size(); i += 2) {
2879 string k = string("caps_") + cv[i];
2880 param_str_map[k] = cv[i + 1];
2881 }
2882 continue;
2883 }
2884 }
2885 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2886 }
2887 }
2888
2889 const MonCommand *Monitor::_get_moncommand(
2890 const string &cmd_prefix,
2891 const vector<MonCommand>& cmds)
2892 {
2893 for (auto& c : cmds) {
2894 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2895 return &c;
2896 }
2897 }
2898 return nullptr;
2899 }
2900
2901 bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2902 const map<string,cmd_vartype>& cmdmap,
2903 const map<string,string>& param_str_map,
2904 const MonCommand *this_cmd) {
2905
2906 bool cmd_r = this_cmd->requires_perm('r');
2907 bool cmd_w = this_cmd->requires_perm('w');
2908 bool cmd_x = this_cmd->requires_perm('x');
2909
2910 bool capable = s->caps.is_capable(
2911 g_ceph_context,
2912 CEPH_ENTITY_TYPE_MON,
2913 s->entity_name,
2914 module, prefix, param_str_map,
2915 cmd_r, cmd_w, cmd_x);
2916
2917 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2918 return capable;
2919 }
2920
2921 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
2922 Formatter *f,
2923 bufferlist *rdata,
2924 bool hide_mgr_flag)
2925 {
2926 int cmdnum = 0;
2927 f->open_object_section("command_descriptions");
2928 for (const auto &cmd : commands) {
2929 unsigned flags = cmd.flags;
2930 if (hide_mgr_flag) {
2931 flags &= ~MonCommand::FLAG_MGR;
2932 }
2933 ostringstream secname;
2934 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2935 dump_cmddesc_to_json(f, secname.str(),
2936 cmd.cmdstring, cmd.helpstring, cmd.module,
2937 cmd.req_perms, cmd.availability, flags);
2938 cmdnum++;
2939 }
2940 f->close_section(); // command_descriptions
2941
2942 f->flush(*rdata);
2943 }
2944
2945 bool Monitor::is_keyring_required()
2946 {
2947 string auth_cluster_required = g_conf->auth_supported.empty() ?
2948 g_conf->auth_cluster_required : g_conf->auth_supported;
2949 string auth_service_required = g_conf->auth_supported.empty() ?
2950 g_conf->auth_service_required : g_conf->auth_supported;
2951
2952 return auth_service_required == "cephx" ||
2953 auth_cluster_required == "cephx";
2954 }
2955
2956 struct C_MgrProxyCommand : public Context {
2957 Monitor *mon;
2958 MonOpRequestRef op;
2959 uint64_t size;
2960 bufferlist outbl;
2961 string outs;
2962 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2963 : mon(mon), op(op), size(s) { }
2964 void finish(int r) {
2965 Mutex::Locker l(mon->lock);
2966 mon->mgr_proxy_bytes -= size;
2967 mon->reply_command(op, r, outs, outbl, 0);
2968 }
2969 };
2970
2971 void Monitor::handle_command(MonOpRequestRef op)
2972 {
2973 assert(op->is_type_command());
2974 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2975 if (m->fsid != monmap->fsid) {
2976 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2977 reply_command(op, -EPERM, "wrong fsid", 0);
2978 return;
2979 }
2980
2981 MonSession *session = static_cast<MonSession *>(
2982 m->get_connection()->get_priv());
2983 if (!session) {
2984 dout(5) << __func__ << " dropping stray message " << *m << dendl;
2985 return;
2986 }
2987 BOOST_SCOPE_EXIT_ALL(=) {
2988 session->put();
2989 };
2990
2991 if (m->cmd.empty()) {
2992 string rs = "No command supplied";
2993 reply_command(op, -EINVAL, rs, 0);
2994 return;
2995 }
2996
2997 string prefix;
2998 vector<string> fullcmd;
2999 map<string, cmd_vartype> cmdmap;
3000 stringstream ss, ds;
3001 bufferlist rdata;
3002 string rs;
3003 int r = -EINVAL;
3004 rs = "unrecognized command";
3005
3006 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3007 // ss has reason for failure
3008 r = -EINVAL;
3009 rs = ss.str();
3010 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3011 reply_command(op, r, rs, 0);
3012 return;
3013 }
3014
3015 // check return value. If no prefix parameter provided,
3016 // return value will be false, then return error info.
3017 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
3018 reply_command(op, -EINVAL, "command prefix not found", 0);
3019 return;
3020 }
3021
3022 // check prefix is empty
3023 if (prefix.empty()) {
3024 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3025 return;
3026 }
3027
3028 if (prefix == "get_command_descriptions") {
3029 bufferlist rdata;
3030 Formatter *f = Formatter::create("json");
3031 // hide mgr commands until luminous upgrade is complete
3032 bool hide_mgr_flag =
3033 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
3034
3035 std::vector<MonCommand> commands;
3036
3037 // only include mgr commands once all mons are upgrade (and we've dropped
3038 // the hard-coded PGMonitor commands)
3039 if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
3040 commands = static_cast<MgrMonitor*>(
3041 paxos_service[PAXOS_MGR])->get_command_descs();
3042 }
3043
3044 for (auto& c : leader_mon_commands) {
3045 commands.push_back(c);
3046 }
3047
3048 format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
3049 delete f;
3050 reply_command(op, 0, "", rdata, 0);
3051 return;
3052 }
3053
3054 string module;
3055 string err;
3056
3057 dout(0) << "handle_command " << *m << dendl;
3058
3059 string format;
3060 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
3061 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3062
3063 get_str_vec(prefix, fullcmd);
3064
3065 // make sure fullcmd is not empty.
3066 // invalid prefix will cause empty vector fullcmd.
3067 // such as, prefix=";,,;"
3068 if (fullcmd.empty()) {
3069 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3070 return;
3071 }
3072
3073 module = fullcmd[0];
3074
3075 // validate command is in leader map
3076
3077 const MonCommand *leader_cmd;
3078 const auto& mgr_cmds = mgrmon()->get_command_descs();
3079 const MonCommand *mgr_cmd = nullptr;
3080 if (!mgr_cmds.empty()) {
3081 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
3082 }
3083 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
3084 if (!leader_cmd) {
3085 leader_cmd = mgr_cmd;
3086 if (!leader_cmd) {
3087 reply_command(op, -EINVAL, "command not known", 0);
3088 return;
3089 }
3090 }
3091 // validate command is in our map & matches, or forward if it is allowed
3092 const MonCommand *mon_cmd = _get_moncommand(
3093 prefix,
3094 get_local_commands(quorum_mon_features));
3095 if (!mon_cmd) {
3096 mon_cmd = mgr_cmd;
3097 }
3098 if (!is_leader()) {
3099 if (!mon_cmd) {
3100 if (leader_cmd->is_noforward()) {
3101 reply_command(op, -EINVAL,
3102 "command not locally supported and not allowed to forward",
3103 0);
3104 return;
3105 }
3106 dout(10) << "Command not locally supported, forwarding request "
3107 << m << dendl;
3108 forward_request_leader(op);
3109 return;
3110 } else if (!mon_cmd->is_compat(leader_cmd)) {
3111 if (mon_cmd->is_noforward()) {
3112 reply_command(op, -EINVAL,
3113 "command not compatible with leader and not allowed to forward",
3114 0);
3115 return;
3116 }
3117 dout(10) << "Command not compatible with leader, forwarding request "
3118 << m << dendl;
3119 forward_request_leader(op);
3120 return;
3121 }
3122 }
3123
3124 if (mon_cmd->is_obsolete() ||
3125 (cct->_conf->mon_debug_deprecated_as_obsolete
3126 && mon_cmd->is_deprecated())) {
3127 reply_command(op, -ENOTSUP,
3128 "command is obsolete; please check usage and/or man page",
3129 0);
3130 return;
3131 }
3132
3133 if (session->proxy_con && mon_cmd->is_noforward()) {
3134 dout(10) << "Got forward for noforward command " << m << dendl;
3135 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3136 return;
3137 }
3138
3139 /* what we perceive as being the service the command falls under */
3140 string service(mon_cmd->module);
3141
3142 dout(25) << __func__ << " prefix='" << prefix
3143 << "' module='" << module
3144 << "' service='" << service << "'" << dendl;
3145
3146 bool cmd_is_rw =
3147 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3148
3149 // validate user's permissions for requested command
3150 map<string,string> param_str_map;
3151 _generate_command_map(cmdmap, param_str_map);
3152 if (!_allowed_command(session, service, prefix, cmdmap,
3153 param_str_map, mon_cmd)) {
3154 dout(1) << __func__ << " access denied" << dendl;
3155 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3156 << "from='" << session->inst << "' "
3157 << "entity='" << session->entity_name << "' "
3158 << "cmd=" << m->cmd << ": access denied";
3159 reply_command(op, -EACCES, "access denied", 0);
3160 return;
3161 }
3162
3163 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3164 << "from='" << session->inst << "' "
3165 << "entity='" << session->entity_name << "' "
3166 << "cmd=" << m->cmd << ": dispatch";
3167
3168 if (mon_cmd->is_mgr() &&
3169 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3170 const auto& hdr = m->get_header();
3171 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3172 uint64_t max = g_conf->get_val<uint64_t>("mon_client_bytes")
3173 * g_conf->get_val<double>("mon_mgr_proxy_client_bytes_ratio");
3174 if (mgr_proxy_bytes + size > max) {
3175 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3176 << " + " << size << " > max " << max << dendl;
3177 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3178 return;
3179 }
3180 mgr_proxy_bytes += size;
3181 dout(10) << __func__ << " proxying mgr command (+" << size
3182 << " -> " << mgr_proxy_bytes << ")" << dendl;
3183 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3184 mgr_client.start_command(m->cmd,
3185 m->get_data(),
3186 &fin->outbl,
3187 &fin->outs,
3188 new C_OnFinisher(fin, &finisher));
3189 return;
3190 }
3191
3192 if ((module == "mds" || module == "fs") &&
3193 prefix != "fs authorize") {
3194 mdsmon()->dispatch(op);
3195 return;
3196 }
3197 if ((module == "osd" || prefix == "pg map") &&
3198 prefix != "osd last-stat-seq") {
3199 osdmon()->dispatch(op);
3200 return;
3201 }
3202
3203 if (module == "pg") {
3204 pgmon()->dispatch(op);
3205 return;
3206 }
3207 if (module == "mon" &&
3208 /* Let the Monitor class handle the following commands:
3209 * 'mon compact'
3210 * 'mon scrub'
3211 * 'mon sync force'
3212 */
3213 prefix != "mon compact" &&
3214 prefix != "mon scrub" &&
3215 prefix != "mon sync force" &&
3216 prefix != "mon metadata" &&
3217 prefix != "mon versions" &&
3218 prefix != "mon count-metadata") {
3219 monmon()->dispatch(op);
3220 return;
3221 }
3222 if (module == "auth" || prefix == "fs authorize") {
3223 authmon()->dispatch(op);
3224 return;
3225 }
3226 if (module == "log") {
3227 logmon()->dispatch(op);
3228 return;
3229 }
3230
3231 if (module == "config-key") {
3232 config_key_service->dispatch(op);
3233 return;
3234 }
3235
3236 if (module == "mgr") {
3237 mgrmon()->dispatch(op);
3238 return;
3239 }
3240
3241 if (prefix == "fsid") {
3242 if (f) {
3243 f->open_object_section("fsid");
3244 f->dump_stream("fsid") << monmap->fsid;
3245 f->close_section();
3246 f->flush(rdata);
3247 } else {
3248 ds << monmap->fsid;
3249 rdata.append(ds);
3250 }
3251 reply_command(op, 0, "", rdata, 0);
3252 return;
3253 }
3254
3255 if (prefix == "scrub" || prefix == "mon scrub") {
3256 wait_for_paxos_write();
3257 if (is_leader()) {
3258 int r = scrub_start();
3259 reply_command(op, r, "", rdata, 0);
3260 } else if (is_peon()) {
3261 forward_request_leader(op);
3262 } else {
3263 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3264 }
3265 return;
3266 }
3267
3268 if (prefix == "compact" || prefix == "mon compact") {
3269 dout(1) << "triggering manual compaction" << dendl;
3270 utime_t start = ceph_clock_now();
3271 store->compact();
3272 utime_t end = ceph_clock_now();
3273 end -= start;
3274 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3275 ostringstream oss;
3276 oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
3277 rs = oss.str();
3278 r = 0;
3279 }
3280 else if (prefix == "injectargs") {
3281 vector<string> injected_args;
3282 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3283 if (!injected_args.empty()) {
3284 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3285 ostringstream oss;
3286 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3287 ss << "injectargs:" << oss.str();
3288 rs = ss.str();
3289 goto out;
3290 } else {
3291 rs = "must supply options to be parsed in a single string";
3292 r = -EINVAL;
3293 }
3294 } else if (prefix == "time-sync-status") {
3295 if (!f)
3296 f.reset(Formatter::create("json-pretty"));
3297 f->open_object_section("time_sync");
3298 if (!timecheck_skews.empty()) {
3299 f->open_object_section("time_skew_status");
3300 for (auto& i : timecheck_skews) {
3301 entity_inst_t inst = i.first;
3302 double skew = i.second;
3303 double latency = timecheck_latencies[inst];
3304 string name = monmap->get_name(inst.addr);
3305 ostringstream tcss;
3306 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3307 f->open_object_section(name.c_str());
3308 f->dump_float("skew", skew);
3309 f->dump_float("latency", latency);
3310 f->dump_stream("health") << tcstatus;
3311 if (tcstatus != HEALTH_OK) {
3312 f->dump_stream("details") << tcss.str();
3313 }
3314 f->close_section();
3315 }
3316 f->close_section();
3317 }
3318 f->open_object_section("timechecks");
3319 f->dump_unsigned("epoch", get_epoch());
3320 f->dump_int("round", timecheck_round);
3321 f->dump_stream("round_status") << ((timecheck_round%2) ?
3322 "on-going" : "finished");
3323 f->close_section();
3324 f->close_section();
3325 f->flush(rdata);
3326 r = 0;
3327 rs = "";
3328 } else if (prefix == "config set") {
3329 std::string key;
3330 cmd_getval(cct, cmdmap, "key", key);
3331 std::string val;
3332 cmd_getval(cct, cmdmap, "value", val);
3333 r = g_conf->set_val(key, val, true, &ss);
3334 if (r == 0) {
3335 g_conf->apply_changes(nullptr);
3336 }
3337 rs = ss.str();
3338 goto out;
3339 } else if (prefix == "status" ||
3340 prefix == "health" ||
3341 prefix == "df") {
3342 string detail;
3343 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3344
3345 if (prefix == "status") {
3346 // get_cluster_status handles f == NULL
3347 get_cluster_status(ds, f.get());
3348
3349 if (f) {
3350 f->flush(ds);
3351 ds << '\n';
3352 }
3353 rdata.append(ds);
3354 } else if (prefix == "health") {
3355 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3356 string plain;
3357 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3358 if (f) {
3359 f->flush(rdata);
3360 } else {
3361 rdata.append(plain);
3362 }
3363 } else {
3364 list<string> health_str;
3365 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3366 if (f) {
3367 f->flush(ds);
3368 ds << '\n';
3369 } else {
3370 assert(!health_str.empty());
3371 ds << health_str.front();
3372 health_str.pop_front();
3373 if (!health_str.empty()) {
3374 ds << ' ';
3375 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3376 }
3377 }
3378 bufferlist comb;
3379 comb.append(ds);
3380 if (detail == "detail")
3381 comb.append(rdata);
3382 rdata = comb;
3383 }
3384 } else if (prefix == "df") {
3385 bool verbose = (detail == "detail");
3386 if (f)
3387 f->open_object_section("stats");
3388
3389 pgservice->dump_fs_stats(&ds, f.get(), verbose);
3390 if (!f)
3391 ds << '\n';
3392 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3393
3394 if (f) {
3395 f->close_section();
3396 f->flush(ds);
3397 ds << '\n';
3398 }
3399 } else {
3400 assert(0 == "We should never get here!");
3401 return;
3402 }
3403 rdata.append(ds);
3404 rs = "";
3405 r = 0;
3406 } else if (prefix == "report") {
3407
3408 // this must be formatted, in its current form
3409 if (!f)
3410 f.reset(Formatter::create("json-pretty"));
3411 f->open_object_section("report");
3412 f->dump_stream("cluster_fingerprint") << fingerprint;
3413 f->dump_string("version", ceph_version_to_str());
3414 f->dump_string("commit", git_version_to_str());
3415 f->dump_stream("timestamp") << ceph_clock_now();
3416
3417 vector<string> tagsvec;
3418 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3419 string tagstr = str_join(tagsvec, " ");
3420 if (!tagstr.empty())
3421 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3422 f->dump_string("tag", tagstr);
3423
3424 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3425 get_health_status(true, f.get(), nullptr);
3426 } else {
3427 list<string> health_str;
3428 get_health(health_str, nullptr, f.get());
3429 }
3430
3431 monmon()->dump_info(f.get());
3432 osdmon()->dump_info(f.get());
3433 mdsmon()->dump_info(f.get());
3434 authmon()->dump_info(f.get());
3435 pgservice->dump_info(f.get());
3436
3437 paxos->dump_info(f.get());
3438
3439 f->close_section();
3440 f->flush(rdata);
3441
3442 ostringstream ss2;
3443 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3444 rs = ss2.str();
3445 r = 0;
3446 } else if (prefix == "osd last-stat-seq") {
3447 int64_t osd;
3448 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3449 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3450 if (f) {
3451 f->dump_unsigned("seq", seq);
3452 f->flush(ds);
3453 } else {
3454 ds << seq;
3455 rdata.append(ds);
3456 }
3457 rs = "";
3458 r = 0;
3459 } else if (prefix == "node ls") {
3460 string node_type("all");
3461 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3462 if (!f)
3463 f.reset(Formatter::create("json-pretty"));
3464 if (node_type == "all") {
3465 f->open_object_section("nodes");
3466 print_nodes(f.get(), ds);
3467 osdmon()->print_nodes(f.get());
3468 mdsmon()->print_nodes(f.get());
3469 f->close_section();
3470 } else if (node_type == "mon") {
3471 print_nodes(f.get(), ds);
3472 } else if (node_type == "osd") {
3473 osdmon()->print_nodes(f.get());
3474 } else if (node_type == "mds") {
3475 mdsmon()->print_nodes(f.get());
3476 }
3477 f->flush(ds);
3478 rdata.append(ds);
3479 rs = "";
3480 r = 0;
3481 } else if (prefix == "features") {
3482 if (!is_leader() && !is_peon()) {
3483 dout(10) << " waiting for quorum" << dendl;
3484 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3485 return;
3486 }
3487 if (!is_leader()) {
3488 forward_request_leader(op);
3489 return;
3490 }
3491 if (!f)
3492 f.reset(Formatter::create("json-pretty"));
3493 FeatureMap fm;
3494 get_combined_feature_map(&fm);
3495 f->dump_object("features", fm);
3496 f->flush(rdata);
3497 rs = "";
3498 r = 0;
3499 } else if (prefix == "mon metadata") {
3500 if (!f)
3501 f.reset(Formatter::create("json-pretty"));
3502
3503 string name;
3504 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3505 if (!all) {
3506 // Dump a single mon's metadata
3507 int mon = monmap->get_rank(name);
3508 if (mon < 0) {
3509 rs = "requested mon not found";
3510 r = -ENOENT;
3511 goto out;
3512 }
3513 f->open_object_section("mon_metadata");
3514 r = get_mon_metadata(mon, f.get(), ds);
3515 f->close_section();
3516 } else {
3517 // Dump all mons' metadata
3518 r = 0;
3519 f->open_array_section("mon_metadata");
3520 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3521 std::ostringstream get_err;
3522 f->open_object_section("mon");
3523 f->dump_string("name", monmap->get_name(rank));
3524 r = get_mon_metadata(rank, f.get(), get_err);
3525 f->close_section();
3526 if (r == -ENOENT || r == -EINVAL) {
3527 dout(1) << get_err.str() << dendl;
3528 // Drop error, list what metadata we do have
3529 r = 0;
3530 } else if (r != 0) {
3531 derr << "Unexpected error from get_mon_metadata: "
3532 << cpp_strerror(r) << dendl;
3533 ds << get_err.str();
3534 break;
3535 }
3536 }
3537 f->close_section();
3538 }
3539
3540 f->flush(ds);
3541 rdata.append(ds);
3542 rs = "";
3543 } else if (prefix == "mon versions") {
3544 if (!f)
3545 f.reset(Formatter::create("json-pretty"));
3546 count_metadata("ceph_version", f.get());
3547 f->flush(ds);
3548 rdata.append(ds);
3549 rs = "";
3550 r = 0;
3551 } else if (prefix == "mon count-metadata") {
3552 if (!f)
3553 f.reset(Formatter::create("json-pretty"));
3554 string field;
3555 cmd_getval(g_ceph_context, cmdmap, "property", field);
3556 count_metadata(field, f.get());
3557 f->flush(ds);
3558 rdata.append(ds);
3559 rs = "";
3560 r = 0;
3561 } else if (prefix == "quorum_status") {
3562 // make sure our map is readable and up to date
3563 if (!is_leader() && !is_peon()) {
3564 dout(10) << " waiting for quorum" << dendl;
3565 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3566 return;
3567 }
3568 _quorum_status(f.get(), ds);
3569 rdata.append(ds);
3570 rs = "";
3571 r = 0;
3572 } else if (prefix == "mon_status") {
3573 get_mon_status(f.get(), ds);
3574 if (f)
3575 f->flush(ds);
3576 rdata.append(ds);
3577 rs = "";
3578 r = 0;
3579 } else if (prefix == "sync force" ||
3580 prefix == "mon sync force") {
3581 string validate1, validate2;
3582 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3583 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3584 if (validate1 != "--yes-i-really-mean-it" ||
3585 validate2 != "--i-know-what-i-am-doing") {
3586 r = -EINVAL;
3587 rs = "are you SURE? this will mean the monitor store will be "
3588 "erased. pass '--yes-i-really-mean-it "
3589 "--i-know-what-i-am-doing' if you really do.";
3590 goto out;
3591 }
3592 sync_force(f.get(), ds);
3593 rs = ds.str();
3594 r = 0;
3595 } else if (prefix == "heap") {
3596 if (!ceph_using_tcmalloc())
3597 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3598 else {
3599 string heapcmd;
3600 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3601 // XXX 1-element vector, change at callee or make vector here?
3602 vector<string> heapcmd_vec;
3603 get_str_vec(heapcmd, heapcmd_vec);
3604 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3605 rdata.append(ds);
3606 rs = "";
3607 r = 0;
3608 }
3609 } else if (prefix == "quorum") {
3610 string quorumcmd;
3611 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3612 if (quorumcmd == "exit") {
3613 start_election();
3614 elector.stop_participating();
3615 rs = "stopped responding to quorum, initiated new election";
3616 r = 0;
3617 } else if (quorumcmd == "enter") {
3618 elector.start_participating();
3619 start_election();
3620 rs = "started responding to quorum, initiated new election";
3621 r = 0;
3622 } else {
3623 rs = "needs a valid 'quorum' command";
3624 r = -EINVAL;
3625 }
3626 } else if (prefix == "version") {
3627 if (f) {
3628 f->open_object_section("version");
3629 f->dump_string("version", pretty_version_to_str());
3630 f->close_section();
3631 f->flush(ds);
3632 } else {
3633 ds << pretty_version_to_str();
3634 }
3635 rdata.append(ds);
3636 rs = "";
3637 r = 0;
3638 } else if (prefix == "versions") {
3639 if (!f)
3640 f.reset(Formatter::create("json-pretty"));
3641 map<string,int> overall;
3642 f->open_object_section("version");
3643 map<string,int> mon, mgr, osd, mds;
3644
3645 count_metadata("ceph_version", &mon);
3646 f->open_object_section("mon");
3647 for (auto& p : mon) {
3648 f->dump_int(p.first.c_str(), p.second);
3649 overall[p.first] += p.second;
3650 }
3651 f->close_section();
3652
3653 mgrmon()->count_metadata("ceph_version", &mgr);
3654 f->open_object_section("mgr");
3655 for (auto& p : mgr) {
3656 f->dump_int(p.first.c_str(), p.second);
3657 overall[p.first] += p.second;
3658 }
3659 f->close_section();
3660
3661 osdmon()->count_metadata("ceph_version", &osd);
3662 f->open_object_section("osd");
3663 for (auto& p : osd) {
3664 f->dump_int(p.first.c_str(), p.second);
3665 overall[p.first] += p.second;
3666 }
3667 f->close_section();
3668
3669 mdsmon()->count_metadata("ceph_version", &mds);
3670 f->open_object_section("mds");
3671 for (auto& p : mds) {
3672 f->dump_int(p.first.c_str(), p.second);
3673 overall[p.first] += p.second;
3674 }
3675 f->close_section();
3676
3677 for (auto& p : mgrstatmon()->get_service_map().services) {
3678 f->open_object_section(p.first.c_str());
3679 map<string,int> m;
3680 p.second.count_metadata("ceph_version", &m);
3681 for (auto& q : m) {
3682 f->dump_int(q.first.c_str(), q.second);
3683 overall[q.first] += q.second;
3684 }
3685 f->close_section();
3686 }
3687
3688 f->open_object_section("overall");
3689 for (auto& p : overall) {
3690 f->dump_int(p.first.c_str(), p.second);
3691 }
3692 f->close_section();
3693 f->close_section();
3694 f->flush(rdata);
3695 rs = "";
3696 r = 0;
3697 }
3698
3699 out:
3700 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3701 reply_command(op, r, rs, rdata, 0);
3702 }
3703
3704 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3705 {
3706 bufferlist rdata;
3707 reply_command(op, rc, rs, rdata, version);
3708 }
3709
3710 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3711 bufferlist& rdata, version_t version)
3712 {
3713 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3714 assert(m->get_type() == MSG_MON_COMMAND);
3715 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3716 reply->set_tid(m->get_tid());
3717 reply->set_data(rdata);
3718 send_reply(op, reply);
3719 }
3720
3721
3722 // ------------------------
3723 // request/reply routing
3724 //
3725 // a client/mds/osd will connect to a random monitor. we need to forward any
3726 // messages requiring state updates to the leader, and then route any replies
3727 // back via the correct monitor and back to them. (the monitor will not
3728 // initiate any connections.)
3729
3730 void Monitor::forward_request_leader(MonOpRequestRef op)
3731 {
3732 op->mark_event(__func__);
3733
3734 int mon = get_leader();
3735 MonSession *session = op->get_session();
3736 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3737
3738 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3739 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3740 } else if (session->proxy_con) {
3741 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3742 } else if (!session->closed) {
3743 RoutedRequest *rr = new RoutedRequest;
3744 rr->tid = ++routed_request_tid;
3745 rr->client_inst = req->get_source_inst();
3746 rr->con = req->get_connection();
3747 rr->con_features = rr->con->get_features();
3748 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3749 rr->session = static_cast<MonSession *>(session->get());
3750 rr->op = op;
3751 routed_requests[rr->tid] = rr;
3752 session->routed_request_tids.insert(rr->tid);
3753
3754 dout(10) << "forward_request " << rr->tid << " request " << *req
3755 << " features " << rr->con_features << dendl;
3756
3757 MForward *forward = new MForward(rr->tid,
3758 req,
3759 rr->con_features,
3760 rr->session->caps);
3761 forward->set_priority(req->get_priority());
3762 if (session->auth_handler) {
3763 forward->entity_name = session->entity_name;
3764 } else if (req->get_source().is_mon()) {
3765 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3766 }
3767 messenger->send_message(forward, monmap->get_inst(mon));
3768 op->mark_forwarded();
3769 assert(op->get_req()->get_type() != 0);
3770 } else {
3771 dout(10) << "forward_request no session for request " << *req << dendl;
3772 }
3773 }
3774
3775 // fake connection attached to forwarded messages
3776 struct AnonConnection : public Connection {
3777 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3778
3779 int send_message(Message *m) override {
3780 assert(!"send_message on anonymous connection");
3781 }
3782 void send_keepalive() override {
3783 assert(!"send_keepalive on anonymous connection");
3784 }
3785 void mark_down() override {
3786 // silently ignore
3787 }
3788 void mark_disposable() override {
3789 // silengtly ignore
3790 }
3791 bool is_connected() override { return false; }
3792 };
3793
3794 //extract the original message and put it into the regular dispatch function
3795 void Monitor::handle_forward(MonOpRequestRef op)
3796 {
3797 MForward *m = static_cast<MForward*>(op->get_req());
3798 dout(10) << "received forwarded message from " << m->client
3799 << " via " << m->get_source_inst() << dendl;
3800 MonSession *session = op->get_session();
3801 assert(session);
3802
3803 if (!session->is_capable("mon", MON_CAP_X)) {
3804 dout(0) << "forward from entity with insufficient caps! "
3805 << session->caps << dendl;
3806 } else {
3807 // see PaxosService::dispatch(); we rely on this being anon
3808 // (c->msgr == NULL)
3809 PaxosServiceMessage *req = m->claim_message();
3810 assert(req != NULL);
3811
3812 ConnectionRef c(new AnonConnection(cct));
3813 MonSession *s = new MonSession(req->get_source_inst(),
3814 static_cast<Connection*>(c.get()));
3815 c->set_priv(s->get());
3816 c->set_peer_addr(m->client.addr);
3817 c->set_peer_type(m->client.name.type());
3818 c->set_features(m->con_features);
3819
3820 s->caps = m->client_caps;
3821 dout(10) << " caps are " << s->caps << dendl;
3822 s->entity_name = m->entity_name;
3823 dout(10) << " entity name '" << s->entity_name << "' type "
3824 << s->entity_name.get_type() << dendl;
3825 s->proxy_con = m->get_connection();
3826 s->proxy_tid = m->tid;
3827
3828 req->set_connection(c);
3829
3830 // not super accurate, but better than nothing.
3831 req->set_recv_stamp(m->get_recv_stamp());
3832
3833 /*
3834 * note which election epoch this is; we will drop the message if
3835 * there is a future election since our peers will resend routed
3836 * requests in that case.
3837 */
3838 req->rx_election_epoch = get_epoch();
3839
3840 /* Because this is a special fake connection, we need to break
3841 the ref loop between Connection and MonSession differently
3842 than we normally do. Here, the Message refers to the Connection
3843 which refers to the Session, and nobody else refers to the Connection
3844 or the Session. And due to the special nature of this message,
3845 nobody refers to the Connection via the Session. So, clear out that
3846 half of the ref loop.*/
3847 s->con.reset(NULL);
3848
3849 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3850
3851 _ms_dispatch(req);
3852 s->put();
3853 }
3854 }
3855
3856 void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3857 {
3858 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3859
3860 bufferlist bl;
3861 encode_message(m, quorum_con_features, bl);
3862
3863 messenger->send_message(m, to);
3864
3865 for (int i=0; i<(int)monmap->size(); i++) {
3866 if (i != rank)
3867 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3868 }
3869 }
3870
3871 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3872 {
3873 op->mark_event(__func__);
3874
3875 MonSession *session = op->get_session();
3876 assert(session);
3877 Message *req = op->get_req();
3878 ConnectionRef con = op->get_connection();
3879
3880 reply->set_cct(g_ceph_context);
3881 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3882
3883 if (!con) {
3884 dout(2) << "send_reply no connection, dropping reply " << *reply
3885 << " to " << req << " " << *req << dendl;
3886 reply->put();
3887 op->mark_event("reply: no connection");
3888 return;
3889 }
3890
3891 if (!session->con && !session->proxy_con) {
3892 dout(2) << "send_reply no connection, dropping reply " << *reply
3893 << " to " << req << " " << *req << dendl;
3894 reply->put();
3895 op->mark_event("reply: no connection");
3896 return;
3897 }
3898
3899 if (session->proxy_con) {
3900 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3901 << " via " << session->proxy_con->get_peer_addr()
3902 << " for request " << *req << dendl;
3903 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3904 op->mark_event("reply: send routed request");
3905 } else {
3906 session->con->send_message(reply);
3907 op->mark_event("reply: send");
3908 }
3909 }
3910
3911 void Monitor::no_reply(MonOpRequestRef op)
3912 {
3913 MonSession *session = op->get_session();
3914 Message *req = op->get_req();
3915
3916 if (session->proxy_con) {
3917 dout(10) << "no_reply to " << req->get_source_inst()
3918 << " via " << session->proxy_con->get_peer_addr()
3919 << " for request " << *req << dendl;
3920 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3921 op->mark_event("no_reply: send routed request");
3922 } else {
3923 dout(10) << "no_reply to " << req->get_source_inst()
3924 << " " << *req << dendl;
3925 op->mark_event("no_reply");
3926 }
3927 }
3928
3929 void Monitor::handle_route(MonOpRequestRef op)
3930 {
3931 MRoute *m = static_cast<MRoute*>(op->get_req());
3932 MonSession *session = op->get_session();
3933 //check privileges
3934 if (!session->is_capable("mon", MON_CAP_X)) {
3935 dout(0) << "MRoute received from entity without appropriate perms! "
3936 << dendl;
3937 return;
3938 }
3939 if (m->msg)
3940 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3941 else
3942 dout(10) << "handle_route null to " << m->dest << dendl;
3943
3944 // look it up
3945 if (m->session_mon_tid) {
3946 if (routed_requests.count(m->session_mon_tid)) {
3947 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3948
3949 // reset payload, in case encoding is dependent on target features
3950 if (m->msg) {
3951 m->msg->clear_payload();
3952 rr->con->send_message(m->msg);
3953 m->msg = NULL;
3954 }
3955 if (m->send_osdmap_first) {
3956 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3957 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3958 true, MonOpRequestRef());
3959 }
3960 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3961 routed_requests.erase(m->session_mon_tid);
3962 rr->session->routed_request_tids.erase(m->session_mon_tid);
3963 delete rr;
3964 } else {
3965 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3966 }
3967 } else {
3968 dout(10) << " not a routed request, trying to send anyway" << dendl;
3969 if (m->msg) {
3970 messenger->send_message(m->msg, m->dest);
3971 m->msg = NULL;
3972 }
3973 }
3974 }
3975
3976 void Monitor::resend_routed_requests()
3977 {
3978 dout(10) << "resend_routed_requests" << dendl;
3979 int mon = get_leader();
3980 list<Context*> retry;
3981 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
3982 p != routed_requests.end();
3983 ++p) {
3984 RoutedRequest *rr = p->second;
3985
3986 if (mon == rank) {
3987 dout(10) << " requeue for self tid " << rr->tid << dendl;
3988 rr->op->mark_event("retry routed request");
3989 retry.push_back(new C_RetryMessage(this, rr->op));
3990 if (rr->session) {
3991 assert(rr->session->routed_request_tids.count(p->first));
3992 rr->session->routed_request_tids.erase(p->first);
3993 }
3994 delete rr;
3995 } else {
3996 bufferlist::iterator q = rr->request_bl.begin();
3997 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
3998 rr->op->mark_event("resend forwarded message to leader");
3999 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
4000 MForward *forward = new MForward(rr->tid, req, rr->con_features,
4001 rr->session->caps);
4002 req->put(); // forward takes its own ref; drop ours.
4003 forward->client = rr->client_inst;
4004 forward->set_priority(req->get_priority());
4005 messenger->send_message(forward, monmap->get_inst(mon));
4006 }
4007 }
4008 if (mon == rank) {
4009 routed_requests.clear();
4010 finish_contexts(g_ceph_context, retry);
4011 }
4012 }
4013
4014 void Monitor::remove_session(MonSession *s)
4015 {
4016 dout(10) << "remove_session " << s << " " << s->inst
4017 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
4018 assert(s->con);
4019 assert(!s->closed);
4020 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4021 p != s->routed_request_tids.end();
4022 ++p) {
4023 assert(routed_requests.count(*p));
4024 RoutedRequest *rr = routed_requests[*p];
4025 dout(10) << " dropping routed request " << rr->tid << dendl;
4026 delete rr;
4027 routed_requests.erase(*p);
4028 }
4029 s->routed_request_tids.clear();
4030 s->con->set_priv(NULL);
4031 session_map.remove_session(s);
4032 logger->set(l_mon_num_sessions, session_map.get_size());
4033 logger->inc(l_mon_session_rm);
4034 }
4035
4036 void Monitor::remove_all_sessions()
4037 {
4038 Mutex::Locker l(session_map_lock);
4039 while (!session_map.sessions.empty()) {
4040 MonSession *s = session_map.sessions.front();
4041 remove_session(s);
4042 if (logger)
4043 logger->inc(l_mon_session_rm);
4044 }
4045 if (logger)
4046 logger->set(l_mon_num_sessions, session_map.get_size());
4047 }
4048
4049 void Monitor::send_command(const entity_inst_t& inst,
4050 const vector<string>& com)
4051 {
4052 dout(10) << "send_command " << inst << "" << com << dendl;
4053 MMonCommand *c = new MMonCommand(monmap->fsid);
4054 c->cmd = com;
4055 try_send_message(c, inst);
4056 }
4057
4058 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4059 {
4060 /**
4061 * Wait list the new session until we're in the quorum, assuming it's
4062 * sufficiently new.
4063 * tick() will periodically send them back through so we can send
4064 * the client elsewhere if we don't think we're getting back in.
4065 *
4066 * But we whitelist a few sorts of messages:
4067 * 1) Monitors can talk to us at any time, of course.
4068 * 2) auth messages. It's unlikely to go through much faster, but
4069 * it's possible we've just lost our quorum status and we want to take...
4070 * 3) command messages. We want to accept these under all possible
4071 * circumstances.
4072 */
4073 Message *m = op->get_req();
4074 MonSession *s = op->get_session();
4075 ConnectionRef con = op->get_connection();
4076 utime_t too_old = ceph_clock_now();
4077 too_old -= g_ceph_context->_conf->mon_lease;
4078 if (m->get_recv_stamp() > too_old &&
4079 con->is_connected()) {
4080 dout(5) << "waitlisting message " << *m << dendl;
4081 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4082 op->mark_wait_for_quorum();
4083 } else {
4084 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4085 con->mark_down();
4086 // proxied sessions aren't registered and don't have a con; don't remove
4087 // those.
4088 if (!s->proxy_con) {
4089 Mutex::Locker l(session_map_lock);
4090 remove_session(s);
4091 }
4092 op->mark_zap();
4093 }
4094 }
4095
4096 void Monitor::_ms_dispatch(Message *m)
4097 {
4098 if (is_shutdown()) {
4099 m->put();
4100 return;
4101 }
4102
4103 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4104 bool src_is_mon = op->is_src_mon();
4105 op->mark_event("mon:_ms_dispatch");
4106 MonSession *s = op->get_session();
4107 if (s && s->closed) {
4108 return;
4109 }
4110
4111 if (src_is_mon && s) {
4112 ConnectionRef con = m->get_connection();
4113 if (con->get_messenger() && con->get_features() != s->con_features) {
4114 // only update features if this is a non-anonymous connection
4115 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4116 << " (was " << s->con_features
4117 << ", now " << con->get_features() << ")" << dendl;
4118 // connection features changed - recreate session.
4119 if (s->con && s->con != con) {
4120 dout(10) << __func__ << " connection for " << m->get_source_inst()
4121 << " changed from session; mark down and replace" << dendl;
4122 s->con->mark_down();
4123 }
4124 if (s->item.is_on_list()) {
4125 // forwarded messages' sessions are not in the sessions map and
4126 // exist only while the op is being handled.
4127 remove_session(s);
4128 }
4129 s->put();
4130 s = nullptr;
4131 }
4132 }
4133
4134 if (!s) {
4135 // if the sender is not a monitor, make sure their first message for a
4136 // session is an MAuth. If it is not, assume it's a stray message,
4137 // and considering that we are creating a new session it is safe to
4138 // assume that the sender hasn't authenticated yet, so we have no way
4139 // of assessing whether we should handle it or not.
4140 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4141 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4142 m->get_type() != CEPH_MSG_PING)) {
4143 dout(1) << __func__ << " dropping stray message " << *m
4144 << " from " << m->get_source_inst() << dendl;
4145 return;
4146 }
4147
4148 ConnectionRef con = m->get_connection();
4149 {
4150 Mutex::Locker l(session_map_lock);
4151 s = session_map.new_session(m->get_source_inst(), con.get());
4152 }
4153 assert(s);
4154 con->set_priv(s->get());
4155 dout(10) << __func__ << " new session " << s << " " << *s
4156 << " features 0x" << std::hex
4157 << s->con_features << std::dec << dendl;
4158 op->set_session(s);
4159
4160 logger->set(l_mon_num_sessions, session_map.get_size());
4161 logger->inc(l_mon_session_add);
4162
4163 if (src_is_mon) {
4164 // give it monitor caps; the peer type has been authenticated
4165 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4166 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4167 s->caps = *mon_caps;
4168 }
4169 s->put();
4170 } else {
4171 dout(20) << __func__ << " existing session " << s << " for " << s->inst
4172 << dendl;
4173 }
4174
4175 assert(s);
4176
4177 s->session_timeout = ceph_clock_now();
4178 s->session_timeout += g_conf->mon_session_timeout;
4179
4180 if (s->auth_handler) {
4181 s->entity_name = s->auth_handler->get_entity_name();
4182 }
4183 dout(20) << " caps " << s->caps.get_str() << dendl;
4184
4185 if ((is_synchronizing() ||
4186 (s->global_id == 0 && !exited_quorum.is_zero())) &&
4187 !src_is_mon &&
4188 m->get_type() != CEPH_MSG_PING) {
4189 waitlist_or_zap_client(op);
4190 } else {
4191 dispatch_op(op);
4192 }
4193 return;
4194 }
4195
4196 void Monitor::dispatch_op(MonOpRequestRef op)
4197 {
4198 op->mark_event("mon:dispatch_op");
4199 MonSession *s = op->get_session();
4200 assert(s);
4201 if (s->closed) {
4202 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4203 return;
4204 }
4205
4206 /* we will consider the default type as being 'monitor' until proven wrong */
4207 op->set_type_monitor();
4208 /* deal with all messages that do not necessarily need caps */
4209 bool dealt_with = true;
4210 switch (op->get_req()->get_type()) {
4211 // auth
4212 case MSG_MON_GLOBAL_ID:
4213 case CEPH_MSG_AUTH:
4214 op->set_type_service();
4215 /* no need to check caps here */
4216 paxos_service[PAXOS_AUTH]->dispatch(op);
4217 break;
4218
4219 case CEPH_MSG_PING:
4220 handle_ping(op);
4221 break;
4222
4223 /* MMonGetMap may be used by clients to obtain a monmap *before*
4224 * authenticating with the monitor. We need to handle these without
4225 * checking caps because, even on a cluster without cephx, we only set
4226 * session caps *after* the auth handshake. A good example of this
4227 * is when a client calls MonClient::get_monmap_privately(), which does
4228 * not authenticate when obtaining a monmap.
4229 */
4230 case CEPH_MSG_MON_GET_MAP:
4231 handle_mon_get_map(op);
4232 break;
4233
4234 case CEPH_MSG_MON_METADATA:
4235 return handle_mon_metadata(op);
4236
4237 default:
4238 dealt_with = false;
4239 break;
4240 }
4241 if (dealt_with)
4242 return;
4243
4244 /* well, maybe the op belongs to a service... */
4245 op->set_type_service();
4246 /* deal with all messages which caps should be checked somewhere else */
4247 dealt_with = true;
4248 switch (op->get_req()->get_type()) {
4249
4250 // OSDs
4251 case CEPH_MSG_MON_GET_OSDMAP:
4252 case CEPH_MSG_POOLOP:
4253 case MSG_OSD_BEACON:
4254 case MSG_OSD_MARK_ME_DOWN:
4255 case MSG_OSD_FULL:
4256 case MSG_OSD_FAILURE:
4257 case MSG_OSD_BOOT:
4258 case MSG_OSD_ALIVE:
4259 case MSG_OSD_PGTEMP:
4260 case MSG_OSD_PG_CREATED:
4261 case MSG_REMOVE_SNAPS:
4262 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4263 break;
4264
4265 // MDSs
4266 case MSG_MDS_BEACON:
4267 case MSG_MDS_OFFLOAD_TARGETS:
4268 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4269 break;
4270
4271 // Mgrs
4272 case MSG_MGR_BEACON:
4273 paxos_service[PAXOS_MGR]->dispatch(op);
4274 break;
4275
4276 // MgrStat
4277 case MSG_MON_MGR_REPORT:
4278 case CEPH_MSG_STATFS:
4279 case MSG_GETPOOLSTATS:
4280 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4281 break;
4282
4283 // pg
4284 case MSG_PGSTATS:
4285 paxos_service[PAXOS_PGMAP]->dispatch(op);
4286 break;
4287
4288 // log
4289 case MSG_LOG:
4290 paxos_service[PAXOS_LOG]->dispatch(op);
4291 break;
4292
4293 // handle_command() does its own caps checking
4294 case MSG_MON_COMMAND:
4295 op->set_type_command();
4296 handle_command(op);
4297 break;
4298
4299 default:
4300 dealt_with = false;
4301 break;
4302 }
4303 if (dealt_with)
4304 return;
4305
4306 /* nop, looks like it's not a service message; revert back to monitor */
4307 op->set_type_monitor();
4308
4309 /* messages we, the Monitor class, need to deal with
4310 * but may be sent by clients. */
4311
4312 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4313 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4314 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4315 << dendl;
4316 goto drop;
4317 }
4318
4319 dealt_with = true;
4320 switch (op->get_req()->get_type()) {
4321
4322 // misc
4323 case CEPH_MSG_MON_GET_VERSION:
4324 handle_get_version(op);
4325 break;
4326
4327 case CEPH_MSG_MON_SUBSCRIBE:
4328 /* FIXME: check what's being subscribed, filter accordingly */
4329 handle_subscribe(op);
4330 break;
4331
4332 default:
4333 dealt_with = false;
4334 break;
4335 }
4336 if (dealt_with)
4337 return;
4338
4339 if (!op->is_src_mon()) {
4340 dout(1) << __func__ << " unexpected monitor message from"
4341 << " non-monitor entity " << op->get_req()->get_source_inst()
4342 << " " << *(op->get_req()) << " -- dropping" << dendl;
4343 goto drop;
4344 }
4345
4346 /* messages that should only be sent by another monitor */
4347 dealt_with = true;
4348 switch (op->get_req()->get_type()) {
4349
4350 case MSG_ROUTE:
4351 handle_route(op);
4352 break;
4353
4354 case MSG_MON_PROBE:
4355 handle_probe(op);
4356 break;
4357
4358 // Sync (i.e., the new slurp, but on steroids)
4359 case MSG_MON_SYNC:
4360 handle_sync(op);
4361 break;
4362 case MSG_MON_SCRUB:
4363 handle_scrub(op);
4364 break;
4365
4366 /* log acks are sent from a monitor we sent the MLog to, and are
4367 never sent by clients to us. */
4368 case MSG_LOGACK:
4369 log_client.handle_log_ack((MLogAck*)op->get_req());
4370 break;
4371
4372 // monmap
4373 case MSG_MON_JOIN:
4374 op->set_type_service();
4375 paxos_service[PAXOS_MONMAP]->dispatch(op);
4376 break;
4377
4378 // paxos
4379 case MSG_MON_PAXOS:
4380 {
4381 op->set_type_paxos();
4382 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4383 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4384 //can't send these!
4385 break;
4386 }
4387
4388 if (state == STATE_SYNCHRONIZING) {
4389 // we are synchronizing. These messages would do us no
4390 // good, thus just drop them and ignore them.
4391 dout(10) << __func__ << " ignore paxos msg from "
4392 << pm->get_source_inst() << dendl;
4393 break;
4394 }
4395
4396 // sanitize
4397 if (pm->epoch > get_epoch()) {
4398 bootstrap();
4399 break;
4400 }
4401 if (pm->epoch != get_epoch()) {
4402 break;
4403 }
4404
4405 paxos->dispatch(op);
4406 }
4407 break;
4408
4409 // elector messages
4410 case MSG_MON_ELECTION:
4411 op->set_type_election();
4412 //check privileges here for simplicity
4413 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4414 dout(0) << "MMonElection received from entity without enough caps!"
4415 << op->get_session()->caps << dendl;
4416 break;
4417 }
4418 if (!is_probing() && !is_synchronizing()) {
4419 elector.dispatch(op);
4420 }
4421 break;
4422
4423 case MSG_FORWARD:
4424 handle_forward(op);
4425 break;
4426
4427 case MSG_TIMECHECK:
4428 handle_timecheck(op);
4429 break;
4430
4431 case MSG_MON_HEALTH:
4432 health_monitor->dispatch(op);
4433 break;
4434
4435 case MSG_MON_HEALTH_CHECKS:
4436 op->set_type_service();
4437 paxos_service[PAXOS_HEALTH]->dispatch(op);
4438 break;
4439
4440 default:
4441 dealt_with = false;
4442 break;
4443 }
4444 if (!dealt_with) {
4445 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4446 goto drop;
4447 }
4448 return;
4449
4450 drop:
4451 return;
4452 }
4453
4454 void Monitor::handle_ping(MonOpRequestRef op)
4455 {
4456 MPing *m = static_cast<MPing*>(op->get_req());
4457 dout(10) << __func__ << " " << *m << dendl;
4458 MPing *reply = new MPing;
4459 entity_inst_t inst = m->get_source_inst();
4460 bufferlist payload;
4461 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4462 f->open_object_section("pong");
4463
4464 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
4465 get_health_status(false, f.get(), nullptr);
4466 } else {
4467 list<string> health_str;
4468 get_health(health_str, nullptr, f.get());
4469 }
4470
4471 {
4472 stringstream ss;
4473 get_mon_status(f.get(), ss);
4474 }
4475
4476 f->close_section();
4477 stringstream ss;
4478 f->flush(ss);
4479 ::encode(ss.str(), payload);
4480 reply->set_payload(payload);
4481 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4482 messenger->send_message(reply, inst);
4483 }
4484
4485 void Monitor::timecheck_start()
4486 {
4487 dout(10) << __func__ << dendl;
4488 timecheck_cleanup();
4489 timecheck_start_round();
4490 }
4491
4492 void Monitor::timecheck_finish()
4493 {
4494 dout(10) << __func__ << dendl;
4495 timecheck_cleanup();
4496 }
4497
4498 void Monitor::timecheck_start_round()
4499 {
4500 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4501 assert(is_leader());
4502
4503 if (monmap->size() == 1) {
4504 assert(0 == "We are alone; this shouldn't have been scheduled!");
4505 return;
4506 }
4507
4508 if (timecheck_round % 2) {
4509 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4510 utime_t curr_time = ceph_clock_now();
4511 double max = g_conf->mon_timecheck_interval*3;
4512 if (curr_time - timecheck_round_start < max) {
4513 dout(10) << __func__ << " keep current round going" << dendl;
4514 goto out;
4515 } else {
4516 dout(10) << __func__
4517 << " finish current timecheck and start new" << dendl;
4518 timecheck_cancel_round();
4519 }
4520 }
4521
4522 assert(timecheck_round % 2 == 0);
4523 timecheck_acks = 0;
4524 timecheck_round ++;
4525 timecheck_round_start = ceph_clock_now();
4526 dout(10) << __func__ << " new " << timecheck_round << dendl;
4527
4528 timecheck();
4529 out:
4530 dout(10) << __func__ << " setting up next event" << dendl;
4531 timecheck_reset_event();
4532 }
4533
4534 void Monitor::timecheck_finish_round(bool success)
4535 {
4536 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4537 assert(timecheck_round % 2);
4538 timecheck_round ++;
4539 timecheck_round_start = utime_t();
4540
4541 if (success) {
4542 assert(timecheck_waiting.empty());
4543 assert(timecheck_acks == quorum.size());
4544 timecheck_report();
4545 timecheck_check_skews();
4546 return;
4547 }
4548
4549 dout(10) << __func__ << " " << timecheck_waiting.size()
4550 << " peers still waiting:";
4551 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4552 p != timecheck_waiting.end(); ++p) {
4553 *_dout << " " << p->first.name;
4554 }
4555 *_dout << dendl;
4556 timecheck_waiting.clear();
4557
4558 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4559 }
4560
4561 void Monitor::timecheck_cancel_round()
4562 {
4563 timecheck_finish_round(false);
4564 }
4565
4566 void Monitor::timecheck_cleanup()
4567 {
4568 timecheck_round = 0;
4569 timecheck_acks = 0;
4570 timecheck_round_start = utime_t();
4571
4572 if (timecheck_event) {
4573 timer.cancel_event(timecheck_event);
4574 timecheck_event = NULL;
4575 }
4576 timecheck_waiting.clear();
4577 timecheck_skews.clear();
4578 timecheck_latencies.clear();
4579
4580 timecheck_rounds_since_clean = 0;
4581 }
4582
4583 void Monitor::timecheck_reset_event()
4584 {
4585 if (timecheck_event) {
4586 timer.cancel_event(timecheck_event);
4587 timecheck_event = NULL;
4588 }
4589
4590 double delay =
4591 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4592
4593 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4594 delay = cct->_conf->mon_timecheck_interval;
4595 }
4596
4597 dout(10) << __func__ << " delay " << delay
4598 << " rounds_since_clean " << timecheck_rounds_since_clean
4599 << dendl;
4600
4601 timecheck_event = timer.add_event_after(
4602 delay,
4603 new C_MonContext(this, [this](int) {
4604 timecheck_start_round();
4605 }));
4606 }
4607
4608 void Monitor::timecheck_check_skews()
4609 {
4610 dout(10) << __func__ << dendl;
4611 assert(is_leader());
4612 assert((timecheck_round % 2) == 0);
4613 if (monmap->size() == 1) {
4614 assert(0 == "We are alone; we shouldn't have gotten here!");
4615 return;
4616 }
4617 assert(timecheck_latencies.size() == timecheck_skews.size());
4618
4619 bool found_skew = false;
4620 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4621 p != timecheck_skews.end(); ++p) {
4622
4623 double abs_skew;
4624 if (timecheck_has_skew(p->second, &abs_skew)) {
4625 dout(10) << __func__
4626 << " " << p->first << " skew " << abs_skew << dendl;
4627 found_skew = true;
4628 }
4629 }
4630
4631 if (found_skew) {
4632 ++timecheck_rounds_since_clean;
4633 timecheck_reset_event();
4634 } else if (timecheck_rounds_since_clean > 0) {
4635 dout(1) << __func__
4636 << " no clock skews found after " << timecheck_rounds_since_clean
4637 << " rounds" << dendl;
4638 // make sure the skews are really gone and not just a transient success
4639 // this will run just once if not in the presence of skews again.
4640 timecheck_rounds_since_clean = 1;
4641 timecheck_reset_event();
4642 timecheck_rounds_since_clean = 0;
4643 }
4644
4645 }
4646
4647 void Monitor::timecheck_report()
4648 {
4649 dout(10) << __func__ << dendl;
4650 assert(is_leader());
4651 assert((timecheck_round % 2) == 0);
4652 if (monmap->size() == 1) {
4653 assert(0 == "We are alone; we shouldn't have gotten here!");
4654 return;
4655 }
4656
4657 assert(timecheck_latencies.size() == timecheck_skews.size());
4658 bool do_output = true; // only output report once
4659 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4660 if (monmap->get_name(*q) == name)
4661 continue;
4662
4663 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4664 m->epoch = get_epoch();
4665 m->round = timecheck_round;
4666
4667 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4668 it != timecheck_skews.end(); ++it) {
4669 double skew = it->second;
4670 double latency = timecheck_latencies[it->first];
4671
4672 m->skews[it->first] = skew;
4673 m->latencies[it->first] = latency;
4674
4675 if (do_output) {
4676 dout(25) << __func__ << " " << it->first
4677 << " latency " << latency
4678 << " skew " << skew << dendl;
4679 }
4680 }
4681 do_output = false;
4682 entity_inst_t inst = monmap->get_inst(*q);
4683 dout(10) << __func__ << " send report to " << inst << dendl;
4684 messenger->send_message(m, inst);
4685 }
4686 }
4687
4688 void Monitor::timecheck()
4689 {
4690 dout(10) << __func__ << dendl;
4691 assert(is_leader());
4692 if (monmap->size() == 1) {
4693 assert(0 == "We are alone; we shouldn't have gotten here!");
4694 return;
4695 }
4696 assert(timecheck_round % 2 != 0);
4697
4698 timecheck_acks = 1; // we ack ourselves
4699
4700 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4701 << " round " << timecheck_round << dendl;
4702
4703 // we are at the eye of the storm; the point of reference
4704 timecheck_skews[messenger->get_myinst()] = 0.0;
4705 timecheck_latencies[messenger->get_myinst()] = 0.0;
4706
4707 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4708 if (monmap->get_name(*it) == name)
4709 continue;
4710
4711 entity_inst_t inst = monmap->get_inst(*it);
4712 utime_t curr_time = ceph_clock_now();
4713 timecheck_waiting[inst] = curr_time;
4714 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4715 m->epoch = get_epoch();
4716 m->round = timecheck_round;
4717 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4718 messenger->send_message(m, inst);
4719 }
4720 }
4721
4722 health_status_t Monitor::timecheck_status(ostringstream &ss,
4723 const double skew_bound,
4724 const double latency)
4725 {
4726 health_status_t status = HEALTH_OK;
4727 assert(latency >= 0);
4728
4729 double abs_skew;
4730 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4731 status = HEALTH_WARN;
4732 ss << "clock skew " << abs_skew << "s"
4733 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4734 }
4735
4736 return status;
4737 }
4738
4739 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4740 {
4741 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4742 dout(10) << __func__ << " " << *m << dendl;
4743 /* handles PONG's */
4744 assert(m->op == MTimeCheck::OP_PONG);
4745
4746 entity_inst_t other = m->get_source_inst();
4747 if (m->epoch < get_epoch()) {
4748 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4749 << " from " << other
4750 << " curr " << get_epoch()
4751 << " -- severely lagged? discard" << dendl;
4752 return;
4753 }
4754 assert(m->epoch == get_epoch());
4755
4756 if (m->round < timecheck_round) {
4757 dout(1) << __func__ << " got old round " << m->round
4758 << " from " << other
4759 << " curr " << timecheck_round << " -- discard" << dendl;
4760 return;
4761 }
4762
4763 utime_t curr_time = ceph_clock_now();
4764
4765 assert(timecheck_waiting.count(other) > 0);
4766 utime_t timecheck_sent = timecheck_waiting[other];
4767 timecheck_waiting.erase(other);
4768 if (curr_time < timecheck_sent) {
4769 // our clock was readjusted -- drop everything until it all makes sense.
4770 dout(1) << __func__ << " our clock was readjusted --"
4771 << " bump round and drop current check"
4772 << dendl;
4773 timecheck_cancel_round();
4774 return;
4775 }
4776
4777 /* update peer latencies */
4778 double latency = (double)(curr_time - timecheck_sent);
4779
4780 if (timecheck_latencies.count(other) == 0)
4781 timecheck_latencies[other] = latency;
4782 else {
4783 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4784 timecheck_latencies[other] = avg_latency;
4785 }
4786
4787 /*
4788 * update skews
4789 *
4790 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4791 * and 'a' happens to be lower than 'b'; so we use double instead.
4792 *
4793 * latency is always expected to be >= 0.
4794 *
4795 * delta, the difference between theirs timestamp and ours, may either be
4796 * lower or higher than 0; will hardly ever be 0.
4797 *
4798 * The absolute skew is the absolute delta minus the latency, which is
4799 * taken as a whole instead of an rtt given that there is some queueing
4800 * and dispatch times involved and it's hard to assess how long exactly
4801 * it took for the message to travel to the other side and be handled. So
4802 * we call it a bounded skew, the worst case scenario.
4803 *
4804 * Now, to math!
4805 *
4806 * Given that the latency is always positive, we can establish that the
4807 * bounded skew will be:
4808 *
4809 * 1. positive if the absolute delta is higher than the latency and
4810 * delta is positive
4811 * 2. negative if the absolute delta is higher than the latency and
4812 * delta is negative.
4813 * 3. zero if the absolute delta is lower than the latency.
4814 *
4815 * On 3. we make a judgement call and treat the skew as non-existent.
4816 * This is because that, if the absolute delta is lower than the
4817 * latency, then the apparently existing skew is nothing more than a
4818 * side-effect of the high latency at work.
4819 *
4820 * This may not be entirely true though, as a severely skewed clock
4821 * may be masked by an even higher latency, but with high latencies
4822 * we probably have worse issues to deal with than just skewed clocks.
4823 */
4824 assert(latency >= 0);
4825
4826 double delta = ((double) m->timestamp) - ((double) curr_time);
4827 double abs_delta = (delta > 0 ? delta : -delta);
4828 double skew_bound = abs_delta - latency;
4829 if (skew_bound < 0)
4830 skew_bound = 0;
4831 else if (delta < 0)
4832 skew_bound = -skew_bound;
4833
4834 ostringstream ss;
4835 health_status_t status = timecheck_status(ss, skew_bound, latency);
4836 clog->health(status) << other << " " << ss.str();
4837
4838 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4839 << " delta " << delta << " skew_bound " << skew_bound
4840 << " latency " << latency << dendl;
4841
4842 timecheck_skews[other] = skew_bound;
4843
4844 timecheck_acks++;
4845 if (timecheck_acks == quorum.size()) {
4846 dout(10) << __func__ << " got pongs from everybody ("
4847 << timecheck_acks << " total)" << dendl;
4848 assert(timecheck_skews.size() == timecheck_acks);
4849 assert(timecheck_waiting.empty());
4850 // everyone has acked, so bump the round to finish it.
4851 timecheck_finish_round();
4852 }
4853 }
4854
4855 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4856 {
4857 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4858 dout(10) << __func__ << " " << *m << dendl;
4859
4860 assert(is_peon());
4861 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4862
4863 if (m->epoch != get_epoch()) {
4864 dout(1) << __func__ << " got wrong epoch "
4865 << "(ours " << get_epoch()
4866 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4867 return;
4868 }
4869
4870 if (m->round < timecheck_round) {
4871 dout(1) << __func__ << " got old round " << m->round
4872 << " current " << timecheck_round
4873 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4874 return;
4875 }
4876
4877 timecheck_round = m->round;
4878
4879 if (m->op == MTimeCheck::OP_REPORT) {
4880 assert((timecheck_round % 2) == 0);
4881 timecheck_latencies.swap(m->latencies);
4882 timecheck_skews.swap(m->skews);
4883 return;
4884 }
4885
4886 assert((timecheck_round % 2) != 0);
4887 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4888 utime_t curr_time = ceph_clock_now();
4889 reply->timestamp = curr_time;
4890 reply->epoch = m->epoch;
4891 reply->round = m->round;
4892 dout(10) << __func__ << " send " << *m
4893 << " to " << m->get_source_inst() << dendl;
4894 m->get_connection()->send_message(reply);
4895 }
4896
4897 void Monitor::handle_timecheck(MonOpRequestRef op)
4898 {
4899 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4900 dout(10) << __func__ << " " << *m << dendl;
4901
4902 if (is_leader()) {
4903 if (m->op != MTimeCheck::OP_PONG) {
4904 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4905 } else {
4906 handle_timecheck_leader(op);
4907 }
4908 } else if (is_peon()) {
4909 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4910 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4911 } else {
4912 handle_timecheck_peon(op);
4913 }
4914 } else {
4915 dout(1) << __func__ << " drop unexpected msg" << dendl;
4916 }
4917 }
4918
4919 void Monitor::handle_subscribe(MonOpRequestRef op)
4920 {
4921 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4922 dout(10) << "handle_subscribe " << *m << dendl;
4923
4924 bool reply = false;
4925
4926 MonSession *s = op->get_session();
4927 assert(s);
4928
4929 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4930 p != m->what.end();
4931 ++p) {
4932 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4933 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4934 reply = true;
4935
4936 // remove conflicting subscribes
4937 if (logmon()->sub_name_to_id(p->first) >= 0) {
4938 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4939 it != s->sub_map.end(); ) {
4940 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4941 Mutex::Locker l(session_map_lock);
4942 session_map.remove_sub((it++)->second);
4943 } else {
4944 ++it;
4945 }
4946 }
4947 }
4948
4949 {
4950 Mutex::Locker l(session_map_lock);
4951 session_map.add_update_sub(s, p->first, p->second.start,
4952 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4953 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4954 }
4955
4956 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4957 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4958 if ((int)s->is_capable("mds", MON_CAP_R)) {
4959 Subscription *sub = s->sub_map[p->first];
4960 assert(sub != nullptr);
4961 mdsmon()->check_sub(sub);
4962 }
4963 } else if (p->first == "osdmap") {
4964 if ((int)s->is_capable("osd", MON_CAP_R)) {
4965 if (s->osd_epoch > p->second.start) {
4966 // client needs earlier osdmaps on purpose, so reset the sent epoch
4967 s->osd_epoch = 0;
4968 }
4969 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4970 }
4971 } else if (p->first == "osd_pg_creates") {
4972 if ((int)s->is_capable("osd", MON_CAP_W)) {
4973 if (monmap->get_required_features().contains_all(
4974 ceph::features::mon::FEATURE_LUMINOUS)) {
4975 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
4976 } else {
4977 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
4978 }
4979 }
4980 } else if (p->first == "monmap") {
4981 monmon()->check_sub(s->sub_map[p->first]);
4982 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
4983 logmon()->check_sub(s->sub_map[p->first]);
4984 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
4985 mgrmon()->check_sub(s->sub_map[p->first]);
4986 } else if (p->first == "servicemap") {
4987 mgrstatmon()->check_sub(s->sub_map[p->first]);
4988 }
4989 }
4990
4991 if (reply) {
4992 // we only need to reply if the client is old enough to think it
4993 // has to send renewals.
4994 ConnectionRef con = m->get_connection();
4995 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
4996 m->get_connection()->send_message(new MMonSubscribeAck(
4997 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
4998 }
4999
5000 }
5001
5002 void Monitor::handle_get_version(MonOpRequestRef op)
5003 {
5004 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
5005 dout(10) << "handle_get_version " << *m << dendl;
5006 PaxosService *svc = NULL;
5007
5008 MonSession *s = op->get_session();
5009 assert(s);
5010
5011 if (!is_leader() && !is_peon()) {
5012 dout(10) << " waiting for quorum" << dendl;
5013 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5014 goto out;
5015 }
5016
5017 if (m->what == "mdsmap") {
5018 svc = mdsmon();
5019 } else if (m->what == "fsmap") {
5020 svc = mdsmon();
5021 } else if (m->what == "osdmap") {
5022 svc = osdmon();
5023 } else if (m->what == "monmap") {
5024 svc = monmon();
5025 } else {
5026 derr << "invalid map type " << m->what << dendl;
5027 }
5028
5029 if (svc) {
5030 if (!svc->is_readable()) {
5031 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5032 goto out;
5033 }
5034
5035 MMonGetVersionReply *reply = new MMonGetVersionReply();
5036 reply->handle = m->handle;
5037 reply->version = svc->get_last_committed();
5038 reply->oldest_version = svc->get_first_committed();
5039 reply->set_tid(m->get_tid());
5040
5041 m->get_connection()->send_message(reply);
5042 }
5043 out:
5044 return;
5045 }
5046
5047 bool Monitor::ms_handle_reset(Connection *con)
5048 {
5049 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5050
5051 // ignore lossless monitor sessions
5052 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5053 return false;
5054
5055 MonSession *s = static_cast<MonSession *>(con->get_priv());
5056 if (!s)
5057 return false;
5058
5059 // break any con <-> session ref cycle
5060 s->con->set_priv(NULL);
5061
5062 if (is_shutdown())
5063 return false;
5064
5065 Mutex::Locker l(lock);
5066
5067 dout(10) << "reset/close on session " << s->inst << dendl;
5068 if (!s->closed) {
5069 Mutex::Locker l(session_map_lock);
5070 remove_session(s);
5071 }
5072 s->put();
5073 return true;
5074 }
5075
5076 bool Monitor::ms_handle_refused(Connection *con)
5077 {
5078 // just log for now...
5079 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5080 return false;
5081 }
5082
5083 // -----
5084
5085 void Monitor::send_latest_monmap(Connection *con)
5086 {
5087 bufferlist bl;
5088 monmap->encode(bl, con->get_features());
5089 con->send_message(new MMonMap(bl));
5090 }
5091
5092 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5093 {
5094 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5095 dout(10) << "handle_mon_get_map" << dendl;
5096 send_latest_monmap(m->get_connection().get());
5097 }
5098
5099 void Monitor::handle_mon_metadata(MonOpRequestRef op)
5100 {
5101 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5102 if (is_leader()) {
5103 dout(10) << __func__ << dendl;
5104 update_mon_metadata(m->get_source().num(), std::move(m->data));
5105 }
5106 }
5107
5108 void Monitor::update_mon_metadata(int from, Metadata&& m)
5109 {
5110 // NOTE: this is now for legacy (kraken or jewel) mons only.
5111 pending_metadata[from] = std::move(m);
5112
5113 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5114 bufferlist bl;
5115 ::encode(pending_metadata, bl);
5116 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5117 paxos->trigger_propose();
5118 }
5119
5120 int Monitor::load_metadata()
5121 {
5122 bufferlist bl;
5123 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5124 if (r)
5125 return r;
5126 bufferlist::iterator it = bl.begin();
5127 ::decode(mon_metadata, it);
5128
5129 pending_metadata = mon_metadata;
5130 return 0;
5131 }
5132
5133 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5134 {
5135 assert(f);
5136 if (!mon_metadata.count(mon)) {
5137 err << "mon." << mon << " not found";
5138 return -EINVAL;
5139 }
5140 const Metadata& m = mon_metadata[mon];
5141 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5142 f->dump_string(p->first.c_str(), p->second);
5143 }
5144 return 0;
5145 }
5146
5147 void Monitor::count_metadata(const string& field, map<string,int> *out)
5148 {
5149 for (auto& p : mon_metadata) {
5150 auto q = p.second.find(field);
5151 if (q == p.second.end()) {
5152 (*out)["unknown"]++;
5153 } else {
5154 (*out)[q->second]++;
5155 }
5156 }
5157 }
5158
5159 void Monitor::count_metadata(const string& field, Formatter *f)
5160 {
5161 map<string,int> by_val;
5162 count_metadata(field, &by_val);
5163 f->open_object_section(field.c_str());
5164 for (auto& p : by_val) {
5165 f->dump_int(p.first.c_str(), p.second);
5166 }
5167 f->close_section();
5168 }
5169
5170 int Monitor::print_nodes(Formatter *f, ostream& err)
5171 {
5172 map<string, list<int> > mons; // hostname => mon
5173 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5174 it != mon_metadata.end(); ++it) {
5175 const Metadata& m = it->second;
5176 Metadata::const_iterator hostname = m.find("hostname");
5177 if (hostname == m.end()) {
5178 // not likely though
5179 continue;
5180 }
5181 mons[hostname->second].push_back(it->first);
5182 }
5183
5184 dump_services(f, mons, "mon");
5185 return 0;
5186 }
5187
5188 // ----------------------------------------------
5189 // scrub
5190
5191 int Monitor::scrub_start()
5192 {
5193 dout(10) << __func__ << dendl;
5194 assert(is_leader());
5195
5196 if (!scrub_result.empty()) {
5197 clog->info() << "scrub already in progress";
5198 return -EBUSY;
5199 }
5200
5201 scrub_event_cancel();
5202 scrub_result.clear();
5203 scrub_state.reset(new ScrubState);
5204
5205 scrub();
5206 return 0;
5207 }
5208
5209 int Monitor::scrub()
5210 {
5211 assert(is_leader());
5212 assert(scrub_state);
5213
5214 scrub_cancel_timeout();
5215 wait_for_paxos_write();
5216 scrub_version = paxos->get_version();
5217
5218
5219 // scrub all keys if we're the only monitor in the quorum
5220 int32_t num_keys =
5221 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5222
5223 for (set<int>::iterator p = quorum.begin();
5224 p != quorum.end();
5225 ++p) {
5226 if (*p == rank)
5227 continue;
5228 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5229 num_keys);
5230 r->key = scrub_state->last_key;
5231 messenger->send_message(r, monmap->get_inst(*p));
5232 }
5233
5234 // scrub my keys
5235 bool r = _scrub(&scrub_result[rank],
5236 &scrub_state->last_key,
5237 &num_keys);
5238
5239 scrub_state->finished = !r;
5240
5241 // only after we got our scrub results do we really care whether the
5242 // other monitors are late on their results. Also, this way we avoid
5243 // triggering the timeout if we end up getting stuck in _scrub() for
5244 // longer than the duration of the timeout.
5245 scrub_reset_timeout();
5246
5247 if (quorum.size() == 1) {
5248 assert(scrub_state->finished == true);
5249 scrub_finish();
5250 }
5251 return 0;
5252 }
5253
5254 void Monitor::handle_scrub(MonOpRequestRef op)
5255 {
5256 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5257 dout(10) << __func__ << " " << *m << dendl;
5258 switch (m->op) {
5259 case MMonScrub::OP_SCRUB:
5260 {
5261 if (!is_peon())
5262 break;
5263
5264 wait_for_paxos_write();
5265
5266 if (m->version != paxos->get_version())
5267 break;
5268
5269 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5270 m->version,
5271 m->num_keys);
5272
5273 reply->key = m->key;
5274 _scrub(&reply->result, &reply->key, &reply->num_keys);
5275 m->get_connection()->send_message(reply);
5276 }
5277 break;
5278
5279 case MMonScrub::OP_RESULT:
5280 {
5281 if (!is_leader())
5282 break;
5283 if (m->version != scrub_version)
5284 break;
5285 // reset the timeout each time we get a result
5286 scrub_reset_timeout();
5287
5288 int from = m->get_source().num();
5289 assert(scrub_result.count(from) == 0);
5290 scrub_result[from] = m->result;
5291
5292 if (scrub_result.size() == quorum.size()) {
5293 scrub_check_results();
5294 scrub_result.clear();
5295 if (scrub_state->finished)
5296 scrub_finish();
5297 else
5298 scrub();
5299 }
5300 }
5301 break;
5302 }
5303 }
5304
5305 bool Monitor::_scrub(ScrubResult *r,
5306 pair<string,string> *start,
5307 int *num_keys)
5308 {
5309 assert(r != NULL);
5310 assert(start != NULL);
5311 assert(num_keys != NULL);
5312
5313 set<string> prefixes = get_sync_targets_names();
5314 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5315
5316 dout(10) << __func__ << " start (" << *start << ")"
5317 << " num_keys " << *num_keys << dendl;
5318
5319 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5320
5321 int scrubbed_keys = 0;
5322 pair<string,string> last_key;
5323
5324 while (it->has_next_chunk()) {
5325
5326 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5327 break;
5328
5329 pair<string,string> k = it->get_next_key();
5330 if (prefixes.count(k.first) == 0)
5331 continue;
5332
5333 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5334 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5335 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5336 << dendl;
5337 continue;
5338 }
5339
5340 bufferlist bl;
5341 int err = store->get(k.first, k.second, bl);
5342 assert(err == 0);
5343
5344 uint32_t key_crc = bl.crc32c(0);
5345 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5346 << " crc " << key_crc << dendl;
5347 r->prefix_keys[k.first]++;
5348 if (r->prefix_crc.count(k.first) == 0) {
5349 r->prefix_crc[k.first] = 0;
5350 }
5351 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5352
5353 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5354 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5355 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5356 r->prefix_crc[k.first] += 1;
5357 }
5358
5359 ++scrubbed_keys;
5360 last_key = k;
5361 }
5362
5363 dout(20) << __func__ << " last_key (" << last_key << ")"
5364 << " scrubbed_keys " << scrubbed_keys
5365 << " has_next " << it->has_next_chunk() << dendl;
5366
5367 *start = last_key;
5368 *num_keys = scrubbed_keys;
5369
5370 return it->has_next_chunk();
5371 }
5372
5373 void Monitor::scrub_check_results()
5374 {
5375 dout(10) << __func__ << dendl;
5376
5377 // compare
5378 int errors = 0;
5379 ScrubResult& mine = scrub_result[rank];
5380 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5381 p != scrub_result.end();
5382 ++p) {
5383 if (p->first == rank)
5384 continue;
5385 if (p->second != mine) {
5386 ++errors;
5387 clog->error() << "scrub mismatch";
5388 clog->error() << " mon." << rank << " " << mine;
5389 clog->error() << " mon." << p->first << " " << p->second;
5390 }
5391 }
5392 if (!errors)
5393 clog->debug() << "scrub ok on " << quorum << ": " << mine;
5394 }
5395
5396 inline void Monitor::scrub_timeout()
5397 {
5398 dout(1) << __func__ << " restarting scrub" << dendl;
5399 scrub_reset();
5400 scrub_start();
5401 }
5402
5403 void Monitor::scrub_finish()
5404 {
5405 dout(10) << __func__ << dendl;
5406 scrub_reset();
5407 scrub_event_start();
5408 }
5409
5410 void Monitor::scrub_reset()
5411 {
5412 dout(10) << __func__ << dendl;
5413 scrub_cancel_timeout();
5414 scrub_version = 0;
5415 scrub_result.clear();
5416 scrub_state.reset();
5417 }
5418
5419 inline void Monitor::scrub_update_interval(int secs)
5420 {
5421 // we don't care about changes if we are not the leader.
5422 // changes will be visible if we become the leader.
5423 if (!is_leader())
5424 return;
5425
5426 dout(1) << __func__ << " new interval = " << secs << dendl;
5427
5428 // if scrub already in progress, all changes will already be visible during
5429 // the next round. Nothing to do.
5430 if (scrub_state != NULL)
5431 return;
5432
5433 scrub_event_cancel();
5434 scrub_event_start();
5435 }
5436
5437 void Monitor::scrub_event_start()
5438 {
5439 dout(10) << __func__ << dendl;
5440
5441 if (scrub_event)
5442 scrub_event_cancel();
5443
5444 if (cct->_conf->mon_scrub_interval <= 0) {
5445 dout(1) << __func__ << " scrub event is disabled"
5446 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5447 << ")" << dendl;
5448 return;
5449 }
5450
5451 scrub_event = timer.add_event_after(
5452 cct->_conf->mon_scrub_interval,
5453 new C_MonContext(this, [this](int) {
5454 scrub_start();
5455 }));
5456 }
5457
5458 void Monitor::scrub_event_cancel()
5459 {
5460 dout(10) << __func__ << dendl;
5461 if (scrub_event) {
5462 timer.cancel_event(scrub_event);
5463 scrub_event = NULL;
5464 }
5465 }
5466
5467 inline void Monitor::scrub_cancel_timeout()
5468 {
5469 if (scrub_timeout_event) {
5470 timer.cancel_event(scrub_timeout_event);
5471 scrub_timeout_event = NULL;
5472 }
5473 }
5474
5475 void Monitor::scrub_reset_timeout()
5476 {
5477 dout(15) << __func__ << " reset timeout event" << dendl;
5478 scrub_cancel_timeout();
5479 scrub_timeout_event = timer.add_event_after(
5480 g_conf->mon_scrub_timeout,
5481 new C_MonContext(this, [this](int) {
5482 scrub_timeout();
5483 }));
5484 }
5485
5486 /************ TICK ***************/
5487 void Monitor::new_tick()
5488 {
5489 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5490 tick();
5491 }));
5492 }
5493
5494 void Monitor::tick()
5495 {
5496 // ok go.
5497 dout(11) << "tick" << dendl;
5498 const utime_t now = ceph_clock_now();
5499
5500 // Check if we need to emit any delayed health check updated messages
5501 if (is_leader()) {
5502 const auto min_period = g_conf->get_val<int64_t>(
5503 "mon_health_log_update_period");
5504 for (auto& svc : paxos_service) {
5505 auto health = svc->get_health_checks();
5506
5507 for (const auto &i : health.checks) {
5508 const std::string &code = i.first;
5509 const std::string &summary = i.second.summary;
5510 const health_status_t severity = i.second.severity;
5511
5512 auto status_iter = health_check_log_times.find(code);
5513 if (status_iter == health_check_log_times.end()) {
5514 continue;
5515 }
5516
5517 auto &log_status = status_iter->second;
5518 bool const changed = log_status.last_message != summary
5519 || log_status.severity != severity;
5520
5521 if (changed && now - log_status.updated_at > min_period) {
5522 log_status.last_message = summary;
5523 log_status.updated_at = now;
5524 log_status.severity = severity;
5525
5526 ostringstream ss;
5527 ss << "Health check update: " << summary << " (" << code << ")";
5528 clog->health(severity) << ss.str();
5529 }
5530 }
5531 }
5532 }
5533
5534
5535 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5536 (*p)->tick();
5537 (*p)->maybe_trim();
5538 }
5539
5540 // trim sessions
5541 {
5542 Mutex::Locker l(session_map_lock);
5543 auto p = session_map.sessions.begin();
5544
5545 bool out_for_too_long = (!exited_quorum.is_zero() &&
5546 now > (exited_quorum + 2*g_conf->mon_lease));
5547
5548 while (!p.end()) {
5549 MonSession *s = *p;
5550 ++p;
5551
5552 // don't trim monitors
5553 if (s->inst.name.is_mon())
5554 continue;
5555
5556 if (s->session_timeout < now && s->con) {
5557 // check keepalive, too
5558 s->session_timeout = s->con->get_last_keepalive();
5559 s->session_timeout += g_conf->mon_session_timeout;
5560 }
5561 if (s->session_timeout < now) {
5562 dout(10) << " trimming session " << s->con << " " << s->inst
5563 << " (timeout " << s->session_timeout
5564 << " < now " << now << ")" << dendl;
5565 } else if (out_for_too_long) {
5566 // boot the client Session because we've taken too long getting back in
5567 dout(10) << " trimming session " << s->con << " " << s->inst
5568 << " because we've been out of quorum too long" << dendl;
5569 } else {
5570 continue;
5571 }
5572
5573 s->con->mark_down();
5574 remove_session(s);
5575 logger->inc(l_mon_session_trim);
5576 }
5577 }
5578 sync_trim_providers();
5579
5580 if (!maybe_wait_for_quorum.empty()) {
5581 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5582 }
5583
5584 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5585 // this is only necessary on upgraded clusters.
5586 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5587 prepare_new_fingerprint(t);
5588 paxos->trigger_propose();
5589 }
5590
5591 new_tick();
5592 }
5593
5594 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5595 {
5596 uuid_d nf;
5597 nf.generate_random();
5598 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5599
5600 bufferlist bl;
5601 ::encode(nf, bl);
5602 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5603 }
5604
5605 int Monitor::check_fsid()
5606 {
5607 bufferlist ebl;
5608 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5609 if (r == -ENOENT)
5610 return r;
5611 assert(r == 0);
5612
5613 string es(ebl.c_str(), ebl.length());
5614
5615 // only keep the first line
5616 size_t pos = es.find_first_of('\n');
5617 if (pos != string::npos)
5618 es.resize(pos);
5619
5620 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5621 uuid_d ondisk;
5622 if (!ondisk.parse(es.c_str())) {
5623 derr << "error: unable to parse uuid" << dendl;
5624 return -EINVAL;
5625 }
5626
5627 if (monmap->get_fsid() != ondisk) {
5628 derr << "error: cluster_uuid file exists with value " << ondisk
5629 << ", != our uuid " << monmap->get_fsid() << dendl;
5630 return -EEXIST;
5631 }
5632
5633 return 0;
5634 }
5635
5636 int Monitor::write_fsid()
5637 {
5638 auto t(std::make_shared<MonitorDBStore::Transaction>());
5639 write_fsid(t);
5640 int r = store->apply_transaction(t);
5641 return r;
5642 }
5643
5644 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5645 {
5646 ostringstream ss;
5647 ss << monmap->get_fsid() << "\n";
5648 string us = ss.str();
5649
5650 bufferlist b;
5651 b.append(us);
5652
5653 t->put(MONITOR_NAME, "cluster_uuid", b);
5654 return 0;
5655 }
5656
5657 /*
5658 * this is the closest thing to a traditional 'mkfs' for ceph.
5659 * initialize the monitor state machines to their initial values.
5660 */
5661 int Monitor::mkfs(bufferlist& osdmapbl)
5662 {
5663 auto t(std::make_shared<MonitorDBStore::Transaction>());
5664
5665 // verify cluster fsid
5666 int r = check_fsid();
5667 if (r < 0 && r != -ENOENT)
5668 return r;
5669
5670 bufferlist magicbl;
5671 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5672 magicbl.append("\n");
5673 t->put(MONITOR_NAME, "magic", magicbl);
5674
5675
5676 features = get_initial_supported_features();
5677 write_features(t);
5678
5679 // save monmap, osdmap, keyring.
5680 bufferlist monmapbl;
5681 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5682 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5683 t->put("mkfs", "monmap", monmapbl);
5684
5685 if (osdmapbl.length()) {
5686 // make sure it's a valid osdmap
5687 try {
5688 OSDMap om;
5689 om.decode(osdmapbl);
5690 }
5691 catch (buffer::error& e) {
5692 derr << "error decoding provided osdmap: " << e.what() << dendl;
5693 return -EINVAL;
5694 }
5695 t->put("mkfs", "osdmap", osdmapbl);
5696 }
5697
5698 if (is_keyring_required()) {
5699 KeyRing keyring;
5700 string keyring_filename;
5701
5702 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5703 if (r) {
5704 derr << "unable to find a keyring file on " << g_conf->keyring
5705 << ": " << cpp_strerror(r) << dendl;
5706 if (g_conf->key != "") {
5707 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5708 "\n\tcaps mon = \"allow *\"\n";
5709 bufferlist bl;
5710 bl.append(keyring_plaintext);
5711 try {
5712 bufferlist::iterator i = bl.begin();
5713 keyring.decode_plaintext(i);
5714 }
5715 catch (const buffer::error& e) {
5716 derr << "error decoding keyring " << keyring_plaintext
5717 << ": " << e.what() << dendl;
5718 return -EINVAL;
5719 }
5720 } else {
5721 return -ENOENT;
5722 }
5723 } else {
5724 r = keyring.load(g_ceph_context, keyring_filename);
5725 if (r < 0) {
5726 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5727 return r;
5728 }
5729 }
5730
5731 // put mon. key in external keyring; seed with everything else.
5732 extract_save_mon_key(keyring);
5733
5734 bufferlist keyringbl;
5735 keyring.encode_plaintext(keyringbl);
5736 t->put("mkfs", "keyring", keyringbl);
5737 }
5738 write_fsid(t);
5739 store->apply_transaction(t);
5740
5741 return 0;
5742 }
5743
5744 int Monitor::write_default_keyring(bufferlist& bl)
5745 {
5746 ostringstream os;
5747 os << g_conf->mon_data << "/keyring";
5748
5749 int err = 0;
5750 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5751 if (fd < 0) {
5752 err = -errno;
5753 dout(0) << __func__ << " failed to open " << os.str()
5754 << ": " << cpp_strerror(err) << dendl;
5755 return err;
5756 }
5757
5758 err = bl.write_fd(fd);
5759 if (!err)
5760 ::fsync(fd);
5761 VOID_TEMP_FAILURE_RETRY(::close(fd));
5762
5763 return err;
5764 }
5765
5766 void Monitor::extract_save_mon_key(KeyRing& keyring)
5767 {
5768 EntityName mon_name;
5769 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5770 EntityAuth mon_key;
5771 if (keyring.get_auth(mon_name, mon_key)) {
5772 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5773 KeyRing pkey;
5774 pkey.add(mon_name, mon_key);
5775 bufferlist bl;
5776 pkey.encode_plaintext(bl);
5777 write_default_keyring(bl);
5778 keyring.remove(mon_name);
5779 }
5780 }
5781
5782 bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5783 bool force_new)
5784 {
5785 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5786 << dendl;
5787
5788 if (is_shutdown())
5789 return false;
5790
5791 // we only connect to other monitors and mgr; every else connects to us.
5792 if (service_id != CEPH_ENTITY_TYPE_MON &&
5793 service_id != CEPH_ENTITY_TYPE_MGR)
5794 return false;
5795
5796 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5797 // auth_none
5798 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5799 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5800 handler.set_global_id(0);
5801 *authorizer = handler.build_authorizer(service_id);
5802 return true;
5803 }
5804
5805 CephXServiceTicketInfo auth_ticket_info;
5806 CephXSessionAuthInfo info;
5807 int ret;
5808
5809 EntityName name;
5810 name.set_type(CEPH_ENTITY_TYPE_MON);
5811 auth_ticket_info.ticket.name = name;
5812 auth_ticket_info.ticket.global_id = 0;
5813
5814 if (service_id == CEPH_ENTITY_TYPE_MON) {
5815 // mon to mon authentication uses the private monitor shared key and not the
5816 // rotating key
5817 CryptoKey secret;
5818 if (!keyring.get_secret(name, secret) &&
5819 !key_server.get_secret(name, secret)) {
5820 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5821 << dendl;
5822 stringstream ss, ds;
5823 int err = key_server.list_secrets(ds);
5824 if (err < 0)
5825 ss << "no installed auth entries!";
5826 else
5827 ss << "installed auth entries:";
5828 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5829 return false;
5830 }
5831
5832 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5833 secret, (uint64_t)-1);
5834 if (ret < 0) {
5835 dout(0) << __func__ << " failed to build mon session_auth_info "
5836 << cpp_strerror(ret) << dendl;
5837 return false;
5838 }
5839 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5840 // mgr
5841 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5842 if (ret < 0) {
5843 derr << __func__ << " failed to build mgr service session_auth_info "
5844 << cpp_strerror(ret) << dendl;
5845 return false;
5846 }
5847 } else {
5848 ceph_abort(); // see check at top of fn
5849 }
5850
5851 CephXTicketBlob blob;
5852 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5853 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5854 return false;
5855 }
5856 bufferlist ticket_data;
5857 ::encode(blob, ticket_data);
5858
5859 bufferlist::iterator iter = ticket_data.begin();
5860 CephXTicketHandler handler(g_ceph_context, service_id);
5861 ::decode(handler.ticket, iter);
5862
5863 handler.session_key = info.session_key;
5864
5865 *authorizer = handler.build_authorizer(0);
5866
5867 return true;
5868 }
5869
5870 bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5871 int protocol, bufferlist& authorizer_data,
5872 bufferlist& authorizer_reply,
5873 bool& isvalid, CryptoKey& session_key)
5874 {
5875 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5876 << " " << ceph_entity_type_name(peer_type)
5877 << " protocol " << protocol << dendl;
5878
5879 if (is_shutdown())
5880 return false;
5881
5882 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5883 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5884 // monitor, and cephx is enabled
5885 isvalid = false;
5886 if (protocol == CEPH_AUTH_CEPHX) {
5887 bufferlist::iterator iter = authorizer_data.begin();
5888 CephXServiceTicketInfo auth_ticket_info;
5889
5890 if (authorizer_data.length()) {
5891 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5892 auth_ticket_info, authorizer_reply);
5893 if (ret) {
5894 session_key = auth_ticket_info.session_key;
5895 isvalid = true;
5896 } else {
5897 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5898 }
5899 }
5900 } else {
5901 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5902 }
5903 } else {
5904 // who cares.
5905 isvalid = true;
5906 }
5907 return true;
5908 }