]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Monitor.cc
def5571c3d134275a6a8fb0be2810cbb807921a4
[ceph.git] / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <sstream>
17 #include <stdlib.h>
18 #include <signal.h>
19 #include <limits.h>
20 #include <cstring>
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
23
24 #include "Monitor.h"
25 #include "common/version.h"
26
27 #include "osd/OSDMap.h"
28
29 #include "MonitorDBStore.h"
30
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
48
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
51
52 #include "messages/MAuthReply.h"
53
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
56
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
70
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/OldHealthMonitor.h"
81 #include "mon/HealthMonitor.h"
82 #include "mon/ConfigKeyService.h"
83 #include "common/config.h"
84 #include "common/cmdparse.h"
85 #include "include/assert.h"
86 #include "include/compat.h"
87 #include "perfglue/heap_profiler.h"
88
89 #include "auth/none/AuthNoneClientHandler.h"
90
91 #define dout_subsys ceph_subsys_mon
92 #undef dout_prefix
93 #define dout_prefix _prefix(_dout, this)
94 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
95 return *_dout << "mon." << mon->name << "@" << mon->rank
96 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
97 }
98
99 const string Monitor::MONITOR_NAME = "monitor";
100 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
101
102
103 #undef FLAG
104 #undef COMMAND
105 #undef COMMAND_WITH_FLAG
106 MonCommand mon_commands[] = {
107 #define FLAG(f) (MonCommand::FLAG_##f)
108 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
109 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
110 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
111 {parsesig, helptext, modulename, req_perms, avail, flags},
112 #include <mon/MonCommands.h>
113 #undef COMMAND
114 #undef COMMAND_WITH_FLAG
115 };
116
117
118 void C_MonContext::finish(int r) {
119 if (mon->is_shutdown())
120 return;
121 FunctionContext::finish(r);
122 }
123
124 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
125 Messenger *m, Messenger *mgr_m, MonMap *map) :
126 Dispatcher(cct_),
127 name(nm),
128 rank(-1),
129 messenger(m),
130 con_self(m ? m->get_loopback_connection() : NULL),
131 lock("Monitor::lock"),
132 timer(cct_, lock),
133 finisher(cct_, "mon_finisher", "fin"),
134 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
135 has_ever_joined(false),
136 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
137 monmap(map),
138 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
139 key_server(cct, &keyring),
140 auth_cluster_required(cct,
141 cct->_conf->auth_supported.empty() ?
142 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
143 auth_service_required(cct,
144 cct->_conf->auth_supported.empty() ?
145 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
146 leader_supported_mon_commands(NULL),
147 leader_supported_mon_commands_size(0),
148 mgr_messenger(mgr_m),
149 mgr_client(cct_, mgr_m),
150 pgservice(nullptr),
151 store(s),
152
153 state(STATE_PROBING),
154
155 elector(this),
156 required_features(0),
157 leader(0),
158 quorum_con_features(0),
159 // scrub
160 scrub_version(0),
161 scrub_event(NULL),
162 scrub_timeout_event(NULL),
163
164 // sync state
165 sync_provider_count(0),
166 sync_cookie(0),
167 sync_full(false),
168 sync_start_version(0),
169 sync_timeout_event(NULL),
170 sync_last_committed_floor(0),
171
172 timecheck_round(0),
173 timecheck_acks(0),
174 timecheck_rounds_since_clean(0),
175 timecheck_event(NULL),
176
177 paxos_service(PAXOS_NUM),
178 admin_hook(NULL),
179 routed_request_tid(0),
180 op_tracker(cct, true, 1)
181 {
182 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
183 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
184
185 update_log_clients();
186
187 paxos = new Paxos(this, "paxos");
188
189 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
190 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
191 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
192 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
193 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
194 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
195 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
196 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
197 paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
198
199 health_monitor = new OldHealthMonitor(this);
200 config_key_service = new ConfigKeyService(this, paxos);
201
202 mon_caps = new MonCap();
203 bool r = mon_caps->parse("allow *", NULL);
204 assert(r);
205
206 exited_quorum = ceph_clock_now();
207
208 // assume our commands until we have an election. this only means
209 // we won't reply with EINVAL before the election; any command that
210 // actually matters will wait until we have quorum etc and then
211 // retry (and revalidate).
212 const MonCommand *cmds;
213 int cmdsize;
214 get_locally_supported_monitor_commands(&cmds, &cmdsize);
215 set_leader_supported_commands(cmds, cmdsize);
216
217 // note: OSDMonitor may update this based on the luminous flag.
218 pgservice = mgrstatmon()->get_pg_stat_service();
219 }
220
221 Monitor::~Monitor()
222 {
223 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
224 delete *p;
225 delete health_monitor;
226 delete config_key_service;
227 delete paxos;
228 assert(session_map.sessions.empty());
229 delete mon_caps;
230 if (leader_supported_mon_commands != mon_commands)
231 delete[] leader_supported_mon_commands;
232 }
233
234
235 class AdminHook : public AdminSocketHook {
236 Monitor *mon;
237 public:
238 explicit AdminHook(Monitor *m) : mon(m) {}
239 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
240 bufferlist& out) override {
241 stringstream ss;
242 mon->do_admin_command(command, cmdmap, format, ss);
243 out.append(ss);
244 return true;
245 }
246 };
247
248 void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
249 ostream& ss)
250 {
251 Mutex::Locker l(lock);
252
253 boost::scoped_ptr<Formatter> f(Formatter::create(format));
254
255 string args;
256 for (cmdmap_t::iterator p = cmdmap.begin();
257 p != cmdmap.end(); ++p) {
258 if (p->first == "prefix")
259 continue;
260 if (!args.empty())
261 args += ", ";
262 args += cmd_vartype_stringify(p->second);
263 }
264 args = "[" + args + "]";
265
266 bool read_only = (command == "mon_status" ||
267 command == "mon metadata" ||
268 command == "quorum_status" ||
269 command == "ops" ||
270 command == "sessions");
271
272 (read_only ? audit_clog->debug() : audit_clog->info())
273 << "from='admin socket' entity='admin socket' "
274 << "cmd='" << command << "' args=" << args << ": dispatch";
275
276 if (command == "mon_status") {
277 get_mon_status(f.get(), ss);
278 if (f)
279 f->flush(ss);
280 } else if (command == "quorum_status") {
281 _quorum_status(f.get(), ss);
282 } else if (command == "sync_force") {
283 string validate;
284 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
285 (validate != "--yes-i-really-mean-it")) {
286 ss << "are you SURE? this will mean the monitor store will be erased "
287 "the next time the monitor is restarted. pass "
288 "'--yes-i-really-mean-it' if you really do.";
289 goto abort;
290 }
291 sync_force(f.get(), ss);
292 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
293 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
294 goto abort;
295 } else if (command == "quorum enter") {
296 elector.start_participating();
297 start_election();
298 ss << "started responding to quorum, initiated new election";
299 } else if (command == "quorum exit") {
300 start_election();
301 elector.stop_participating();
302 ss << "stopped responding to quorum, initiated new election";
303 } else if (command == "ops") {
304 (void)op_tracker.dump_ops_in_flight(f.get());
305 if (f) {
306 f->flush(ss);
307 }
308 } else if (command == "sessions") {
309
310 if (f) {
311 f->open_array_section("sessions");
312 for (auto p : session_map.sessions) {
313 f->dump_stream("session") << *p;
314 }
315 f->close_section();
316 f->flush(ss);
317 }
318
319 } else {
320 assert(0 == "bad AdminSocket command binding");
321 }
322 (read_only ? audit_clog->debug() : audit_clog->info())
323 << "from='admin socket' "
324 << "entity='admin socket' "
325 << "cmd=" << command << " "
326 << "args=" << args << ": finished";
327 return;
328
329 abort:
330 (read_only ? audit_clog->debug() : audit_clog->info())
331 << "from='admin socket' "
332 << "entity='admin socket' "
333 << "cmd=" << command << " "
334 << "args=" << args << ": aborted";
335 }
336
337 void Monitor::handle_signal(int signum)
338 {
339 assert(signum == SIGINT || signum == SIGTERM);
340 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
341 shutdown();
342 }
343
344 CompatSet Monitor::get_initial_supported_features()
345 {
346 CompatSet::FeatureSet ceph_mon_feature_compat;
347 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
348 CompatSet::FeatureSet ceph_mon_feature_incompat;
349 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
350 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
351 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
352 ceph_mon_feature_incompat);
353 }
354
355 CompatSet Monitor::get_supported_features()
356 {
357 CompatSet compat = get_initial_supported_features();
358 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
359 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
360 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
361 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
362 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
363 return compat;
364 }
365
366 CompatSet Monitor::get_legacy_features()
367 {
368 CompatSet::FeatureSet ceph_mon_feature_compat;
369 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
370 CompatSet::FeatureSet ceph_mon_feature_incompat;
371 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
372 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
373 ceph_mon_feature_incompat);
374 }
375
376 int Monitor::check_features(MonitorDBStore *store)
377 {
378 CompatSet required = get_supported_features();
379 CompatSet ondisk;
380
381 read_features_off_disk(store, &ondisk);
382
383 if (!required.writeable(ondisk)) {
384 CompatSet diff = required.unsupported(ondisk);
385 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
386 return -EPERM;
387 }
388
389 return 0;
390 }
391
392 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
393 {
394 bufferlist featuresbl;
395 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
396 if (featuresbl.length() == 0) {
397 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
398 << "Assuming it is old-style and introducing one." << dendl;
399 //we only want the baseline ~v.18 features assumed to be on disk.
400 //If new features are introduced this code needs to disappear or
401 //be made smarter.
402 *features = get_legacy_features();
403
404 features->encode(featuresbl);
405 auto t(std::make_shared<MonitorDBStore::Transaction>());
406 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
407 store->apply_transaction(t);
408 } else {
409 bufferlist::iterator it = featuresbl.begin();
410 features->decode(it);
411 }
412 }
413
414 void Monitor::read_features()
415 {
416 read_features_off_disk(store, &features);
417 dout(10) << "features " << features << dendl;
418
419 calc_quorum_requirements();
420 dout(10) << "required_features " << required_features << dendl;
421 }
422
423 void Monitor::write_features(MonitorDBStore::TransactionRef t)
424 {
425 bufferlist bl;
426 features.encode(bl);
427 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
428 }
429
430 const char** Monitor::get_tracked_conf_keys() const
431 {
432 static const char* KEYS[] = {
433 "crushtool", // helpful for testing
434 "mon_election_timeout",
435 "mon_lease",
436 "mon_lease_renew_interval_factor",
437 "mon_lease_ack_timeout_factor",
438 "mon_accept_timeout_factor",
439 // clog & admin clog
440 "clog_to_monitors",
441 "clog_to_syslog",
442 "clog_to_syslog_facility",
443 "clog_to_syslog_level",
444 "clog_to_graylog",
445 "clog_to_graylog_host",
446 "clog_to_graylog_port",
447 "host",
448 "fsid",
449 // periodic health to clog
450 "mon_health_to_clog",
451 "mon_health_to_clog_interval",
452 "mon_health_to_clog_tick_interval",
453 // scrub interval
454 "mon_scrub_interval",
455 NULL
456 };
457 return KEYS;
458 }
459
460 void Monitor::handle_conf_change(const struct md_config_t *conf,
461 const std::set<std::string> &changed)
462 {
463 sanitize_options();
464
465 dout(10) << __func__ << " " << changed << dendl;
466
467 if (changed.count("clog_to_monitors") ||
468 changed.count("clog_to_syslog") ||
469 changed.count("clog_to_syslog_level") ||
470 changed.count("clog_to_syslog_facility") ||
471 changed.count("clog_to_graylog") ||
472 changed.count("clog_to_graylog_host") ||
473 changed.count("clog_to_graylog_port") ||
474 changed.count("host") ||
475 changed.count("fsid")) {
476 update_log_clients();
477 }
478
479 if (changed.count("mon_health_to_clog") ||
480 changed.count("mon_health_to_clog_interval") ||
481 changed.count("mon_health_to_clog_tick_interval")) {
482 health_to_clog_update_conf(changed);
483 }
484
485 if (changed.count("mon_scrub_interval")) {
486 scrub_update_interval(conf->mon_scrub_interval);
487 }
488 }
489
490 void Monitor::update_log_clients()
491 {
492 map<string,string> log_to_monitors;
493 map<string,string> log_to_syslog;
494 map<string,string> log_channel;
495 map<string,string> log_prio;
496 map<string,string> log_to_graylog;
497 map<string,string> log_to_graylog_host;
498 map<string,string> log_to_graylog_port;
499 uuid_d fsid;
500 string host;
501
502 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
503 log_channel, log_prio, log_to_graylog,
504 log_to_graylog_host, log_to_graylog_port,
505 fsid, host))
506 return;
507
508 clog->update_config(log_to_monitors, log_to_syslog,
509 log_channel, log_prio, log_to_graylog,
510 log_to_graylog_host, log_to_graylog_port,
511 fsid, host);
512
513 audit_clog->update_config(log_to_monitors, log_to_syslog,
514 log_channel, log_prio, log_to_graylog,
515 log_to_graylog_host, log_to_graylog_port,
516 fsid, host);
517 }
518
519 int Monitor::sanitize_options()
520 {
521 int r = 0;
522
523 // mon_lease must be greater than mon_lease_renewal; otherwise we
524 // may incur in leases expiring before they are renewed.
525 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
526 clog->error() << "mon_lease_renew_interval_factor ("
527 << g_conf->mon_lease_renew_interval_factor
528 << ") must be less than 1.0";
529 r = -EINVAL;
530 }
531
532 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
533 // got time to renew the lease and get an ack for it. Having both options
534 // with the same value, for a given small vale, could mean timing out if
535 // the monitors happened to be overloaded -- or even under normal load for
536 // a small enough value.
537 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
538 clog->error() << "mon_lease_ack_timeout_factor ("
539 << g_conf->mon_lease_ack_timeout_factor
540 << ") must be greater than 1.0";
541 r = -EINVAL;
542 }
543
544 return r;
545 }
546
547 int Monitor::preinit()
548 {
549 lock.Lock();
550
551 dout(1) << "preinit fsid " << monmap->fsid << dendl;
552
553 int r = sanitize_options();
554 if (r < 0) {
555 derr << "option sanitization failed!" << dendl;
556 lock.Unlock();
557 return r;
558 }
559
560 assert(!logger);
561 {
562 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
563 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess");
564 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd");
565 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm");
566 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions");
567 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in");
568 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started");
569 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won");
570 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost");
571 logger = pcb.create_perf_counters();
572 cct->get_perfcounters_collection()->add(logger);
573 }
574
575 assert(!cluster_logger);
576 {
577 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
578 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
579 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
580 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
581 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
582 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
583 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
584 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
585 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
586 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
587 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
588 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
589 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
590 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
591 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
592 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
593 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
594 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
595 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
596 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
597 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
598 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
599 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
600 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
601 cluster_logger = pcb.create_perf_counters();
602 }
603
604 paxos->init_logger();
605
606 // verify cluster_uuid
607 {
608 int r = check_fsid();
609 if (r == -ENOENT)
610 r = write_fsid();
611 if (r < 0) {
612 lock.Unlock();
613 return r;
614 }
615 }
616
617 // open compatset
618 read_features();
619
620 // have we ever joined a quorum?
621 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
622 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
623
624 if (!has_ever_joined) {
625 // impose initial quorum restrictions?
626 list<string> initial_members;
627 get_str_list(g_conf->mon_initial_members, initial_members);
628
629 if (!initial_members.empty()) {
630 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
631
632 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
633 &extra_probe_peers);
634
635 dout(10) << " monmap is " << *monmap << dendl;
636 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
637 }
638 } else if (!monmap->contains(name)) {
639 derr << "not in monmap and have been in a quorum before; "
640 << "must have been removed" << dendl;
641 if (g_conf->mon_force_quorum_join) {
642 dout(0) << "we should have died but "
643 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
644 } else {
645 derr << "commit suicide!" << dendl;
646 lock.Unlock();
647 return -ENOENT;
648 }
649 }
650
651 {
652 // We have a potentially inconsistent store state in hands. Get rid of it
653 // and start fresh.
654 bool clear_store = false;
655 if (store->exists("mon_sync", "in_sync")) {
656 dout(1) << __func__ << " clean up potentially inconsistent store state"
657 << dendl;
658 clear_store = true;
659 }
660
661 if (store->get("mon_sync", "force_sync") > 0) {
662 dout(1) << __func__ << " force sync by clearing store state" << dendl;
663 clear_store = true;
664 }
665
666 if (clear_store) {
667 set<string> sync_prefixes = get_sync_targets_names();
668 store->clear(sync_prefixes);
669 }
670 }
671
672 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
673 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
674
675 init_paxos();
676 health_monitor->init();
677
678 if (is_keyring_required()) {
679 // we need to bootstrap authentication keys so we can form an
680 // initial quorum.
681 if (authmon()->get_last_committed() == 0) {
682 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
683 bufferlist bl;
684 int err = store->get("mkfs", "keyring", bl);
685 if (err == 0 && bl.length() > 0) {
686 // Attempt to decode and extract keyring only if it is found.
687 KeyRing keyring;
688 bufferlist::iterator p = bl.begin();
689 ::decode(keyring, p);
690 extract_save_mon_key(keyring);
691 }
692 }
693
694 string keyring_loc = g_conf->mon_data + "/keyring";
695
696 r = keyring.load(cct, keyring_loc);
697 if (r < 0) {
698 EntityName mon_name;
699 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
700 EntityAuth mon_key;
701 if (key_server.get_auth(mon_name, mon_key)) {
702 dout(1) << "copying mon. key from old db to external keyring" << dendl;
703 keyring.add(mon_name, mon_key);
704 bufferlist bl;
705 keyring.encode_plaintext(bl);
706 write_default_keyring(bl);
707 } else {
708 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
709 lock.Unlock();
710 return r;
711 }
712 }
713 }
714
715 admin_hook = new AdminHook(this);
716 AdminSocket* admin_socket = cct->get_admin_socket();
717
718 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
719 lock.Unlock();
720 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
721 "show current monitor status");
722 assert(r == 0);
723 r = admin_socket->register_command("quorum_status", "quorum_status",
724 admin_hook, "show current quorum status");
725 assert(r == 0);
726 r = admin_socket->register_command("sync_force",
727 "sync_force name=validate,"
728 "type=CephChoices,"
729 "strings=--yes-i-really-mean-it",
730 admin_hook,
731 "force sync of and clear monitor store");
732 assert(r == 0);
733 r = admin_socket->register_command("add_bootstrap_peer_hint",
734 "add_bootstrap_peer_hint name=addr,"
735 "type=CephIPAddr",
736 admin_hook,
737 "add peer address as potential bootstrap"
738 " peer for cluster bringup");
739 assert(r == 0);
740 r = admin_socket->register_command("quorum enter", "quorum enter",
741 admin_hook,
742 "force monitor back into quorum");
743 assert(r == 0);
744 r = admin_socket->register_command("quorum exit", "quorum exit",
745 admin_hook,
746 "force monitor out of the quorum");
747 assert(r == 0);
748 r = admin_socket->register_command("ops",
749 "ops",
750 admin_hook,
751 "show the ops currently in flight");
752 assert(r == 0);
753 r = admin_socket->register_command("sessions",
754 "sessions",
755 admin_hook,
756 "list existing sessions");
757 assert(r == 0);
758
759 lock.Lock();
760
761 // add ourselves as a conf observer
762 g_conf->add_observer(this);
763
764 lock.Unlock();
765 return 0;
766 }
767
768 int Monitor::init()
769 {
770 dout(2) << "init" << dendl;
771 Mutex::Locker l(lock);
772
773 finisher.start();
774
775 // start ticker
776 timer.init();
777 new_tick();
778
779 cpu_tp.start();
780
781 // i'm ready!
782 messenger->add_dispatcher_tail(this);
783
784 mgr_client.init();
785 mgr_messenger->add_dispatcher_tail(&mgr_client);
786 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
787
788 bootstrap();
789
790 // encode command sets
791 const MonCommand *cmds;
792 int cmdsize;
793 get_locally_supported_monitor_commands(&cmds, &cmdsize);
794 MonCommand::encode_array(cmds, cmdsize, supported_commands_bl);
795
796 return 0;
797 }
798
799 void Monitor::init_paxos()
800 {
801 dout(10) << __func__ << dendl;
802 paxos->init();
803
804 // init services
805 for (int i = 0; i < PAXOS_NUM; ++i) {
806 paxos_service[i]->init();
807 }
808
809 refresh_from_paxos(NULL);
810 }
811
812 void Monitor::refresh_from_paxos(bool *need_bootstrap)
813 {
814 dout(10) << __func__ << dendl;
815
816 bufferlist bl;
817 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
818 if (r >= 0) {
819 try {
820 bufferlist::iterator p = bl.begin();
821 ::decode(fingerprint, p);
822 }
823 catch (buffer::error& e) {
824 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
825 }
826 } else {
827 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
828 }
829
830 for (int i = 0; i < PAXOS_NUM; ++i) {
831 paxos_service[i]->refresh(need_bootstrap);
832 }
833 for (int i = 0; i < PAXOS_NUM; ++i) {
834 paxos_service[i]->post_refresh();
835 }
836 load_metadata();
837 }
838
839 void Monitor::register_cluster_logger()
840 {
841 if (!cluster_logger_registered) {
842 dout(10) << "register_cluster_logger" << dendl;
843 cluster_logger_registered = true;
844 cct->get_perfcounters_collection()->add(cluster_logger);
845 } else {
846 dout(10) << "register_cluster_logger - already registered" << dendl;
847 }
848 }
849
850 void Monitor::unregister_cluster_logger()
851 {
852 if (cluster_logger_registered) {
853 dout(10) << "unregister_cluster_logger" << dendl;
854 cluster_logger_registered = false;
855 cct->get_perfcounters_collection()->remove(cluster_logger);
856 } else {
857 dout(10) << "unregister_cluster_logger - not registered" << dendl;
858 }
859 }
860
861 void Monitor::update_logger()
862 {
863 cluster_logger->set(l_cluster_num_mon, monmap->size());
864 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
865 }
866
867 void Monitor::shutdown()
868 {
869 dout(1) << "shutdown" << dendl;
870
871 lock.Lock();
872
873 wait_for_paxos_write();
874
875 state = STATE_SHUTDOWN;
876
877 g_conf->remove_observer(this);
878
879 if (admin_hook) {
880 AdminSocket* admin_socket = cct->get_admin_socket();
881 admin_socket->unregister_command("mon_status");
882 admin_socket->unregister_command("quorum_status");
883 admin_socket->unregister_command("sync_force");
884 admin_socket->unregister_command("add_bootstrap_peer_hint");
885 admin_socket->unregister_command("quorum enter");
886 admin_socket->unregister_command("quorum exit");
887 admin_socket->unregister_command("ops");
888 admin_socket->unregister_command("sessions");
889 delete admin_hook;
890 admin_hook = NULL;
891 }
892
893 elector.shutdown();
894
895 mgr_client.shutdown();
896
897 lock.Unlock();
898 finisher.wait_for_empty();
899 finisher.stop();
900 lock.Lock();
901
902 // clean up
903 paxos->shutdown();
904 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
905 (*p)->shutdown();
906 health_monitor->shutdown();
907
908 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
909 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
910
911 timer.shutdown();
912
913 cpu_tp.stop();
914
915 remove_all_sessions();
916
917 if (logger) {
918 cct->get_perfcounters_collection()->remove(logger);
919 delete logger;
920 logger = NULL;
921 }
922 if (cluster_logger) {
923 if (cluster_logger_registered)
924 cct->get_perfcounters_collection()->remove(cluster_logger);
925 delete cluster_logger;
926 cluster_logger = NULL;
927 }
928
929 log_client.shutdown();
930
931 // unlock before msgr shutdown...
932 lock.Unlock();
933
934 messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
935 mgr_messenger->shutdown();
936 }
937
938 void Monitor::wait_for_paxos_write()
939 {
940 if (paxos->is_writing() || paxos->is_writing_previous()) {
941 dout(10) << __func__ << " flushing pending write" << dendl;
942 lock.Unlock();
943 store->flush();
944 lock.Lock();
945 dout(10) << __func__ << " flushed pending write" << dendl;
946 }
947 }
948
949 void Monitor::bootstrap()
950 {
951 dout(10) << "bootstrap" << dendl;
952 wait_for_paxos_write();
953
954 sync_reset_requester();
955 unregister_cluster_logger();
956 cancel_probe_timeout();
957
958 // note my rank
959 int newrank = monmap->get_rank(messenger->get_myaddr());
960 if (newrank < 0 && rank >= 0) {
961 // was i ever part of the quorum?
962 if (has_ever_joined) {
963 dout(0) << " removed from monmap, suicide." << dendl;
964 exit(0);
965 }
966 }
967 if (newrank != rank) {
968 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
969 messenger->set_myname(entity_name_t::MON(newrank));
970 rank = newrank;
971
972 // reset all connections, or else our peers will think we are someone else.
973 messenger->mark_down_all();
974 }
975
976 // reset
977 state = STATE_PROBING;
978
979 _reset();
980
981 // sync store
982 if (g_conf->mon_compact_on_bootstrap) {
983 dout(10) << "bootstrap -- triggering compaction" << dendl;
984 store->compact();
985 dout(10) << "bootstrap -- finished compaction" << dendl;
986 }
987
988 // singleton monitor?
989 if (monmap->size() == 1 && rank == 0) {
990 win_standalone_election();
991 return;
992 }
993
994 reset_probe_timeout();
995
996 // i'm outside the quorum
997 if (monmap->contains(name))
998 outside_quorum.insert(name);
999
1000 // probe monitors
1001 dout(10) << "probing other monitors" << dendl;
1002 for (unsigned i = 0; i < monmap->size(); i++) {
1003 if ((int)i != rank)
1004 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1005 monmap->get_inst(i));
1006 }
1007 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1008 p != extra_probe_peers.end();
1009 ++p) {
1010 if (*p != messenger->get_myaddr()) {
1011 entity_inst_t i;
1012 i.name = entity_name_t::MON(-1);
1013 i.addr = *p;
1014 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1015 }
1016 }
1017 }
1018
1019 bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1020 {
1021 string addrstr;
1022 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1023 ss << "unable to parse address string value '"
1024 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1025 return false;
1026 }
1027 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1028 << addrstr << "'" << dendl;
1029
1030 entity_addr_t addr;
1031 const char *end = 0;
1032 if (!addr.parse(addrstr.c_str(), &end)) {
1033 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1034 return false;
1035 }
1036
1037 if (is_leader() || is_peon()) {
1038 ss << "mon already active; ignoring bootstrap hint";
1039 return true;
1040 }
1041
1042 if (addr.get_port() == 0)
1043 addr.set_port(CEPH_MON_PORT);
1044
1045 extra_probe_peers.insert(addr);
1046 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1047 return true;
1048 }
1049
1050 // called by bootstrap(), or on leader|peon -> electing
1051 void Monitor::_reset()
1052 {
1053 dout(10) << __func__ << dendl;
1054
1055 cancel_probe_timeout();
1056 timecheck_finish();
1057 health_events_cleanup();
1058 scrub_event_cancel();
1059
1060 leader_since = utime_t();
1061 if (!quorum.empty()) {
1062 exited_quorum = ceph_clock_now();
1063 }
1064 quorum.clear();
1065 outside_quorum.clear();
1066 quorum_feature_map.clear();
1067
1068 scrub_reset();
1069
1070 paxos->restart();
1071
1072 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1073 (*p)->restart();
1074 health_monitor->finish();
1075 }
1076
1077
1078 // -----------------------------------------------------------
1079 // sync
1080
1081 set<string> Monitor::get_sync_targets_names()
1082 {
1083 set<string> targets;
1084 targets.insert(paxos->get_name());
1085 for (int i = 0; i < PAXOS_NUM; ++i)
1086 paxos_service[i]->get_store_prefixes(targets);
1087 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1088 assert(config_key_service_ptr);
1089 config_key_service_ptr->get_store_prefixes(targets);
1090 return targets;
1091 }
1092
1093
1094 void Monitor::sync_timeout()
1095 {
1096 dout(10) << __func__ << dendl;
1097 assert(state == STATE_SYNCHRONIZING);
1098 bootstrap();
1099 }
1100
1101 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1102 {
1103 dout(1) << __func__ << dendl;
1104
1105 MonMap latest_monmap;
1106
1107 // Grab latest monmap from MonmapMonitor
1108 bufferlist monmon_bl;
1109 int err = monmon()->get_monmap(monmon_bl);
1110 if (err < 0) {
1111 if (err != -ENOENT) {
1112 derr << __func__
1113 << " something wrong happened while reading the store: "
1114 << cpp_strerror(err) << dendl;
1115 assert(0 == "error reading the store");
1116 }
1117 } else {
1118 latest_monmap.decode(monmon_bl);
1119 }
1120
1121 // Grab last backed up monmap (if any) and compare epochs
1122 if (store->exists("mon_sync", "latest_monmap")) {
1123 bufferlist backup_bl;
1124 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1125 if (err < 0) {
1126 derr << __func__
1127 << " something wrong happened while reading the store: "
1128 << cpp_strerror(err) << dendl;
1129 assert(0 == "error reading the store");
1130 }
1131 assert(backup_bl.length() > 0);
1132
1133 MonMap backup_monmap;
1134 backup_monmap.decode(backup_bl);
1135
1136 if (backup_monmap.epoch > latest_monmap.epoch)
1137 latest_monmap = backup_monmap;
1138 }
1139
1140 // Check if our current monmap's epoch is greater than the one we've
1141 // got so far.
1142 if (monmap->epoch > latest_monmap.epoch)
1143 latest_monmap = *monmap;
1144
1145 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1146
1147 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1148 }
1149
1150 void Monitor::sync_reset_requester()
1151 {
1152 dout(10) << __func__ << dendl;
1153
1154 if (sync_timeout_event) {
1155 timer.cancel_event(sync_timeout_event);
1156 sync_timeout_event = NULL;
1157 }
1158
1159 sync_provider = entity_inst_t();
1160 sync_cookie = 0;
1161 sync_full = false;
1162 sync_start_version = 0;
1163 }
1164
1165 void Monitor::sync_reset_provider()
1166 {
1167 dout(10) << __func__ << dendl;
1168 sync_providers.clear();
1169 }
1170
1171 void Monitor::sync_start(entity_inst_t &other, bool full)
1172 {
1173 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1174
1175 assert(state == STATE_PROBING ||
1176 state == STATE_SYNCHRONIZING);
1177 state = STATE_SYNCHRONIZING;
1178
1179 // make sure are not a provider for anyone!
1180 sync_reset_provider();
1181
1182 sync_full = full;
1183
1184 if (sync_full) {
1185 // stash key state, and mark that we are syncing
1186 auto t(std::make_shared<MonitorDBStore::Transaction>());
1187 sync_stash_critical_state(t);
1188 t->put("mon_sync", "in_sync", 1);
1189
1190 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1191 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1192 << sync_last_committed_floor << dendl;
1193 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1194
1195 store->apply_transaction(t);
1196
1197 assert(g_conf->mon_sync_requester_kill_at != 1);
1198
1199 // clear the underlying store
1200 set<string> targets = get_sync_targets_names();
1201 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1202 store->clear(targets);
1203
1204 // make sure paxos knows it has been reset. this prevents a
1205 // bootstrap and then different probe reply order from possibly
1206 // deciding a partial or no sync is needed.
1207 paxos->init();
1208
1209 assert(g_conf->mon_sync_requester_kill_at != 2);
1210 }
1211
1212 // assume 'other' as the leader. We will update the leader once we receive
1213 // a reply to the sync start.
1214 sync_provider = other;
1215
1216 sync_reset_timeout();
1217
1218 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1219 if (!sync_full)
1220 m->last_committed = paxos->get_version();
1221 messenger->send_message(m, sync_provider);
1222 }
1223
1224 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1225 {
1226 dout(10) << __func__ << dendl;
1227 bufferlist backup_monmap;
1228 sync_obtain_latest_monmap(backup_monmap);
1229 assert(backup_monmap.length() > 0);
1230 t->put("mon_sync", "latest_monmap", backup_monmap);
1231 }
1232
1233 void Monitor::sync_reset_timeout()
1234 {
1235 dout(10) << __func__ << dendl;
1236 if (sync_timeout_event)
1237 timer.cancel_event(sync_timeout_event);
1238 sync_timeout_event = new C_MonContext(this, [this](int) {
1239 sync_timeout();
1240 });
1241 timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
1242 }
1243
1244 void Monitor::sync_finish(version_t last_committed)
1245 {
1246 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1247
1248 assert(g_conf->mon_sync_requester_kill_at != 7);
1249
1250 if (sync_full) {
1251 // finalize the paxos commits
1252 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1253 paxos->read_and_prepare_transactions(tx, sync_start_version,
1254 last_committed);
1255 tx->put(paxos->get_name(), "last_committed", last_committed);
1256
1257 dout(30) << __func__ << " final tx dump:\n";
1258 JSONFormatter f(true);
1259 tx->dump(&f);
1260 f.flush(*_dout);
1261 *_dout << dendl;
1262
1263 store->apply_transaction(tx);
1264 }
1265
1266 assert(g_conf->mon_sync_requester_kill_at != 8);
1267
1268 auto t(std::make_shared<MonitorDBStore::Transaction>());
1269 t->erase("mon_sync", "in_sync");
1270 t->erase("mon_sync", "force_sync");
1271 t->erase("mon_sync", "last_committed_floor");
1272 store->apply_transaction(t);
1273
1274 assert(g_conf->mon_sync_requester_kill_at != 9);
1275
1276 init_paxos();
1277
1278 assert(g_conf->mon_sync_requester_kill_at != 10);
1279
1280 bootstrap();
1281 }
1282
1283 void Monitor::handle_sync(MonOpRequestRef op)
1284 {
1285 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1286 dout(10) << __func__ << " " << *m << dendl;
1287 switch (m->op) {
1288
1289 // provider ---------
1290
1291 case MMonSync::OP_GET_COOKIE_FULL:
1292 case MMonSync::OP_GET_COOKIE_RECENT:
1293 handle_sync_get_cookie(op);
1294 break;
1295 case MMonSync::OP_GET_CHUNK:
1296 handle_sync_get_chunk(op);
1297 break;
1298
1299 // client -----------
1300
1301 case MMonSync::OP_COOKIE:
1302 handle_sync_cookie(op);
1303 break;
1304
1305 case MMonSync::OP_CHUNK:
1306 case MMonSync::OP_LAST_CHUNK:
1307 handle_sync_chunk(op);
1308 break;
1309 case MMonSync::OP_NO_COOKIE:
1310 handle_sync_no_cookie(op);
1311 break;
1312
1313 default:
1314 dout(0) << __func__ << " unknown op " << m->op << dendl;
1315 assert(0 == "unknown op");
1316 }
1317 }
1318
1319 // leader
1320
1321 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1322 {
1323 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1324 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1325 m->get_connection()->send_message(reply);
1326 }
1327
1328 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1329 {
1330 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1331 if (is_synchronizing()) {
1332 _sync_reply_no_cookie(op);
1333 return;
1334 }
1335
1336 assert(g_conf->mon_sync_provider_kill_at != 1);
1337
1338 // make sure they can understand us.
1339 if ((required_features ^ m->get_connection()->get_features()) &
1340 required_features) {
1341 dout(5) << " ignoring peer mon." << m->get_source().num()
1342 << " has features " << std::hex
1343 << m->get_connection()->get_features()
1344 << " but we require " << required_features << std::dec << dendl;
1345 return;
1346 }
1347
1348 // make up a unique cookie. include election epoch (which persists
1349 // across restarts for the whole cluster) and a counter for this
1350 // process instance. there is no need to be unique *across*
1351 // monitors, though.
1352 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1353 assert(sync_providers.count(cookie) == 0);
1354
1355 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1356
1357 SyncProvider& sp = sync_providers[cookie];
1358 sp.cookie = cookie;
1359 sp.entity = m->get_source_inst();
1360 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1361
1362 set<string> sync_targets;
1363 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1364 // full scan
1365 sync_targets = get_sync_targets_names();
1366 sp.last_committed = paxos->get_version();
1367 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1368 sp.full = true;
1369 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1370 } else {
1371 // just catch up paxos
1372 sp.last_committed = m->last_committed;
1373 }
1374 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1375
1376 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1377 reply->last_committed = sp.last_committed;
1378 m->get_connection()->send_message(reply);
1379 }
1380
1381 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1382 {
1383 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1384 dout(10) << __func__ << " " << *m << dendl;
1385
1386 if (sync_providers.count(m->cookie) == 0) {
1387 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1388 _sync_reply_no_cookie(op);
1389 return;
1390 }
1391
1392 assert(g_conf->mon_sync_provider_kill_at != 2);
1393
1394 SyncProvider& sp = sync_providers[m->cookie];
1395 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1396
1397 if (sp.last_committed < paxos->get_first_committed() &&
1398 paxos->get_first_committed() > 1) {
1399 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1400 << " < our fc " << paxos->get_first_committed() << dendl;
1401 sync_providers.erase(m->cookie);
1402 _sync_reply_no_cookie(op);
1403 return;
1404 }
1405
1406 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1407 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1408
1409 int left = g_conf->mon_sync_max_payload_size;
1410 while (sp.last_committed < paxos->get_version() && left > 0) {
1411 bufferlist bl;
1412 sp.last_committed++;
1413
1414 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1415 assert(err == 0);
1416
1417 tx->put(paxos->get_name(), sp.last_committed, bl);
1418 left -= bl.length();
1419 dout(20) << __func__ << " including paxos state " << sp.last_committed
1420 << dendl;
1421 }
1422 reply->last_committed = sp.last_committed;
1423
1424 if (sp.full && left > 0) {
1425 sp.synchronizer->get_chunk_tx(tx, left);
1426 sp.last_key = sp.synchronizer->get_last_key();
1427 reply->last_key = sp.last_key;
1428 }
1429
1430 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1431 sp.last_committed < paxos->get_version()) {
1432 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1433 << " key " << sp.last_key << dendl;
1434 } else {
1435 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1436 << " key " << sp.last_key << dendl;
1437 reply->op = MMonSync::OP_LAST_CHUNK;
1438
1439 assert(g_conf->mon_sync_provider_kill_at != 3);
1440
1441 // clean up our local state
1442 sync_providers.erase(sp.cookie);
1443 }
1444
1445 ::encode(*tx, reply->chunk_bl);
1446
1447 m->get_connection()->send_message(reply);
1448 }
1449
1450 // requester
1451
1452 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1453 {
1454 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1455 dout(10) << __func__ << " " << *m << dendl;
1456 if (sync_cookie) {
1457 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1458 return;
1459 }
1460 if (m->get_source_inst() != sync_provider) {
1461 dout(10) << __func__ << " source does not match, discarding" << dendl;
1462 return;
1463 }
1464 sync_cookie = m->cookie;
1465 sync_start_version = m->last_committed;
1466
1467 sync_reset_timeout();
1468 sync_get_next_chunk();
1469
1470 assert(g_conf->mon_sync_requester_kill_at != 3);
1471 }
1472
1473 void Monitor::sync_get_next_chunk()
1474 {
1475 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1476 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1477 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1478 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1479 }
1480 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1481 messenger->send_message(r, sync_provider);
1482
1483 assert(g_conf->mon_sync_requester_kill_at != 4);
1484 }
1485
1486 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1487 {
1488 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1489 dout(10) << __func__ << " " << *m << dendl;
1490
1491 if (m->cookie != sync_cookie) {
1492 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1493 return;
1494 }
1495 if (m->get_source_inst() != sync_provider) {
1496 dout(10) << __func__ << " source does not match, discarding" << dendl;
1497 return;
1498 }
1499
1500 assert(state == STATE_SYNCHRONIZING);
1501 assert(g_conf->mon_sync_requester_kill_at != 5);
1502
1503 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1504 tx->append_from_encoded(m->chunk_bl);
1505
1506 dout(30) << __func__ << " tx dump:\n";
1507 JSONFormatter f(true);
1508 tx->dump(&f);
1509 f.flush(*_dout);
1510 *_dout << dendl;
1511
1512 store->apply_transaction(tx);
1513
1514 assert(g_conf->mon_sync_requester_kill_at != 6);
1515
1516 if (!sync_full) {
1517 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1518 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1519 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1520 m->last_committed);
1521 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1522
1523 dout(30) << __func__ << " tx dump:\n";
1524 JSONFormatter f(true);
1525 tx->dump(&f);
1526 f.flush(*_dout);
1527 *_dout << dendl;
1528
1529 store->apply_transaction(tx);
1530 paxos->init(); // to refresh what we just wrote
1531 }
1532
1533 if (m->op == MMonSync::OP_CHUNK) {
1534 sync_reset_timeout();
1535 sync_get_next_chunk();
1536 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1537 sync_finish(m->last_committed);
1538 }
1539 }
1540
1541 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1542 {
1543 dout(10) << __func__ << dendl;
1544 bootstrap();
1545 }
1546
1547 void Monitor::sync_trim_providers()
1548 {
1549 dout(20) << __func__ << dendl;
1550
1551 utime_t now = ceph_clock_now();
1552 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1553 while (p != sync_providers.end()) {
1554 if (now > p->second.timeout) {
1555 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1556 sync_providers.erase(p++);
1557 } else {
1558 ++p;
1559 }
1560 }
1561 }
1562
1563 // ---------------------------------------------------
1564 // probe
1565
1566 void Monitor::cancel_probe_timeout()
1567 {
1568 if (probe_timeout_event) {
1569 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1570 timer.cancel_event(probe_timeout_event);
1571 probe_timeout_event = NULL;
1572 } else {
1573 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1574 }
1575 }
1576
1577 void Monitor::reset_probe_timeout()
1578 {
1579 cancel_probe_timeout();
1580 probe_timeout_event = new C_MonContext(this, [this](int r) {
1581 probe_timeout(r);
1582 });
1583 double t = g_conf->mon_probe_timeout;
1584 timer.add_event_after(t, probe_timeout_event);
1585 dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
1586 }
1587
1588 void Monitor::probe_timeout(int r)
1589 {
1590 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1591 assert(is_probing() || is_synchronizing());
1592 assert(probe_timeout_event);
1593 probe_timeout_event = NULL;
1594 bootstrap();
1595 }
1596
1597 void Monitor::handle_probe(MonOpRequestRef op)
1598 {
1599 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1600 dout(10) << "handle_probe " << *m << dendl;
1601
1602 if (m->fsid != monmap->fsid) {
1603 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1604 return;
1605 }
1606
1607 switch (m->op) {
1608 case MMonProbe::OP_PROBE:
1609 handle_probe_probe(op);
1610 break;
1611
1612 case MMonProbe::OP_REPLY:
1613 handle_probe_reply(op);
1614 break;
1615
1616 case MMonProbe::OP_MISSING_FEATURES:
1617 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1618 << ", required " << m->required_features
1619 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1620 << dendl;
1621 break;
1622 }
1623 }
1624
1625 void Monitor::handle_probe_probe(MonOpRequestRef op)
1626 {
1627 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1628
1629 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1630 << " features " << m->get_connection()->get_features() << dendl;
1631 uint64_t missing = required_features & ~m->get_connection()->get_features();
1632 if (missing) {
1633 dout(1) << " peer " << m->get_source_addr() << " missing features "
1634 << missing << dendl;
1635 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1636 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1637 name, has_ever_joined);
1638 m->required_features = required_features;
1639 m->get_connection()->send_message(r);
1640 }
1641 goto out;
1642 }
1643
1644 if (!is_probing() && !is_synchronizing()) {
1645 // If the probing mon is way ahead of us, we need to re-bootstrap.
1646 // Normally we capture this case when we initially bootstrap, but
1647 // it is possible we pass those checks (we overlap with
1648 // quorum-to-be) but fail to join a quorum before it moves past
1649 // us. We need to be kicked back to bootstrap so we can
1650 // synchonize, not keep calling elections.
1651 if (paxos->get_version() + 1 < m->paxos_first_version) {
1652 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1653 << "ahead of us, re-bootstrapping" << dendl;
1654 bootstrap();
1655 goto out;
1656
1657 }
1658 }
1659
1660 MMonProbe *r;
1661 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1662 r->name = name;
1663 r->quorum = quorum;
1664 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1665 r->paxos_first_version = paxos->get_first_committed();
1666 r->paxos_last_version = paxos->get_version();
1667 m->get_connection()->send_message(r);
1668
1669 // did we discover a peer here?
1670 if (!monmap->contains(m->get_source_addr())) {
1671 dout(1) << " adding peer " << m->get_source_addr()
1672 << " to list of hints" << dendl;
1673 extra_probe_peers.insert(m->get_source_addr());
1674 }
1675
1676 out:
1677 return;
1678 }
1679
1680 void Monitor::handle_probe_reply(MonOpRequestRef op)
1681 {
1682 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1683 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1684 dout(10) << " monmap is " << *monmap << dendl;
1685
1686 // discover name and addrs during probing or electing states.
1687 if (!is_probing() && !is_electing()) {
1688 return;
1689 }
1690
1691 // newer map, or they've joined a quorum and we haven't?
1692 bufferlist mybl;
1693 monmap->encode(mybl, m->get_connection()->get_features());
1694 // make sure it's actually different; the checks below err toward
1695 // taking the other guy's map, which could cause us to loop.
1696 if (!mybl.contents_equal(m->monmap_bl)) {
1697 MonMap *newmap = new MonMap;
1698 newmap->decode(m->monmap_bl);
1699 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1700 !has_ever_joined)) {
1701 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1702 << ", mine was " << monmap->get_epoch() << dendl;
1703 delete newmap;
1704 monmap->decode(m->monmap_bl);
1705
1706 bootstrap();
1707 return;
1708 }
1709 delete newmap;
1710 }
1711
1712 // rename peer?
1713 string peer_name = monmap->get_name(m->get_source_addr());
1714 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1715 dout(10) << " renaming peer " << m->get_source_addr() << " "
1716 << peer_name << " -> " << m->name << " in my monmap"
1717 << dendl;
1718 monmap->rename(peer_name, m->name);
1719
1720 if (is_electing()) {
1721 bootstrap();
1722 return;
1723 }
1724 } else {
1725 dout(10) << " peer name is " << peer_name << dendl;
1726 }
1727
1728 // new initial peer?
1729 if (monmap->get_epoch() == 0 &&
1730 monmap->contains(m->name) &&
1731 monmap->get_addr(m->name).is_blank_ip()) {
1732 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1733 monmap->set_addr(m->name, m->get_source_addr());
1734
1735 bootstrap();
1736 return;
1737 }
1738
1739 // end discover phase
1740 if (!is_probing()) {
1741 return;
1742 }
1743
1744 assert(paxos != NULL);
1745
1746 if (is_synchronizing()) {
1747 dout(10) << " currently syncing" << dendl;
1748 return;
1749 }
1750
1751 entity_inst_t other = m->get_source_inst();
1752
1753 if (m->paxos_last_version < sync_last_committed_floor) {
1754 dout(10) << " peer paxos versions [" << m->paxos_first_version
1755 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1756 << sync_last_committed_floor << ", ignoring"
1757 << dendl;
1758 } else {
1759 if (paxos->get_version() < m->paxos_first_version &&
1760 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1761 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1762 << "," << m->paxos_last_version << "]"
1763 << " vs my version " << paxos->get_version()
1764 << " (too far ahead)"
1765 << dendl;
1766 cancel_probe_timeout();
1767 sync_start(other, true);
1768 return;
1769 }
1770 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1771 dout(10) << " peer paxos last version " << m->paxos_last_version
1772 << " vs my version " << paxos->get_version()
1773 << " (too far ahead)"
1774 << dendl;
1775 cancel_probe_timeout();
1776 sync_start(other, false);
1777 return;
1778 }
1779 }
1780
1781 // is there an existing quorum?
1782 if (m->quorum.size()) {
1783 dout(10) << " existing quorum " << m->quorum << dendl;
1784
1785 dout(10) << " peer paxos version " << m->paxos_last_version
1786 << " vs my version " << paxos->get_version()
1787 << " (ok)"
1788 << dendl;
1789
1790 if (monmap->contains(name) &&
1791 !monmap->get_addr(name).is_blank_ip()) {
1792 // i'm part of the cluster; just initiate a new election
1793 start_election();
1794 } else {
1795 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1796 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1797 monmap->get_inst(*m->quorum.begin()));
1798 }
1799 } else {
1800 if (monmap->contains(m->name)) {
1801 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1802 outside_quorum.insert(m->name);
1803 } else {
1804 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1805 return;
1806 }
1807
1808 unsigned need = monmap->size() / 2 + 1;
1809 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1810 if (outside_quorum.size() >= need) {
1811 if (outside_quorum.count(name)) {
1812 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1813 start_election();
1814 } else {
1815 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1816 }
1817 } else {
1818 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1819 }
1820 }
1821 }
1822
1823 void Monitor::join_election()
1824 {
1825 dout(10) << __func__ << dendl;
1826 wait_for_paxos_write();
1827 _reset();
1828 state = STATE_ELECTING;
1829
1830 logger->inc(l_mon_num_elections);
1831 }
1832
1833 void Monitor::start_election()
1834 {
1835 dout(10) << "start_election" << dendl;
1836 wait_for_paxos_write();
1837 _reset();
1838 state = STATE_ELECTING;
1839
1840 logger->inc(l_mon_num_elections);
1841 logger->inc(l_mon_election_call);
1842
1843 clog->info() << "mon." << name << " calling new monitor election";
1844 elector.call_election();
1845 }
1846
1847 void Monitor::win_standalone_election()
1848 {
1849 dout(1) << "win_standalone_election" << dendl;
1850
1851 // bump election epoch, in case the previous epoch included other
1852 // monitors; we need to be able to make the distinction.
1853 elector.init();
1854 elector.advance_epoch();
1855
1856 rank = monmap->get_rank(name);
1857 assert(rank == 0);
1858 set<int> q;
1859 q.insert(rank);
1860
1861 map<int,Metadata> metadata;
1862 collect_metadata(&metadata[0]);
1863
1864 const MonCommand *my_cmds = nullptr;
1865 int cmdsize = 0;
1866 get_locally_supported_monitor_commands(&my_cmds, &cmdsize);
1867 win_election(elector.get_epoch(), q,
1868 CEPH_FEATURES_ALL,
1869 ceph::features::mon::get_supported(),
1870 metadata,
1871 my_cmds, cmdsize);
1872 }
1873
1874 const utime_t& Monitor::get_leader_since() const
1875 {
1876 assert(state == STATE_LEADER);
1877 return leader_since;
1878 }
1879
1880 epoch_t Monitor::get_epoch()
1881 {
1882 return elector.get_epoch();
1883 }
1884
1885 void Monitor::_finish_svc_election()
1886 {
1887 assert(state == STATE_LEADER || state == STATE_PEON);
1888
1889 for (auto p : paxos_service) {
1890 // we already called election_finished() on monmon(); avoid callig twice
1891 if (state == STATE_LEADER && p == monmon())
1892 continue;
1893 p->election_finished();
1894 }
1895 }
1896
1897 void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1898 const mon_feature_t& mon_features,
1899 const map<int,Metadata>& metadata,
1900 const MonCommand *cmdset, int cmdsize)
1901 {
1902 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1903 << " features " << features
1904 << " mon_features " << mon_features
1905 << dendl;
1906 assert(is_electing());
1907 state = STATE_LEADER;
1908 leader_since = ceph_clock_now();
1909 leader = rank;
1910 quorum = active;
1911 quorum_con_features = features;
1912 quorum_mon_features = mon_features;
1913 pending_metadata = metadata;
1914 outside_quorum.clear();
1915
1916 clog->info() << "mon." << name << "@" << rank
1917 << " won leader election with quorum " << quorum;
1918
1919 set_leader_supported_commands(cmdset, cmdsize);
1920
1921 paxos->leader_init();
1922 // NOTE: tell monmap monitor first. This is important for the
1923 // bootstrap case to ensure that the very first paxos proposal
1924 // codifies the monmap. Otherwise any manner of chaos can ensue
1925 // when monitors are call elections or participating in a paxos
1926 // round without agreeing on who the participants are.
1927 monmon()->election_finished();
1928 _finish_svc_election();
1929 health_monitor->start(epoch);
1930
1931 logger->inc(l_mon_election_win);
1932
1933 // inject new metadata in first transaction.
1934 {
1935 // include previous metadata for missing mons (that aren't part of
1936 // the current quorum).
1937 map<int,Metadata> m = metadata;
1938 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1939 if (m.count(rank) == 0 &&
1940 mon_metadata.count(rank)) {
1941 m[rank] = mon_metadata[rank];
1942 }
1943 }
1944
1945 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1946 // a new transaction immediately after the election finishes. We should
1947 // do that anyway for other reasons, though.
1948 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1949 bufferlist bl;
1950 ::encode(m, bl);
1951 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1952 }
1953
1954 finish_election();
1955 if (monmap->size() > 1 &&
1956 monmap->get_epoch() > 0) {
1957 timecheck_start();
1958 health_tick_start();
1959 do_health_to_clog_interval();
1960 scrub_event_start();
1961 }
1962 }
1963
1964 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1965 uint64_t features,
1966 const mon_feature_t& mon_features)
1967 {
1968 state = STATE_PEON;
1969 leader_since = utime_t();
1970 leader = l;
1971 quorum = q;
1972 outside_quorum.clear();
1973 quorum_con_features = features;
1974 quorum_mon_features = mon_features;
1975 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
1976 << " quorum is " << quorum << " features are " << quorum_con_features
1977 << " mon_features are " << quorum_mon_features
1978 << dendl;
1979
1980 paxos->peon_init();
1981 _finish_svc_election();
1982 health_monitor->start(epoch);
1983
1984 logger->inc(l_mon_election_lose);
1985
1986 finish_election();
1987
1988 if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
1989 !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
1990 // for pre-luminous mons only
1991 Metadata sys_info;
1992 collect_metadata(&sys_info);
1993 messenger->send_message(new MMonMetadata(sys_info),
1994 monmap->get_inst(get_leader()));
1995 }
1996 }
1997
1998 void Monitor::collect_metadata(Metadata *m)
1999 {
2000 collect_sys_info(m, g_ceph_context);
2001 (*m)["addr"] = stringify(messenger->get_myaddr());
2002 }
2003
2004 void Monitor::finish_election()
2005 {
2006 apply_quorum_to_compatset_features();
2007 apply_monmap_to_compatset_features();
2008 timecheck_finish();
2009 exited_quorum = utime_t();
2010 finish_contexts(g_ceph_context, waitfor_quorum);
2011 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2012 resend_routed_requests();
2013 update_logger();
2014 register_cluster_logger();
2015
2016 // am i named properly?
2017 string cur_name = monmap->get_name(messenger->get_myaddr());
2018 if (cur_name != name) {
2019 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2020 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2021 monmap->get_inst(*quorum.begin()));
2022 }
2023 }
2024
2025 void Monitor::_apply_compatset_features(CompatSet &new_features)
2026 {
2027 if (new_features.compare(features) != 0) {
2028 CompatSet diff = features.unsupported(new_features);
2029 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2030 features = new_features;
2031
2032 auto t = std::make_shared<MonitorDBStore::Transaction>();
2033 write_features(t);
2034 store->apply_transaction(t);
2035
2036 calc_quorum_requirements();
2037 }
2038 }
2039
2040 void Monitor::apply_quorum_to_compatset_features()
2041 {
2042 CompatSet new_features(features);
2043 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2044 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2045 }
2046 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2047 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2048 }
2049 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2050 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2051 }
2052 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2053 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2054 }
2055 dout(5) << __func__ << dendl;
2056 _apply_compatset_features(new_features);
2057 }
2058
2059 void Monitor::apply_monmap_to_compatset_features()
2060 {
2061 CompatSet new_features(features);
2062 mon_feature_t monmap_features = monmap->get_required_features();
2063
2064 /* persistent monmap features may go into the compatset.
2065 * optional monmap features may not - why?
2066 * because optional monmap features may be set/unset by the admin,
2067 * and possibly by other means that haven't yet been thought out,
2068 * so we can't make the monitor enforce them on start - because they
2069 * may go away.
2070 * this, of course, does not invalidate setting a compatset feature
2071 * for an optional feature - as long as you make sure to clean it up
2072 * once you unset it.
2073 */
2074 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2075 assert(ceph::features::mon::get_persistent().contains_all(
2076 ceph::features::mon::FEATURE_KRAKEN));
2077 // this feature should only ever be set if the quorum supports it.
2078 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2079 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2080 }
2081
2082 dout(5) << __func__ << dendl;
2083 _apply_compatset_features(new_features);
2084 }
2085
2086 void Monitor::calc_quorum_requirements()
2087 {
2088 required_features = 0;
2089
2090 // compatset
2091 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2092 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2093 }
2094 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2095 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2096 }
2097 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2098 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2099 }
2100 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2101 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2102 }
2103 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2104 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2105 }
2106
2107 // monmap
2108 if (monmap->get_required_features().contains_all(
2109 ceph::features::mon::FEATURE_KRAKEN)) {
2110 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2111 }
2112 if (monmap->get_required_features().contains_all(
2113 ceph::features::mon::FEATURE_LUMINOUS)) {
2114 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2115 }
2116 dout(10) << __func__ << " required_features " << required_features << dendl;
2117 }
2118
2119 void Monitor::get_combined_feature_map(FeatureMap *fm)
2120 {
2121 *fm += session_map.feature_map;
2122 for (auto id : quorum) {
2123 if (id != rank) {
2124 *fm += quorum_feature_map[id];
2125 }
2126 }
2127 }
2128
2129 void Monitor::sync_force(Formatter *f, ostream& ss)
2130 {
2131 bool free_formatter = false;
2132
2133 if (!f) {
2134 // louzy/lazy hack: default to json if no formatter has been defined
2135 f = new JSONFormatter();
2136 free_formatter = true;
2137 }
2138
2139 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2140 sync_stash_critical_state(tx);
2141 tx->put("mon_sync", "force_sync", 1);
2142 store->apply_transaction(tx);
2143
2144 f->open_object_section("sync_force");
2145 f->dump_int("ret", 0);
2146 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2147 f->close_section(); // sync_force
2148 f->flush(ss);
2149 if (free_formatter)
2150 delete f;
2151 }
2152
2153 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2154 {
2155 bool free_formatter = false;
2156
2157 if (!f) {
2158 // louzy/lazy hack: default to json if no formatter has been defined
2159 f = new JSONFormatter();
2160 free_formatter = true;
2161 }
2162 f->open_object_section("quorum_status");
2163 f->dump_int("election_epoch", get_epoch());
2164
2165 f->open_array_section("quorum");
2166 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2167 f->dump_int("mon", *p);
2168 f->close_section(); // quorum
2169
2170 list<string> quorum_names = get_quorum_names();
2171 f->open_array_section("quorum_names");
2172 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2173 f->dump_string("mon", *p);
2174 f->close_section(); // quorum_names
2175
2176 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2177
2178 f->open_object_section("monmap");
2179 monmap->dump(f);
2180 f->close_section(); // monmap
2181
2182 f->close_section(); // quorum_status
2183 f->flush(ss);
2184 if (free_formatter)
2185 delete f;
2186 }
2187
2188 void Monitor::get_mon_status(Formatter *f, ostream& ss)
2189 {
2190 bool free_formatter = false;
2191
2192 if (!f) {
2193 // louzy/lazy hack: default to json if no formatter has been defined
2194 f = new JSONFormatter();
2195 free_formatter = true;
2196 }
2197
2198 f->open_object_section("mon_status");
2199 f->dump_string("name", name);
2200 f->dump_int("rank", rank);
2201 f->dump_string("state", get_state_name());
2202 f->dump_int("election_epoch", get_epoch());
2203
2204 f->open_array_section("quorum");
2205 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2206 f->dump_int("mon", *p);
2207 }
2208
2209 f->close_section(); // quorum
2210
2211 f->open_object_section("features");
2212 f->dump_stream("required_con") << required_features;
2213 mon_feature_t req_mon_features = get_required_mon_features();
2214 req_mon_features.dump(f, "required_mon");
2215 f->dump_stream("quorum_con") << quorum_con_features;
2216 quorum_mon_features.dump(f, "quorum_mon");
2217 f->close_section(); // features
2218
2219 f->open_array_section("outside_quorum");
2220 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2221 f->dump_string("mon", *p);
2222 f->close_section(); // outside_quorum
2223
2224 f->open_array_section("extra_probe_peers");
2225 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2226 p != extra_probe_peers.end();
2227 ++p)
2228 f->dump_stream("peer") << *p;
2229 f->close_section(); // extra_probe_peers
2230
2231 f->open_array_section("sync_provider");
2232 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2233 p != sync_providers.end();
2234 ++p) {
2235 f->dump_unsigned("cookie", p->second.cookie);
2236 f->dump_stream("entity") << p->second.entity;
2237 f->dump_stream("timeout") << p->second.timeout;
2238 f->dump_unsigned("last_committed", p->second.last_committed);
2239 f->dump_stream("last_key") << p->second.last_key;
2240 }
2241 f->close_section();
2242
2243 if (is_synchronizing()) {
2244 f->open_object_section("sync");
2245 f->dump_stream("sync_provider") << sync_provider;
2246 f->dump_unsigned("sync_cookie", sync_cookie);
2247 f->dump_unsigned("sync_start_version", sync_start_version);
2248 f->close_section();
2249 }
2250
2251 if (g_conf->mon_sync_provider_kill_at > 0)
2252 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2253 if (g_conf->mon_sync_requester_kill_at > 0)
2254 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2255
2256 f->open_object_section("monmap");
2257 monmap->dump(f);
2258 f->close_section();
2259
2260 f->dump_object("feature_map", session_map.feature_map);
2261 f->close_section(); // mon_status
2262
2263 if (free_formatter) {
2264 // flush formatter to ss and delete it iff we created the formatter
2265 f->flush(ss);
2266 delete f;
2267 }
2268 }
2269
2270
2271 // health status to clog
2272
2273 void Monitor::health_tick_start()
2274 {
2275 if (!cct->_conf->mon_health_to_clog ||
2276 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2277 return;
2278
2279 dout(15) << __func__ << dendl;
2280
2281 health_tick_stop();
2282 health_tick_event = new C_MonContext(this, [this](int r) {
2283 if (r < 0)
2284 return;
2285 do_health_to_clog();
2286 health_tick_start();
2287 });
2288 timer.add_event_after(cct->_conf->mon_health_to_clog_tick_interval,
2289 health_tick_event);
2290 }
2291
2292 void Monitor::health_tick_stop()
2293 {
2294 dout(15) << __func__ << dendl;
2295
2296 if (health_tick_event) {
2297 timer.cancel_event(health_tick_event);
2298 health_tick_event = NULL;
2299 }
2300 }
2301
2302 utime_t Monitor::health_interval_calc_next_update()
2303 {
2304 utime_t now = ceph_clock_now();
2305
2306 time_t secs = now.sec();
2307 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2308 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2309 utime_t next = utime_t(secs + adjustment, 0);
2310
2311 dout(20) << __func__
2312 << " now: " << now << ","
2313 << " next: " << next << ","
2314 << " interval: " << cct->_conf->mon_health_to_clog_interval
2315 << dendl;
2316
2317 return next;
2318 }
2319
2320 void Monitor::health_interval_start()
2321 {
2322 dout(15) << __func__ << dendl;
2323
2324 if (!cct->_conf->mon_health_to_clog ||
2325 cct->_conf->mon_health_to_clog_interval <= 0) {
2326 return;
2327 }
2328
2329 health_interval_stop();
2330 utime_t next = health_interval_calc_next_update();
2331 health_interval_event = new C_MonContext(this, [this](int r) {
2332 if (r < 0)
2333 return;
2334 do_health_to_clog_interval();
2335 });
2336 timer.add_event_at(next, health_interval_event);
2337 }
2338
2339 void Monitor::health_interval_stop()
2340 {
2341 dout(15) << __func__ << dendl;
2342 if (health_interval_event) {
2343 timer.cancel_event(health_interval_event);
2344 }
2345 health_interval_event = NULL;
2346 }
2347
2348 void Monitor::health_events_cleanup()
2349 {
2350 health_tick_stop();
2351 health_interval_stop();
2352 health_status_cache.reset();
2353 }
2354
2355 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2356 {
2357 dout(20) << __func__ << dendl;
2358
2359 if (changed.count("mon_health_to_clog")) {
2360 if (!cct->_conf->mon_health_to_clog) {
2361 health_events_cleanup();
2362 } else {
2363 if (!health_tick_event) {
2364 health_tick_start();
2365 }
2366 if (!health_interval_event) {
2367 health_interval_start();
2368 }
2369 }
2370 }
2371
2372 if (changed.count("mon_health_to_clog_interval")) {
2373 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2374 health_interval_stop();
2375 } else {
2376 health_interval_start();
2377 }
2378 }
2379
2380 if (changed.count("mon_health_to_clog_tick_interval")) {
2381 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2382 health_tick_stop();
2383 } else {
2384 health_tick_start();
2385 }
2386 }
2387 }
2388
2389 void Monitor::do_health_to_clog_interval()
2390 {
2391 // outputting to clog may have been disabled in the conf
2392 // since we were scheduled.
2393 if (!cct->_conf->mon_health_to_clog ||
2394 cct->_conf->mon_health_to_clog_interval <= 0)
2395 return;
2396
2397 dout(10) << __func__ << dendl;
2398
2399 // do we have a cached value for next_clog_update? if not,
2400 // do we know when the last update was?
2401
2402 do_health_to_clog(true);
2403 health_interval_start();
2404 }
2405
2406 void Monitor::do_health_to_clog(bool force)
2407 {
2408 // outputting to clog may have been disabled in the conf
2409 // since we were scheduled.
2410 if (!cct->_conf->mon_health_to_clog ||
2411 cct->_conf->mon_health_to_clog_interval <= 0)
2412 return;
2413
2414 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2415
2416 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2417 string summary;
2418 health_status_t level = get_health_status(false, nullptr, &summary);
2419 if (!force &&
2420 summary == health_status_cache.summary &&
2421 level == health_status_cache.overall)
2422 return;
2423 if (level == HEALTH_OK)
2424 clog->info() << "overall " << summary;
2425 else if (level == HEALTH_WARN)
2426 clog->warn() << "overall " << summary;
2427 else if (level == HEALTH_ERR)
2428 clog->error() << "overall " << summary;
2429 else
2430 ceph_abort();
2431 health_status_cache.summary = summary;
2432 health_status_cache.overall = level;
2433 } else {
2434 // for jewel only
2435 list<string> status;
2436 health_status_t overall = get_health(status, NULL, NULL);
2437 dout(25) << __func__
2438 << (force ? " (force)" : "")
2439 << dendl;
2440
2441 string summary = joinify(status.begin(), status.end(), string("; "));
2442
2443 if (!force &&
2444 overall == health_status_cache.overall &&
2445 !health_status_cache.summary.empty() &&
2446 health_status_cache.summary == summary) {
2447 // we got a dup!
2448 return;
2449 }
2450
2451 clog->info() << summary;
2452
2453 health_status_cache.overall = overall;
2454 health_status_cache.summary = summary;
2455 }
2456 }
2457
2458 health_status_t Monitor::get_health_status(
2459 bool want_detail,
2460 Formatter *f,
2461 std::string *plain,
2462 const char *sep1,
2463 const char *sep2)
2464 {
2465 health_status_t r = HEALTH_OK;
2466 bool compat = g_conf->mon_health_preluminous_compat;
2467 if (f) {
2468 f->open_object_section("health");
2469 f->open_object_section("checks");
2470 }
2471
2472 string summary;
2473 string *psummary = f ? nullptr : &summary;
2474 for (auto& svc : paxos_service) {
2475 r = std::min(r, svc->get_health_checks().dump_summary(
2476 f, psummary, sep2, want_detail));
2477 }
2478
2479 if (f) {
2480 f->close_section();
2481 f->dump_stream("status") << r;
2482 } else {
2483 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2484 *plain = stringify(r);
2485 if (summary.size()) {
2486 *plain += sep1;
2487 *plain += summary;
2488 }
2489 *plain += "\n";
2490 }
2491
2492 if (f && compat) {
2493 f->open_array_section("summary");
2494 for (auto& svc : paxos_service) {
2495 svc->get_health_checks().dump_summary_compat(f);
2496 }
2497 f->close_section();
2498 f->dump_stream("overall_status") << r;
2499 }
2500
2501 if (want_detail) {
2502 if (f && compat) {
2503 f->open_array_section("detail");
2504 }
2505
2506 for (auto& svc : paxos_service) {
2507 svc->get_health_checks().dump_detail(f, plain, compat);
2508 }
2509
2510 if (f && compat) {
2511 f->close_section();
2512 }
2513 }
2514 if (f) {
2515 f->close_section();
2516 }
2517 return r;
2518 }
2519
2520 void Monitor::log_health(
2521 const health_check_map_t& updated,
2522 const health_check_map_t& previous,
2523 MonitorDBStore::TransactionRef t)
2524 {
2525 if (!g_conf->mon_health_to_clog) {
2526 return;
2527 }
2528 // FIXME: log atomically as part of @t instead of using clog.
2529 dout(10) << __func__ << " updated " << updated.checks.size()
2530 << " previous " << previous.checks.size()
2531 << dendl;
2532 for (auto& p : updated.checks) {
2533 auto q = previous.checks.find(p.first);
2534 if (q == previous.checks.end()) {
2535 // new
2536 ostringstream ss;
2537 ss << "Health check failed: " << p.second.summary << " ("
2538 << p.first << ")";
2539 if (p.second.severity == HEALTH_WARN)
2540 clog->warn() << ss.str();
2541 else
2542 clog->error() << ss.str();
2543 } else {
2544 if (p.second.summary != q->second.summary ||
2545 p.second.severity != q->second.severity) {
2546 // summary or severity changed (ignore detail changes at this level)
2547 ostringstream ss;
2548 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2549 if (p.second.severity == HEALTH_WARN)
2550 clog->warn() << ss.str();
2551 else
2552 clog->error() << ss.str();
2553 }
2554 }
2555 }
2556 for (auto& p : previous.checks) {
2557 if (!updated.checks.count(p.first)) {
2558 // cleared
2559 ostringstream ss;
2560 if (p.first == "DEGRADED_OBJECTS") {
2561 clog->info() << "All degraded objects recovered";
2562 } else if (p.first == "OSD_FLAGS") {
2563 clog->info() << "OSD flags cleared";
2564 } else {
2565 clog->info() << "Health check cleared: " << p.first << " (was: "
2566 << p.second.summary << ")";
2567 }
2568 }
2569 }
2570
2571 if (previous.checks.size() && updated.checks.size() == 0) {
2572 // We might be going into a fully healthy state, check
2573 // other subsystems
2574 bool any_checks = false;
2575 for (auto& svc : paxos_service) {
2576 if (&(svc->get_health_checks()) == &(previous)) {
2577 // Ignore the ones we're clearing right now
2578 continue;
2579 }
2580
2581 if (svc->get_health_checks().checks.size() > 0) {
2582 any_checks = true;
2583 break;
2584 }
2585 }
2586 if (!any_checks) {
2587 clog->info() << "Cluster is now healthy";
2588 }
2589 }
2590 }
2591
2592 health_status_t Monitor::get_health(list<string>& status,
2593 bufferlist *detailbl,
2594 Formatter *f)
2595 {
2596 list<pair<health_status_t,string> > summary;
2597 list<pair<health_status_t,string> > detail;
2598
2599 if (f)
2600 f->open_object_section("health");
2601
2602 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2603 p != paxos_service.end();
2604 ++p) {
2605 PaxosService *s = *p;
2606 s->get_health(summary, detailbl ? &detail : NULL, cct);
2607 }
2608
2609 health_monitor->get_health(summary, (detailbl ? &detail : NULL));
2610
2611 health_status_t overall = HEALTH_OK;
2612 if (!timecheck_skews.empty()) {
2613 list<string> warns;
2614 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2615 i != timecheck_skews.end(); ++i) {
2616 entity_inst_t inst = i->first;
2617 double skew = i->second;
2618 double latency = timecheck_latencies[inst];
2619 string name = monmap->get_name(inst.addr);
2620 ostringstream tcss;
2621 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2622 if (tcstatus != HEALTH_OK) {
2623 if (overall > tcstatus)
2624 overall = tcstatus;
2625 warns.push_back(name);
2626 ostringstream tmp_ss;
2627 tmp_ss << "mon." << name
2628 << " addr " << inst.addr << " " << tcss.str()
2629 << " (latency " << latency << "s)";
2630 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2631 }
2632 }
2633 if (!warns.empty()) {
2634 ostringstream ss;
2635 ss << "clock skew detected on";
2636 while (!warns.empty()) {
2637 ss << " mon." << warns.front();
2638 warns.pop_front();
2639 if (!warns.empty())
2640 ss << ",";
2641 }
2642 status.push_back(ss.str());
2643 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2644 }
2645 }
2646
2647 if (f)
2648 f->open_array_section("summary");
2649 if (!summary.empty()) {
2650 while (!summary.empty()) {
2651 if (overall > summary.front().first)
2652 overall = summary.front().first;
2653 status.push_back(summary.front().second);
2654 if (f) {
2655 f->open_object_section("item");
2656 f->dump_stream("severity") << summary.front().first;
2657 f->dump_string("summary", summary.front().second);
2658 f->close_section();
2659 }
2660 summary.pop_front();
2661 }
2662 }
2663 if (f)
2664 f->close_section();
2665
2666 stringstream fss;
2667 fss << overall;
2668 status.push_front(fss.str());
2669 if (f)
2670 f->dump_stream("overall_status") << overall;
2671
2672 if (f)
2673 f->open_array_section("detail");
2674 while (!detail.empty()) {
2675 if (f)
2676 f->dump_string("item", detail.front().second);
2677 else if (detailbl != NULL) {
2678 detailbl->append(detail.front().second);
2679 detailbl->append('\n');
2680 }
2681 detail.pop_front();
2682 }
2683 if (f)
2684 f->close_section();
2685
2686 if (f)
2687 f->close_section();
2688
2689 return overall;
2690 }
2691
2692 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2693 {
2694 if (f)
2695 f->open_object_section("status");
2696
2697 if (f) {
2698 f->dump_stream("fsid") << monmap->get_fsid();
2699 get_health_status(false, f, nullptr);
2700 f->dump_unsigned("election_epoch", get_epoch());
2701 {
2702 f->open_array_section("quorum");
2703 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2704 f->dump_int("rank", *p);
2705 f->close_section();
2706 f->open_array_section("quorum_names");
2707 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2708 f->dump_string("id", monmap->get_name(*p));
2709 f->close_section();
2710 }
2711 f->open_object_section("monmap");
2712 monmap->dump(f);
2713 f->close_section();
2714 f->open_object_section("osdmap");
2715 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
2716 f->close_section();
2717 f->open_object_section("pgmap");
2718 pgservice->print_summary(f, NULL);
2719 f->close_section();
2720 f->open_object_section("fsmap");
2721 mdsmon()->get_fsmap().print_summary(f, NULL);
2722 f->close_section();
2723 f->open_object_section("mgrmap");
2724 mgrmon()->get_map().print_summary(f, nullptr);
2725 f->close_section();
2726
2727 f->dump_object("servicemap", mgrstatmon()->get_service_map());
2728 f->close_section();
2729 } else {
2730 ss << " cluster:\n";
2731 ss << " id: " << monmap->get_fsid() << "\n";
2732
2733 string health;
2734 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2735 get_health_status(false, nullptr, &health,
2736 "\n ", "\n ");
2737 } else {
2738 list<string> ls;
2739 get_health(ls, NULL, f);
2740 health = joinify(ls.begin(), ls.end(),
2741 string("\n "));
2742 }
2743 ss << " health: " << health << "\n";
2744
2745 ss << "\n \n services:\n";
2746 {
2747 size_t maxlen = 3;
2748 auto& service_map = mgrstatmon()->get_service_map();
2749 for (auto& p : service_map.services) {
2750 maxlen = std::max(maxlen, p.first.size());
2751 }
2752 string spacing(maxlen - 3, ' ');
2753 const auto quorum_names = get_quorum_names();
2754 const auto mon_count = monmap->mon_info.size();
2755 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2756 << quorum_names;
2757 if (quorum_names.size() != mon_count) {
2758 std::list<std::string> out_of_q;
2759 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2760 if (quorum.count(i) == 0) {
2761 out_of_q.push_back(monmap->ranks[i]);
2762 }
2763 }
2764 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2765 out_of_q.end(), std::string(", "));
2766 }
2767 ss << "\n";
2768 if (mgrmon()->in_use()) {
2769 ss << " mgr: " << spacing;
2770 mgrmon()->get_map().print_summary(nullptr, &ss);
2771 ss << "\n";
2772 }
2773 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2774 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2775 }
2776 ss << " osd: " << spacing;
2777 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2778 ss << "\n";
2779 for (auto& p : service_map.services) {
2780 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2781 << p.second.get_summary() << "\n";
2782 }
2783 }
2784
2785 ss << "\n \n data:\n";
2786 pgservice->print_summary(NULL, &ss);
2787 ss << "\n ";
2788 }
2789 }
2790
2791 void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2792 map<string,string> &param_str_map)
2793 {
2794 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2795 p != cmdmap.end(); ++p) {
2796 if (p->first == "prefix")
2797 continue;
2798 if (p->first == "caps") {
2799 vector<string> cv;
2800 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2801 cv.size() % 2 == 0) {
2802 for (unsigned i = 0; i < cv.size(); i += 2) {
2803 string k = string("caps_") + cv[i];
2804 param_str_map[k] = cv[i + 1];
2805 }
2806 continue;
2807 }
2808 }
2809 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2810 }
2811 }
2812
2813 const MonCommand *Monitor::_get_moncommand(
2814 const string &cmd_prefix,
2815 const MonCommand *cmds,
2816 int cmds_size)
2817 {
2818 const MonCommand *this_cmd = NULL;
2819 for (const MonCommand *cp = cmds;
2820 cp < &cmds[cmds_size]; cp++) {
2821 if (cp->cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2822 this_cmd = cp;
2823 break;
2824 }
2825 }
2826 return this_cmd;
2827 }
2828
2829 bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2830 const map<string,cmd_vartype>& cmdmap,
2831 const map<string,string>& param_str_map,
2832 const MonCommand *this_cmd) {
2833
2834 bool cmd_r = this_cmd->requires_perm('r');
2835 bool cmd_w = this_cmd->requires_perm('w');
2836 bool cmd_x = this_cmd->requires_perm('x');
2837
2838 bool capable = s->caps.is_capable(
2839 g_ceph_context,
2840 CEPH_ENTITY_TYPE_MON,
2841 s->entity_name,
2842 module, prefix, param_str_map,
2843 cmd_r, cmd_w, cmd_x);
2844
2845 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2846 return capable;
2847 }
2848
2849 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
2850 Formatter *f,
2851 bufferlist *rdata,
2852 bool hide_mgr_flag)
2853 {
2854 int cmdnum = 0;
2855 f->open_object_section("command_descriptions");
2856 for (const auto &cmd : commands) {
2857 unsigned flags = cmd.flags;
2858 if (hide_mgr_flag) {
2859 flags &= ~MonCommand::FLAG_MGR;
2860 }
2861 ostringstream secname;
2862 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2863 dump_cmddesc_to_json(f, secname.str(),
2864 cmd.cmdstring, cmd.helpstring, cmd.module,
2865 cmd.req_perms, cmd.availability, flags);
2866 cmdnum++;
2867 }
2868 f->close_section(); // command_descriptions
2869
2870 f->flush(*rdata);
2871 }
2872
2873 void Monitor::get_locally_supported_monitor_commands(const MonCommand **cmds,
2874 int *count)
2875 {
2876 *cmds = mon_commands;
2877 *count = ARRAY_SIZE(mon_commands);
2878 }
2879 void Monitor::set_leader_supported_commands(const MonCommand *cmds, int size)
2880 {
2881 if (leader_supported_mon_commands != mon_commands)
2882 delete[] leader_supported_mon_commands;
2883 leader_supported_mon_commands = cmds;
2884 leader_supported_mon_commands_size = size;
2885 }
2886
2887 bool Monitor::is_keyring_required()
2888 {
2889 string auth_cluster_required = g_conf->auth_supported.empty() ?
2890 g_conf->auth_cluster_required : g_conf->auth_supported;
2891 string auth_service_required = g_conf->auth_supported.empty() ?
2892 g_conf->auth_service_required : g_conf->auth_supported;
2893
2894 return auth_service_required == "cephx" ||
2895 auth_cluster_required == "cephx";
2896 }
2897
2898 struct C_MgrProxyCommand : public Context {
2899 Monitor *mon;
2900 MonOpRequestRef op;
2901 uint64_t size;
2902 bufferlist outbl;
2903 string outs;
2904 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2905 : mon(mon), op(op), size(s) { }
2906 void finish(int r) {
2907 Mutex::Locker l(mon->lock);
2908 mon->mgr_proxy_bytes -= size;
2909 mon->reply_command(op, r, outs, outbl, 0);
2910 }
2911 };
2912
2913 void Monitor::handle_command(MonOpRequestRef op)
2914 {
2915 assert(op->is_type_command());
2916 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2917 if (m->fsid != monmap->fsid) {
2918 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2919 reply_command(op, -EPERM, "wrong fsid", 0);
2920 return;
2921 }
2922
2923 MonSession *session = static_cast<MonSession *>(
2924 m->get_connection()->get_priv());
2925 if (!session) {
2926 dout(5) << __func__ << " dropping stray message " << *m << dendl;
2927 return;
2928 }
2929 BOOST_SCOPE_EXIT_ALL(=) {
2930 session->put();
2931 };
2932
2933 if (m->cmd.empty()) {
2934 string rs = "No command supplied";
2935 reply_command(op, -EINVAL, rs, 0);
2936 return;
2937 }
2938
2939 string prefix;
2940 vector<string> fullcmd;
2941 map<string, cmd_vartype> cmdmap;
2942 stringstream ss, ds;
2943 bufferlist rdata;
2944 string rs;
2945 int r = -EINVAL;
2946 rs = "unrecognized command";
2947
2948 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
2949 // ss has reason for failure
2950 r = -EINVAL;
2951 rs = ss.str();
2952 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
2953 reply_command(op, r, rs, 0);
2954 return;
2955 }
2956
2957 // check return value. If no prefix parameter provided,
2958 // return value will be false, then return error info.
2959 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
2960 reply_command(op, -EINVAL, "command prefix not found", 0);
2961 return;
2962 }
2963
2964 // check prefix is empty
2965 if (prefix.empty()) {
2966 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
2967 return;
2968 }
2969
2970 if (prefix == "get_command_descriptions") {
2971 bufferlist rdata;
2972 Formatter *f = Formatter::create("json");
2973 // hide mgr commands until luminous upgrade is complete
2974 bool hide_mgr_flag =
2975 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
2976
2977 std::vector<MonCommand> commands;
2978 commands = static_cast<MgrMonitor*>(
2979 paxos_service[PAXOS_MGR])->get_command_descs();
2980
2981 for (int i = 0 ; i < leader_supported_mon_commands_size; ++i) {
2982 commands.push_back(leader_supported_mon_commands[i]);
2983 }
2984
2985 format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
2986 delete f;
2987 reply_command(op, 0, "", rdata, 0);
2988 return;
2989 }
2990
2991 string module;
2992 string err;
2993
2994 dout(0) << "handle_command " << *m << dendl;
2995
2996 string format;
2997 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
2998 boost::scoped_ptr<Formatter> f(Formatter::create(format));
2999
3000 get_str_vec(prefix, fullcmd);
3001
3002 // make sure fullcmd is not empty.
3003 // invalid prefix will cause empty vector fullcmd.
3004 // such as, prefix=";,,;"
3005 if (fullcmd.empty()) {
3006 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3007 return;
3008 }
3009
3010 module = fullcmd[0];
3011
3012 // validate command is in leader map
3013
3014 const MonCommand *leader_cmd;
3015 const auto& mgr_cmds = mgrmon()->get_command_descs();
3016 const MonCommand *mgr_cmd = nullptr;
3017 if (!mgr_cmds.empty()) {
3018 mgr_cmd = _get_moncommand(prefix, &mgr_cmds.at(0), mgr_cmds.size());
3019 }
3020 leader_cmd = _get_moncommand(prefix,
3021 // the boost underlying this isn't const for some reason
3022 const_cast<MonCommand*>(leader_supported_mon_commands),
3023 leader_supported_mon_commands_size);
3024 if (!leader_cmd) {
3025 leader_cmd = mgr_cmd;
3026 if (!leader_cmd) {
3027 reply_command(op, -EINVAL, "command not known", 0);
3028 return;
3029 }
3030 }
3031 // validate command is in our map & matches, or forward if it is allowed
3032 const MonCommand *mon_cmd = _get_moncommand(prefix, mon_commands,
3033 ARRAY_SIZE(mon_commands));
3034 if (!mon_cmd) {
3035 mon_cmd = mgr_cmd;
3036 }
3037 if (!is_leader()) {
3038 if (!mon_cmd) {
3039 if (leader_cmd->is_noforward()) {
3040 reply_command(op, -EINVAL,
3041 "command not locally supported and not allowed to forward",
3042 0);
3043 return;
3044 }
3045 dout(10) << "Command not locally supported, forwarding request "
3046 << m << dendl;
3047 forward_request_leader(op);
3048 return;
3049 } else if (!mon_cmd->is_compat(leader_cmd)) {
3050 if (mon_cmd->is_noforward()) {
3051 reply_command(op, -EINVAL,
3052 "command not compatible with leader and not allowed to forward",
3053 0);
3054 return;
3055 }
3056 dout(10) << "Command not compatible with leader, forwarding request "
3057 << m << dendl;
3058 forward_request_leader(op);
3059 return;
3060 }
3061 }
3062
3063 if (mon_cmd->is_obsolete() ||
3064 (cct->_conf->mon_debug_deprecated_as_obsolete
3065 && mon_cmd->is_deprecated())) {
3066 reply_command(op, -ENOTSUP,
3067 "command is obsolete; please check usage and/or man page",
3068 0);
3069 return;
3070 }
3071
3072 if (session->proxy_con && mon_cmd->is_noforward()) {
3073 dout(10) << "Got forward for noforward command " << m << dendl;
3074 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3075 return;
3076 }
3077
3078 /* what we perceive as being the service the command falls under */
3079 string service(mon_cmd->module);
3080
3081 dout(25) << __func__ << " prefix='" << prefix
3082 << "' module='" << module
3083 << "' service='" << service << "'" << dendl;
3084
3085 bool cmd_is_rw =
3086 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3087
3088 // validate user's permissions for requested command
3089 map<string,string> param_str_map;
3090 _generate_command_map(cmdmap, param_str_map);
3091 if (!_allowed_command(session, service, prefix, cmdmap,
3092 param_str_map, mon_cmd)) {
3093 dout(1) << __func__ << " access denied" << dendl;
3094 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3095 << "from='" << session->inst << "' "
3096 << "entity='" << session->entity_name << "' "
3097 << "cmd=" << m->cmd << ": access denied";
3098 reply_command(op, -EACCES, "access denied", 0);
3099 return;
3100 }
3101
3102 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3103 << "from='" << session->inst << "' "
3104 << "entity='" << session->entity_name << "' "
3105 << "cmd=" << m->cmd << ": dispatch";
3106
3107 if (mon_cmd->is_mgr() &&
3108 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3109 const auto& hdr = m->get_header();
3110 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3111 uint64_t max =
3112 g_conf->mon_client_bytes * g_conf->mon_mgr_proxy_client_bytes_ratio;
3113 if (mgr_proxy_bytes + size > max) {
3114 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3115 << " + " << size << " > max " << max << dendl;
3116 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3117 return;
3118 }
3119 mgr_proxy_bytes += size;
3120 dout(10) << __func__ << " proxying mgr command (+" << size
3121 << " -> " << mgr_proxy_bytes << ")" << dendl;
3122 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3123 mgr_client.start_command(m->cmd,
3124 m->get_data(),
3125 &fin->outbl,
3126 &fin->outs,
3127 new C_OnFinisher(fin, &finisher));
3128 return;
3129 }
3130
3131 if (module == "mds" || module == "fs") {
3132 mdsmon()->dispatch(op);
3133 return;
3134 }
3135 if ((module == "osd" || prefix == "pg map") &&
3136 prefix != "osd last-stat-seq") {
3137 osdmon()->dispatch(op);
3138 return;
3139 }
3140
3141 if (module == "pg") {
3142 pgmon()->dispatch(op);
3143 return;
3144 }
3145 if (module == "mon" &&
3146 /* Let the Monitor class handle the following commands:
3147 * 'mon compact'
3148 * 'mon scrub'
3149 * 'mon sync force'
3150 */
3151 prefix != "mon compact" &&
3152 prefix != "mon scrub" &&
3153 prefix != "mon sync force" &&
3154 prefix != "mon metadata" &&
3155 prefix != "mon versions" &&
3156 prefix != "mon count-metadata") {
3157 monmon()->dispatch(op);
3158 return;
3159 }
3160 if (module == "auth") {
3161 authmon()->dispatch(op);
3162 return;
3163 }
3164 if (module == "log") {
3165 logmon()->dispatch(op);
3166 return;
3167 }
3168
3169 if (module == "config-key") {
3170 config_key_service->dispatch(op);
3171 return;
3172 }
3173
3174 if (module == "mgr") {
3175 mgrmon()->dispatch(op);
3176 return;
3177 }
3178
3179 if (prefix == "fsid") {
3180 if (f) {
3181 f->open_object_section("fsid");
3182 f->dump_stream("fsid") << monmap->fsid;
3183 f->close_section();
3184 f->flush(rdata);
3185 } else {
3186 ds << monmap->fsid;
3187 rdata.append(ds);
3188 }
3189 reply_command(op, 0, "", rdata, 0);
3190 return;
3191 }
3192
3193 if (prefix == "scrub" || prefix == "mon scrub") {
3194 wait_for_paxos_write();
3195 if (is_leader()) {
3196 int r = scrub_start();
3197 reply_command(op, r, "", rdata, 0);
3198 } else if (is_peon()) {
3199 forward_request_leader(op);
3200 } else {
3201 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3202 }
3203 return;
3204 }
3205
3206 if (prefix == "compact" || prefix == "mon compact") {
3207 dout(1) << "triggering manual compaction" << dendl;
3208 utime_t start = ceph_clock_now();
3209 store->compact();
3210 utime_t end = ceph_clock_now();
3211 end -= start;
3212 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3213 ostringstream oss;
3214 oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
3215 rs = oss.str();
3216 r = 0;
3217 }
3218 else if (prefix == "injectargs") {
3219 vector<string> injected_args;
3220 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3221 if (!injected_args.empty()) {
3222 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3223 ostringstream oss;
3224 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3225 ss << "injectargs:" << oss.str();
3226 rs = ss.str();
3227 goto out;
3228 } else {
3229 rs = "must supply options to be parsed in a single string";
3230 r = -EINVAL;
3231 }
3232 } else if (prefix == "time-sync-status") {
3233 if (!f)
3234 f.reset(Formatter::create("json-pretty"));
3235 f->open_object_section("time_sync");
3236 if (!timecheck_skews.empty()) {
3237 f->open_object_section("time_skew_status");
3238 for (auto& i : timecheck_skews) {
3239 entity_inst_t inst = i.first;
3240 double skew = i.second;
3241 double latency = timecheck_latencies[inst];
3242 string name = monmap->get_name(inst.addr);
3243 ostringstream tcss;
3244 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3245 f->open_object_section(name.c_str());
3246 f->dump_float("skew", skew);
3247 f->dump_float("latency", latency);
3248 f->dump_stream("health") << tcstatus;
3249 if (tcstatus != HEALTH_OK) {
3250 f->dump_stream("details") << tcss.str();
3251 }
3252 f->close_section();
3253 }
3254 f->close_section();
3255 }
3256 f->open_object_section("timechecks");
3257 f->dump_unsigned("epoch", get_epoch());
3258 f->dump_int("round", timecheck_round);
3259 f->dump_stream("round_status") << ((timecheck_round%2) ?
3260 "on-going" : "finished");
3261 f->close_section();
3262 f->close_section();
3263 f->flush(rdata);
3264 r = 0;
3265 rs = "";
3266 } else if (prefix == "config set") {
3267 std::string key;
3268 cmd_getval(cct, cmdmap, "key", key);
3269 std::string val;
3270 cmd_getval(cct, cmdmap, "value", val);
3271 r = g_conf->set_val(key, val, true, &ss);
3272 rs = ss.str();
3273 goto out;
3274 } else if (prefix == "status" ||
3275 prefix == "health" ||
3276 prefix == "df") {
3277 string detail;
3278 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3279
3280 if (prefix == "status") {
3281 // get_cluster_status handles f == NULL
3282 get_cluster_status(ds, f.get());
3283
3284 if (f) {
3285 f->flush(ds);
3286 ds << '\n';
3287 }
3288 rdata.append(ds);
3289 } else if (prefix == "health") {
3290 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3291 string plain;
3292 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3293 if (f) {
3294 f->flush(rdata);
3295 } else {
3296 rdata.append(plain);
3297 }
3298 } else {
3299 list<string> health_str;
3300 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3301 if (f) {
3302 f->flush(ds);
3303 ds << '\n';
3304 } else {
3305 assert(!health_str.empty());
3306 ds << health_str.front();
3307 health_str.pop_front();
3308 if (!health_str.empty()) {
3309 ds << ' ';
3310 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3311 }
3312 }
3313 bufferlist comb;
3314 comb.append(ds);
3315 if (detail == "detail")
3316 comb.append(rdata);
3317 rdata = comb;
3318 }
3319 } else if (prefix == "df") {
3320 bool verbose = (detail == "detail");
3321 if (f)
3322 f->open_object_section("stats");
3323
3324 pgservice->dump_fs_stats(&ds, f.get(), verbose);
3325 if (!f)
3326 ds << '\n';
3327 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3328
3329 if (f) {
3330 f->close_section();
3331 f->flush(ds);
3332 ds << '\n';
3333 }
3334 } else {
3335 assert(0 == "We should never get here!");
3336 return;
3337 }
3338 rdata.append(ds);
3339 rs = "";
3340 r = 0;
3341 } else if (prefix == "report") {
3342
3343 // this must be formatted, in its current form
3344 if (!f)
3345 f.reset(Formatter::create("json-pretty"));
3346 f->open_object_section("report");
3347 f->dump_stream("cluster_fingerprint") << fingerprint;
3348 f->dump_string("version", ceph_version_to_str());
3349 f->dump_string("commit", git_version_to_str());
3350 f->dump_stream("timestamp") << ceph_clock_now();
3351
3352 vector<string> tagsvec;
3353 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3354 string tagstr = str_join(tagsvec, " ");
3355 if (!tagstr.empty())
3356 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3357 f->dump_string("tag", tagstr);
3358
3359 list<string> hs;
3360 get_health(hs, NULL, f.get());
3361
3362 monmon()->dump_info(f.get());
3363 osdmon()->dump_info(f.get());
3364 mdsmon()->dump_info(f.get());
3365 authmon()->dump_info(f.get());
3366 pgservice->dump_info(f.get());
3367
3368 paxos->dump_info(f.get());
3369
3370 f->close_section();
3371 f->flush(rdata);
3372
3373 ostringstream ss2;
3374 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3375 rs = ss2.str();
3376 r = 0;
3377 } else if (prefix == "osd last-stat-seq") {
3378 int64_t osd;
3379 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3380 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3381 if (f) {
3382 f->dump_unsigned("seq", seq);
3383 f->flush(ds);
3384 } else {
3385 ds << seq;
3386 rdata.append(ds);
3387 }
3388 rs = "";
3389 r = 0;
3390 } else if (prefix == "node ls") {
3391 string node_type("all");
3392 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3393 if (!f)
3394 f.reset(Formatter::create("json-pretty"));
3395 if (node_type == "all") {
3396 f->open_object_section("nodes");
3397 print_nodes(f.get(), ds);
3398 osdmon()->print_nodes(f.get());
3399 mdsmon()->print_nodes(f.get());
3400 f->close_section();
3401 } else if (node_type == "mon") {
3402 print_nodes(f.get(), ds);
3403 } else if (node_type == "osd") {
3404 osdmon()->print_nodes(f.get());
3405 } else if (node_type == "mds") {
3406 mdsmon()->print_nodes(f.get());
3407 }
3408 f->flush(ds);
3409 rdata.append(ds);
3410 rs = "";
3411 r = 0;
3412 } else if (prefix == "features") {
3413 if (!is_leader() && !is_peon()) {
3414 dout(10) << " waiting for quorum" << dendl;
3415 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3416 return;
3417 }
3418 if (!is_leader()) {
3419 forward_request_leader(op);
3420 return;
3421 }
3422 if (!f)
3423 f.reset(Formatter::create("json-pretty"));
3424 FeatureMap fm;
3425 get_combined_feature_map(&fm);
3426 f->dump_object("features", fm);
3427 f->flush(rdata);
3428 rs = "";
3429 r = 0;
3430 } else if (prefix == "mon metadata") {
3431 if (!f)
3432 f.reset(Formatter::create("json-pretty"));
3433
3434 string name;
3435 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3436 if (!all) {
3437 // Dump a single mon's metadata
3438 int mon = monmap->get_rank(name);
3439 if (mon < 0) {
3440 rs = "requested mon not found";
3441 r = -ENOENT;
3442 goto out;
3443 }
3444 f->open_object_section("mon_metadata");
3445 r = get_mon_metadata(mon, f.get(), ds);
3446 f->close_section();
3447 } else {
3448 // Dump all mons' metadata
3449 r = 0;
3450 f->open_array_section("mon_metadata");
3451 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3452 std::ostringstream get_err;
3453 f->open_object_section("mon");
3454 f->dump_string("name", monmap->get_name(rank));
3455 r = get_mon_metadata(rank, f.get(), get_err);
3456 f->close_section();
3457 if (r == -ENOENT || r == -EINVAL) {
3458 dout(1) << get_err.str() << dendl;
3459 // Drop error, list what metadata we do have
3460 r = 0;
3461 } else if (r != 0) {
3462 derr << "Unexpected error from get_mon_metadata: "
3463 << cpp_strerror(r) << dendl;
3464 ds << get_err.str();
3465 break;
3466 }
3467 }
3468 f->close_section();
3469 }
3470
3471 f->flush(ds);
3472 rdata.append(ds);
3473 rs = "";
3474 } else if (prefix == "mon versions") {
3475 if (!f)
3476 f.reset(Formatter::create("json-pretty"));
3477 count_metadata("ceph_version", f.get());
3478 f->flush(ds);
3479 rdata.append(ds);
3480 rs = "";
3481 r = 0;
3482 } else if (prefix == "mon count-metadata") {
3483 if (!f)
3484 f.reset(Formatter::create("json-pretty"));
3485 string field;
3486 cmd_getval(g_ceph_context, cmdmap, "property", field);
3487 count_metadata(field, f.get());
3488 f->flush(ds);
3489 rdata.append(ds);
3490 rs = "";
3491 r = 0;
3492 } else if (prefix == "quorum_status") {
3493 // make sure our map is readable and up to date
3494 if (!is_leader() && !is_peon()) {
3495 dout(10) << " waiting for quorum" << dendl;
3496 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3497 return;
3498 }
3499 _quorum_status(f.get(), ds);
3500 rdata.append(ds);
3501 rs = "";
3502 r = 0;
3503 } else if (prefix == "mon_status") {
3504 get_mon_status(f.get(), ds);
3505 if (f)
3506 f->flush(ds);
3507 rdata.append(ds);
3508 rs = "";
3509 r = 0;
3510 } else if (prefix == "sync force" ||
3511 prefix == "mon sync force") {
3512 string validate1, validate2;
3513 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3514 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3515 if (validate1 != "--yes-i-really-mean-it" ||
3516 validate2 != "--i-know-what-i-am-doing") {
3517 r = -EINVAL;
3518 rs = "are you SURE? this will mean the monitor store will be "
3519 "erased. pass '--yes-i-really-mean-it "
3520 "--i-know-what-i-am-doing' if you really do.";
3521 goto out;
3522 }
3523 sync_force(f.get(), ds);
3524 rs = ds.str();
3525 r = 0;
3526 } else if (prefix == "heap") {
3527 if (!ceph_using_tcmalloc())
3528 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3529 else {
3530 string heapcmd;
3531 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3532 // XXX 1-element vector, change at callee or make vector here?
3533 vector<string> heapcmd_vec;
3534 get_str_vec(heapcmd, heapcmd_vec);
3535 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3536 rdata.append(ds);
3537 rs = "";
3538 r = 0;
3539 }
3540 } else if (prefix == "quorum") {
3541 string quorumcmd;
3542 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3543 if (quorumcmd == "exit") {
3544 start_election();
3545 elector.stop_participating();
3546 rs = "stopped responding to quorum, initiated new election";
3547 r = 0;
3548 } else if (quorumcmd == "enter") {
3549 elector.start_participating();
3550 start_election();
3551 rs = "started responding to quorum, initiated new election";
3552 r = 0;
3553 } else {
3554 rs = "needs a valid 'quorum' command";
3555 r = -EINVAL;
3556 }
3557 } else if (prefix == "version") {
3558 if (f) {
3559 f->open_object_section("version");
3560 f->dump_string("version", pretty_version_to_str());
3561 f->close_section();
3562 f->flush(ds);
3563 } else {
3564 ds << pretty_version_to_str();
3565 }
3566 rdata.append(ds);
3567 rs = "";
3568 r = 0;
3569 } else if (prefix == "versions") {
3570 if (!f)
3571 f.reset(Formatter::create("json-pretty"));
3572 map<string,int> overall;
3573 f->open_object_section("version");
3574 map<string,int> mon, mgr, osd, mds;
3575
3576 count_metadata("ceph_version", &mon);
3577 f->open_object_section("mon");
3578 for (auto& p : mon) {
3579 f->dump_int(p.first.c_str(), p.second);
3580 overall[p.first] += p.second;
3581 }
3582 f->close_section();
3583
3584 mgrmon()->count_metadata("ceph_version", &mgr);
3585 f->open_object_section("mgr");
3586 for (auto& p : mgr) {
3587 f->dump_int(p.first.c_str(), p.second);
3588 overall[p.first] += p.second;
3589 }
3590 f->close_section();
3591
3592 osdmon()->count_metadata("ceph_version", &osd);
3593 f->open_object_section("osd");
3594 for (auto& p : osd) {
3595 f->dump_int(p.first.c_str(), p.second);
3596 overall[p.first] += p.second;
3597 }
3598 f->close_section();
3599
3600 mdsmon()->count_metadata("ceph_version", &mds);
3601 f->open_object_section("mds");
3602 for (auto& p : mon) {
3603 f->dump_int(p.first.c_str(), p.second);
3604 overall[p.first] += p.second;
3605 }
3606 f->close_section();
3607
3608 for (auto& p : mgrstatmon()->get_service_map().services) {
3609 f->open_object_section(p.first.c_str());
3610 map<string,int> m;
3611 p.second.count_metadata("ceph_version", &m);
3612 for (auto& q : m) {
3613 f->dump_int(q.first.c_str(), q.second);
3614 overall[q.first] += q.second;
3615 }
3616 f->close_section();
3617 }
3618
3619 f->open_object_section("overall");
3620 for (auto& p : overall) {
3621 f->dump_int(p.first.c_str(), p.second);
3622 }
3623 f->close_section();
3624 f->close_section();
3625 f->flush(rdata);
3626 rs = "";
3627 r = 0;
3628 }
3629
3630 out:
3631 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3632 reply_command(op, r, rs, rdata, 0);
3633 }
3634
3635 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3636 {
3637 bufferlist rdata;
3638 reply_command(op, rc, rs, rdata, version);
3639 }
3640
3641 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3642 bufferlist& rdata, version_t version)
3643 {
3644 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3645 assert(m->get_type() == MSG_MON_COMMAND);
3646 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3647 reply->set_tid(m->get_tid());
3648 reply->set_data(rdata);
3649 send_reply(op, reply);
3650 }
3651
3652
3653 // ------------------------
3654 // request/reply routing
3655 //
3656 // a client/mds/osd will connect to a random monitor. we need to forward any
3657 // messages requiring state updates to the leader, and then route any replies
3658 // back via the correct monitor and back to them. (the monitor will not
3659 // initiate any connections.)
3660
3661 void Monitor::forward_request_leader(MonOpRequestRef op)
3662 {
3663 op->mark_event(__func__);
3664
3665 int mon = get_leader();
3666 MonSession *session = op->get_session();
3667 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3668
3669 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3670 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3671 } else if (session->proxy_con) {
3672 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3673 } else if (!session->closed) {
3674 RoutedRequest *rr = new RoutedRequest;
3675 rr->tid = ++routed_request_tid;
3676 rr->client_inst = req->get_source_inst();
3677 rr->con = req->get_connection();
3678 rr->con_features = rr->con->get_features();
3679 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3680 rr->session = static_cast<MonSession *>(session->get());
3681 rr->op = op;
3682 routed_requests[rr->tid] = rr;
3683 session->routed_request_tids.insert(rr->tid);
3684
3685 dout(10) << "forward_request " << rr->tid << " request " << *req
3686 << " features " << rr->con_features << dendl;
3687
3688 MForward *forward = new MForward(rr->tid,
3689 req,
3690 rr->con_features,
3691 rr->session->caps);
3692 forward->set_priority(req->get_priority());
3693 if (session->auth_handler) {
3694 forward->entity_name = session->entity_name;
3695 } else if (req->get_source().is_mon()) {
3696 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3697 }
3698 messenger->send_message(forward, monmap->get_inst(mon));
3699 op->mark_forwarded();
3700 assert(op->get_req()->get_type() != 0);
3701 } else {
3702 dout(10) << "forward_request no session for request " << *req << dendl;
3703 }
3704 }
3705
3706 // fake connection attached to forwarded messages
3707 struct AnonConnection : public Connection {
3708 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3709
3710 int send_message(Message *m) override {
3711 assert(!"send_message on anonymous connection");
3712 }
3713 void send_keepalive() override {
3714 assert(!"send_keepalive on anonymous connection");
3715 }
3716 void mark_down() override {
3717 // silently ignore
3718 }
3719 void mark_disposable() override {
3720 // silengtly ignore
3721 }
3722 bool is_connected() override { return false; }
3723 };
3724
3725 //extract the original message and put it into the regular dispatch function
3726 void Monitor::handle_forward(MonOpRequestRef op)
3727 {
3728 MForward *m = static_cast<MForward*>(op->get_req());
3729 dout(10) << "received forwarded message from " << m->client
3730 << " via " << m->get_source_inst() << dendl;
3731 MonSession *session = op->get_session();
3732 assert(session);
3733
3734 if (!session->is_capable("mon", MON_CAP_X)) {
3735 dout(0) << "forward from entity with insufficient caps! "
3736 << session->caps << dendl;
3737 } else {
3738 // see PaxosService::dispatch(); we rely on this being anon
3739 // (c->msgr == NULL)
3740 PaxosServiceMessage *req = m->claim_message();
3741 assert(req != NULL);
3742
3743 ConnectionRef c(new AnonConnection(cct));
3744 MonSession *s = new MonSession(req->get_source_inst(),
3745 static_cast<Connection*>(c.get()));
3746 c->set_priv(s->get());
3747 c->set_peer_addr(m->client.addr);
3748 c->set_peer_type(m->client.name.type());
3749 c->set_features(m->con_features);
3750
3751 s->caps = m->client_caps;
3752 dout(10) << " caps are " << s->caps << dendl;
3753 s->entity_name = m->entity_name;
3754 dout(10) << " entity name '" << s->entity_name << "' type "
3755 << s->entity_name.get_type() << dendl;
3756 s->proxy_con = m->get_connection();
3757 s->proxy_tid = m->tid;
3758
3759 req->set_connection(c);
3760
3761 // not super accurate, but better than nothing.
3762 req->set_recv_stamp(m->get_recv_stamp());
3763
3764 /*
3765 * note which election epoch this is; we will drop the message if
3766 * there is a future election since our peers will resend routed
3767 * requests in that case.
3768 */
3769 req->rx_election_epoch = get_epoch();
3770
3771 /* Because this is a special fake connection, we need to break
3772 the ref loop between Connection and MonSession differently
3773 than we normally do. Here, the Message refers to the Connection
3774 which refers to the Session, and nobody else refers to the Connection
3775 or the Session. And due to the special nature of this message,
3776 nobody refers to the Connection via the Session. So, clear out that
3777 half of the ref loop.*/
3778 s->con.reset(NULL);
3779
3780 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3781
3782 _ms_dispatch(req);
3783 s->put();
3784 }
3785 }
3786
3787 void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3788 {
3789 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3790
3791 bufferlist bl;
3792 encode_message(m, quorum_con_features, bl);
3793
3794 messenger->send_message(m, to);
3795
3796 for (int i=0; i<(int)monmap->size(); i++) {
3797 if (i != rank)
3798 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3799 }
3800 }
3801
3802 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3803 {
3804 op->mark_event(__func__);
3805
3806 MonSession *session = op->get_session();
3807 assert(session);
3808 Message *req = op->get_req();
3809 ConnectionRef con = op->get_connection();
3810
3811 reply->set_cct(g_ceph_context);
3812 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3813
3814 if (!con) {
3815 dout(2) << "send_reply no connection, dropping reply " << *reply
3816 << " to " << req << " " << *req << dendl;
3817 reply->put();
3818 op->mark_event("reply: no connection");
3819 return;
3820 }
3821
3822 if (!session->con && !session->proxy_con) {
3823 dout(2) << "send_reply no connection, dropping reply " << *reply
3824 << " to " << req << " " << *req << dendl;
3825 reply->put();
3826 op->mark_event("reply: no connection");
3827 return;
3828 }
3829
3830 if (session->proxy_con) {
3831 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3832 << " via " << session->proxy_con->get_peer_addr()
3833 << " for request " << *req << dendl;
3834 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3835 op->mark_event("reply: send routed request");
3836 } else {
3837 session->con->send_message(reply);
3838 op->mark_event("reply: send");
3839 }
3840 }
3841
3842 void Monitor::no_reply(MonOpRequestRef op)
3843 {
3844 MonSession *session = op->get_session();
3845 Message *req = op->get_req();
3846
3847 if (session->proxy_con) {
3848 dout(10) << "no_reply to " << req->get_source_inst()
3849 << " via " << session->proxy_con->get_peer_addr()
3850 << " for request " << *req << dendl;
3851 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3852 op->mark_event("no_reply: send routed request");
3853 } else {
3854 dout(10) << "no_reply to " << req->get_source_inst()
3855 << " " << *req << dendl;
3856 op->mark_event("no_reply");
3857 }
3858 }
3859
3860 void Monitor::handle_route(MonOpRequestRef op)
3861 {
3862 MRoute *m = static_cast<MRoute*>(op->get_req());
3863 MonSession *session = op->get_session();
3864 //check privileges
3865 if (!session->is_capable("mon", MON_CAP_X)) {
3866 dout(0) << "MRoute received from entity without appropriate perms! "
3867 << dendl;
3868 return;
3869 }
3870 if (m->msg)
3871 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3872 else
3873 dout(10) << "handle_route null to " << m->dest << dendl;
3874
3875 // look it up
3876 if (m->session_mon_tid) {
3877 if (routed_requests.count(m->session_mon_tid)) {
3878 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3879
3880 // reset payload, in case encoding is dependent on target features
3881 if (m->msg) {
3882 m->msg->clear_payload();
3883 rr->con->send_message(m->msg);
3884 m->msg = NULL;
3885 }
3886 if (m->send_osdmap_first) {
3887 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3888 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3889 true, MonOpRequestRef());
3890 }
3891 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3892 routed_requests.erase(m->session_mon_tid);
3893 rr->session->routed_request_tids.erase(m->session_mon_tid);
3894 delete rr;
3895 } else {
3896 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3897 }
3898 } else {
3899 dout(10) << " not a routed request, trying to send anyway" << dendl;
3900 if (m->msg) {
3901 messenger->send_message(m->msg, m->dest);
3902 m->msg = NULL;
3903 }
3904 }
3905 }
3906
3907 void Monitor::resend_routed_requests()
3908 {
3909 dout(10) << "resend_routed_requests" << dendl;
3910 int mon = get_leader();
3911 list<Context*> retry;
3912 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
3913 p != routed_requests.end();
3914 ++p) {
3915 RoutedRequest *rr = p->second;
3916
3917 if (mon == rank) {
3918 dout(10) << " requeue for self tid " << rr->tid << dendl;
3919 rr->op->mark_event("retry routed request");
3920 retry.push_back(new C_RetryMessage(this, rr->op));
3921 if (rr->session) {
3922 assert(rr->session->routed_request_tids.count(p->first));
3923 rr->session->routed_request_tids.erase(p->first);
3924 }
3925 delete rr;
3926 } else {
3927 bufferlist::iterator q = rr->request_bl.begin();
3928 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
3929 rr->op->mark_event("resend forwarded message to leader");
3930 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
3931 MForward *forward = new MForward(rr->tid, req, rr->con_features,
3932 rr->session->caps);
3933 req->put(); // forward takes its own ref; drop ours.
3934 forward->client = rr->client_inst;
3935 forward->set_priority(req->get_priority());
3936 messenger->send_message(forward, monmap->get_inst(mon));
3937 }
3938 }
3939 if (mon == rank) {
3940 routed_requests.clear();
3941 finish_contexts(g_ceph_context, retry);
3942 }
3943 }
3944
3945 void Monitor::remove_session(MonSession *s)
3946 {
3947 dout(10) << "remove_session " << s << " " << s->inst
3948 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
3949 assert(s->con);
3950 assert(!s->closed);
3951 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
3952 p != s->routed_request_tids.end();
3953 ++p) {
3954 assert(routed_requests.count(*p));
3955 RoutedRequest *rr = routed_requests[*p];
3956 dout(10) << " dropping routed request " << rr->tid << dendl;
3957 delete rr;
3958 routed_requests.erase(*p);
3959 }
3960 s->routed_request_tids.clear();
3961 s->con->set_priv(NULL);
3962 session_map.remove_session(s);
3963 logger->set(l_mon_num_sessions, session_map.get_size());
3964 logger->inc(l_mon_session_rm);
3965 }
3966
3967 void Monitor::remove_all_sessions()
3968 {
3969 Mutex::Locker l(session_map_lock);
3970 while (!session_map.sessions.empty()) {
3971 MonSession *s = session_map.sessions.front();
3972 remove_session(s);
3973 if (logger)
3974 logger->inc(l_mon_session_rm);
3975 }
3976 if (logger)
3977 logger->set(l_mon_num_sessions, session_map.get_size());
3978 }
3979
3980 void Monitor::send_command(const entity_inst_t& inst,
3981 const vector<string>& com)
3982 {
3983 dout(10) << "send_command " << inst << "" << com << dendl;
3984 MMonCommand *c = new MMonCommand(monmap->fsid);
3985 c->cmd = com;
3986 try_send_message(c, inst);
3987 }
3988
3989 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
3990 {
3991 /**
3992 * Wait list the new session until we're in the quorum, assuming it's
3993 * sufficiently new.
3994 * tick() will periodically send them back through so we can send
3995 * the client elsewhere if we don't think we're getting back in.
3996 *
3997 * But we whitelist a few sorts of messages:
3998 * 1) Monitors can talk to us at any time, of course.
3999 * 2) auth messages. It's unlikely to go through much faster, but
4000 * it's possible we've just lost our quorum status and we want to take...
4001 * 3) command messages. We want to accept these under all possible
4002 * circumstances.
4003 */
4004 Message *m = op->get_req();
4005 MonSession *s = op->get_session();
4006 ConnectionRef con = op->get_connection();
4007 utime_t too_old = ceph_clock_now();
4008 too_old -= g_ceph_context->_conf->mon_lease;
4009 if (m->get_recv_stamp() > too_old &&
4010 con->is_connected()) {
4011 dout(5) << "waitlisting message " << *m << dendl;
4012 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4013 op->mark_wait_for_quorum();
4014 } else {
4015 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4016 con->mark_down();
4017 // proxied sessions aren't registered and don't have a con; don't remove
4018 // those.
4019 if (!s->proxy_con) {
4020 Mutex::Locker l(session_map_lock);
4021 remove_session(s);
4022 }
4023 op->mark_zap();
4024 }
4025 }
4026
4027 void Monitor::_ms_dispatch(Message *m)
4028 {
4029 if (is_shutdown()) {
4030 m->put();
4031 return;
4032 }
4033
4034 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4035 bool src_is_mon = op->is_src_mon();
4036 op->mark_event("mon:_ms_dispatch");
4037 MonSession *s = op->get_session();
4038 if (s && s->closed) {
4039 return;
4040 }
4041
4042 if (src_is_mon && s) {
4043 ConnectionRef con = m->get_connection();
4044 if (con->get_messenger() && con->get_features() != s->con_features) {
4045 // only update features if this is a non-anonymous connection
4046 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4047 << " (was " << s->con_features
4048 << ", now " << con->get_features() << ")" << dendl;
4049 // connection features changed - recreate session.
4050 if (s->con && s->con != con) {
4051 dout(10) << __func__ << " connection for " << m->get_source_inst()
4052 << " changed from session; mark down and replace" << dendl;
4053 s->con->mark_down();
4054 }
4055 if (s->item.is_on_list()) {
4056 // forwarded messages' sessions are not in the sessions map and
4057 // exist only while the op is being handled.
4058 remove_session(s);
4059 }
4060 s->put();
4061 s = nullptr;
4062 }
4063 }
4064
4065 if (!s) {
4066 // if the sender is not a monitor, make sure their first message for a
4067 // session is an MAuth. If it is not, assume it's a stray message,
4068 // and considering that we are creating a new session it is safe to
4069 // assume that the sender hasn't authenticated yet, so we have no way
4070 // of assessing whether we should handle it or not.
4071 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4072 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4073 m->get_type() != CEPH_MSG_PING)) {
4074 dout(1) << __func__ << " dropping stray message " << *m
4075 << " from " << m->get_source_inst() << dendl;
4076 return;
4077 }
4078
4079 ConnectionRef con = m->get_connection();
4080 {
4081 Mutex::Locker l(session_map_lock);
4082 s = session_map.new_session(m->get_source_inst(), con.get());
4083 }
4084 assert(s);
4085 con->set_priv(s->get());
4086 dout(10) << __func__ << " new session " << s << " " << *s
4087 << " features 0x" << std::hex
4088 << s->con_features << std::dec << dendl;
4089 op->set_session(s);
4090
4091 logger->set(l_mon_num_sessions, session_map.get_size());
4092 logger->inc(l_mon_session_add);
4093
4094 if (src_is_mon) {
4095 // give it monitor caps; the peer type has been authenticated
4096 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4097 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4098 s->caps = *mon_caps;
4099 }
4100 s->put();
4101 } else {
4102 dout(20) << __func__ << " existing session " << s << " for " << s->inst
4103 << dendl;
4104 }
4105
4106 assert(s);
4107
4108 s->session_timeout = ceph_clock_now();
4109 s->session_timeout += g_conf->mon_session_timeout;
4110
4111 if (s->auth_handler) {
4112 s->entity_name = s->auth_handler->get_entity_name();
4113 }
4114 dout(20) << " caps " << s->caps.get_str() << dendl;
4115
4116 if ((is_synchronizing() ||
4117 (s->global_id == 0 && !exited_quorum.is_zero())) &&
4118 !src_is_mon &&
4119 m->get_type() != CEPH_MSG_PING) {
4120 waitlist_or_zap_client(op);
4121 } else {
4122 dispatch_op(op);
4123 }
4124 return;
4125 }
4126
4127 void Monitor::dispatch_op(MonOpRequestRef op)
4128 {
4129 op->mark_event("mon:dispatch_op");
4130 MonSession *s = op->get_session();
4131 assert(s);
4132 if (s->closed) {
4133 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4134 return;
4135 }
4136
4137 /* we will consider the default type as being 'monitor' until proven wrong */
4138 op->set_type_monitor();
4139 /* deal with all messages that do not necessarily need caps */
4140 bool dealt_with = true;
4141 switch (op->get_req()->get_type()) {
4142 // auth
4143 case MSG_MON_GLOBAL_ID:
4144 case CEPH_MSG_AUTH:
4145 op->set_type_service();
4146 /* no need to check caps here */
4147 paxos_service[PAXOS_AUTH]->dispatch(op);
4148 break;
4149
4150 case CEPH_MSG_PING:
4151 handle_ping(op);
4152 break;
4153
4154 /* MMonGetMap may be used by clients to obtain a monmap *before*
4155 * authenticating with the monitor. We need to handle these without
4156 * checking caps because, even on a cluster without cephx, we only set
4157 * session caps *after* the auth handshake. A good example of this
4158 * is when a client calls MonClient::get_monmap_privately(), which does
4159 * not authenticate when obtaining a monmap.
4160 */
4161 case CEPH_MSG_MON_GET_MAP:
4162 handle_mon_get_map(op);
4163 break;
4164
4165 case CEPH_MSG_MON_METADATA:
4166 return handle_mon_metadata(op);
4167
4168 default:
4169 dealt_with = false;
4170 break;
4171 }
4172 if (dealt_with)
4173 return;
4174
4175 /* well, maybe the op belongs to a service... */
4176 op->set_type_service();
4177 /* deal with all messages which caps should be checked somewhere else */
4178 dealt_with = true;
4179 switch (op->get_req()->get_type()) {
4180
4181 // OSDs
4182 case CEPH_MSG_MON_GET_OSDMAP:
4183 case CEPH_MSG_POOLOP:
4184 case MSG_OSD_BEACON:
4185 case MSG_OSD_MARK_ME_DOWN:
4186 case MSG_OSD_FULL:
4187 case MSG_OSD_FAILURE:
4188 case MSG_OSD_BOOT:
4189 case MSG_OSD_ALIVE:
4190 case MSG_OSD_PGTEMP:
4191 case MSG_OSD_PG_CREATED:
4192 case MSG_REMOVE_SNAPS:
4193 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4194 break;
4195
4196 // MDSs
4197 case MSG_MDS_BEACON:
4198 case MSG_MDS_OFFLOAD_TARGETS:
4199 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4200 break;
4201
4202 // Mgrs
4203 case MSG_MGR_BEACON:
4204 paxos_service[PAXOS_MGR]->dispatch(op);
4205 break;
4206
4207 // MgrStat
4208 case MSG_MON_MGR_REPORT:
4209 case CEPH_MSG_STATFS:
4210 case MSG_GETPOOLSTATS:
4211 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4212 break;
4213
4214 // pg
4215 case MSG_PGSTATS:
4216 paxos_service[PAXOS_PGMAP]->dispatch(op);
4217 break;
4218
4219 // log
4220 case MSG_LOG:
4221 paxos_service[PAXOS_LOG]->dispatch(op);
4222 break;
4223
4224 // handle_command() does its own caps checking
4225 case MSG_MON_COMMAND:
4226 op->set_type_command();
4227 handle_command(op);
4228 break;
4229
4230 default:
4231 dealt_with = false;
4232 break;
4233 }
4234 if (dealt_with)
4235 return;
4236
4237 /* nop, looks like it's not a service message; revert back to monitor */
4238 op->set_type_monitor();
4239
4240 /* messages we, the Monitor class, need to deal with
4241 * but may be sent by clients. */
4242
4243 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4244 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4245 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4246 << dendl;
4247 goto drop;
4248 }
4249
4250 dealt_with = true;
4251 switch (op->get_req()->get_type()) {
4252
4253 // misc
4254 case CEPH_MSG_MON_GET_VERSION:
4255 handle_get_version(op);
4256 break;
4257
4258 case CEPH_MSG_MON_SUBSCRIBE:
4259 /* FIXME: check what's being subscribed, filter accordingly */
4260 handle_subscribe(op);
4261 break;
4262
4263 default:
4264 dealt_with = false;
4265 break;
4266 }
4267 if (dealt_with)
4268 return;
4269
4270 if (!op->is_src_mon()) {
4271 dout(1) << __func__ << " unexpected monitor message from"
4272 << " non-monitor entity " << op->get_req()->get_source_inst()
4273 << " " << *(op->get_req()) << " -- dropping" << dendl;
4274 goto drop;
4275 }
4276
4277 /* messages that should only be sent by another monitor */
4278 dealt_with = true;
4279 switch (op->get_req()->get_type()) {
4280
4281 case MSG_ROUTE:
4282 handle_route(op);
4283 break;
4284
4285 case MSG_MON_PROBE:
4286 handle_probe(op);
4287 break;
4288
4289 // Sync (i.e., the new slurp, but on steroids)
4290 case MSG_MON_SYNC:
4291 handle_sync(op);
4292 break;
4293 case MSG_MON_SCRUB:
4294 handle_scrub(op);
4295 break;
4296
4297 /* log acks are sent from a monitor we sent the MLog to, and are
4298 never sent by clients to us. */
4299 case MSG_LOGACK:
4300 log_client.handle_log_ack((MLogAck*)op->get_req());
4301 break;
4302
4303 // monmap
4304 case MSG_MON_JOIN:
4305 op->set_type_service();
4306 paxos_service[PAXOS_MONMAP]->dispatch(op);
4307 break;
4308
4309 // paxos
4310 case MSG_MON_PAXOS:
4311 {
4312 op->set_type_paxos();
4313 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4314 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4315 //can't send these!
4316 break;
4317 }
4318
4319 if (state == STATE_SYNCHRONIZING) {
4320 // we are synchronizing. These messages would do us no
4321 // good, thus just drop them and ignore them.
4322 dout(10) << __func__ << " ignore paxos msg from "
4323 << pm->get_source_inst() << dendl;
4324 break;
4325 }
4326
4327 // sanitize
4328 if (pm->epoch > get_epoch()) {
4329 bootstrap();
4330 break;
4331 }
4332 if (pm->epoch != get_epoch()) {
4333 break;
4334 }
4335
4336 paxos->dispatch(op);
4337 }
4338 break;
4339
4340 // elector messages
4341 case MSG_MON_ELECTION:
4342 op->set_type_election();
4343 //check privileges here for simplicity
4344 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4345 dout(0) << "MMonElection received from entity without enough caps!"
4346 << op->get_session()->caps << dendl;
4347 break;
4348 }
4349 if (!is_probing() && !is_synchronizing()) {
4350 elector.dispatch(op);
4351 }
4352 break;
4353
4354 case MSG_FORWARD:
4355 handle_forward(op);
4356 break;
4357
4358 case MSG_TIMECHECK:
4359 handle_timecheck(op);
4360 break;
4361
4362 case MSG_MON_HEALTH:
4363 health_monitor->dispatch(op);
4364 break;
4365
4366 case MSG_MON_HEALTH_CHECKS:
4367 op->set_type_service();
4368 paxos_service[PAXOS_HEALTH]->dispatch(op);
4369 break;
4370
4371 default:
4372 dealt_with = false;
4373 break;
4374 }
4375 if (!dealt_with) {
4376 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4377 goto drop;
4378 }
4379 return;
4380
4381 drop:
4382 return;
4383 }
4384
4385 void Monitor::handle_ping(MonOpRequestRef op)
4386 {
4387 MPing *m = static_cast<MPing*>(op->get_req());
4388 dout(10) << __func__ << " " << *m << dendl;
4389 MPing *reply = new MPing;
4390 entity_inst_t inst = m->get_source_inst();
4391 bufferlist payload;
4392 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4393 f->open_object_section("pong");
4394
4395 list<string> health_str;
4396 get_health(health_str, NULL, f.get());
4397 {
4398 stringstream ss;
4399 get_mon_status(f.get(), ss);
4400 }
4401
4402 f->close_section();
4403 stringstream ss;
4404 f->flush(ss);
4405 ::encode(ss.str(), payload);
4406 reply->set_payload(payload);
4407 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4408 messenger->send_message(reply, inst);
4409 }
4410
4411 void Monitor::timecheck_start()
4412 {
4413 dout(10) << __func__ << dendl;
4414 timecheck_cleanup();
4415 timecheck_start_round();
4416 }
4417
4418 void Monitor::timecheck_finish()
4419 {
4420 dout(10) << __func__ << dendl;
4421 timecheck_cleanup();
4422 }
4423
4424 void Monitor::timecheck_start_round()
4425 {
4426 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4427 assert(is_leader());
4428
4429 if (monmap->size() == 1) {
4430 assert(0 == "We are alone; this shouldn't have been scheduled!");
4431 return;
4432 }
4433
4434 if (timecheck_round % 2) {
4435 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4436 utime_t curr_time = ceph_clock_now();
4437 double max = g_conf->mon_timecheck_interval*3;
4438 if (curr_time - timecheck_round_start < max) {
4439 dout(10) << __func__ << " keep current round going" << dendl;
4440 goto out;
4441 } else {
4442 dout(10) << __func__
4443 << " finish current timecheck and start new" << dendl;
4444 timecheck_cancel_round();
4445 }
4446 }
4447
4448 assert(timecheck_round % 2 == 0);
4449 timecheck_acks = 0;
4450 timecheck_round ++;
4451 timecheck_round_start = ceph_clock_now();
4452 dout(10) << __func__ << " new " << timecheck_round << dendl;
4453
4454 timecheck();
4455 out:
4456 dout(10) << __func__ << " setting up next event" << dendl;
4457 timecheck_reset_event();
4458 }
4459
4460 void Monitor::timecheck_finish_round(bool success)
4461 {
4462 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4463 assert(timecheck_round % 2);
4464 timecheck_round ++;
4465 timecheck_round_start = utime_t();
4466
4467 if (success) {
4468 assert(timecheck_waiting.empty());
4469 assert(timecheck_acks == quorum.size());
4470 timecheck_report();
4471 timecheck_check_skews();
4472 return;
4473 }
4474
4475 dout(10) << __func__ << " " << timecheck_waiting.size()
4476 << " peers still waiting:";
4477 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4478 p != timecheck_waiting.end(); ++p) {
4479 *_dout << " " << p->first.name;
4480 }
4481 *_dout << dendl;
4482 timecheck_waiting.clear();
4483
4484 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4485 }
4486
4487 void Monitor::timecheck_cancel_round()
4488 {
4489 timecheck_finish_round(false);
4490 }
4491
4492 void Monitor::timecheck_cleanup()
4493 {
4494 timecheck_round = 0;
4495 timecheck_acks = 0;
4496 timecheck_round_start = utime_t();
4497
4498 if (timecheck_event) {
4499 timer.cancel_event(timecheck_event);
4500 timecheck_event = NULL;
4501 }
4502 timecheck_waiting.clear();
4503 timecheck_skews.clear();
4504 timecheck_latencies.clear();
4505
4506 timecheck_rounds_since_clean = 0;
4507 }
4508
4509 void Monitor::timecheck_reset_event()
4510 {
4511 if (timecheck_event) {
4512 timer.cancel_event(timecheck_event);
4513 timecheck_event = NULL;
4514 }
4515
4516 double delay =
4517 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4518
4519 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4520 delay = cct->_conf->mon_timecheck_interval;
4521 }
4522
4523 dout(10) << __func__ << " delay " << delay
4524 << " rounds_since_clean " << timecheck_rounds_since_clean
4525 << dendl;
4526
4527 timecheck_event = new C_MonContext(this, [this](int) {
4528 timecheck_start_round();
4529 });
4530 timer.add_event_after(delay, timecheck_event);
4531 }
4532
4533 void Monitor::timecheck_check_skews()
4534 {
4535 dout(10) << __func__ << dendl;
4536 assert(is_leader());
4537 assert((timecheck_round % 2) == 0);
4538 if (monmap->size() == 1) {
4539 assert(0 == "We are alone; we shouldn't have gotten here!");
4540 return;
4541 }
4542 assert(timecheck_latencies.size() == timecheck_skews.size());
4543
4544 bool found_skew = false;
4545 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4546 p != timecheck_skews.end(); ++p) {
4547
4548 double abs_skew;
4549 if (timecheck_has_skew(p->second, &abs_skew)) {
4550 dout(10) << __func__
4551 << " " << p->first << " skew " << abs_skew << dendl;
4552 found_skew = true;
4553 }
4554 }
4555
4556 if (found_skew) {
4557 ++timecheck_rounds_since_clean;
4558 timecheck_reset_event();
4559 } else if (timecheck_rounds_since_clean > 0) {
4560 dout(1) << __func__
4561 << " no clock skews found after " << timecheck_rounds_since_clean
4562 << " rounds" << dendl;
4563 // make sure the skews are really gone and not just a transient success
4564 // this will run just once if not in the presence of skews again.
4565 timecheck_rounds_since_clean = 1;
4566 timecheck_reset_event();
4567 timecheck_rounds_since_clean = 0;
4568 }
4569
4570 }
4571
4572 void Monitor::timecheck_report()
4573 {
4574 dout(10) << __func__ << dendl;
4575 assert(is_leader());
4576 assert((timecheck_round % 2) == 0);
4577 if (monmap->size() == 1) {
4578 assert(0 == "We are alone; we shouldn't have gotten here!");
4579 return;
4580 }
4581
4582 assert(timecheck_latencies.size() == timecheck_skews.size());
4583 bool do_output = true; // only output report once
4584 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4585 if (monmap->get_name(*q) == name)
4586 continue;
4587
4588 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4589 m->epoch = get_epoch();
4590 m->round = timecheck_round;
4591
4592 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4593 it != timecheck_skews.end(); ++it) {
4594 double skew = it->second;
4595 double latency = timecheck_latencies[it->first];
4596
4597 m->skews[it->first] = skew;
4598 m->latencies[it->first] = latency;
4599
4600 if (do_output) {
4601 dout(25) << __func__ << " " << it->first
4602 << " latency " << latency
4603 << " skew " << skew << dendl;
4604 }
4605 }
4606 do_output = false;
4607 entity_inst_t inst = monmap->get_inst(*q);
4608 dout(10) << __func__ << " send report to " << inst << dendl;
4609 messenger->send_message(m, inst);
4610 }
4611 }
4612
4613 void Monitor::timecheck()
4614 {
4615 dout(10) << __func__ << dendl;
4616 assert(is_leader());
4617 if (monmap->size() == 1) {
4618 assert(0 == "We are alone; we shouldn't have gotten here!");
4619 return;
4620 }
4621 assert(timecheck_round % 2 != 0);
4622
4623 timecheck_acks = 1; // we ack ourselves
4624
4625 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4626 << " round " << timecheck_round << dendl;
4627
4628 // we are at the eye of the storm; the point of reference
4629 timecheck_skews[messenger->get_myinst()] = 0.0;
4630 timecheck_latencies[messenger->get_myinst()] = 0.0;
4631
4632 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4633 if (monmap->get_name(*it) == name)
4634 continue;
4635
4636 entity_inst_t inst = monmap->get_inst(*it);
4637 utime_t curr_time = ceph_clock_now();
4638 timecheck_waiting[inst] = curr_time;
4639 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4640 m->epoch = get_epoch();
4641 m->round = timecheck_round;
4642 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4643 messenger->send_message(m, inst);
4644 }
4645 }
4646
4647 health_status_t Monitor::timecheck_status(ostringstream &ss,
4648 const double skew_bound,
4649 const double latency)
4650 {
4651 health_status_t status = HEALTH_OK;
4652 assert(latency >= 0);
4653
4654 double abs_skew;
4655 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4656 status = HEALTH_WARN;
4657 ss << "clock skew " << abs_skew << "s"
4658 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4659 }
4660
4661 return status;
4662 }
4663
4664 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4665 {
4666 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4667 dout(10) << __func__ << " " << *m << dendl;
4668 /* handles PONG's */
4669 assert(m->op == MTimeCheck::OP_PONG);
4670
4671 entity_inst_t other = m->get_source_inst();
4672 if (m->epoch < get_epoch()) {
4673 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4674 << " from " << other
4675 << " curr " << get_epoch()
4676 << " -- severely lagged? discard" << dendl;
4677 return;
4678 }
4679 assert(m->epoch == get_epoch());
4680
4681 if (m->round < timecheck_round) {
4682 dout(1) << __func__ << " got old round " << m->round
4683 << " from " << other
4684 << " curr " << timecheck_round << " -- discard" << dendl;
4685 return;
4686 }
4687
4688 utime_t curr_time = ceph_clock_now();
4689
4690 assert(timecheck_waiting.count(other) > 0);
4691 utime_t timecheck_sent = timecheck_waiting[other];
4692 timecheck_waiting.erase(other);
4693 if (curr_time < timecheck_sent) {
4694 // our clock was readjusted -- drop everything until it all makes sense.
4695 dout(1) << __func__ << " our clock was readjusted --"
4696 << " bump round and drop current check"
4697 << dendl;
4698 timecheck_cancel_round();
4699 return;
4700 }
4701
4702 /* update peer latencies */
4703 double latency = (double)(curr_time - timecheck_sent);
4704
4705 if (timecheck_latencies.count(other) == 0)
4706 timecheck_latencies[other] = latency;
4707 else {
4708 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4709 timecheck_latencies[other] = avg_latency;
4710 }
4711
4712 /*
4713 * update skews
4714 *
4715 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4716 * and 'a' happens to be lower than 'b'; so we use double instead.
4717 *
4718 * latency is always expected to be >= 0.
4719 *
4720 * delta, the difference between theirs timestamp and ours, may either be
4721 * lower or higher than 0; will hardly ever be 0.
4722 *
4723 * The absolute skew is the absolute delta minus the latency, which is
4724 * taken as a whole instead of an rtt given that there is some queueing
4725 * and dispatch times involved and it's hard to assess how long exactly
4726 * it took for the message to travel to the other side and be handled. So
4727 * we call it a bounded skew, the worst case scenario.
4728 *
4729 * Now, to math!
4730 *
4731 * Given that the latency is always positive, we can establish that the
4732 * bounded skew will be:
4733 *
4734 * 1. positive if the absolute delta is higher than the latency and
4735 * delta is positive
4736 * 2. negative if the absolute delta is higher than the latency and
4737 * delta is negative.
4738 * 3. zero if the absolute delta is lower than the latency.
4739 *
4740 * On 3. we make a judgement call and treat the skew as non-existent.
4741 * This is because that, if the absolute delta is lower than the
4742 * latency, then the apparently existing skew is nothing more than a
4743 * side-effect of the high latency at work.
4744 *
4745 * This may not be entirely true though, as a severely skewed clock
4746 * may be masked by an even higher latency, but with high latencies
4747 * we probably have worse issues to deal with than just skewed clocks.
4748 */
4749 assert(latency >= 0);
4750
4751 double delta = ((double) m->timestamp) - ((double) curr_time);
4752 double abs_delta = (delta > 0 ? delta : -delta);
4753 double skew_bound = abs_delta - latency;
4754 if (skew_bound < 0)
4755 skew_bound = 0;
4756 else if (delta < 0)
4757 skew_bound = -skew_bound;
4758
4759 ostringstream ss;
4760 health_status_t status = timecheck_status(ss, skew_bound, latency);
4761 if (status == HEALTH_ERR)
4762 clog->error() << other << " " << ss.str();
4763 else if (status == HEALTH_WARN)
4764 clog->warn() << other << " " << ss.str();
4765
4766 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4767 << " delta " << delta << " skew_bound " << skew_bound
4768 << " latency " << latency << dendl;
4769
4770 timecheck_skews[other] = skew_bound;
4771
4772 timecheck_acks++;
4773 if (timecheck_acks == quorum.size()) {
4774 dout(10) << __func__ << " got pongs from everybody ("
4775 << timecheck_acks << " total)" << dendl;
4776 assert(timecheck_skews.size() == timecheck_acks);
4777 assert(timecheck_waiting.empty());
4778 // everyone has acked, so bump the round to finish it.
4779 timecheck_finish_round();
4780 }
4781 }
4782
4783 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4784 {
4785 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4786 dout(10) << __func__ << " " << *m << dendl;
4787
4788 assert(is_peon());
4789 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4790
4791 if (m->epoch != get_epoch()) {
4792 dout(1) << __func__ << " got wrong epoch "
4793 << "(ours " << get_epoch()
4794 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4795 return;
4796 }
4797
4798 if (m->round < timecheck_round) {
4799 dout(1) << __func__ << " got old round " << m->round
4800 << " current " << timecheck_round
4801 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4802 return;
4803 }
4804
4805 timecheck_round = m->round;
4806
4807 if (m->op == MTimeCheck::OP_REPORT) {
4808 assert((timecheck_round % 2) == 0);
4809 timecheck_latencies.swap(m->latencies);
4810 timecheck_skews.swap(m->skews);
4811 return;
4812 }
4813
4814 assert((timecheck_round % 2) != 0);
4815 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4816 utime_t curr_time = ceph_clock_now();
4817 reply->timestamp = curr_time;
4818 reply->epoch = m->epoch;
4819 reply->round = m->round;
4820 dout(10) << __func__ << " send " << *m
4821 << " to " << m->get_source_inst() << dendl;
4822 m->get_connection()->send_message(reply);
4823 }
4824
4825 void Monitor::handle_timecheck(MonOpRequestRef op)
4826 {
4827 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4828 dout(10) << __func__ << " " << *m << dendl;
4829
4830 if (is_leader()) {
4831 if (m->op != MTimeCheck::OP_PONG) {
4832 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4833 } else {
4834 handle_timecheck_leader(op);
4835 }
4836 } else if (is_peon()) {
4837 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4838 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4839 } else {
4840 handle_timecheck_peon(op);
4841 }
4842 } else {
4843 dout(1) << __func__ << " drop unexpected msg" << dendl;
4844 }
4845 }
4846
4847 void Monitor::handle_subscribe(MonOpRequestRef op)
4848 {
4849 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4850 dout(10) << "handle_subscribe " << *m << dendl;
4851
4852 bool reply = false;
4853
4854 MonSession *s = op->get_session();
4855 assert(s);
4856
4857 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4858 p != m->what.end();
4859 ++p) {
4860 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4861 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4862 reply = true;
4863
4864 // remove conflicting subscribes
4865 if (logmon()->sub_name_to_id(p->first) >= 0) {
4866 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4867 it != s->sub_map.end(); ) {
4868 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4869 Mutex::Locker l(session_map_lock);
4870 session_map.remove_sub((it++)->second);
4871 } else {
4872 ++it;
4873 }
4874 }
4875 }
4876
4877 {
4878 Mutex::Locker l(session_map_lock);
4879 session_map.add_update_sub(s, p->first, p->second.start,
4880 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4881 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4882 }
4883
4884 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4885 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4886 if ((int)s->is_capable("mds", MON_CAP_R)) {
4887 Subscription *sub = s->sub_map[p->first];
4888 assert(sub != nullptr);
4889 mdsmon()->check_sub(sub);
4890 }
4891 } else if (p->first == "osdmap") {
4892 if ((int)s->is_capable("osd", MON_CAP_R)) {
4893 if (s->osd_epoch > p->second.start) {
4894 // client needs earlier osdmaps on purpose, so reset the sent epoch
4895 s->osd_epoch = 0;
4896 }
4897 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4898 }
4899 } else if (p->first == "osd_pg_creates") {
4900 if ((int)s->is_capable("osd", MON_CAP_W)) {
4901 if (monmap->get_required_features().contains_all(
4902 ceph::features::mon::FEATURE_LUMINOUS)) {
4903 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
4904 } else {
4905 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
4906 }
4907 }
4908 } else if (p->first == "monmap") {
4909 monmon()->check_sub(s->sub_map[p->first]);
4910 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
4911 logmon()->check_sub(s->sub_map[p->first]);
4912 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
4913 mgrmon()->check_sub(s->sub_map[p->first]);
4914 } else if (p->first == "servicemap") {
4915 mgrstatmon()->check_sub(s->sub_map[p->first]);
4916 }
4917 }
4918
4919 if (reply) {
4920 // we only need to reply if the client is old enough to think it
4921 // has to send renewals.
4922 ConnectionRef con = m->get_connection();
4923 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
4924 m->get_connection()->send_message(new MMonSubscribeAck(
4925 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
4926 }
4927
4928 }
4929
4930 void Monitor::handle_get_version(MonOpRequestRef op)
4931 {
4932 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
4933 dout(10) << "handle_get_version " << *m << dendl;
4934 PaxosService *svc = NULL;
4935
4936 MonSession *s = op->get_session();
4937 assert(s);
4938
4939 if (!is_leader() && !is_peon()) {
4940 dout(10) << " waiting for quorum" << dendl;
4941 waitfor_quorum.push_back(new C_RetryMessage(this, op));
4942 goto out;
4943 }
4944
4945 if (m->what == "mdsmap") {
4946 svc = mdsmon();
4947 } else if (m->what == "fsmap") {
4948 svc = mdsmon();
4949 } else if (m->what == "osdmap") {
4950 svc = osdmon();
4951 } else if (m->what == "monmap") {
4952 svc = monmon();
4953 } else {
4954 derr << "invalid map type " << m->what << dendl;
4955 }
4956
4957 if (svc) {
4958 if (!svc->is_readable()) {
4959 svc->wait_for_readable(op, new C_RetryMessage(this, op));
4960 goto out;
4961 }
4962
4963 MMonGetVersionReply *reply = new MMonGetVersionReply();
4964 reply->handle = m->handle;
4965 reply->version = svc->get_last_committed();
4966 reply->oldest_version = svc->get_first_committed();
4967 reply->set_tid(m->get_tid());
4968
4969 m->get_connection()->send_message(reply);
4970 }
4971 out:
4972 return;
4973 }
4974
4975 bool Monitor::ms_handle_reset(Connection *con)
4976 {
4977 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
4978
4979 // ignore lossless monitor sessions
4980 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4981 return false;
4982
4983 MonSession *s = static_cast<MonSession *>(con->get_priv());
4984 if (!s)
4985 return false;
4986
4987 // break any con <-> session ref cycle
4988 s->con->set_priv(NULL);
4989
4990 if (is_shutdown())
4991 return false;
4992
4993 Mutex::Locker l(lock);
4994
4995 dout(10) << "reset/close on session " << s->inst << dendl;
4996 if (!s->closed) {
4997 Mutex::Locker l(session_map_lock);
4998 remove_session(s);
4999 }
5000 s->put();
5001 return true;
5002 }
5003
5004 bool Monitor::ms_handle_refused(Connection *con)
5005 {
5006 // just log for now...
5007 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5008 return false;
5009 }
5010
5011 // -----
5012
5013 void Monitor::send_latest_monmap(Connection *con)
5014 {
5015 bufferlist bl;
5016 monmap->encode(bl, con->get_features());
5017 con->send_message(new MMonMap(bl));
5018 }
5019
5020 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5021 {
5022 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5023 dout(10) << "handle_mon_get_map" << dendl;
5024 send_latest_monmap(m->get_connection().get());
5025 }
5026
5027 void Monitor::handle_mon_metadata(MonOpRequestRef op)
5028 {
5029 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5030 if (is_leader()) {
5031 dout(10) << __func__ << dendl;
5032 update_mon_metadata(m->get_source().num(), std::move(m->data));
5033 }
5034 }
5035
5036 void Monitor::update_mon_metadata(int from, Metadata&& m)
5037 {
5038 // NOTE: this is now for legacy (kraken or jewel) mons only.
5039 pending_metadata[from] = std::move(m);
5040
5041 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5042 bufferlist bl;
5043 ::encode(pending_metadata, bl);
5044 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5045 paxos->trigger_propose();
5046 }
5047
5048 int Monitor::load_metadata()
5049 {
5050 bufferlist bl;
5051 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5052 if (r)
5053 return r;
5054 bufferlist::iterator it = bl.begin();
5055 ::decode(mon_metadata, it);
5056
5057 pending_metadata = mon_metadata;
5058 return 0;
5059 }
5060
5061 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5062 {
5063 assert(f);
5064 if (!mon_metadata.count(mon)) {
5065 err << "mon." << mon << " not found";
5066 return -EINVAL;
5067 }
5068 const Metadata& m = mon_metadata[mon];
5069 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5070 f->dump_string(p->first.c_str(), p->second);
5071 }
5072 return 0;
5073 }
5074
5075 void Monitor::count_metadata(const string& field, map<string,int> *out)
5076 {
5077 for (auto& p : mon_metadata) {
5078 auto q = p.second.find(field);
5079 if (q == p.second.end()) {
5080 (*out)["unknown"]++;
5081 } else {
5082 (*out)[q->second]++;
5083 }
5084 }
5085 }
5086
5087 void Monitor::count_metadata(const string& field, Formatter *f)
5088 {
5089 map<string,int> by_val;
5090 count_metadata(field, &by_val);
5091 f->open_object_section(field.c_str());
5092 for (auto& p : by_val) {
5093 f->dump_int(p.first.c_str(), p.second);
5094 }
5095 f->close_section();
5096 }
5097
5098 int Monitor::print_nodes(Formatter *f, ostream& err)
5099 {
5100 map<string, list<int> > mons; // hostname => mon
5101 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5102 it != mon_metadata.end(); ++it) {
5103 const Metadata& m = it->second;
5104 Metadata::const_iterator hostname = m.find("hostname");
5105 if (hostname == m.end()) {
5106 // not likely though
5107 continue;
5108 }
5109 mons[hostname->second].push_back(it->first);
5110 }
5111
5112 dump_services(f, mons, "mon");
5113 return 0;
5114 }
5115
5116 // ----------------------------------------------
5117 // scrub
5118
5119 int Monitor::scrub_start()
5120 {
5121 dout(10) << __func__ << dendl;
5122 assert(is_leader());
5123
5124 if (!scrub_result.empty()) {
5125 clog->info() << "scrub already in progress";
5126 return -EBUSY;
5127 }
5128
5129 scrub_event_cancel();
5130 scrub_result.clear();
5131 scrub_state.reset(new ScrubState);
5132
5133 scrub();
5134 return 0;
5135 }
5136
5137 int Monitor::scrub()
5138 {
5139 assert(is_leader());
5140 assert(scrub_state);
5141
5142 scrub_cancel_timeout();
5143 wait_for_paxos_write();
5144 scrub_version = paxos->get_version();
5145
5146
5147 // scrub all keys if we're the only monitor in the quorum
5148 int32_t num_keys =
5149 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5150
5151 for (set<int>::iterator p = quorum.begin();
5152 p != quorum.end();
5153 ++p) {
5154 if (*p == rank)
5155 continue;
5156 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5157 num_keys);
5158 r->key = scrub_state->last_key;
5159 messenger->send_message(r, monmap->get_inst(*p));
5160 }
5161
5162 // scrub my keys
5163 bool r = _scrub(&scrub_result[rank],
5164 &scrub_state->last_key,
5165 &num_keys);
5166
5167 scrub_state->finished = !r;
5168
5169 // only after we got our scrub results do we really care whether the
5170 // other monitors are late on their results. Also, this way we avoid
5171 // triggering the timeout if we end up getting stuck in _scrub() for
5172 // longer than the duration of the timeout.
5173 scrub_reset_timeout();
5174
5175 if (quorum.size() == 1) {
5176 assert(scrub_state->finished == true);
5177 scrub_finish();
5178 }
5179 return 0;
5180 }
5181
5182 void Monitor::handle_scrub(MonOpRequestRef op)
5183 {
5184 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5185 dout(10) << __func__ << " " << *m << dendl;
5186 switch (m->op) {
5187 case MMonScrub::OP_SCRUB:
5188 {
5189 if (!is_peon())
5190 break;
5191
5192 wait_for_paxos_write();
5193
5194 if (m->version != paxos->get_version())
5195 break;
5196
5197 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5198 m->version,
5199 m->num_keys);
5200
5201 reply->key = m->key;
5202 _scrub(&reply->result, &reply->key, &reply->num_keys);
5203 m->get_connection()->send_message(reply);
5204 }
5205 break;
5206
5207 case MMonScrub::OP_RESULT:
5208 {
5209 if (!is_leader())
5210 break;
5211 if (m->version != scrub_version)
5212 break;
5213 // reset the timeout each time we get a result
5214 scrub_reset_timeout();
5215
5216 int from = m->get_source().num();
5217 assert(scrub_result.count(from) == 0);
5218 scrub_result[from] = m->result;
5219
5220 if (scrub_result.size() == quorum.size()) {
5221 scrub_check_results();
5222 scrub_result.clear();
5223 if (scrub_state->finished)
5224 scrub_finish();
5225 else
5226 scrub();
5227 }
5228 }
5229 break;
5230 }
5231 }
5232
5233 bool Monitor::_scrub(ScrubResult *r,
5234 pair<string,string> *start,
5235 int *num_keys)
5236 {
5237 assert(r != NULL);
5238 assert(start != NULL);
5239 assert(num_keys != NULL);
5240
5241 set<string> prefixes = get_sync_targets_names();
5242 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5243
5244 dout(10) << __func__ << " start (" << *start << ")"
5245 << " num_keys " << *num_keys << dendl;
5246
5247 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5248
5249 int scrubbed_keys = 0;
5250 pair<string,string> last_key;
5251
5252 while (it->has_next_chunk()) {
5253
5254 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5255 break;
5256
5257 pair<string,string> k = it->get_next_key();
5258 if (prefixes.count(k.first) == 0)
5259 continue;
5260
5261 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5262 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5263 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5264 << dendl;
5265 continue;
5266 }
5267
5268 bufferlist bl;
5269 int err = store->get(k.first, k.second, bl);
5270 assert(err == 0);
5271
5272 uint32_t key_crc = bl.crc32c(0);
5273 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5274 << " crc " << key_crc << dendl;
5275 r->prefix_keys[k.first]++;
5276 if (r->prefix_crc.count(k.first) == 0) {
5277 r->prefix_crc[k.first] = 0;
5278 }
5279 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5280
5281 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5282 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5283 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5284 r->prefix_crc[k.first] += 1;
5285 }
5286
5287 ++scrubbed_keys;
5288 last_key = k;
5289 }
5290
5291 dout(20) << __func__ << " last_key (" << last_key << ")"
5292 << " scrubbed_keys " << scrubbed_keys
5293 << " has_next " << it->has_next_chunk() << dendl;
5294
5295 *start = last_key;
5296 *num_keys = scrubbed_keys;
5297
5298 return it->has_next_chunk();
5299 }
5300
5301 void Monitor::scrub_check_results()
5302 {
5303 dout(10) << __func__ << dendl;
5304
5305 // compare
5306 int errors = 0;
5307 ScrubResult& mine = scrub_result[rank];
5308 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5309 p != scrub_result.end();
5310 ++p) {
5311 if (p->first == rank)
5312 continue;
5313 if (p->second != mine) {
5314 ++errors;
5315 clog->error() << "scrub mismatch";
5316 clog->error() << " mon." << rank << " " << mine;
5317 clog->error() << " mon." << p->first << " " << p->second;
5318 }
5319 }
5320 if (!errors)
5321 clog->info() << "scrub ok on " << quorum << ": " << mine;
5322 }
5323
5324 inline void Monitor::scrub_timeout()
5325 {
5326 dout(1) << __func__ << " restarting scrub" << dendl;
5327 scrub_reset();
5328 scrub_start();
5329 }
5330
5331 void Monitor::scrub_finish()
5332 {
5333 dout(10) << __func__ << dendl;
5334 scrub_reset();
5335 scrub_event_start();
5336 }
5337
5338 void Monitor::scrub_reset()
5339 {
5340 dout(10) << __func__ << dendl;
5341 scrub_cancel_timeout();
5342 scrub_version = 0;
5343 scrub_result.clear();
5344 scrub_state.reset();
5345 }
5346
5347 inline void Monitor::scrub_update_interval(int secs)
5348 {
5349 // we don't care about changes if we are not the leader.
5350 // changes will be visible if we become the leader.
5351 if (!is_leader())
5352 return;
5353
5354 dout(1) << __func__ << " new interval = " << secs << dendl;
5355
5356 // if scrub already in progress, all changes will already be visible during
5357 // the next round. Nothing to do.
5358 if (scrub_state != NULL)
5359 return;
5360
5361 scrub_event_cancel();
5362 scrub_event_start();
5363 }
5364
5365 void Monitor::scrub_event_start()
5366 {
5367 dout(10) << __func__ << dendl;
5368
5369 if (scrub_event)
5370 scrub_event_cancel();
5371
5372 if (cct->_conf->mon_scrub_interval <= 0) {
5373 dout(1) << __func__ << " scrub event is disabled"
5374 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5375 << ")" << dendl;
5376 return;
5377 }
5378
5379 scrub_event = new C_MonContext(this, [this](int) {
5380 scrub_start();
5381 });
5382 timer.add_event_after(cct->_conf->mon_scrub_interval, scrub_event);
5383 }
5384
5385 void Monitor::scrub_event_cancel()
5386 {
5387 dout(10) << __func__ << dendl;
5388 if (scrub_event) {
5389 timer.cancel_event(scrub_event);
5390 scrub_event = NULL;
5391 }
5392 }
5393
5394 inline void Monitor::scrub_cancel_timeout()
5395 {
5396 if (scrub_timeout_event) {
5397 timer.cancel_event(scrub_timeout_event);
5398 scrub_timeout_event = NULL;
5399 }
5400 }
5401
5402 void Monitor::scrub_reset_timeout()
5403 {
5404 dout(15) << __func__ << " reset timeout event" << dendl;
5405 scrub_cancel_timeout();
5406
5407 scrub_timeout_event = new C_MonContext(this, [this](int) {
5408 scrub_timeout();
5409 });
5410 timer.add_event_after(g_conf->mon_scrub_timeout, scrub_timeout_event);
5411 }
5412
5413 /************ TICK ***************/
5414 void Monitor::new_tick()
5415 {
5416 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5417 tick();
5418 }));
5419 }
5420
5421 void Monitor::tick()
5422 {
5423 // ok go.
5424 dout(11) << "tick" << dendl;
5425
5426 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5427 (*p)->tick();
5428 (*p)->maybe_trim();
5429 }
5430
5431 // trim sessions
5432 utime_t now = ceph_clock_now();
5433 {
5434 Mutex::Locker l(session_map_lock);
5435 auto p = session_map.sessions.begin();
5436
5437 bool out_for_too_long = (!exited_quorum.is_zero() &&
5438 now > (exited_quorum + 2*g_conf->mon_lease));
5439
5440 while (!p.end()) {
5441 MonSession *s = *p;
5442 ++p;
5443
5444 // don't trim monitors
5445 if (s->inst.name.is_mon())
5446 continue;
5447
5448 if (s->session_timeout < now && s->con) {
5449 // check keepalive, too
5450 s->session_timeout = s->con->get_last_keepalive();
5451 s->session_timeout += g_conf->mon_session_timeout;
5452 }
5453 if (s->session_timeout < now) {
5454 dout(10) << " trimming session " << s->con << " " << s->inst
5455 << " (timeout " << s->session_timeout
5456 << " < now " << now << ")" << dendl;
5457 } else if (out_for_too_long) {
5458 // boot the client Session because we've taken too long getting back in
5459 dout(10) << " trimming session " << s->con << " " << s->inst
5460 << " because we've been out of quorum too long" << dendl;
5461 } else {
5462 continue;
5463 }
5464
5465 s->con->mark_down();
5466 remove_session(s);
5467 logger->inc(l_mon_session_trim);
5468 }
5469 }
5470 sync_trim_providers();
5471
5472 if (!maybe_wait_for_quorum.empty()) {
5473 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5474 }
5475
5476 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5477 // this is only necessary on upgraded clusters.
5478 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5479 prepare_new_fingerprint(t);
5480 paxos->trigger_propose();
5481 }
5482
5483 new_tick();
5484 }
5485
5486 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5487 {
5488 uuid_d nf;
5489 nf.generate_random();
5490 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5491
5492 bufferlist bl;
5493 ::encode(nf, bl);
5494 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5495 }
5496
5497 int Monitor::check_fsid()
5498 {
5499 bufferlist ebl;
5500 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5501 if (r == -ENOENT)
5502 return r;
5503 assert(r == 0);
5504
5505 string es(ebl.c_str(), ebl.length());
5506
5507 // only keep the first line
5508 size_t pos = es.find_first_of('\n');
5509 if (pos != string::npos)
5510 es.resize(pos);
5511
5512 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5513 uuid_d ondisk;
5514 if (!ondisk.parse(es.c_str())) {
5515 derr << "error: unable to parse uuid" << dendl;
5516 return -EINVAL;
5517 }
5518
5519 if (monmap->get_fsid() != ondisk) {
5520 derr << "error: cluster_uuid file exists with value " << ondisk
5521 << ", != our uuid " << monmap->get_fsid() << dendl;
5522 return -EEXIST;
5523 }
5524
5525 return 0;
5526 }
5527
5528 int Monitor::write_fsid()
5529 {
5530 auto t(std::make_shared<MonitorDBStore::Transaction>());
5531 write_fsid(t);
5532 int r = store->apply_transaction(t);
5533 return r;
5534 }
5535
5536 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5537 {
5538 ostringstream ss;
5539 ss << monmap->get_fsid() << "\n";
5540 string us = ss.str();
5541
5542 bufferlist b;
5543 b.append(us);
5544
5545 t->put(MONITOR_NAME, "cluster_uuid", b);
5546 return 0;
5547 }
5548
5549 /*
5550 * this is the closest thing to a traditional 'mkfs' for ceph.
5551 * initialize the monitor state machines to their initial values.
5552 */
5553 int Monitor::mkfs(bufferlist& osdmapbl)
5554 {
5555 auto t(std::make_shared<MonitorDBStore::Transaction>());
5556
5557 // verify cluster fsid
5558 int r = check_fsid();
5559 if (r < 0 && r != -ENOENT)
5560 return r;
5561
5562 bufferlist magicbl;
5563 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5564 magicbl.append("\n");
5565 t->put(MONITOR_NAME, "magic", magicbl);
5566
5567
5568 features = get_initial_supported_features();
5569 write_features(t);
5570
5571 // save monmap, osdmap, keyring.
5572 bufferlist monmapbl;
5573 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5574 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5575 t->put("mkfs", "monmap", monmapbl);
5576
5577 if (osdmapbl.length()) {
5578 // make sure it's a valid osdmap
5579 try {
5580 OSDMap om;
5581 om.decode(osdmapbl);
5582 }
5583 catch (buffer::error& e) {
5584 derr << "error decoding provided osdmap: " << e.what() << dendl;
5585 return -EINVAL;
5586 }
5587 t->put("mkfs", "osdmap", osdmapbl);
5588 }
5589
5590 if (is_keyring_required()) {
5591 KeyRing keyring;
5592 string keyring_filename;
5593
5594 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5595 if (r) {
5596 derr << "unable to find a keyring file on " << g_conf->keyring
5597 << ": " << cpp_strerror(r) << dendl;
5598 if (g_conf->key != "") {
5599 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5600 "\n\tcaps mon = \"allow *\"\n";
5601 bufferlist bl;
5602 bl.append(keyring_plaintext);
5603 try {
5604 bufferlist::iterator i = bl.begin();
5605 keyring.decode_plaintext(i);
5606 }
5607 catch (const buffer::error& e) {
5608 derr << "error decoding keyring " << keyring_plaintext
5609 << ": " << e.what() << dendl;
5610 return -EINVAL;
5611 }
5612 } else {
5613 return -ENOENT;
5614 }
5615 } else {
5616 r = keyring.load(g_ceph_context, keyring_filename);
5617 if (r < 0) {
5618 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5619 return r;
5620 }
5621 }
5622
5623 // put mon. key in external keyring; seed with everything else.
5624 extract_save_mon_key(keyring);
5625
5626 bufferlist keyringbl;
5627 keyring.encode_plaintext(keyringbl);
5628 t->put("mkfs", "keyring", keyringbl);
5629 }
5630 write_fsid(t);
5631 store->apply_transaction(t);
5632
5633 return 0;
5634 }
5635
5636 int Monitor::write_default_keyring(bufferlist& bl)
5637 {
5638 ostringstream os;
5639 os << g_conf->mon_data << "/keyring";
5640
5641 int err = 0;
5642 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5643 if (fd < 0) {
5644 err = -errno;
5645 dout(0) << __func__ << " failed to open " << os.str()
5646 << ": " << cpp_strerror(err) << dendl;
5647 return err;
5648 }
5649
5650 err = bl.write_fd(fd);
5651 if (!err)
5652 ::fsync(fd);
5653 VOID_TEMP_FAILURE_RETRY(::close(fd));
5654
5655 return err;
5656 }
5657
5658 void Monitor::extract_save_mon_key(KeyRing& keyring)
5659 {
5660 EntityName mon_name;
5661 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5662 EntityAuth mon_key;
5663 if (keyring.get_auth(mon_name, mon_key)) {
5664 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5665 KeyRing pkey;
5666 pkey.add(mon_name, mon_key);
5667 bufferlist bl;
5668 pkey.encode_plaintext(bl);
5669 write_default_keyring(bl);
5670 keyring.remove(mon_name);
5671 }
5672 }
5673
5674 bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5675 bool force_new)
5676 {
5677 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5678 << dendl;
5679
5680 if (is_shutdown())
5681 return false;
5682
5683 // we only connect to other monitors and mgr; every else connects to us.
5684 if (service_id != CEPH_ENTITY_TYPE_MON &&
5685 service_id != CEPH_ENTITY_TYPE_MGR)
5686 return false;
5687
5688 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5689 // auth_none
5690 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5691 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5692 handler.set_global_id(0);
5693 *authorizer = handler.build_authorizer(service_id);
5694 return true;
5695 }
5696
5697 CephXServiceTicketInfo auth_ticket_info;
5698 CephXSessionAuthInfo info;
5699 int ret;
5700
5701 EntityName name;
5702 name.set_type(CEPH_ENTITY_TYPE_MON);
5703 auth_ticket_info.ticket.name = name;
5704 auth_ticket_info.ticket.global_id = 0;
5705
5706 if (service_id == CEPH_ENTITY_TYPE_MON) {
5707 // mon to mon authentication uses the private monitor shared key and not the
5708 // rotating key
5709 CryptoKey secret;
5710 if (!keyring.get_secret(name, secret) &&
5711 !key_server.get_secret(name, secret)) {
5712 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5713 << dendl;
5714 stringstream ss, ds;
5715 int err = key_server.list_secrets(ds);
5716 if (err < 0)
5717 ss << "no installed auth entries!";
5718 else
5719 ss << "installed auth entries:";
5720 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5721 return false;
5722 }
5723
5724 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5725 secret, (uint64_t)-1);
5726 if (ret < 0) {
5727 dout(0) << __func__ << " failed to build mon session_auth_info "
5728 << cpp_strerror(ret) << dendl;
5729 return false;
5730 }
5731 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5732 // mgr
5733 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5734 if (ret < 0) {
5735 derr << __func__ << " failed to build mgr service session_auth_info "
5736 << cpp_strerror(ret) << dendl;
5737 return false;
5738 }
5739 } else {
5740 ceph_abort(); // see check at top of fn
5741 }
5742
5743 CephXTicketBlob blob;
5744 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5745 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5746 return false;
5747 }
5748 bufferlist ticket_data;
5749 ::encode(blob, ticket_data);
5750
5751 bufferlist::iterator iter = ticket_data.begin();
5752 CephXTicketHandler handler(g_ceph_context, service_id);
5753 ::decode(handler.ticket, iter);
5754
5755 handler.session_key = info.session_key;
5756
5757 *authorizer = handler.build_authorizer(0);
5758
5759 return true;
5760 }
5761
5762 bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5763 int protocol, bufferlist& authorizer_data,
5764 bufferlist& authorizer_reply,
5765 bool& isvalid, CryptoKey& session_key)
5766 {
5767 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5768 << " " << ceph_entity_type_name(peer_type)
5769 << " protocol " << protocol << dendl;
5770
5771 if (is_shutdown())
5772 return false;
5773
5774 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5775 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5776 // monitor, and cephx is enabled
5777 isvalid = false;
5778 if (protocol == CEPH_AUTH_CEPHX) {
5779 bufferlist::iterator iter = authorizer_data.begin();
5780 CephXServiceTicketInfo auth_ticket_info;
5781
5782 if (authorizer_data.length()) {
5783 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5784 auth_ticket_info, authorizer_reply);
5785 if (ret) {
5786 session_key = auth_ticket_info.session_key;
5787 isvalid = true;
5788 } else {
5789 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5790 }
5791 }
5792 } else {
5793 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5794 }
5795 } else {
5796 // who cares.
5797 isvalid = true;
5798 }
5799 return true;
5800 }