]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Monitor.cc
9a7304fcab41be15130b3469e32d32c8733de436
[ceph.git] / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <sstream>
17 #include <stdlib.h>
18 #include <signal.h>
19 #include <limits.h>
20 #include <cstring>
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
23
24 #include "Monitor.h"
25 #include "common/version.h"
26
27 #include "osd/OSDMap.h"
28
29 #include "MonitorDBStore.h"
30
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
48
49 #include "messages/MMonSubscribe.h"
50 #include "messages/MMonSubscribeAck.h"
51
52 #include "messages/MAuthReply.h"
53
54 #include "messages/MTimeCheck.h"
55 #include "messages/MPing.h"
56
57 #include "common/strtol.h"
58 #include "common/ceph_argparse.h"
59 #include "common/Timer.h"
60 #include "common/Clock.h"
61 #include "common/errno.h"
62 #include "common/perf_counters.h"
63 #include "common/admin_socket.h"
64 #include "global/signal_handler.h"
65 #include "common/Formatter.h"
66 #include "include/stringify.h"
67 #include "include/color.h"
68 #include "include/ceph_fs.h"
69 #include "include/str_list.h"
70
71 #include "OSDMonitor.h"
72 #include "MDSMonitor.h"
73 #include "MonmapMonitor.h"
74 #include "PGMonitor.h"
75 #include "LogMonitor.h"
76 #include "AuthMonitor.h"
77 #include "MgrMonitor.h"
78 #include "MgrStatMonitor.h"
79 #include "mon/QuorumService.h"
80 #include "mon/HealthMonitor.h"
81 #include "mon/ConfigKeyService.h"
82 #include "common/config.h"
83 #include "common/cmdparse.h"
84 #include "include/assert.h"
85 #include "include/compat.h"
86 #include "perfglue/heap_profiler.h"
87
88 #include "auth/none/AuthNoneClientHandler.h"
89
90 #define dout_subsys ceph_subsys_mon
91 #undef dout_prefix
92 #define dout_prefix _prefix(_dout, this)
93 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
94 return *_dout << "mon." << mon->name << "@" << mon->rank
95 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
96 }
97
98 const string Monitor::MONITOR_NAME = "monitor";
99 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
100
101
102 #undef FLAG
103 #undef COMMAND
104 #undef COMMAND_WITH_FLAG
105 MonCommand mon_commands[] = {
106 #define FLAG(f) (MonCommand::FLAG_##f)
107 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
108 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
109 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
110 {parsesig, helptext, modulename, req_perms, avail, flags},
111 #include <mon/MonCommands.h>
112 #undef COMMAND
113 #undef COMMAND_WITH_FLAG
114
115 // FIXME: slurp up the Mgr commands too
116
117 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
118 {parsesig, helptext, modulename, req_perms, avail, FLAG(MGR)},
119 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
120 {parsesig, helptext, modulename, req_perms, avail, flags | FLAG(MGR)},
121 #include <mgr/MgrCommands.h>
122 #undef COMMAND
123 #undef COMMAND_WITH_FLAG
124
125 };
126
127
128 void C_MonContext::finish(int r) {
129 if (mon->is_shutdown())
130 return;
131 FunctionContext::finish(r);
132 }
133
134 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
135 Messenger *m, Messenger *mgr_m, MonMap *map) :
136 Dispatcher(cct_),
137 name(nm),
138 rank(-1),
139 messenger(m),
140 con_self(m ? m->get_loopback_connection() : NULL),
141 lock("Monitor::lock"),
142 timer(cct_, lock),
143 finisher(cct_, "mon_finisher", "fin"),
144 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
145 has_ever_joined(false),
146 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
147 monmap(map),
148 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
149 key_server(cct, &keyring),
150 auth_cluster_required(cct,
151 cct->_conf->auth_supported.empty() ?
152 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
153 auth_service_required(cct,
154 cct->_conf->auth_supported.empty() ?
155 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
156 leader_supported_mon_commands(NULL),
157 leader_supported_mon_commands_size(0),
158 mgr_messenger(mgr_m),
159 mgr_client(cct_, mgr_m),
160 pgservice(nullptr),
161 store(s),
162
163 state(STATE_PROBING),
164
165 elector(this),
166 required_features(0),
167 leader(0),
168 quorum_con_features(0),
169 // scrub
170 scrub_version(0),
171 scrub_event(NULL),
172 scrub_timeout_event(NULL),
173
174 // sync state
175 sync_provider_count(0),
176 sync_cookie(0),
177 sync_full(false),
178 sync_start_version(0),
179 sync_timeout_event(NULL),
180 sync_last_committed_floor(0),
181
182 timecheck_round(0),
183 timecheck_acks(0),
184 timecheck_rounds_since_clean(0),
185 timecheck_event(NULL),
186
187 paxos_service(PAXOS_NUM),
188 admin_hook(NULL),
189 routed_request_tid(0),
190 op_tracker(cct, true, 1)
191 {
192 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
193 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
194
195 update_log_clients();
196
197 paxos = new Paxos(this, "paxos");
198
199 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
200 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
201 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
202 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
203 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
204 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
205 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
206 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
207
208 health_monitor = new HealthMonitor(this);
209 config_key_service = new ConfigKeyService(this, paxos);
210
211 mon_caps = new MonCap();
212 bool r = mon_caps->parse("allow *", NULL);
213 assert(r);
214
215 exited_quorum = ceph_clock_now();
216
217 // assume our commands until we have an election. this only means
218 // we won't reply with EINVAL before the election; any command that
219 // actually matters will wait until we have quorum etc and then
220 // retry (and revalidate).
221 const MonCommand *cmds;
222 int cmdsize;
223 get_locally_supported_monitor_commands(&cmds, &cmdsize);
224 set_leader_supported_commands(cmds, cmdsize);
225
226 // note: OSDMonitor may update this based on the luminous flag.
227 pgservice = mgrstatmon()->get_pg_stat_service();
228 }
229
230 PaxosService *Monitor::get_paxos_service_by_name(const string& name)
231 {
232 if (name == "mdsmap")
233 return paxos_service[PAXOS_MDSMAP];
234 if (name == "monmap")
235 return paxos_service[PAXOS_MONMAP];
236 if (name == "osdmap")
237 return paxos_service[PAXOS_OSDMAP];
238 if (name == "pgmap")
239 return paxos_service[PAXOS_PGMAP];
240 if (name == "logm")
241 return paxos_service[PAXOS_LOG];
242 if (name == "auth")
243 return paxos_service[PAXOS_AUTH];
244 if (name == "mgr")
245 return paxos_service[PAXOS_MGR];
246
247 assert(0 == "given name does not match known paxos service");
248 return NULL;
249 }
250
251 Monitor::~Monitor()
252 {
253 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
254 delete *p;
255 delete health_monitor;
256 delete config_key_service;
257 delete paxos;
258 assert(session_map.sessions.empty());
259 delete mon_caps;
260 if (leader_supported_mon_commands != mon_commands)
261 delete[] leader_supported_mon_commands;
262 }
263
264
265 class AdminHook : public AdminSocketHook {
266 Monitor *mon;
267 public:
268 explicit AdminHook(Monitor *m) : mon(m) {}
269 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
270 bufferlist& out) override {
271 stringstream ss;
272 mon->do_admin_command(command, cmdmap, format, ss);
273 out.append(ss);
274 return true;
275 }
276 };
277
278 void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
279 ostream& ss)
280 {
281 Mutex::Locker l(lock);
282
283 boost::scoped_ptr<Formatter> f(Formatter::create(format));
284
285 string args;
286 for (cmdmap_t::iterator p = cmdmap.begin();
287 p != cmdmap.end(); ++p) {
288 if (p->first == "prefix")
289 continue;
290 if (!args.empty())
291 args += ", ";
292 args += cmd_vartype_stringify(p->second);
293 }
294 args = "[" + args + "]";
295
296 bool read_only = (command == "mon_status" ||
297 command == "mon metadata" ||
298 command == "quorum_status" ||
299 command == "ops");
300
301 (read_only ? audit_clog->debug() : audit_clog->info())
302 << "from='admin socket' entity='admin socket' "
303 << "cmd='" << command << "' args=" << args << ": dispatch";
304
305 if (command == "mon_status") {
306 get_mon_status(f.get(), ss);
307 if (f)
308 f->flush(ss);
309 } else if (command == "quorum_status") {
310 _quorum_status(f.get(), ss);
311 } else if (command == "sync_force") {
312 string validate;
313 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
314 (validate != "--yes-i-really-mean-it")) {
315 ss << "are you SURE? this will mean the monitor store will be erased "
316 "the next time the monitor is restarted. pass "
317 "'--yes-i-really-mean-it' if you really do.";
318 goto abort;
319 }
320 sync_force(f.get(), ss);
321 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
322 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
323 goto abort;
324 } else if (command == "quorum enter") {
325 elector.start_participating();
326 start_election();
327 ss << "started responding to quorum, initiated new election";
328 } else if (command == "quorum exit") {
329 start_election();
330 elector.stop_participating();
331 ss << "stopped responding to quorum, initiated new election";
332 } else if (command == "ops") {
333 (void)op_tracker.dump_ops_in_flight(f.get());
334 if (f) {
335 f->flush(ss);
336 }
337 } else {
338 assert(0 == "bad AdminSocket command binding");
339 }
340 (read_only ? audit_clog->debug() : audit_clog->info())
341 << "from='admin socket' "
342 << "entity='admin socket' "
343 << "cmd=" << command << " "
344 << "args=" << args << ": finished";
345 return;
346
347 abort:
348 (read_only ? audit_clog->debug() : audit_clog->info())
349 << "from='admin socket' "
350 << "entity='admin socket' "
351 << "cmd=" << command << " "
352 << "args=" << args << ": aborted";
353 }
354
355 void Monitor::handle_signal(int signum)
356 {
357 assert(signum == SIGINT || signum == SIGTERM);
358 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
359 shutdown();
360 }
361
362 CompatSet Monitor::get_initial_supported_features()
363 {
364 CompatSet::FeatureSet ceph_mon_feature_compat;
365 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
366 CompatSet::FeatureSet ceph_mon_feature_incompat;
367 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
368 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
369 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
370 ceph_mon_feature_incompat);
371 }
372
373 CompatSet Monitor::get_supported_features()
374 {
375 CompatSet compat = get_initial_supported_features();
376 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
377 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
378 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
379 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
380 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
381 return compat;
382 }
383
384 CompatSet Monitor::get_legacy_features()
385 {
386 CompatSet::FeatureSet ceph_mon_feature_compat;
387 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
388 CompatSet::FeatureSet ceph_mon_feature_incompat;
389 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
390 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
391 ceph_mon_feature_incompat);
392 }
393
394 int Monitor::check_features(MonitorDBStore *store)
395 {
396 CompatSet required = get_supported_features();
397 CompatSet ondisk;
398
399 read_features_off_disk(store, &ondisk);
400
401 if (!required.writeable(ondisk)) {
402 CompatSet diff = required.unsupported(ondisk);
403 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
404 return -EPERM;
405 }
406
407 return 0;
408 }
409
410 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
411 {
412 bufferlist featuresbl;
413 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
414 if (featuresbl.length() == 0) {
415 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
416 << "Assuming it is old-style and introducing one." << dendl;
417 //we only want the baseline ~v.18 features assumed to be on disk.
418 //If new features are introduced this code needs to disappear or
419 //be made smarter.
420 *features = get_legacy_features();
421
422 features->encode(featuresbl);
423 auto t(std::make_shared<MonitorDBStore::Transaction>());
424 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
425 store->apply_transaction(t);
426 } else {
427 bufferlist::iterator it = featuresbl.begin();
428 features->decode(it);
429 }
430 }
431
432 void Monitor::read_features()
433 {
434 read_features_off_disk(store, &features);
435 dout(10) << "features " << features << dendl;
436
437 calc_quorum_requirements();
438 dout(10) << "required_features " << required_features << dendl;
439 }
440
441 void Monitor::write_features(MonitorDBStore::TransactionRef t)
442 {
443 bufferlist bl;
444 features.encode(bl);
445 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
446 }
447
448 const char** Monitor::get_tracked_conf_keys() const
449 {
450 static const char* KEYS[] = {
451 "crushtool", // helpful for testing
452 "mon_election_timeout",
453 "mon_lease",
454 "mon_lease_renew_interval_factor",
455 "mon_lease_ack_timeout_factor",
456 "mon_accept_timeout_factor",
457 // clog & admin clog
458 "clog_to_monitors",
459 "clog_to_syslog",
460 "clog_to_syslog_facility",
461 "clog_to_syslog_level",
462 "clog_to_graylog",
463 "clog_to_graylog_host",
464 "clog_to_graylog_port",
465 "host",
466 "fsid",
467 // periodic health to clog
468 "mon_health_to_clog",
469 "mon_health_to_clog_interval",
470 "mon_health_to_clog_tick_interval",
471 // scrub interval
472 "mon_scrub_interval",
473 NULL
474 };
475 return KEYS;
476 }
477
478 void Monitor::handle_conf_change(const struct md_config_t *conf,
479 const std::set<std::string> &changed)
480 {
481 sanitize_options();
482
483 dout(10) << __func__ << " " << changed << dendl;
484
485 if (changed.count("clog_to_monitors") ||
486 changed.count("clog_to_syslog") ||
487 changed.count("clog_to_syslog_level") ||
488 changed.count("clog_to_syslog_facility") ||
489 changed.count("clog_to_graylog") ||
490 changed.count("clog_to_graylog_host") ||
491 changed.count("clog_to_graylog_port") ||
492 changed.count("host") ||
493 changed.count("fsid")) {
494 update_log_clients();
495 }
496
497 if (changed.count("mon_health_to_clog") ||
498 changed.count("mon_health_to_clog_interval") ||
499 changed.count("mon_health_to_clog_tick_interval")) {
500 health_to_clog_update_conf(changed);
501 }
502
503 if (changed.count("mon_scrub_interval")) {
504 scrub_update_interval(conf->mon_scrub_interval);
505 }
506 }
507
508 void Monitor::update_log_clients()
509 {
510 map<string,string> log_to_monitors;
511 map<string,string> log_to_syslog;
512 map<string,string> log_channel;
513 map<string,string> log_prio;
514 map<string,string> log_to_graylog;
515 map<string,string> log_to_graylog_host;
516 map<string,string> log_to_graylog_port;
517 uuid_d fsid;
518 string host;
519
520 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
521 log_channel, log_prio, log_to_graylog,
522 log_to_graylog_host, log_to_graylog_port,
523 fsid, host))
524 return;
525
526 clog->update_config(log_to_monitors, log_to_syslog,
527 log_channel, log_prio, log_to_graylog,
528 log_to_graylog_host, log_to_graylog_port,
529 fsid, host);
530
531 audit_clog->update_config(log_to_monitors, log_to_syslog,
532 log_channel, log_prio, log_to_graylog,
533 log_to_graylog_host, log_to_graylog_port,
534 fsid, host);
535 }
536
537 int Monitor::sanitize_options()
538 {
539 int r = 0;
540
541 // mon_lease must be greater than mon_lease_renewal; otherwise we
542 // may incur in leases expiring before they are renewed.
543 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
544 clog->error() << "mon_lease_renew_interval_factor ("
545 << g_conf->mon_lease_renew_interval_factor
546 << ") must be less than 1.0";
547 r = -EINVAL;
548 }
549
550 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
551 // got time to renew the lease and get an ack for it. Having both options
552 // with the same value, for a given small vale, could mean timing out if
553 // the monitors happened to be overloaded -- or even under normal load for
554 // a small enough value.
555 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
556 clog->error() << "mon_lease_ack_timeout_factor ("
557 << g_conf->mon_lease_ack_timeout_factor
558 << ") must be greater than 1.0";
559 r = -EINVAL;
560 }
561
562 return r;
563 }
564
565 int Monitor::preinit()
566 {
567 lock.Lock();
568
569 dout(1) << "preinit fsid " << monmap->fsid << dendl;
570
571 int r = sanitize_options();
572 if (r < 0) {
573 derr << "option sanitization failed!" << dendl;
574 lock.Unlock();
575 return r;
576 }
577
578 assert(!logger);
579 {
580 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
581 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess");
582 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd");
583 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm");
584 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions");
585 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in");
586 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started");
587 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won");
588 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost");
589 logger = pcb.create_perf_counters();
590 cct->get_perfcounters_collection()->add(logger);
591 }
592
593 assert(!cluster_logger);
594 {
595 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
596 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
597 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
598 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
599 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
600 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
601 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
602 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
603 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
604 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
605 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
606 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
607 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
608 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
609 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
610 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
611 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
612 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
613 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
614 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
615 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
616 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
617 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
618 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
619 cluster_logger = pcb.create_perf_counters();
620 }
621
622 paxos->init_logger();
623
624 // verify cluster_uuid
625 {
626 int r = check_fsid();
627 if (r == -ENOENT)
628 r = write_fsid();
629 if (r < 0) {
630 lock.Unlock();
631 return r;
632 }
633 }
634
635 // open compatset
636 read_features();
637
638 // have we ever joined a quorum?
639 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
640 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
641
642 if (!has_ever_joined) {
643 // impose initial quorum restrictions?
644 list<string> initial_members;
645 get_str_list(g_conf->mon_initial_members, initial_members);
646
647 if (!initial_members.empty()) {
648 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
649
650 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
651 &extra_probe_peers);
652
653 dout(10) << " monmap is " << *monmap << dendl;
654 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
655 }
656 } else if (!monmap->contains(name)) {
657 derr << "not in monmap and have been in a quorum before; "
658 << "must have been removed" << dendl;
659 if (g_conf->mon_force_quorum_join) {
660 dout(0) << "we should have died but "
661 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
662 } else {
663 derr << "commit suicide!" << dendl;
664 lock.Unlock();
665 return -ENOENT;
666 }
667 }
668
669 {
670 // We have a potentially inconsistent store state in hands. Get rid of it
671 // and start fresh.
672 bool clear_store = false;
673 if (store->exists("mon_sync", "in_sync")) {
674 dout(1) << __func__ << " clean up potentially inconsistent store state"
675 << dendl;
676 clear_store = true;
677 }
678
679 if (store->get("mon_sync", "force_sync") > 0) {
680 dout(1) << __func__ << " force sync by clearing store state" << dendl;
681 clear_store = true;
682 }
683
684 if (clear_store) {
685 set<string> sync_prefixes = get_sync_targets_names();
686 store->clear(sync_prefixes);
687 }
688 }
689
690 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
691 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
692
693 init_paxos();
694 health_monitor->init();
695
696 if (is_keyring_required()) {
697 // we need to bootstrap authentication keys so we can form an
698 // initial quorum.
699 if (authmon()->get_last_committed() == 0) {
700 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
701 bufferlist bl;
702 int err = store->get("mkfs", "keyring", bl);
703 if (err == 0 && bl.length() > 0) {
704 // Attempt to decode and extract keyring only if it is found.
705 KeyRing keyring;
706 bufferlist::iterator p = bl.begin();
707 ::decode(keyring, p);
708 extract_save_mon_key(keyring);
709 }
710 }
711
712 string keyring_loc = g_conf->mon_data + "/keyring";
713
714 r = keyring.load(cct, keyring_loc);
715 if (r < 0) {
716 EntityName mon_name;
717 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
718 EntityAuth mon_key;
719 if (key_server.get_auth(mon_name, mon_key)) {
720 dout(1) << "copying mon. key from old db to external keyring" << dendl;
721 keyring.add(mon_name, mon_key);
722 bufferlist bl;
723 keyring.encode_plaintext(bl);
724 write_default_keyring(bl);
725 } else {
726 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
727 lock.Unlock();
728 return r;
729 }
730 }
731 }
732
733 admin_hook = new AdminHook(this);
734 AdminSocket* admin_socket = cct->get_admin_socket();
735
736 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
737 lock.Unlock();
738 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
739 "show current monitor status");
740 assert(r == 0);
741 r = admin_socket->register_command("quorum_status", "quorum_status",
742 admin_hook, "show current quorum status");
743 assert(r == 0);
744 r = admin_socket->register_command("sync_force",
745 "sync_force name=validate,"
746 "type=CephChoices,"
747 "strings=--yes-i-really-mean-it",
748 admin_hook,
749 "force sync of and clear monitor store");
750 assert(r == 0);
751 r = admin_socket->register_command("add_bootstrap_peer_hint",
752 "add_bootstrap_peer_hint name=addr,"
753 "type=CephIPAddr",
754 admin_hook,
755 "add peer address as potential bootstrap"
756 " peer for cluster bringup");
757 assert(r == 0);
758 r = admin_socket->register_command("quorum enter", "quorum enter",
759 admin_hook,
760 "force monitor back into quorum");
761 assert(r == 0);
762 r = admin_socket->register_command("quorum exit", "quorum exit",
763 admin_hook,
764 "force monitor out of the quorum");
765 assert(r == 0);
766 r = admin_socket->register_command("ops",
767 "ops",
768 admin_hook,
769 "show the ops currently in flight");
770 assert(r == 0);
771
772 lock.Lock();
773
774 // add ourselves as a conf observer
775 g_conf->add_observer(this);
776
777 lock.Unlock();
778 return 0;
779 }
780
781 int Monitor::init()
782 {
783 dout(2) << "init" << dendl;
784 Mutex::Locker l(lock);
785
786 finisher.start();
787
788 // start ticker
789 timer.init();
790 new_tick();
791
792 cpu_tp.start();
793
794 // i'm ready!
795 messenger->add_dispatcher_tail(this);
796
797 mgr_client.init();
798 mgr_messenger->add_dispatcher_tail(&mgr_client);
799 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
800
801 bootstrap();
802
803 // encode command sets
804 const MonCommand *cmds;
805 int cmdsize;
806 get_locally_supported_monitor_commands(&cmds, &cmdsize);
807 MonCommand::encode_array(cmds, cmdsize, supported_commands_bl);
808
809 return 0;
810 }
811
812 void Monitor::init_paxos()
813 {
814 dout(10) << __func__ << dendl;
815 paxos->init();
816
817 // init services
818 for (int i = 0; i < PAXOS_NUM; ++i) {
819 paxos_service[i]->init();
820 }
821
822 refresh_from_paxos(NULL);
823 }
824
825 void Monitor::refresh_from_paxos(bool *need_bootstrap)
826 {
827 dout(10) << __func__ << dendl;
828
829 bufferlist bl;
830 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
831 if (r >= 0) {
832 try {
833 bufferlist::iterator p = bl.begin();
834 ::decode(fingerprint, p);
835 }
836 catch (buffer::error& e) {
837 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
838 }
839 } else {
840 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
841 }
842
843 for (int i = 0; i < PAXOS_NUM; ++i) {
844 paxos_service[i]->refresh(need_bootstrap);
845 }
846 for (int i = 0; i < PAXOS_NUM; ++i) {
847 paxos_service[i]->post_refresh();
848 }
849 }
850
851 void Monitor::register_cluster_logger()
852 {
853 if (!cluster_logger_registered) {
854 dout(10) << "register_cluster_logger" << dendl;
855 cluster_logger_registered = true;
856 cct->get_perfcounters_collection()->add(cluster_logger);
857 } else {
858 dout(10) << "register_cluster_logger - already registered" << dendl;
859 }
860 }
861
862 void Monitor::unregister_cluster_logger()
863 {
864 if (cluster_logger_registered) {
865 dout(10) << "unregister_cluster_logger" << dendl;
866 cluster_logger_registered = false;
867 cct->get_perfcounters_collection()->remove(cluster_logger);
868 } else {
869 dout(10) << "unregister_cluster_logger - not registered" << dendl;
870 }
871 }
872
873 void Monitor::update_logger()
874 {
875 cluster_logger->set(l_cluster_num_mon, monmap->size());
876 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
877 }
878
879 void Monitor::shutdown()
880 {
881 dout(1) << "shutdown" << dendl;
882
883 lock.Lock();
884
885 wait_for_paxos_write();
886
887 state = STATE_SHUTDOWN;
888
889 g_conf->remove_observer(this);
890
891 if (admin_hook) {
892 AdminSocket* admin_socket = cct->get_admin_socket();
893 admin_socket->unregister_command("mon_status");
894 admin_socket->unregister_command("quorum_status");
895 admin_socket->unregister_command("sync_force");
896 admin_socket->unregister_command("add_bootstrap_peer_hint");
897 admin_socket->unregister_command("quorum enter");
898 admin_socket->unregister_command("quorum exit");
899 admin_socket->unregister_command("ops");
900 delete admin_hook;
901 admin_hook = NULL;
902 }
903
904 elector.shutdown();
905
906 mgr_client.shutdown();
907
908 lock.Unlock();
909 finisher.wait_for_empty();
910 finisher.stop();
911 lock.Lock();
912
913 // clean up
914 paxos->shutdown();
915 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
916 (*p)->shutdown();
917 health_monitor->shutdown();
918
919 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
920 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
921
922 timer.shutdown();
923
924 cpu_tp.stop();
925
926 remove_all_sessions();
927
928 if (logger) {
929 cct->get_perfcounters_collection()->remove(logger);
930 delete logger;
931 logger = NULL;
932 }
933 if (cluster_logger) {
934 if (cluster_logger_registered)
935 cct->get_perfcounters_collection()->remove(cluster_logger);
936 delete cluster_logger;
937 cluster_logger = NULL;
938 }
939
940 log_client.shutdown();
941
942 // unlock before msgr shutdown...
943 lock.Unlock();
944
945 messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
946 mgr_messenger->shutdown();
947 }
948
949 void Monitor::wait_for_paxos_write()
950 {
951 if (paxos->is_writing() || paxos->is_writing_previous()) {
952 dout(10) << __func__ << " flushing pending write" << dendl;
953 lock.Unlock();
954 store->flush();
955 lock.Lock();
956 dout(10) << __func__ << " flushed pending write" << dendl;
957 }
958 }
959
960 void Monitor::bootstrap()
961 {
962 dout(10) << "bootstrap" << dendl;
963 wait_for_paxos_write();
964
965 sync_reset_requester();
966 unregister_cluster_logger();
967 cancel_probe_timeout();
968
969 // note my rank
970 int newrank = monmap->get_rank(messenger->get_myaddr());
971 if (newrank < 0 && rank >= 0) {
972 // was i ever part of the quorum?
973 if (has_ever_joined) {
974 dout(0) << " removed from monmap, suicide." << dendl;
975 exit(0);
976 }
977 }
978 if (newrank != rank) {
979 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
980 messenger->set_myname(entity_name_t::MON(newrank));
981 rank = newrank;
982
983 // reset all connections, or else our peers will think we are someone else.
984 messenger->mark_down_all();
985 }
986
987 // reset
988 state = STATE_PROBING;
989
990 _reset();
991
992 // sync store
993 if (g_conf->mon_compact_on_bootstrap) {
994 dout(10) << "bootstrap -- triggering compaction" << dendl;
995 store->compact();
996 dout(10) << "bootstrap -- finished compaction" << dendl;
997 }
998
999 // singleton monitor?
1000 if (monmap->size() == 1 && rank == 0) {
1001 win_standalone_election();
1002 return;
1003 }
1004
1005 reset_probe_timeout();
1006
1007 // i'm outside the quorum
1008 if (monmap->contains(name))
1009 outside_quorum.insert(name);
1010
1011 // probe monitors
1012 dout(10) << "probing other monitors" << dendl;
1013 for (unsigned i = 0; i < monmap->size(); i++) {
1014 if ((int)i != rank)
1015 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1016 monmap->get_inst(i));
1017 }
1018 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1019 p != extra_probe_peers.end();
1020 ++p) {
1021 if (*p != messenger->get_myaddr()) {
1022 entity_inst_t i;
1023 i.name = entity_name_t::MON(-1);
1024 i.addr = *p;
1025 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1026 }
1027 }
1028 }
1029
1030 bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1031 {
1032 string addrstr;
1033 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1034 ss << "unable to parse address string value '"
1035 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1036 return false;
1037 }
1038 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1039 << addrstr << "'" << dendl;
1040
1041 entity_addr_t addr;
1042 const char *end = 0;
1043 if (!addr.parse(addrstr.c_str(), &end)) {
1044 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1045 return false;
1046 }
1047
1048 if (is_leader() || is_peon()) {
1049 ss << "mon already active; ignoring bootstrap hint";
1050 return true;
1051 }
1052
1053 if (addr.get_port() == 0)
1054 addr.set_port(CEPH_MON_PORT);
1055
1056 extra_probe_peers.insert(addr);
1057 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1058 return true;
1059 }
1060
1061 // called by bootstrap(), or on leader|peon -> electing
1062 void Monitor::_reset()
1063 {
1064 dout(10) << __func__ << dendl;
1065
1066 cancel_probe_timeout();
1067 timecheck_finish();
1068 health_events_cleanup();
1069 scrub_event_cancel();
1070
1071 leader_since = utime_t();
1072 if (!quorum.empty()) {
1073 exited_quorum = ceph_clock_now();
1074 }
1075 quorum.clear();
1076 outside_quorum.clear();
1077 quorum_feature_map.clear();
1078
1079 scrub_reset();
1080
1081 paxos->restart();
1082
1083 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1084 (*p)->restart();
1085 health_monitor->finish();
1086 }
1087
1088
1089 // -----------------------------------------------------------
1090 // sync
1091
1092 set<string> Monitor::get_sync_targets_names()
1093 {
1094 set<string> targets;
1095 targets.insert(paxos->get_name());
1096 for (int i = 0; i < PAXOS_NUM; ++i)
1097 paxos_service[i]->get_store_prefixes(targets);
1098 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1099 assert(config_key_service_ptr);
1100 config_key_service_ptr->get_store_prefixes(targets);
1101 return targets;
1102 }
1103
1104
1105 void Monitor::sync_timeout()
1106 {
1107 dout(10) << __func__ << dendl;
1108 assert(state == STATE_SYNCHRONIZING);
1109 bootstrap();
1110 }
1111
1112 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1113 {
1114 dout(1) << __func__ << dendl;
1115
1116 MonMap latest_monmap;
1117
1118 // Grab latest monmap from MonmapMonitor
1119 bufferlist monmon_bl;
1120 int err = monmon()->get_monmap(monmon_bl);
1121 if (err < 0) {
1122 if (err != -ENOENT) {
1123 derr << __func__
1124 << " something wrong happened while reading the store: "
1125 << cpp_strerror(err) << dendl;
1126 assert(0 == "error reading the store");
1127 }
1128 } else {
1129 latest_monmap.decode(monmon_bl);
1130 }
1131
1132 // Grab last backed up monmap (if any) and compare epochs
1133 if (store->exists("mon_sync", "latest_monmap")) {
1134 bufferlist backup_bl;
1135 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1136 if (err < 0) {
1137 derr << __func__
1138 << " something wrong happened while reading the store: "
1139 << cpp_strerror(err) << dendl;
1140 assert(0 == "error reading the store");
1141 }
1142 assert(backup_bl.length() > 0);
1143
1144 MonMap backup_monmap;
1145 backup_monmap.decode(backup_bl);
1146
1147 if (backup_monmap.epoch > latest_monmap.epoch)
1148 latest_monmap = backup_monmap;
1149 }
1150
1151 // Check if our current monmap's epoch is greater than the one we've
1152 // got so far.
1153 if (monmap->epoch > latest_monmap.epoch)
1154 latest_monmap = *monmap;
1155
1156 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1157
1158 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1159 }
1160
1161 void Monitor::sync_reset_requester()
1162 {
1163 dout(10) << __func__ << dendl;
1164
1165 if (sync_timeout_event) {
1166 timer.cancel_event(sync_timeout_event);
1167 sync_timeout_event = NULL;
1168 }
1169
1170 sync_provider = entity_inst_t();
1171 sync_cookie = 0;
1172 sync_full = false;
1173 sync_start_version = 0;
1174 }
1175
1176 void Monitor::sync_reset_provider()
1177 {
1178 dout(10) << __func__ << dendl;
1179 sync_providers.clear();
1180 }
1181
1182 void Monitor::sync_start(entity_inst_t &other, bool full)
1183 {
1184 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1185
1186 assert(state == STATE_PROBING ||
1187 state == STATE_SYNCHRONIZING);
1188 state = STATE_SYNCHRONIZING;
1189
1190 // make sure are not a provider for anyone!
1191 sync_reset_provider();
1192
1193 sync_full = full;
1194
1195 if (sync_full) {
1196 // stash key state, and mark that we are syncing
1197 auto t(std::make_shared<MonitorDBStore::Transaction>());
1198 sync_stash_critical_state(t);
1199 t->put("mon_sync", "in_sync", 1);
1200
1201 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1202 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1203 << sync_last_committed_floor << dendl;
1204 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1205
1206 store->apply_transaction(t);
1207
1208 assert(g_conf->mon_sync_requester_kill_at != 1);
1209
1210 // clear the underlying store
1211 set<string> targets = get_sync_targets_names();
1212 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1213 store->clear(targets);
1214
1215 // make sure paxos knows it has been reset. this prevents a
1216 // bootstrap and then different probe reply order from possibly
1217 // deciding a partial or no sync is needed.
1218 paxos->init();
1219
1220 assert(g_conf->mon_sync_requester_kill_at != 2);
1221 }
1222
1223 // assume 'other' as the leader. We will update the leader once we receive
1224 // a reply to the sync start.
1225 sync_provider = other;
1226
1227 sync_reset_timeout();
1228
1229 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1230 if (!sync_full)
1231 m->last_committed = paxos->get_version();
1232 messenger->send_message(m, sync_provider);
1233 }
1234
1235 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1236 {
1237 dout(10) << __func__ << dendl;
1238 bufferlist backup_monmap;
1239 sync_obtain_latest_monmap(backup_monmap);
1240 assert(backup_monmap.length() > 0);
1241 t->put("mon_sync", "latest_monmap", backup_monmap);
1242 }
1243
1244 void Monitor::sync_reset_timeout()
1245 {
1246 dout(10) << __func__ << dendl;
1247 if (sync_timeout_event)
1248 timer.cancel_event(sync_timeout_event);
1249 sync_timeout_event = new C_MonContext(this, [this](int) {
1250 sync_timeout();
1251 });
1252 timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
1253 }
1254
1255 void Monitor::sync_finish(version_t last_committed)
1256 {
1257 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1258
1259 assert(g_conf->mon_sync_requester_kill_at != 7);
1260
1261 if (sync_full) {
1262 // finalize the paxos commits
1263 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1264 paxos->read_and_prepare_transactions(tx, sync_start_version,
1265 last_committed);
1266 tx->put(paxos->get_name(), "last_committed", last_committed);
1267
1268 dout(30) << __func__ << " final tx dump:\n";
1269 JSONFormatter f(true);
1270 tx->dump(&f);
1271 f.flush(*_dout);
1272 *_dout << dendl;
1273
1274 store->apply_transaction(tx);
1275 }
1276
1277 assert(g_conf->mon_sync_requester_kill_at != 8);
1278
1279 auto t(std::make_shared<MonitorDBStore::Transaction>());
1280 t->erase("mon_sync", "in_sync");
1281 t->erase("mon_sync", "force_sync");
1282 t->erase("mon_sync", "last_committed_floor");
1283 store->apply_transaction(t);
1284
1285 assert(g_conf->mon_sync_requester_kill_at != 9);
1286
1287 init_paxos();
1288
1289 assert(g_conf->mon_sync_requester_kill_at != 10);
1290
1291 bootstrap();
1292 }
1293
1294 void Monitor::handle_sync(MonOpRequestRef op)
1295 {
1296 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1297 dout(10) << __func__ << " " << *m << dendl;
1298 switch (m->op) {
1299
1300 // provider ---------
1301
1302 case MMonSync::OP_GET_COOKIE_FULL:
1303 case MMonSync::OP_GET_COOKIE_RECENT:
1304 handle_sync_get_cookie(op);
1305 break;
1306 case MMonSync::OP_GET_CHUNK:
1307 handle_sync_get_chunk(op);
1308 break;
1309
1310 // client -----------
1311
1312 case MMonSync::OP_COOKIE:
1313 handle_sync_cookie(op);
1314 break;
1315
1316 case MMonSync::OP_CHUNK:
1317 case MMonSync::OP_LAST_CHUNK:
1318 handle_sync_chunk(op);
1319 break;
1320 case MMonSync::OP_NO_COOKIE:
1321 handle_sync_no_cookie(op);
1322 break;
1323
1324 default:
1325 dout(0) << __func__ << " unknown op " << m->op << dendl;
1326 assert(0 == "unknown op");
1327 }
1328 }
1329
1330 // leader
1331
1332 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1333 {
1334 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1335 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1336 m->get_connection()->send_message(reply);
1337 }
1338
1339 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1340 {
1341 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1342 if (is_synchronizing()) {
1343 _sync_reply_no_cookie(op);
1344 return;
1345 }
1346
1347 assert(g_conf->mon_sync_provider_kill_at != 1);
1348
1349 // make sure they can understand us.
1350 if ((required_features ^ m->get_connection()->get_features()) &
1351 required_features) {
1352 dout(5) << " ignoring peer mon." << m->get_source().num()
1353 << " has features " << std::hex
1354 << m->get_connection()->get_features()
1355 << " but we require " << required_features << std::dec << dendl;
1356 return;
1357 }
1358
1359 // make up a unique cookie. include election epoch (which persists
1360 // across restarts for the whole cluster) and a counter for this
1361 // process instance. there is no need to be unique *across*
1362 // monitors, though.
1363 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1364 assert(sync_providers.count(cookie) == 0);
1365
1366 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1367
1368 SyncProvider& sp = sync_providers[cookie];
1369 sp.cookie = cookie;
1370 sp.entity = m->get_source_inst();
1371 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1372
1373 set<string> sync_targets;
1374 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1375 // full scan
1376 sync_targets = get_sync_targets_names();
1377 sp.last_committed = paxos->get_version();
1378 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1379 sp.full = true;
1380 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1381 } else {
1382 // just catch up paxos
1383 sp.last_committed = m->last_committed;
1384 }
1385 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1386
1387 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1388 reply->last_committed = sp.last_committed;
1389 m->get_connection()->send_message(reply);
1390 }
1391
1392 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1393 {
1394 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1395 dout(10) << __func__ << " " << *m << dendl;
1396
1397 if (sync_providers.count(m->cookie) == 0) {
1398 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1399 _sync_reply_no_cookie(op);
1400 return;
1401 }
1402
1403 assert(g_conf->mon_sync_provider_kill_at != 2);
1404
1405 SyncProvider& sp = sync_providers[m->cookie];
1406 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1407
1408 if (sp.last_committed < paxos->get_first_committed() &&
1409 paxos->get_first_committed() > 1) {
1410 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1411 << " < our fc " << paxos->get_first_committed() << dendl;
1412 sync_providers.erase(m->cookie);
1413 _sync_reply_no_cookie(op);
1414 return;
1415 }
1416
1417 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1418 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1419
1420 int left = g_conf->mon_sync_max_payload_size;
1421 while (sp.last_committed < paxos->get_version() && left > 0) {
1422 bufferlist bl;
1423 sp.last_committed++;
1424 store->get(paxos->get_name(), sp.last_committed, bl);
1425 // TODO: what if store->get returns error or empty bl?
1426 tx->put(paxos->get_name(), sp.last_committed, bl);
1427 left -= bl.length();
1428 dout(20) << __func__ << " including paxos state " << sp.last_committed
1429 << dendl;
1430 }
1431 reply->last_committed = sp.last_committed;
1432
1433 if (sp.full && left > 0) {
1434 sp.synchronizer->get_chunk_tx(tx, left);
1435 sp.last_key = sp.synchronizer->get_last_key();
1436 reply->last_key = sp.last_key;
1437 }
1438
1439 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1440 sp.last_committed < paxos->get_version()) {
1441 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1442 << " key " << sp.last_key << dendl;
1443 } else {
1444 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1445 << " key " << sp.last_key << dendl;
1446 reply->op = MMonSync::OP_LAST_CHUNK;
1447
1448 assert(g_conf->mon_sync_provider_kill_at != 3);
1449
1450 // clean up our local state
1451 sync_providers.erase(sp.cookie);
1452 }
1453
1454 ::encode(*tx, reply->chunk_bl);
1455
1456 m->get_connection()->send_message(reply);
1457 }
1458
1459 // requester
1460
1461 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1462 {
1463 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1464 dout(10) << __func__ << " " << *m << dendl;
1465 if (sync_cookie) {
1466 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1467 return;
1468 }
1469 if (m->get_source_inst() != sync_provider) {
1470 dout(10) << __func__ << " source does not match, discarding" << dendl;
1471 return;
1472 }
1473 sync_cookie = m->cookie;
1474 sync_start_version = m->last_committed;
1475
1476 sync_reset_timeout();
1477 sync_get_next_chunk();
1478
1479 assert(g_conf->mon_sync_requester_kill_at != 3);
1480 }
1481
1482 void Monitor::sync_get_next_chunk()
1483 {
1484 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1485 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1486 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1487 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1488 }
1489 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1490 messenger->send_message(r, sync_provider);
1491
1492 assert(g_conf->mon_sync_requester_kill_at != 4);
1493 }
1494
1495 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1496 {
1497 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1498 dout(10) << __func__ << " " << *m << dendl;
1499
1500 if (m->cookie != sync_cookie) {
1501 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1502 return;
1503 }
1504 if (m->get_source_inst() != sync_provider) {
1505 dout(10) << __func__ << " source does not match, discarding" << dendl;
1506 return;
1507 }
1508
1509 assert(state == STATE_SYNCHRONIZING);
1510 assert(g_conf->mon_sync_requester_kill_at != 5);
1511
1512 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1513 tx->append_from_encoded(m->chunk_bl);
1514
1515 dout(30) << __func__ << " tx dump:\n";
1516 JSONFormatter f(true);
1517 tx->dump(&f);
1518 f.flush(*_dout);
1519 *_dout << dendl;
1520
1521 store->apply_transaction(tx);
1522
1523 assert(g_conf->mon_sync_requester_kill_at != 6);
1524
1525 if (!sync_full) {
1526 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1527 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1528 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1529 m->last_committed);
1530 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1531
1532 dout(30) << __func__ << " tx dump:\n";
1533 JSONFormatter f(true);
1534 tx->dump(&f);
1535 f.flush(*_dout);
1536 *_dout << dendl;
1537
1538 store->apply_transaction(tx);
1539 paxos->init(); // to refresh what we just wrote
1540 }
1541
1542 if (m->op == MMonSync::OP_CHUNK) {
1543 sync_reset_timeout();
1544 sync_get_next_chunk();
1545 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1546 sync_finish(m->last_committed);
1547 }
1548 }
1549
1550 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1551 {
1552 dout(10) << __func__ << dendl;
1553 bootstrap();
1554 }
1555
1556 void Monitor::sync_trim_providers()
1557 {
1558 dout(20) << __func__ << dendl;
1559
1560 utime_t now = ceph_clock_now();
1561 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1562 while (p != sync_providers.end()) {
1563 if (now > p->second.timeout) {
1564 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1565 sync_providers.erase(p++);
1566 } else {
1567 ++p;
1568 }
1569 }
1570 }
1571
1572 // ---------------------------------------------------
1573 // probe
1574
1575 void Monitor::cancel_probe_timeout()
1576 {
1577 if (probe_timeout_event) {
1578 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1579 timer.cancel_event(probe_timeout_event);
1580 probe_timeout_event = NULL;
1581 } else {
1582 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1583 }
1584 }
1585
1586 void Monitor::reset_probe_timeout()
1587 {
1588 cancel_probe_timeout();
1589 probe_timeout_event = new C_MonContext(this, [this](int r) {
1590 probe_timeout(r);
1591 });
1592 double t = g_conf->mon_probe_timeout;
1593 timer.add_event_after(t, probe_timeout_event);
1594 dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
1595 }
1596
1597 void Monitor::probe_timeout(int r)
1598 {
1599 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1600 assert(is_probing() || is_synchronizing());
1601 assert(probe_timeout_event);
1602 probe_timeout_event = NULL;
1603 bootstrap();
1604 }
1605
1606 void Monitor::handle_probe(MonOpRequestRef op)
1607 {
1608 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1609 dout(10) << "handle_probe " << *m << dendl;
1610
1611 if (m->fsid != monmap->fsid) {
1612 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1613 return;
1614 }
1615
1616 switch (m->op) {
1617 case MMonProbe::OP_PROBE:
1618 handle_probe_probe(op);
1619 break;
1620
1621 case MMonProbe::OP_REPLY:
1622 handle_probe_reply(op);
1623 break;
1624
1625 case MMonProbe::OP_MISSING_FEATURES:
1626 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1627 << ", required " << m->required_features
1628 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1629 << dendl;
1630 break;
1631 }
1632 }
1633
1634 void Monitor::handle_probe_probe(MonOpRequestRef op)
1635 {
1636 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1637
1638 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1639 << " features " << m->get_connection()->get_features() << dendl;
1640 uint64_t missing = required_features & ~m->get_connection()->get_features();
1641 if (missing) {
1642 dout(1) << " peer " << m->get_source_addr() << " missing features "
1643 << missing << dendl;
1644 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1645 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1646 name, has_ever_joined);
1647 m->required_features = required_features;
1648 m->get_connection()->send_message(r);
1649 }
1650 goto out;
1651 }
1652
1653 if (!is_probing() && !is_synchronizing()) {
1654 // If the probing mon is way ahead of us, we need to re-bootstrap.
1655 // Normally we capture this case when we initially bootstrap, but
1656 // it is possible we pass those checks (we overlap with
1657 // quorum-to-be) but fail to join a quorum before it moves past
1658 // us. We need to be kicked back to bootstrap so we can
1659 // synchonize, not keep calling elections.
1660 if (paxos->get_version() + 1 < m->paxos_first_version) {
1661 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1662 << "ahead of us, re-bootstrapping" << dendl;
1663 bootstrap();
1664 goto out;
1665
1666 }
1667 }
1668
1669 MMonProbe *r;
1670 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1671 r->name = name;
1672 r->quorum = quorum;
1673 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1674 r->paxos_first_version = paxos->get_first_committed();
1675 r->paxos_last_version = paxos->get_version();
1676 m->get_connection()->send_message(r);
1677
1678 // did we discover a peer here?
1679 if (!monmap->contains(m->get_source_addr())) {
1680 dout(1) << " adding peer " << m->get_source_addr()
1681 << " to list of hints" << dendl;
1682 extra_probe_peers.insert(m->get_source_addr());
1683 }
1684
1685 out:
1686 return;
1687 }
1688
1689 void Monitor::handle_probe_reply(MonOpRequestRef op)
1690 {
1691 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1692 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1693 dout(10) << " monmap is " << *monmap << dendl;
1694
1695 // discover name and addrs during probing or electing states.
1696 if (!is_probing() && !is_electing()) {
1697 return;
1698 }
1699
1700 // newer map, or they've joined a quorum and we haven't?
1701 bufferlist mybl;
1702 monmap->encode(mybl, m->get_connection()->get_features());
1703 // make sure it's actually different; the checks below err toward
1704 // taking the other guy's map, which could cause us to loop.
1705 if (!mybl.contents_equal(m->monmap_bl)) {
1706 MonMap *newmap = new MonMap;
1707 newmap->decode(m->monmap_bl);
1708 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1709 !has_ever_joined)) {
1710 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1711 << ", mine was " << monmap->get_epoch() << dendl;
1712 delete newmap;
1713 monmap->decode(m->monmap_bl);
1714
1715 bootstrap();
1716 return;
1717 }
1718 delete newmap;
1719 }
1720
1721 // rename peer?
1722 string peer_name = monmap->get_name(m->get_source_addr());
1723 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1724 dout(10) << " renaming peer " << m->get_source_addr() << " "
1725 << peer_name << " -> " << m->name << " in my monmap"
1726 << dendl;
1727 monmap->rename(peer_name, m->name);
1728
1729 if (is_electing()) {
1730 bootstrap();
1731 return;
1732 }
1733 } else {
1734 dout(10) << " peer name is " << peer_name << dendl;
1735 }
1736
1737 // new initial peer?
1738 if (monmap->get_epoch() == 0 &&
1739 monmap->contains(m->name) &&
1740 monmap->get_addr(m->name).is_blank_ip()) {
1741 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1742 monmap->set_addr(m->name, m->get_source_addr());
1743
1744 bootstrap();
1745 return;
1746 }
1747
1748 // end discover phase
1749 if (!is_probing()) {
1750 return;
1751 }
1752
1753 assert(paxos != NULL);
1754
1755 if (is_synchronizing()) {
1756 dout(10) << " currently syncing" << dendl;
1757 return;
1758 }
1759
1760 entity_inst_t other = m->get_source_inst();
1761
1762 if (m->paxos_last_version < sync_last_committed_floor) {
1763 dout(10) << " peer paxos versions [" << m->paxos_first_version
1764 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1765 << sync_last_committed_floor << ", ignoring"
1766 << dendl;
1767 } else {
1768 if (paxos->get_version() < m->paxos_first_version &&
1769 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1770 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1771 << "," << m->paxos_last_version << "]"
1772 << " vs my version " << paxos->get_version()
1773 << " (too far ahead)"
1774 << dendl;
1775 cancel_probe_timeout();
1776 sync_start(other, true);
1777 return;
1778 }
1779 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1780 dout(10) << " peer paxos last version " << m->paxos_last_version
1781 << " vs my version " << paxos->get_version()
1782 << " (too far ahead)"
1783 << dendl;
1784 cancel_probe_timeout();
1785 sync_start(other, false);
1786 return;
1787 }
1788 }
1789
1790 // is there an existing quorum?
1791 if (m->quorum.size()) {
1792 dout(10) << " existing quorum " << m->quorum << dendl;
1793
1794 dout(10) << " peer paxos version " << m->paxos_last_version
1795 << " vs my version " << paxos->get_version()
1796 << " (ok)"
1797 << dendl;
1798
1799 if (monmap->contains(name) &&
1800 !monmap->get_addr(name).is_blank_ip()) {
1801 // i'm part of the cluster; just initiate a new election
1802 start_election();
1803 } else {
1804 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1805 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1806 monmap->get_inst(*m->quorum.begin()));
1807 }
1808 } else {
1809 if (monmap->contains(m->name)) {
1810 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1811 outside_quorum.insert(m->name);
1812 } else {
1813 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1814 return;
1815 }
1816
1817 unsigned need = monmap->size() / 2 + 1;
1818 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1819 if (outside_quorum.size() >= need) {
1820 if (outside_quorum.count(name)) {
1821 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1822 start_election();
1823 } else {
1824 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1825 }
1826 } else {
1827 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1828 }
1829 }
1830 }
1831
1832 void Monitor::join_election()
1833 {
1834 dout(10) << __func__ << dendl;
1835 wait_for_paxos_write();
1836 _reset();
1837 state = STATE_ELECTING;
1838
1839 logger->inc(l_mon_num_elections);
1840 }
1841
1842 void Monitor::start_election()
1843 {
1844 dout(10) << "start_election" << dendl;
1845 wait_for_paxos_write();
1846 _reset();
1847 state = STATE_ELECTING;
1848
1849 logger->inc(l_mon_num_elections);
1850 logger->inc(l_mon_election_call);
1851
1852 clog->info() << "mon." << name << " calling new monitor election";
1853 elector.call_election();
1854 }
1855
1856 void Monitor::win_standalone_election()
1857 {
1858 dout(1) << "win_standalone_election" << dendl;
1859
1860 // bump election epoch, in case the previous epoch included other
1861 // monitors; we need to be able to make the distinction.
1862 elector.init();
1863 elector.advance_epoch();
1864
1865 rank = monmap->get_rank(name);
1866 assert(rank == 0);
1867 set<int> q;
1868 q.insert(rank);
1869
1870 const MonCommand *my_cmds;
1871 int cmdsize;
1872 get_locally_supported_monitor_commands(&my_cmds, &cmdsize);
1873 win_election(elector.get_epoch(), q,
1874 CEPH_FEATURES_ALL,
1875 ceph::features::mon::get_supported(),
1876 my_cmds, cmdsize);
1877 }
1878
1879 const utime_t& Monitor::get_leader_since() const
1880 {
1881 assert(state == STATE_LEADER);
1882 return leader_since;
1883 }
1884
1885 epoch_t Monitor::get_epoch()
1886 {
1887 return elector.get_epoch();
1888 }
1889
1890 void Monitor::_finish_svc_election()
1891 {
1892 assert(state == STATE_LEADER || state == STATE_PEON);
1893
1894 for (auto p : paxos_service) {
1895 // we already called election_finished() on monmon(); avoid callig twice
1896 if (state == STATE_LEADER && p == monmon())
1897 continue;
1898 p->election_finished();
1899 }
1900 }
1901
1902 void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1903 const mon_feature_t& mon_features,
1904 const MonCommand *cmdset, int cmdsize)
1905 {
1906 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1907 << " features " << features
1908 << " mon_features " << mon_features
1909 << dendl;
1910 assert(is_electing());
1911 state = STATE_LEADER;
1912 leader_since = ceph_clock_now();
1913 leader = rank;
1914 quorum = active;
1915 quorum_con_features = features;
1916 quorum_mon_features = mon_features;
1917 outside_quorum.clear();
1918
1919 clog->info() << "mon." << name << "@" << rank
1920 << " won leader election with quorum " << quorum;
1921
1922 set_leader_supported_commands(cmdset, cmdsize);
1923
1924 paxos->leader_init();
1925 // NOTE: tell monmap monitor first. This is important for the
1926 // bootstrap case to ensure that the very first paxos proposal
1927 // codifies the monmap. Otherwise any manner of chaos can ensue
1928 // when monitors are call elections or participating in a paxos
1929 // round without agreeing on who the participants are.
1930 monmon()->election_finished();
1931 _finish_svc_election();
1932 health_monitor->start(epoch);
1933
1934 logger->inc(l_mon_election_win);
1935
1936 finish_election();
1937 if (monmap->size() > 1 &&
1938 monmap->get_epoch() > 0) {
1939 timecheck_start();
1940 health_tick_start();
1941 do_health_to_clog_interval();
1942 scrub_event_start();
1943 }
1944
1945 Metadata my_meta;
1946 collect_sys_info(&my_meta, g_ceph_context);
1947 my_meta["addr"] = stringify(messenger->get_myaddr());
1948 update_mon_metadata(rank, std::move(my_meta));
1949 }
1950
1951 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1952 uint64_t features,
1953 const mon_feature_t& mon_features)
1954 {
1955 state = STATE_PEON;
1956 leader_since = utime_t();
1957 leader = l;
1958 quorum = q;
1959 outside_quorum.clear();
1960 quorum_con_features = features;
1961 quorum_mon_features = mon_features;
1962 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
1963 << " quorum is " << quorum << " features are " << quorum_con_features
1964 << " mon_features are " << quorum_mon_features
1965 << dendl;
1966
1967 paxos->peon_init();
1968 _finish_svc_election();
1969 health_monitor->start(epoch);
1970
1971 logger->inc(l_mon_election_lose);
1972
1973 finish_election();
1974
1975 if (quorum_con_features & CEPH_FEATURE_MON_METADATA) {
1976 Metadata sys_info;
1977 collect_sys_info(&sys_info, g_ceph_context);
1978 messenger->send_message(new MMonMetadata(sys_info),
1979 monmap->get_inst(get_leader()));
1980 }
1981 }
1982
1983 void Monitor::finish_election()
1984 {
1985 apply_quorum_to_compatset_features();
1986 apply_monmap_to_compatset_features();
1987 timecheck_finish();
1988 exited_quorum = utime_t();
1989 finish_contexts(g_ceph_context, waitfor_quorum);
1990 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
1991 resend_routed_requests();
1992 update_logger();
1993 register_cluster_logger();
1994
1995 // am i named properly?
1996 string cur_name = monmap->get_name(messenger->get_myaddr());
1997 if (cur_name != name) {
1998 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
1999 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2000 monmap->get_inst(*quorum.begin()));
2001 }
2002 }
2003
2004 void Monitor::_apply_compatset_features(CompatSet &new_features)
2005 {
2006 if (new_features.compare(features) != 0) {
2007 CompatSet diff = features.unsupported(new_features);
2008 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2009 features = new_features;
2010
2011 auto t = std::make_shared<MonitorDBStore::Transaction>();
2012 write_features(t);
2013 store->apply_transaction(t);
2014
2015 calc_quorum_requirements();
2016 }
2017 }
2018
2019 void Monitor::apply_quorum_to_compatset_features()
2020 {
2021 CompatSet new_features(features);
2022 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2023 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2024 }
2025 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2026 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2027 }
2028 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2029 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2030 }
2031 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2032 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2033 }
2034 dout(5) << __func__ << dendl;
2035 _apply_compatset_features(new_features);
2036 }
2037
2038 void Monitor::apply_monmap_to_compatset_features()
2039 {
2040 CompatSet new_features(features);
2041 mon_feature_t monmap_features = monmap->get_required_features();
2042
2043 /* persistent monmap features may go into the compatset.
2044 * optional monmap features may not - why?
2045 * because optional monmap features may be set/unset by the admin,
2046 * and possibly by other means that haven't yet been thought out,
2047 * so we can't make the monitor enforce them on start - because they
2048 * may go away.
2049 * this, of course, does not invalidate setting a compatset feature
2050 * for an optional feature - as long as you make sure to clean it up
2051 * once you unset it.
2052 */
2053 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2054 assert(ceph::features::mon::get_persistent().contains_all(
2055 ceph::features::mon::FEATURE_KRAKEN));
2056 // this feature should only ever be set if the quorum supports it.
2057 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2058 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2059 }
2060
2061 dout(5) << __func__ << dendl;
2062 _apply_compatset_features(new_features);
2063 }
2064
2065 void Monitor::calc_quorum_requirements()
2066 {
2067 required_features = 0;
2068
2069 // compatset
2070 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2071 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2072 }
2073 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2074 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2075 }
2076 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2077 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2078 }
2079 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2080 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2081 }
2082 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2083 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2084 }
2085
2086 // monmap
2087 if (monmap->get_required_features().contains_all(
2088 ceph::features::mon::FEATURE_KRAKEN)) {
2089 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2090 }
2091 if (monmap->get_required_features().contains_all(
2092 ceph::features::mon::FEATURE_LUMINOUS)) {
2093 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2094 }
2095 dout(10) << __func__ << " required_features " << required_features << dendl;
2096 }
2097
2098 void Monitor::get_combined_feature_map(FeatureMap *fm)
2099 {
2100 *fm += session_map.feature_map;
2101 for (auto id : quorum) {
2102 if (id != rank) {
2103 *fm += quorum_feature_map[id];
2104 }
2105 }
2106 }
2107
2108 void Monitor::sync_force(Formatter *f, ostream& ss)
2109 {
2110 bool free_formatter = false;
2111
2112 if (!f) {
2113 // louzy/lazy hack: default to json if no formatter has been defined
2114 f = new JSONFormatter();
2115 free_formatter = true;
2116 }
2117
2118 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2119 sync_stash_critical_state(tx);
2120 tx->put("mon_sync", "force_sync", 1);
2121 store->apply_transaction(tx);
2122
2123 f->open_object_section("sync_force");
2124 f->dump_int("ret", 0);
2125 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2126 f->close_section(); // sync_force
2127 f->flush(ss);
2128 if (free_formatter)
2129 delete f;
2130 }
2131
2132 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2133 {
2134 bool free_formatter = false;
2135
2136 if (!f) {
2137 // louzy/lazy hack: default to json if no formatter has been defined
2138 f = new JSONFormatter();
2139 free_formatter = true;
2140 }
2141 f->open_object_section("quorum_status");
2142 f->dump_int("election_epoch", get_epoch());
2143
2144 f->open_array_section("quorum");
2145 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2146 f->dump_int("mon", *p);
2147 f->close_section(); // quorum
2148
2149 list<string> quorum_names = get_quorum_names();
2150 f->open_array_section("quorum_names");
2151 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2152 f->dump_string("mon", *p);
2153 f->close_section(); // quorum_names
2154
2155 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2156
2157 f->open_object_section("monmap");
2158 monmap->dump(f);
2159 f->close_section(); // monmap
2160
2161 f->close_section(); // quorum_status
2162 f->flush(ss);
2163 if (free_formatter)
2164 delete f;
2165 }
2166
2167 void Monitor::get_mon_status(Formatter *f, ostream& ss)
2168 {
2169 bool free_formatter = false;
2170
2171 if (!f) {
2172 // louzy/lazy hack: default to json if no formatter has been defined
2173 f = new JSONFormatter();
2174 free_formatter = true;
2175 }
2176
2177 f->open_object_section("mon_status");
2178 f->dump_string("name", name);
2179 f->dump_int("rank", rank);
2180 f->dump_string("state", get_state_name());
2181 f->dump_int("election_epoch", get_epoch());
2182
2183 f->open_array_section("quorum");
2184 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2185 f->dump_int("mon", *p);
2186 }
2187
2188 f->close_section(); // quorum
2189
2190 f->open_object_section("features");
2191 f->dump_stream("required_con") << required_features;
2192 mon_feature_t req_mon_features = get_required_mon_features();
2193 req_mon_features.dump(f, "required_mon");
2194 f->dump_stream("quorum_con") << quorum_con_features;
2195 quorum_mon_features.dump(f, "quorum_mon");
2196 f->close_section(); // features
2197
2198 f->open_array_section("outside_quorum");
2199 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2200 f->dump_string("mon", *p);
2201 f->close_section(); // outside_quorum
2202
2203 f->open_array_section("extra_probe_peers");
2204 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2205 p != extra_probe_peers.end();
2206 ++p)
2207 f->dump_stream("peer") << *p;
2208 f->close_section(); // extra_probe_peers
2209
2210 f->open_array_section("sync_provider");
2211 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2212 p != sync_providers.end();
2213 ++p) {
2214 f->dump_unsigned("cookie", p->second.cookie);
2215 f->dump_stream("entity") << p->second.entity;
2216 f->dump_stream("timeout") << p->second.timeout;
2217 f->dump_unsigned("last_committed", p->second.last_committed);
2218 f->dump_stream("last_key") << p->second.last_key;
2219 }
2220 f->close_section();
2221
2222 if (is_synchronizing()) {
2223 f->open_object_section("sync");
2224 f->dump_stream("sync_provider") << sync_provider;
2225 f->dump_unsigned("sync_cookie", sync_cookie);
2226 f->dump_unsigned("sync_start_version", sync_start_version);
2227 f->close_section();
2228 }
2229
2230 if (g_conf->mon_sync_provider_kill_at > 0)
2231 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2232 if (g_conf->mon_sync_requester_kill_at > 0)
2233 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2234
2235 f->open_object_section("monmap");
2236 monmap->dump(f);
2237 f->close_section();
2238
2239 f->dump_object("feature_map", session_map.feature_map);
2240 f->close_section(); // mon_status
2241
2242 if (free_formatter) {
2243 // flush formatter to ss and delete it iff we created the formatter
2244 f->flush(ss);
2245 delete f;
2246 }
2247 }
2248
2249
2250 // health status to clog
2251
2252 void Monitor::health_tick_start()
2253 {
2254 if (!cct->_conf->mon_health_to_clog ||
2255 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2256 return;
2257
2258 dout(15) << __func__ << dendl;
2259
2260 health_tick_stop();
2261 health_tick_event = new C_MonContext(this, [this](int r) {
2262 if (r < 0)
2263 return;
2264 do_health_to_clog();
2265 health_tick_start();
2266 });
2267 timer.add_event_after(cct->_conf->mon_health_to_clog_tick_interval,
2268 health_tick_event);
2269 }
2270
2271 void Monitor::health_tick_stop()
2272 {
2273 dout(15) << __func__ << dendl;
2274
2275 if (health_tick_event) {
2276 timer.cancel_event(health_tick_event);
2277 health_tick_event = NULL;
2278 }
2279 }
2280
2281 utime_t Monitor::health_interval_calc_next_update()
2282 {
2283 utime_t now = ceph_clock_now();
2284
2285 time_t secs = now.sec();
2286 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2287 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2288 utime_t next = utime_t(secs + adjustment, 0);
2289
2290 dout(20) << __func__
2291 << " now: " << now << ","
2292 << " next: " << next << ","
2293 << " interval: " << cct->_conf->mon_health_to_clog_interval
2294 << dendl;
2295
2296 return next;
2297 }
2298
2299 void Monitor::health_interval_start()
2300 {
2301 dout(15) << __func__ << dendl;
2302
2303 if (!cct->_conf->mon_health_to_clog ||
2304 cct->_conf->mon_health_to_clog_interval <= 0) {
2305 return;
2306 }
2307
2308 health_interval_stop();
2309 utime_t next = health_interval_calc_next_update();
2310 health_interval_event = new C_MonContext(this, [this](int r) {
2311 if (r < 0)
2312 return;
2313 do_health_to_clog_interval();
2314 });
2315 timer.add_event_at(next, health_interval_event);
2316 }
2317
2318 void Monitor::health_interval_stop()
2319 {
2320 dout(15) << __func__ << dendl;
2321 if (health_interval_event) {
2322 timer.cancel_event(health_interval_event);
2323 }
2324 health_interval_event = NULL;
2325 }
2326
2327 void Monitor::health_events_cleanup()
2328 {
2329 health_tick_stop();
2330 health_interval_stop();
2331 health_status_cache.reset();
2332 }
2333
2334 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2335 {
2336 dout(20) << __func__ << dendl;
2337
2338 if (changed.count("mon_health_to_clog")) {
2339 if (!cct->_conf->mon_health_to_clog) {
2340 health_events_cleanup();
2341 } else {
2342 if (!health_tick_event) {
2343 health_tick_start();
2344 }
2345 if (!health_interval_event) {
2346 health_interval_start();
2347 }
2348 }
2349 }
2350
2351 if (changed.count("mon_health_to_clog_interval")) {
2352 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2353 health_interval_stop();
2354 } else {
2355 health_interval_start();
2356 }
2357 }
2358
2359 if (changed.count("mon_health_to_clog_tick_interval")) {
2360 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2361 health_tick_stop();
2362 } else {
2363 health_tick_start();
2364 }
2365 }
2366 }
2367
2368 void Monitor::do_health_to_clog_interval()
2369 {
2370 // outputting to clog may have been disabled in the conf
2371 // since we were scheduled.
2372 if (!cct->_conf->mon_health_to_clog ||
2373 cct->_conf->mon_health_to_clog_interval <= 0)
2374 return;
2375
2376 dout(10) << __func__ << dendl;
2377
2378 // do we have a cached value for next_clog_update? if not,
2379 // do we know when the last update was?
2380
2381 do_health_to_clog(true);
2382 health_interval_start();
2383 }
2384
2385 void Monitor::do_health_to_clog(bool force)
2386 {
2387 // outputting to clog may have been disabled in the conf
2388 // since we were scheduled.
2389 if (!cct->_conf->mon_health_to_clog ||
2390 cct->_conf->mon_health_to_clog_interval <= 0)
2391 return;
2392
2393 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2394
2395 list<string> status;
2396 health_status_t overall = get_health(status, NULL, NULL);
2397
2398 dout(25) << __func__
2399 << (force ? " (force)" : "")
2400 << dendl;
2401
2402 string summary = joinify(status.begin(), status.end(), string("; "));
2403
2404 if (!force &&
2405 overall == health_status_cache.overall &&
2406 !health_status_cache.summary.empty() &&
2407 health_status_cache.summary == summary) {
2408 // we got a dup!
2409 return;
2410 }
2411
2412 clog->info() << summary;
2413
2414 health_status_cache.overall = overall;
2415 health_status_cache.summary = summary;
2416 }
2417
2418 health_status_t Monitor::get_health(list<string>& status,
2419 bufferlist *detailbl,
2420 Formatter *f)
2421 {
2422 list<pair<health_status_t,string> > summary;
2423 list<pair<health_status_t,string> > detail;
2424
2425 if (f)
2426 f->open_object_section("health");
2427
2428 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2429 p != paxos_service.end();
2430 ++p) {
2431 PaxosService *s = *p;
2432 s->get_health(summary, detailbl ? &detail : NULL, cct);
2433 }
2434
2435 health_monitor->get_health(f, summary, (detailbl ? &detail : NULL));
2436
2437 if (f) {
2438 f->open_object_section("timechecks");
2439 f->dump_unsigned("epoch", get_epoch());
2440 f->dump_int("round", timecheck_round);
2441 f->dump_stream("round_status")
2442 << ((timecheck_round%2) ? "on-going" : "finished");
2443 }
2444
2445 health_status_t overall = HEALTH_OK;
2446 if (!timecheck_skews.empty()) {
2447 list<string> warns;
2448 if (f)
2449 f->open_array_section("mons");
2450 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2451 i != timecheck_skews.end(); ++i) {
2452 entity_inst_t inst = i->first;
2453 double skew = i->second;
2454 double latency = timecheck_latencies[inst];
2455 string name = monmap->get_name(inst.addr);
2456
2457 ostringstream tcss;
2458 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2459 if (tcstatus != HEALTH_OK) {
2460 if (overall > tcstatus)
2461 overall = tcstatus;
2462 warns.push_back(name);
2463
2464 ostringstream tmp_ss;
2465 tmp_ss << "mon." << name
2466 << " addr " << inst.addr << " " << tcss.str()
2467 << " (latency " << latency << "s)";
2468 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2469 }
2470
2471 if (f) {
2472 f->open_object_section("mon");
2473 f->dump_string("name", name.c_str());
2474 f->dump_float("skew", skew);
2475 f->dump_float("latency", latency);
2476 f->dump_stream("health") << tcstatus;
2477 if (tcstatus != HEALTH_OK)
2478 f->dump_stream("details") << tcss.str();
2479 f->close_section();
2480 }
2481 }
2482 if (!warns.empty()) {
2483 ostringstream ss;
2484 ss << "clock skew detected on";
2485 while (!warns.empty()) {
2486 ss << " mon." << warns.front();
2487 warns.pop_front();
2488 if (!warns.empty())
2489 ss << ",";
2490 }
2491 status.push_back(ss.str());
2492 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2493 }
2494 if (f)
2495 f->close_section();
2496 }
2497 if (f)
2498 f->close_section();
2499
2500 if (f)
2501 f->open_array_section("summary");
2502 if (!summary.empty()) {
2503 while (!summary.empty()) {
2504 if (overall > summary.front().first)
2505 overall = summary.front().first;
2506 status.push_back(summary.front().second);
2507 if (f) {
2508 f->open_object_section("item");
2509 f->dump_stream("severity") << summary.front().first;
2510 f->dump_string("summary", summary.front().second);
2511 f->close_section();
2512 }
2513 summary.pop_front();
2514 }
2515 }
2516 if (f)
2517 f->close_section();
2518
2519 stringstream fss;
2520 fss << overall;
2521 status.push_front(fss.str());
2522 if (f)
2523 f->dump_stream("overall_status") << overall;
2524
2525 if (f)
2526 f->open_array_section("detail");
2527 while (!detail.empty()) {
2528 if (f)
2529 f->dump_string("item", detail.front().second);
2530 else if (detailbl != NULL) {
2531 detailbl->append(detail.front().second);
2532 detailbl->append('\n');
2533 }
2534 detail.pop_front();
2535 }
2536 if (f)
2537 f->close_section();
2538
2539 if (f)
2540 f->close_section();
2541
2542 return overall;
2543 }
2544
2545 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2546 {
2547 if (f)
2548 f->open_object_section("status");
2549
2550 // reply with the status for all the components
2551 list<string> health;
2552 get_health(health, NULL, f);
2553
2554 if (f) {
2555 f->dump_stream("fsid") << monmap->get_fsid();
2556 f->dump_unsigned("election_epoch", get_epoch());
2557 {
2558 f->open_array_section("quorum");
2559 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2560 f->dump_int("rank", *p);
2561 f->close_section();
2562 f->open_array_section("quorum_names");
2563 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2564 f->dump_string("id", monmap->get_name(*p));
2565 f->close_section();
2566 }
2567 f->open_object_section("monmap");
2568 monmap->dump(f);
2569 f->close_section();
2570 f->open_object_section("osdmap");
2571 osdmon()->osdmap.print_summary(f, cout);
2572 f->close_section();
2573 f->open_object_section("pgmap");
2574 pgservice->print_summary(f, NULL);
2575 f->close_section();
2576 f->open_object_section("fsmap");
2577 mdsmon()->get_fsmap().print_summary(f, NULL);
2578 f->close_section();
2579
2580 f->open_object_section("mgrmap");
2581 mgrmon()->get_map().print_summary(f, nullptr);
2582 f->close_section();
2583 f->close_section();
2584 } else {
2585
2586 ss << " cluster:\n";
2587 ss << " id: " << monmap->get_fsid() << "\n";
2588 ss << " health: " << joinify(health.begin(), health.end(),
2589 string("\n ")) << "\n";
2590 ss << "\n \n services:\n";
2591 const auto quorum_names = get_quorum_names();
2592 const auto mon_count = monmap->mon_info.size();
2593 ss << " mon: " << mon_count << " daemons, quorum "
2594 << quorum_names;
2595 if (quorum_names.size() != mon_count) {
2596 std::list<std::string> out_of_q;
2597 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2598 if (quorum.count(i) == 0) {
2599 out_of_q.push_back(monmap->ranks[i]);
2600 }
2601 }
2602 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2603 out_of_q.end(), std::string(", "));
2604 }
2605 ss << "\n";
2606 if (mgrmon()->in_use()) {
2607 ss << " mgr: ";
2608 mgrmon()->get_map().print_summary(nullptr, &ss);
2609 ss << "\n";
2610 }
2611 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2612 ss << " mds: " << mdsmon()->get_fsmap() << "\n";
2613 }
2614 ss << " osd: ";
2615 osdmon()->osdmap.print_summary(NULL, ss);
2616
2617 ss << "\n \n data:\n";
2618 pgservice->print_summary(NULL, &ss);
2619 ss << "\n ";
2620 }
2621 }
2622
2623 void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2624 map<string,string> &param_str_map)
2625 {
2626 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2627 p != cmdmap.end(); ++p) {
2628 if (p->first == "prefix")
2629 continue;
2630 if (p->first == "caps") {
2631 vector<string> cv;
2632 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2633 cv.size() % 2 == 0) {
2634 for (unsigned i = 0; i < cv.size(); i += 2) {
2635 string k = string("caps_") + cv[i];
2636 param_str_map[k] = cv[i + 1];
2637 }
2638 continue;
2639 }
2640 }
2641 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2642 }
2643 }
2644
2645 const MonCommand *Monitor::_get_moncommand(const string &cmd_prefix,
2646 MonCommand *cmds, int cmds_size)
2647 {
2648 MonCommand *this_cmd = NULL;
2649 for (MonCommand *cp = cmds;
2650 cp < &cmds[cmds_size]; cp++) {
2651 if (cp->cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2652 this_cmd = cp;
2653 break;
2654 }
2655 }
2656 return this_cmd;
2657 }
2658
2659 bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2660 const map<string,cmd_vartype>& cmdmap,
2661 const map<string,string>& param_str_map,
2662 const MonCommand *this_cmd) {
2663
2664 bool cmd_r = this_cmd->requires_perm('r');
2665 bool cmd_w = this_cmd->requires_perm('w');
2666 bool cmd_x = this_cmd->requires_perm('x');
2667
2668 bool capable = s->caps.is_capable(
2669 g_ceph_context,
2670 CEPH_ENTITY_TYPE_MON,
2671 s->entity_name,
2672 module, prefix, param_str_map,
2673 cmd_r, cmd_w, cmd_x);
2674
2675 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2676 return capable;
2677 }
2678
2679 void Monitor::format_command_descriptions(const MonCommand *commands,
2680 unsigned commands_size,
2681 Formatter *f,
2682 bufferlist *rdata,
2683 bool hide_mgr_flag)
2684 {
2685 int cmdnum = 0;
2686 f->open_object_section("command_descriptions");
2687 for (const MonCommand *cp = commands;
2688 cp < &commands[commands_size]; cp++) {
2689
2690 unsigned flags = cp->flags;
2691 if (hide_mgr_flag) {
2692 flags &= ~MonCommand::FLAG_MGR;
2693 }
2694 ostringstream secname;
2695 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2696 dump_cmddesc_to_json(f, secname.str(),
2697 cp->cmdstring, cp->helpstring, cp->module,
2698 cp->req_perms, cp->availability, flags);
2699 cmdnum++;
2700 }
2701 f->close_section(); // command_descriptions
2702
2703 f->flush(*rdata);
2704 }
2705
2706 void Monitor::get_locally_supported_monitor_commands(const MonCommand **cmds,
2707 int *count)
2708 {
2709 *cmds = mon_commands;
2710 *count = ARRAY_SIZE(mon_commands);
2711 }
2712 void Monitor::set_leader_supported_commands(const MonCommand *cmds, int size)
2713 {
2714 if (leader_supported_mon_commands != mon_commands)
2715 delete[] leader_supported_mon_commands;
2716 leader_supported_mon_commands = cmds;
2717 leader_supported_mon_commands_size = size;
2718 }
2719
2720 bool Monitor::is_keyring_required()
2721 {
2722 string auth_cluster_required = g_conf->auth_supported.empty() ?
2723 g_conf->auth_cluster_required : g_conf->auth_supported;
2724 string auth_service_required = g_conf->auth_supported.empty() ?
2725 g_conf->auth_service_required : g_conf->auth_supported;
2726
2727 return auth_service_required == "cephx" ||
2728 auth_cluster_required == "cephx";
2729 }
2730
2731 struct C_MgrProxyCommand : public Context {
2732 Monitor *mon;
2733 MonOpRequestRef op;
2734 uint64_t size;
2735 bufferlist outbl;
2736 string outs;
2737 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2738 : mon(mon), op(op), size(s) { }
2739 void finish(int r) {
2740 Mutex::Locker l(mon->lock);
2741 mon->mgr_proxy_bytes -= size;
2742 mon->reply_command(op, r, outs, outbl, 0);
2743 }
2744 };
2745
2746 void Monitor::handle_command(MonOpRequestRef op)
2747 {
2748 assert(op->is_type_command());
2749 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2750 if (m->fsid != monmap->fsid) {
2751 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2752 reply_command(op, -EPERM, "wrong fsid", 0);
2753 return;
2754 }
2755
2756 MonSession *session = static_cast<MonSession *>(
2757 m->get_connection()->get_priv());
2758 if (!session) {
2759 dout(5) << __func__ << " dropping stray message " << *m << dendl;
2760 return;
2761 }
2762 BOOST_SCOPE_EXIT_ALL(=) {
2763 session->put();
2764 };
2765
2766 if (m->cmd.empty()) {
2767 string rs = "No command supplied";
2768 reply_command(op, -EINVAL, rs, 0);
2769 return;
2770 }
2771
2772 string prefix;
2773 vector<string> fullcmd;
2774 map<string, cmd_vartype> cmdmap;
2775 stringstream ss, ds;
2776 bufferlist rdata;
2777 string rs;
2778 int r = -EINVAL;
2779 rs = "unrecognized command";
2780
2781 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
2782 // ss has reason for failure
2783 r = -EINVAL;
2784 rs = ss.str();
2785 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
2786 reply_command(op, r, rs, 0);
2787 return;
2788 }
2789
2790 // check return value. If no prefix parameter provided,
2791 // return value will be false, then return error info.
2792 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
2793 reply_command(op, -EINVAL, "command prefix not found", 0);
2794 return;
2795 }
2796
2797 // check prefix is empty
2798 if (prefix.empty()) {
2799 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
2800 return;
2801 }
2802
2803 if (prefix == "get_command_descriptions") {
2804 bufferlist rdata;
2805 Formatter *f = Formatter::create("json");
2806 // hide mgr commands until luminous upgrade is complete
2807 bool hide_mgr_flag =
2808 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
2809 format_command_descriptions(leader_supported_mon_commands,
2810 leader_supported_mon_commands_size, f, &rdata,
2811 hide_mgr_flag);
2812 delete f;
2813 reply_command(op, 0, "", rdata, 0);
2814 return;
2815 }
2816
2817 string module;
2818 string err;
2819
2820 dout(0) << "handle_command " << *m << dendl;
2821
2822 string format;
2823 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
2824 boost::scoped_ptr<Formatter> f(Formatter::create(format));
2825
2826 get_str_vec(prefix, fullcmd);
2827
2828 // make sure fullcmd is not empty.
2829 // invalid prefix will cause empty vector fullcmd.
2830 // such as, prefix=";,,;"
2831 if (fullcmd.empty()) {
2832 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
2833 return;
2834 }
2835
2836 module = fullcmd[0];
2837
2838 // validate command is in leader map
2839
2840 const MonCommand *leader_cmd;
2841 leader_cmd = _get_moncommand(prefix,
2842 // the boost underlying this isn't const for some reason
2843 const_cast<MonCommand*>(leader_supported_mon_commands),
2844 leader_supported_mon_commands_size);
2845 if (!leader_cmd) {
2846 reply_command(op, -EINVAL, "command not known", 0);
2847 return;
2848 }
2849 // validate command is in our map & matches, or forward if it is allowed
2850 const MonCommand *mon_cmd = _get_moncommand(prefix, mon_commands,
2851 ARRAY_SIZE(mon_commands));
2852 if (!is_leader()) {
2853 if (!mon_cmd) {
2854 if (leader_cmd->is_noforward()) {
2855 reply_command(op, -EINVAL,
2856 "command not locally supported and not allowed to forward",
2857 0);
2858 return;
2859 }
2860 dout(10) << "Command not locally supported, forwarding request "
2861 << m << dendl;
2862 forward_request_leader(op);
2863 return;
2864 } else if (!mon_cmd->is_compat(leader_cmd)) {
2865 if (mon_cmd->is_noforward()) {
2866 reply_command(op, -EINVAL,
2867 "command not compatible with leader and not allowed to forward",
2868 0);
2869 return;
2870 }
2871 dout(10) << "Command not compatible with leader, forwarding request "
2872 << m << dendl;
2873 forward_request_leader(op);
2874 return;
2875 }
2876 }
2877
2878 if (mon_cmd->is_obsolete() ||
2879 (cct->_conf->mon_debug_deprecated_as_obsolete
2880 && mon_cmd->is_deprecated())) {
2881 reply_command(op, -ENOTSUP,
2882 "command is obsolete; please check usage and/or man page",
2883 0);
2884 return;
2885 }
2886
2887 if (session->proxy_con && mon_cmd->is_noforward()) {
2888 dout(10) << "Got forward for noforward command " << m << dendl;
2889 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
2890 return;
2891 }
2892
2893 /* what we perceive as being the service the command falls under */
2894 string service(mon_cmd->module);
2895
2896 dout(25) << __func__ << " prefix='" << prefix
2897 << "' module='" << module
2898 << "' service='" << service << "'" << dendl;
2899
2900 bool cmd_is_rw =
2901 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
2902
2903 // validate user's permissions for requested command
2904 map<string,string> param_str_map;
2905 _generate_command_map(cmdmap, param_str_map);
2906 if (!_allowed_command(session, service, prefix, cmdmap,
2907 param_str_map, mon_cmd)) {
2908 dout(1) << __func__ << " access denied" << dendl;
2909 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
2910 << "from='" << session->inst << "' "
2911 << "entity='" << session->entity_name << "' "
2912 << "cmd=" << m->cmd << ": access denied";
2913 reply_command(op, -EACCES, "access denied", 0);
2914 return;
2915 }
2916
2917 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
2918 << "from='" << session->inst << "' "
2919 << "entity='" << session->entity_name << "' "
2920 << "cmd=" << m->cmd << ": dispatch";
2921
2922 if (mon_cmd->is_mgr() &&
2923 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2924 const auto& hdr = m->get_header();
2925 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
2926 uint64_t max =
2927 g_conf->mon_client_bytes * g_conf->mon_mgr_proxy_client_bytes_ratio;
2928 if (mgr_proxy_bytes + size > max) {
2929 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
2930 << " + " << size << " > max " << max << dendl;
2931 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
2932 return;
2933 }
2934 mgr_proxy_bytes += size;
2935 dout(10) << __func__ << " proxying mgr command (+" << size
2936 << " -> " << mgr_proxy_bytes << ")" << dendl;
2937 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
2938 mgr_client.start_command(m->cmd,
2939 m->get_data(),
2940 &fin->outbl,
2941 &fin->outs,
2942 new C_OnFinisher(fin, &finisher));
2943 return;
2944 }
2945
2946 if (module == "mds" || module == "fs") {
2947 mdsmon()->dispatch(op);
2948 return;
2949 }
2950 if ((module == "osd" || prefix == "pg map") &&
2951 prefix != "osd last-stat-seq") {
2952 osdmon()->dispatch(op);
2953 return;
2954 }
2955
2956 if (module == "pg") {
2957 pgmon()->dispatch(op);
2958 return;
2959 }
2960 if (module == "mon" &&
2961 /* Let the Monitor class handle the following commands:
2962 * 'mon compact'
2963 * 'mon scrub'
2964 * 'mon sync force'
2965 */
2966 prefix != "mon compact" &&
2967 prefix != "mon scrub" &&
2968 prefix != "mon sync force" &&
2969 prefix != "mon metadata" &&
2970 prefix != "mon versions" &&
2971 prefix != "mon count-metadata") {
2972 monmon()->dispatch(op);
2973 return;
2974 }
2975 if (module == "auth") {
2976 authmon()->dispatch(op);
2977 return;
2978 }
2979 if (module == "log") {
2980 logmon()->dispatch(op);
2981 return;
2982 }
2983
2984 if (module == "config-key") {
2985 config_key_service->dispatch(op);
2986 return;
2987 }
2988
2989 if (module == "mgr") {
2990 mgrmon()->dispatch(op);
2991 return;
2992 }
2993
2994 if (prefix == "fsid") {
2995 if (f) {
2996 f->open_object_section("fsid");
2997 f->dump_stream("fsid") << monmap->fsid;
2998 f->close_section();
2999 f->flush(rdata);
3000 } else {
3001 ds << monmap->fsid;
3002 rdata.append(ds);
3003 }
3004 reply_command(op, 0, "", rdata, 0);
3005 return;
3006 }
3007
3008 if (prefix == "scrub" || prefix == "mon scrub") {
3009 wait_for_paxos_write();
3010 if (is_leader()) {
3011 int r = scrub_start();
3012 reply_command(op, r, "", rdata, 0);
3013 } else if (is_peon()) {
3014 forward_request_leader(op);
3015 } else {
3016 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3017 }
3018 return;
3019 }
3020
3021 if (prefix == "compact" || prefix == "mon compact") {
3022 dout(1) << "triggering manual compaction" << dendl;
3023 utime_t start = ceph_clock_now();
3024 store->compact();
3025 utime_t end = ceph_clock_now();
3026 end -= start;
3027 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3028 ostringstream oss;
3029 oss << "compacted leveldb in " << end;
3030 rs = oss.str();
3031 r = 0;
3032 }
3033 else if (prefix == "injectargs") {
3034 vector<string> injected_args;
3035 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3036 if (!injected_args.empty()) {
3037 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3038 ostringstream oss;
3039 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3040 ss << "injectargs:" << oss.str();
3041 rs = ss.str();
3042 goto out;
3043 } else {
3044 rs = "must supply options to be parsed in a single string";
3045 r = -EINVAL;
3046 }
3047 } else if (prefix == "status" ||
3048 prefix == "health" ||
3049 prefix == "df") {
3050 string detail;
3051 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3052
3053 if (prefix == "status") {
3054 // get_cluster_status handles f == NULL
3055 get_cluster_status(ds, f.get());
3056
3057 if (f) {
3058 f->flush(ds);
3059 ds << '\n';
3060 }
3061 rdata.append(ds);
3062 } else if (prefix == "health") {
3063 list<string> health_str;
3064 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3065 if (f) {
3066 f->flush(ds);
3067 ds << '\n';
3068 } else {
3069 assert(!health_str.empty());
3070 ds << health_str.front();
3071 health_str.pop_front();
3072 if (!health_str.empty()) {
3073 ds << ' ';
3074 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3075 }
3076 }
3077 bufferlist comb;
3078 comb.append(ds);
3079 if (detail == "detail")
3080 comb.append(rdata);
3081 rdata = comb;
3082 } else if (prefix == "df") {
3083 bool verbose = (detail == "detail");
3084 if (f)
3085 f->open_object_section("stats");
3086
3087 pgservice->dump_fs_stats(&ds, f.get(), verbose);
3088 if (!f)
3089 ds << '\n';
3090 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3091
3092 if (f) {
3093 f->close_section();
3094 f->flush(ds);
3095 ds << '\n';
3096 }
3097 } else {
3098 assert(0 == "We should never get here!");
3099 return;
3100 }
3101 rdata.append(ds);
3102 rs = "";
3103 r = 0;
3104 } else if (prefix == "report") {
3105
3106 // this must be formatted, in its current form
3107 if (!f)
3108 f.reset(Formatter::create("json-pretty"));
3109 f->open_object_section("report");
3110 f->dump_stream("cluster_fingerprint") << fingerprint;
3111 f->dump_string("version", ceph_version_to_str());
3112 f->dump_string("commit", git_version_to_str());
3113 f->dump_stream("timestamp") << ceph_clock_now();
3114
3115 vector<string> tagsvec;
3116 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3117 string tagstr = str_join(tagsvec, " ");
3118 if (!tagstr.empty())
3119 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3120 f->dump_string("tag", tagstr);
3121
3122 list<string> hs;
3123 get_health(hs, NULL, f.get());
3124
3125 monmon()->dump_info(f.get());
3126 osdmon()->dump_info(f.get());
3127 mdsmon()->dump_info(f.get());
3128 authmon()->dump_info(f.get());
3129 pgservice->dump_info(f.get());
3130
3131 paxos->dump_info(f.get());
3132
3133 f->close_section();
3134 f->flush(rdata);
3135
3136 ostringstream ss2;
3137 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3138 rs = ss2.str();
3139 r = 0;
3140 } else if (prefix == "osd last-stat-seq") {
3141 int64_t osd;
3142 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3143 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3144 if (f) {
3145 f->dump_unsigned("seq", seq);
3146 f->flush(ds);
3147 } else {
3148 ds << seq;
3149 rdata.append(ds);
3150 }
3151 rs = "";
3152 r = 0;
3153 } else if (prefix == "node ls") {
3154 string node_type("all");
3155 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3156 if (!f)
3157 f.reset(Formatter::create("json-pretty"));
3158 if (node_type == "all") {
3159 f->open_object_section("nodes");
3160 print_nodes(f.get(), ds);
3161 osdmon()->print_nodes(f.get());
3162 mdsmon()->print_nodes(f.get());
3163 f->close_section();
3164 } else if (node_type == "mon") {
3165 print_nodes(f.get(), ds);
3166 } else if (node_type == "osd") {
3167 osdmon()->print_nodes(f.get());
3168 } else if (node_type == "mds") {
3169 mdsmon()->print_nodes(f.get());
3170 }
3171 f->flush(ds);
3172 rdata.append(ds);
3173 rs = "";
3174 r = 0;
3175 } else if (prefix == "features") {
3176 if (!is_leader() && !is_peon()) {
3177 dout(10) << " waiting for quorum" << dendl;
3178 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3179 return;
3180 }
3181 if (!is_leader()) {
3182 forward_request_leader(op);
3183 return;
3184 }
3185 if (!f)
3186 f.reset(Formatter::create("json-pretty"));
3187 FeatureMap fm;
3188 get_combined_feature_map(&fm);
3189 f->dump_object("features", fm);
3190 f->flush(rdata);
3191 rs = "";
3192 r = 0;
3193 } else if (prefix == "mon metadata") {
3194 if (!f)
3195 f.reset(Formatter::create("json-pretty"));
3196
3197 string name;
3198 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3199 if (!all) {
3200 // Dump a single mon's metadata
3201 int mon = monmap->get_rank(name);
3202 if (mon < 0) {
3203 rs = "requested mon not found";
3204 r = -ENOENT;
3205 goto out;
3206 }
3207 f->open_object_section("mon_metadata");
3208 r = get_mon_metadata(mon, f.get(), ds);
3209 f->close_section();
3210 } else {
3211 // Dump all mons' metadata
3212 r = 0;
3213 f->open_array_section("mon_metadata");
3214 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3215 std::ostringstream get_err;
3216 f->open_object_section("mon");
3217 f->dump_string("name", monmap->get_name(rank));
3218 r = get_mon_metadata(rank, f.get(), get_err);
3219 f->close_section();
3220 if (r == -ENOENT || r == -EINVAL) {
3221 dout(1) << get_err.str() << dendl;
3222 // Drop error, list what metadata we do have
3223 r = 0;
3224 } else if (r != 0) {
3225 derr << "Unexpected error from get_mon_metadata: "
3226 << cpp_strerror(r) << dendl;
3227 ds << get_err.str();
3228 break;
3229 }
3230 }
3231 f->close_section();
3232 }
3233
3234 f->flush(ds);
3235 rdata.append(ds);
3236 rs = "";
3237 } else if (prefix == "mon versions") {
3238 if (!f)
3239 f.reset(Formatter::create("json-pretty"));
3240 count_metadata("ceph_version", f.get());
3241 f->flush(ds);
3242 rdata.append(ds);
3243 rs = "";
3244 r = 0;
3245 } else if (prefix == "mon count-metadata") {
3246 if (!f)
3247 f.reset(Formatter::create("json-pretty"));
3248 string field;
3249 cmd_getval(g_ceph_context, cmdmap, "property", field);
3250 count_metadata(field, f.get());
3251 f->flush(ds);
3252 rdata.append(ds);
3253 rs = "";
3254 r = 0;
3255 } else if (prefix == "quorum_status") {
3256 // make sure our map is readable and up to date
3257 if (!is_leader() && !is_peon()) {
3258 dout(10) << " waiting for quorum" << dendl;
3259 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3260 return;
3261 }
3262 _quorum_status(f.get(), ds);
3263 rdata.append(ds);
3264 rs = "";
3265 r = 0;
3266 } else if (prefix == "mon_status") {
3267 get_mon_status(f.get(), ds);
3268 if (f)
3269 f->flush(ds);
3270 rdata.append(ds);
3271 rs = "";
3272 r = 0;
3273 } else if (prefix == "sync force" ||
3274 prefix == "mon sync force") {
3275 string validate1, validate2;
3276 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3277 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3278 if (validate1 != "--yes-i-really-mean-it" ||
3279 validate2 != "--i-know-what-i-am-doing") {
3280 r = -EINVAL;
3281 rs = "are you SURE? this will mean the monitor store will be "
3282 "erased. pass '--yes-i-really-mean-it "
3283 "--i-know-what-i-am-doing' if you really do.";
3284 goto out;
3285 }
3286 sync_force(f.get(), ds);
3287 rs = ds.str();
3288 r = 0;
3289 } else if (prefix == "heap") {
3290 if (!ceph_using_tcmalloc())
3291 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3292 else {
3293 string heapcmd;
3294 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3295 // XXX 1-element vector, change at callee or make vector here?
3296 vector<string> heapcmd_vec;
3297 get_str_vec(heapcmd, heapcmd_vec);
3298 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3299 rdata.append(ds);
3300 rs = "";
3301 r = 0;
3302 }
3303 } else if (prefix == "quorum") {
3304 string quorumcmd;
3305 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3306 if (quorumcmd == "exit") {
3307 start_election();
3308 elector.stop_participating();
3309 rs = "stopped responding to quorum, initiated new election";
3310 r = 0;
3311 } else if (quorumcmd == "enter") {
3312 elector.start_participating();
3313 start_election();
3314 rs = "started responding to quorum, initiated new election";
3315 r = 0;
3316 } else {
3317 rs = "needs a valid 'quorum' command";
3318 r = -EINVAL;
3319 }
3320 } else if (prefix == "version") {
3321 if (f) {
3322 f->open_object_section("version");
3323 f->dump_string("version", pretty_version_to_str());
3324 f->close_section();
3325 f->flush(ds);
3326 } else {
3327 ds << pretty_version_to_str();
3328 }
3329 rdata.append(ds);
3330 rs = "";
3331 r = 0;
3332 }
3333
3334 out:
3335 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3336 reply_command(op, r, rs, rdata, 0);
3337 }
3338
3339 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3340 {
3341 bufferlist rdata;
3342 reply_command(op, rc, rs, rdata, version);
3343 }
3344
3345 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3346 bufferlist& rdata, version_t version)
3347 {
3348 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3349 assert(m->get_type() == MSG_MON_COMMAND);
3350 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3351 reply->set_tid(m->get_tid());
3352 reply->set_data(rdata);
3353 send_reply(op, reply);
3354 }
3355
3356
3357 // ------------------------
3358 // request/reply routing
3359 //
3360 // a client/mds/osd will connect to a random monitor. we need to forward any
3361 // messages requiring state updates to the leader, and then route any replies
3362 // back via the correct monitor and back to them. (the monitor will not
3363 // initiate any connections.)
3364
3365 void Monitor::forward_request_leader(MonOpRequestRef op)
3366 {
3367 op->mark_event(__func__);
3368
3369 int mon = get_leader();
3370 MonSession *session = op->get_session();
3371 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3372
3373 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3374 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3375 } else if (session->proxy_con) {
3376 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3377 } else if (!session->closed) {
3378 RoutedRequest *rr = new RoutedRequest;
3379 rr->tid = ++routed_request_tid;
3380 rr->client_inst = req->get_source_inst();
3381 rr->con = req->get_connection();
3382 rr->con_features = rr->con->get_features();
3383 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3384 rr->session = static_cast<MonSession *>(session->get());
3385 rr->op = op;
3386 routed_requests[rr->tid] = rr;
3387 session->routed_request_tids.insert(rr->tid);
3388
3389 dout(10) << "forward_request " << rr->tid << " request " << *req
3390 << " features " << rr->con_features << dendl;
3391
3392 MForward *forward = new MForward(rr->tid,
3393 req,
3394 rr->con_features,
3395 rr->session->caps);
3396 forward->set_priority(req->get_priority());
3397 if (session->auth_handler) {
3398 forward->entity_name = session->entity_name;
3399 } else if (req->get_source().is_mon()) {
3400 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3401 }
3402 messenger->send_message(forward, monmap->get_inst(mon));
3403 op->mark_forwarded();
3404 assert(op->get_req()->get_type() != 0);
3405 } else {
3406 dout(10) << "forward_request no session for request " << *req << dendl;
3407 }
3408 }
3409
3410 // fake connection attached to forwarded messages
3411 struct AnonConnection : public Connection {
3412 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3413
3414 int send_message(Message *m) override {
3415 assert(!"send_message on anonymous connection");
3416 }
3417 void send_keepalive() override {
3418 assert(!"send_keepalive on anonymous connection");
3419 }
3420 void mark_down() override {
3421 // silently ignore
3422 }
3423 void mark_disposable() override {
3424 // silengtly ignore
3425 }
3426 bool is_connected() override { return false; }
3427 };
3428
3429 //extract the original message and put it into the regular dispatch function
3430 void Monitor::handle_forward(MonOpRequestRef op)
3431 {
3432 MForward *m = static_cast<MForward*>(op->get_req());
3433 dout(10) << "received forwarded message from " << m->client
3434 << " via " << m->get_source_inst() << dendl;
3435 MonSession *session = op->get_session();
3436 assert(session);
3437
3438 if (!session->is_capable("mon", MON_CAP_X)) {
3439 dout(0) << "forward from entity with insufficient caps! "
3440 << session->caps << dendl;
3441 } else {
3442 // see PaxosService::dispatch(); we rely on this being anon
3443 // (c->msgr == NULL)
3444 PaxosServiceMessage *req = m->claim_message();
3445 assert(req != NULL);
3446
3447 ConnectionRef c(new AnonConnection(cct));
3448 MonSession *s = new MonSession(req->get_source_inst(),
3449 static_cast<Connection*>(c.get()));
3450 c->set_priv(s->get());
3451 c->set_peer_addr(m->client.addr);
3452 c->set_peer_type(m->client.name.type());
3453 c->set_features(m->con_features);
3454
3455 s->caps = m->client_caps;
3456 dout(10) << " caps are " << s->caps << dendl;
3457 s->entity_name = m->entity_name;
3458 dout(10) << " entity name '" << s->entity_name << "' type "
3459 << s->entity_name.get_type() << dendl;
3460 s->proxy_con = m->get_connection();
3461 s->proxy_tid = m->tid;
3462
3463 req->set_connection(c);
3464
3465 // not super accurate, but better than nothing.
3466 req->set_recv_stamp(m->get_recv_stamp());
3467
3468 /*
3469 * note which election epoch this is; we will drop the message if
3470 * there is a future election since our peers will resend routed
3471 * requests in that case.
3472 */
3473 req->rx_election_epoch = get_epoch();
3474
3475 /* Because this is a special fake connection, we need to break
3476 the ref loop between Connection and MonSession differently
3477 than we normally do. Here, the Message refers to the Connection
3478 which refers to the Session, and nobody else refers to the Connection
3479 or the Session. And due to the special nature of this message,
3480 nobody refers to the Connection via the Session. So, clear out that
3481 half of the ref loop.*/
3482 s->con.reset(NULL);
3483
3484 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3485
3486 _ms_dispatch(req);
3487 s->put();
3488 }
3489 }
3490
3491 void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3492 {
3493 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3494
3495 bufferlist bl;
3496 encode_message(m, quorum_con_features, bl);
3497
3498 messenger->send_message(m, to);
3499
3500 for (int i=0; i<(int)monmap->size(); i++) {
3501 if (i != rank)
3502 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3503 }
3504 }
3505
3506 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3507 {
3508 op->mark_event(__func__);
3509
3510 MonSession *session = op->get_session();
3511 assert(session);
3512 Message *req = op->get_req();
3513 ConnectionRef con = op->get_connection();
3514
3515 reply->set_cct(g_ceph_context);
3516 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3517
3518 if (!con) {
3519 dout(2) << "send_reply no connection, dropping reply " << *reply
3520 << " to " << req << " " << *req << dendl;
3521 reply->put();
3522 op->mark_event("reply: no connection");
3523 return;
3524 }
3525
3526 if (!session->con && !session->proxy_con) {
3527 dout(2) << "send_reply no connection, dropping reply " << *reply
3528 << " to " << req << " " << *req << dendl;
3529 reply->put();
3530 op->mark_event("reply: no connection");
3531 return;
3532 }
3533
3534 if (session->proxy_con) {
3535 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3536 << " via " << session->proxy_con->get_peer_addr()
3537 << " for request " << *req << dendl;
3538 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3539 op->mark_event("reply: send routed request");
3540 } else {
3541 session->con->send_message(reply);
3542 op->mark_event("reply: send");
3543 }
3544 }
3545
3546 void Monitor::no_reply(MonOpRequestRef op)
3547 {
3548 MonSession *session = op->get_session();
3549 Message *req = op->get_req();
3550
3551 if (session->proxy_con) {
3552 dout(10) << "no_reply to " << req->get_source_inst()
3553 << " via " << session->proxy_con->get_peer_addr()
3554 << " for request " << *req << dendl;
3555 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3556 op->mark_event("no_reply: send routed request");
3557 } else {
3558 dout(10) << "no_reply to " << req->get_source_inst()
3559 << " " << *req << dendl;
3560 op->mark_event("no_reply");
3561 }
3562 }
3563
3564 void Monitor::handle_route(MonOpRequestRef op)
3565 {
3566 MRoute *m = static_cast<MRoute*>(op->get_req());
3567 MonSession *session = op->get_session();
3568 //check privileges
3569 if (!session->is_capable("mon", MON_CAP_X)) {
3570 dout(0) << "MRoute received from entity without appropriate perms! "
3571 << dendl;
3572 return;
3573 }
3574 if (m->msg)
3575 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3576 else
3577 dout(10) << "handle_route null to " << m->dest << dendl;
3578
3579 // look it up
3580 if (m->session_mon_tid) {
3581 if (routed_requests.count(m->session_mon_tid)) {
3582 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3583
3584 // reset payload, in case encoding is dependent on target features
3585 if (m->msg) {
3586 m->msg->clear_payload();
3587 rr->con->send_message(m->msg);
3588 m->msg = NULL;
3589 }
3590 if (m->send_osdmap_first) {
3591 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3592 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3593 true, MonOpRequestRef());
3594 }
3595 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3596 routed_requests.erase(m->session_mon_tid);
3597 rr->session->routed_request_tids.erase(m->session_mon_tid);
3598 delete rr;
3599 } else {
3600 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3601 }
3602 } else {
3603 dout(10) << " not a routed request, trying to send anyway" << dendl;
3604 if (m->msg) {
3605 messenger->send_message(m->msg, m->dest);
3606 m->msg = NULL;
3607 }
3608 }
3609 }
3610
3611 void Monitor::resend_routed_requests()
3612 {
3613 dout(10) << "resend_routed_requests" << dendl;
3614 int mon = get_leader();
3615 list<Context*> retry;
3616 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
3617 p != routed_requests.end();
3618 ++p) {
3619 RoutedRequest *rr = p->second;
3620
3621 if (mon == rank) {
3622 dout(10) << " requeue for self tid " << rr->tid << dendl;
3623 rr->op->mark_event("retry routed request");
3624 retry.push_back(new C_RetryMessage(this, rr->op));
3625 if (rr->session) {
3626 assert(rr->session->routed_request_tids.count(p->first));
3627 rr->session->routed_request_tids.erase(p->first);
3628 }
3629 delete rr;
3630 } else {
3631 bufferlist::iterator q = rr->request_bl.begin();
3632 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
3633 rr->op->mark_event("resend forwarded message to leader");
3634 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
3635 MForward *forward = new MForward(rr->tid, req, rr->con_features,
3636 rr->session->caps);
3637 req->put(); // forward takes its own ref; drop ours.
3638 forward->client = rr->client_inst;
3639 forward->set_priority(req->get_priority());
3640 messenger->send_message(forward, monmap->get_inst(mon));
3641 }
3642 }
3643 if (mon == rank) {
3644 routed_requests.clear();
3645 finish_contexts(g_ceph_context, retry);
3646 }
3647 }
3648
3649 void Monitor::remove_session(MonSession *s)
3650 {
3651 dout(10) << "remove_session " << s << " " << s->inst << dendl;
3652 assert(s->con);
3653 assert(!s->closed);
3654 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
3655 p != s->routed_request_tids.end();
3656 ++p) {
3657 assert(routed_requests.count(*p));
3658 RoutedRequest *rr = routed_requests[*p];
3659 dout(10) << " dropping routed request " << rr->tid << dendl;
3660 delete rr;
3661 routed_requests.erase(*p);
3662 }
3663 s->routed_request_tids.clear();
3664 s->con->set_priv(NULL);
3665 session_map.remove_session(s);
3666 logger->set(l_mon_num_sessions, session_map.get_size());
3667 logger->inc(l_mon_session_rm);
3668 }
3669
3670 void Monitor::remove_all_sessions()
3671 {
3672 Mutex::Locker l(session_map_lock);
3673 while (!session_map.sessions.empty()) {
3674 MonSession *s = session_map.sessions.front();
3675 remove_session(s);
3676 if (logger)
3677 logger->inc(l_mon_session_rm);
3678 }
3679 if (logger)
3680 logger->set(l_mon_num_sessions, session_map.get_size());
3681 }
3682
3683 void Monitor::send_command(const entity_inst_t& inst,
3684 const vector<string>& com)
3685 {
3686 dout(10) << "send_command " << inst << "" << com << dendl;
3687 MMonCommand *c = new MMonCommand(monmap->fsid);
3688 c->cmd = com;
3689 try_send_message(c, inst);
3690 }
3691
3692 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
3693 {
3694 /**
3695 * Wait list the new session until we're in the quorum, assuming it's
3696 * sufficiently new.
3697 * tick() will periodically send them back through so we can send
3698 * the client elsewhere if we don't think we're getting back in.
3699 *
3700 * But we whitelist a few sorts of messages:
3701 * 1) Monitors can talk to us at any time, of course.
3702 * 2) auth messages. It's unlikely to go through much faster, but
3703 * it's possible we've just lost our quorum status and we want to take...
3704 * 3) command messages. We want to accept these under all possible
3705 * circumstances.
3706 */
3707 Message *m = op->get_req();
3708 MonSession *s = op->get_session();
3709 ConnectionRef con = op->get_connection();
3710 utime_t too_old = ceph_clock_now();
3711 too_old -= g_ceph_context->_conf->mon_lease;
3712 if (m->get_recv_stamp() > too_old &&
3713 con->is_connected()) {
3714 dout(5) << "waitlisting message " << *m << dendl;
3715 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
3716 op->mark_wait_for_quorum();
3717 } else {
3718 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
3719 con->mark_down();
3720 // proxied sessions aren't registered and don't have a con; don't remove
3721 // those.
3722 if (!s->proxy_con) {
3723 Mutex::Locker l(session_map_lock);
3724 remove_session(s);
3725 }
3726 op->mark_zap();
3727 }
3728 }
3729
3730 void Monitor::_ms_dispatch(Message *m)
3731 {
3732 if (is_shutdown()) {
3733 m->put();
3734 return;
3735 }
3736
3737 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
3738 bool src_is_mon = op->is_src_mon();
3739 op->mark_event("mon:_ms_dispatch");
3740 MonSession *s = op->get_session();
3741 if (s && s->closed) {
3742 return;
3743 }
3744 if (!s) {
3745 // if the sender is not a monitor, make sure their first message for a
3746 // session is an MAuth. If it is not, assume it's a stray message,
3747 // and considering that we are creating a new session it is safe to
3748 // assume that the sender hasn't authenticated yet, so we have no way
3749 // of assessing whether we should handle it or not.
3750 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
3751 m->get_type() != CEPH_MSG_MON_GET_MAP &&
3752 m->get_type() != CEPH_MSG_PING)) {
3753 dout(1) << __func__ << " dropping stray message " << *m
3754 << " from " << m->get_source_inst() << dendl;
3755 return;
3756 }
3757
3758 ConnectionRef con = m->get_connection();
3759 {
3760 Mutex::Locker l(session_map_lock);
3761 s = session_map.new_session(m->get_source_inst(), con.get());
3762 }
3763 assert(s);
3764 con->set_priv(s->get());
3765 dout(10) << __func__ << " new session " << s << " " << *s << dendl;
3766 op->set_session(s);
3767
3768 logger->set(l_mon_num_sessions, session_map.get_size());
3769 logger->inc(l_mon_session_add);
3770
3771 if (src_is_mon) {
3772 // give it monitor caps; the peer type has been authenticated
3773 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
3774 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
3775 s->caps = *mon_caps;
3776 }
3777 s->put();
3778 } else {
3779 dout(20) << __func__ << " existing session " << s << " for " << s->inst
3780 << dendl;
3781 }
3782
3783 assert(s);
3784
3785 s->session_timeout = ceph_clock_now();
3786 s->session_timeout += g_conf->mon_session_timeout;
3787
3788 if (s->auth_handler) {
3789 s->entity_name = s->auth_handler->get_entity_name();
3790 }
3791 dout(20) << " caps " << s->caps.get_str() << dendl;
3792
3793 if ((is_synchronizing() ||
3794 (s->global_id == 0 && !exited_quorum.is_zero())) &&
3795 !src_is_mon &&
3796 m->get_type() != CEPH_MSG_PING) {
3797 waitlist_or_zap_client(op);
3798 } else {
3799 dispatch_op(op);
3800 }
3801 return;
3802 }
3803
3804 void Monitor::dispatch_op(MonOpRequestRef op)
3805 {
3806 op->mark_event("mon:dispatch_op");
3807 MonSession *s = op->get_session();
3808 assert(s);
3809 if (s->closed) {
3810 dout(10) << " session closed, dropping " << op->get_req() << dendl;
3811 return;
3812 }
3813
3814 /* we will consider the default type as being 'monitor' until proven wrong */
3815 op->set_type_monitor();
3816 /* deal with all messages that do not necessarily need caps */
3817 bool dealt_with = true;
3818 switch (op->get_req()->get_type()) {
3819 // auth
3820 case MSG_MON_GLOBAL_ID:
3821 case CEPH_MSG_AUTH:
3822 op->set_type_service();
3823 /* no need to check caps here */
3824 paxos_service[PAXOS_AUTH]->dispatch(op);
3825 break;
3826
3827 case CEPH_MSG_PING:
3828 handle_ping(op);
3829 break;
3830
3831 /* MMonGetMap may be used by clients to obtain a monmap *before*
3832 * authenticating with the monitor. We need to handle these without
3833 * checking caps because, even on a cluster without cephx, we only set
3834 * session caps *after* the auth handshake. A good example of this
3835 * is when a client calls MonClient::get_monmap_privately(), which does
3836 * not authenticate when obtaining a monmap.
3837 */
3838 case CEPH_MSG_MON_GET_MAP:
3839 handle_mon_get_map(op);
3840 break;
3841
3842 case CEPH_MSG_MON_METADATA:
3843 return handle_mon_metadata(op);
3844
3845 default:
3846 dealt_with = false;
3847 break;
3848 }
3849 if (dealt_with)
3850 return;
3851
3852 /* well, maybe the op belongs to a service... */
3853 op->set_type_service();
3854 /* deal with all messages which caps should be checked somewhere else */
3855 dealt_with = true;
3856 switch (op->get_req()->get_type()) {
3857
3858 // OSDs
3859 case CEPH_MSG_MON_GET_OSDMAP:
3860 case CEPH_MSG_POOLOP:
3861 case MSG_OSD_BEACON:
3862 case MSG_OSD_MARK_ME_DOWN:
3863 case MSG_OSD_FULL:
3864 case MSG_OSD_FAILURE:
3865 case MSG_OSD_BOOT:
3866 case MSG_OSD_ALIVE:
3867 case MSG_OSD_PGTEMP:
3868 case MSG_OSD_PG_CREATED:
3869 case MSG_REMOVE_SNAPS:
3870 paxos_service[PAXOS_OSDMAP]->dispatch(op);
3871 break;
3872
3873 // MDSs
3874 case MSG_MDS_BEACON:
3875 case MSG_MDS_OFFLOAD_TARGETS:
3876 paxos_service[PAXOS_MDSMAP]->dispatch(op);
3877 break;
3878
3879 // Mgrs
3880 case MSG_MGR_BEACON:
3881 paxos_service[PAXOS_MGR]->dispatch(op);
3882 break;
3883
3884 // MgrStat
3885 case MSG_MON_MGR_REPORT:
3886 case CEPH_MSG_STATFS:
3887 case MSG_GETPOOLSTATS:
3888 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
3889 break;
3890
3891 // pg
3892 case MSG_PGSTATS:
3893 paxos_service[PAXOS_PGMAP]->dispatch(op);
3894 break;
3895
3896 // log
3897 case MSG_LOG:
3898 paxos_service[PAXOS_LOG]->dispatch(op);
3899 break;
3900
3901 // handle_command() does its own caps checking
3902 case MSG_MON_COMMAND:
3903 op->set_type_command();
3904 handle_command(op);
3905 break;
3906
3907 default:
3908 dealt_with = false;
3909 break;
3910 }
3911 if (dealt_with)
3912 return;
3913
3914 /* nop, looks like it's not a service message; revert back to monitor */
3915 op->set_type_monitor();
3916
3917 /* messages we, the Monitor class, need to deal with
3918 * but may be sent by clients. */
3919
3920 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
3921 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
3922 << " not enough caps for " << *(op->get_req()) << " -- dropping"
3923 << dendl;
3924 goto drop;
3925 }
3926
3927 dealt_with = true;
3928 switch (op->get_req()->get_type()) {
3929
3930 // misc
3931 case CEPH_MSG_MON_GET_VERSION:
3932 handle_get_version(op);
3933 break;
3934
3935 case CEPH_MSG_MON_SUBSCRIBE:
3936 /* FIXME: check what's being subscribed, filter accordingly */
3937 handle_subscribe(op);
3938 break;
3939
3940 default:
3941 dealt_with = false;
3942 break;
3943 }
3944 if (dealt_with)
3945 return;
3946
3947 if (!op->is_src_mon()) {
3948 dout(1) << __func__ << " unexpected monitor message from"
3949 << " non-monitor entity " << op->get_req()->get_source_inst()
3950 << " " << *(op->get_req()) << " -- dropping" << dendl;
3951 goto drop;
3952 }
3953
3954 /* messages that should only be sent by another monitor */
3955 dealt_with = true;
3956 switch (op->get_req()->get_type()) {
3957
3958 case MSG_ROUTE:
3959 handle_route(op);
3960 break;
3961
3962 case MSG_MON_PROBE:
3963 handle_probe(op);
3964 break;
3965
3966 // Sync (i.e., the new slurp, but on steroids)
3967 case MSG_MON_SYNC:
3968 handle_sync(op);
3969 break;
3970 case MSG_MON_SCRUB:
3971 handle_scrub(op);
3972 break;
3973
3974 /* log acks are sent from a monitor we sent the MLog to, and are
3975 never sent by clients to us. */
3976 case MSG_LOGACK:
3977 log_client.handle_log_ack((MLogAck*)op->get_req());
3978 break;
3979
3980 // monmap
3981 case MSG_MON_JOIN:
3982 op->set_type_service();
3983 paxos_service[PAXOS_MONMAP]->dispatch(op);
3984 break;
3985
3986 // paxos
3987 case MSG_MON_PAXOS:
3988 {
3989 op->set_type_paxos();
3990 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
3991 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
3992 //can't send these!
3993 break;
3994 }
3995
3996 if (state == STATE_SYNCHRONIZING) {
3997 // we are synchronizing. These messages would do us no
3998 // good, thus just drop them and ignore them.
3999 dout(10) << __func__ << " ignore paxos msg from "
4000 << pm->get_source_inst() << dendl;
4001 break;
4002 }
4003
4004 // sanitize
4005 if (pm->epoch > get_epoch()) {
4006 bootstrap();
4007 break;
4008 }
4009 if (pm->epoch != get_epoch()) {
4010 break;
4011 }
4012
4013 paxos->dispatch(op);
4014 }
4015 break;
4016
4017 // elector messages
4018 case MSG_MON_ELECTION:
4019 op->set_type_election();
4020 //check privileges here for simplicity
4021 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4022 dout(0) << "MMonElection received from entity without enough caps!"
4023 << op->get_session()->caps << dendl;
4024 break;
4025 }
4026 if (!is_probing() && !is_synchronizing()) {
4027 elector.dispatch(op);
4028 }
4029 break;
4030
4031 case MSG_FORWARD:
4032 handle_forward(op);
4033 break;
4034
4035 case MSG_TIMECHECK:
4036 handle_timecheck(op);
4037 break;
4038
4039 case MSG_MON_HEALTH:
4040 health_monitor->dispatch(op);
4041 break;
4042
4043 default:
4044 dealt_with = false;
4045 break;
4046 }
4047 if (!dealt_with) {
4048 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4049 goto drop;
4050 }
4051 return;
4052
4053 drop:
4054 return;
4055 }
4056
4057 void Monitor::handle_ping(MonOpRequestRef op)
4058 {
4059 MPing *m = static_cast<MPing*>(op->get_req());
4060 dout(10) << __func__ << " " << *m << dendl;
4061 MPing *reply = new MPing;
4062 entity_inst_t inst = m->get_source_inst();
4063 bufferlist payload;
4064 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4065 f->open_object_section("pong");
4066
4067 list<string> health_str;
4068 get_health(health_str, NULL, f.get());
4069 {
4070 stringstream ss;
4071 get_mon_status(f.get(), ss);
4072 }
4073
4074 f->close_section();
4075 stringstream ss;
4076 f->flush(ss);
4077 ::encode(ss.str(), payload);
4078 reply->set_payload(payload);
4079 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4080 messenger->send_message(reply, inst);
4081 }
4082
4083 void Monitor::timecheck_start()
4084 {
4085 dout(10) << __func__ << dendl;
4086 timecheck_cleanup();
4087 timecheck_start_round();
4088 }
4089
4090 void Monitor::timecheck_finish()
4091 {
4092 dout(10) << __func__ << dendl;
4093 timecheck_cleanup();
4094 }
4095
4096 void Monitor::timecheck_start_round()
4097 {
4098 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4099 assert(is_leader());
4100
4101 if (monmap->size() == 1) {
4102 assert(0 == "We are alone; this shouldn't have been scheduled!");
4103 return;
4104 }
4105
4106 if (timecheck_round % 2) {
4107 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4108 utime_t curr_time = ceph_clock_now();
4109 double max = g_conf->mon_timecheck_interval*3;
4110 if (curr_time - timecheck_round_start < max) {
4111 dout(10) << __func__ << " keep current round going" << dendl;
4112 goto out;
4113 } else {
4114 dout(10) << __func__
4115 << " finish current timecheck and start new" << dendl;
4116 timecheck_cancel_round();
4117 }
4118 }
4119
4120 assert(timecheck_round % 2 == 0);
4121 timecheck_acks = 0;
4122 timecheck_round ++;
4123 timecheck_round_start = ceph_clock_now();
4124 dout(10) << __func__ << " new " << timecheck_round << dendl;
4125
4126 timecheck();
4127 out:
4128 dout(10) << __func__ << " setting up next event" << dendl;
4129 timecheck_reset_event();
4130 }
4131
4132 void Monitor::timecheck_finish_round(bool success)
4133 {
4134 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4135 assert(timecheck_round % 2);
4136 timecheck_round ++;
4137 timecheck_round_start = utime_t();
4138
4139 if (success) {
4140 assert(timecheck_waiting.empty());
4141 assert(timecheck_acks == quorum.size());
4142 timecheck_report();
4143 timecheck_check_skews();
4144 return;
4145 }
4146
4147 dout(10) << __func__ << " " << timecheck_waiting.size()
4148 << " peers still waiting:";
4149 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4150 p != timecheck_waiting.end(); ++p) {
4151 *_dout << " " << p->first.name;
4152 }
4153 *_dout << dendl;
4154 timecheck_waiting.clear();
4155
4156 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4157 }
4158
4159 void Monitor::timecheck_cancel_round()
4160 {
4161 timecheck_finish_round(false);
4162 }
4163
4164 void Monitor::timecheck_cleanup()
4165 {
4166 timecheck_round = 0;
4167 timecheck_acks = 0;
4168 timecheck_round_start = utime_t();
4169
4170 if (timecheck_event) {
4171 timer.cancel_event(timecheck_event);
4172 timecheck_event = NULL;
4173 }
4174 timecheck_waiting.clear();
4175 timecheck_skews.clear();
4176 timecheck_latencies.clear();
4177
4178 timecheck_rounds_since_clean = 0;
4179 }
4180
4181 void Monitor::timecheck_reset_event()
4182 {
4183 if (timecheck_event) {
4184 timer.cancel_event(timecheck_event);
4185 timecheck_event = NULL;
4186 }
4187
4188 double delay =
4189 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4190
4191 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4192 delay = cct->_conf->mon_timecheck_interval;
4193 }
4194
4195 dout(10) << __func__ << " delay " << delay
4196 << " rounds_since_clean " << timecheck_rounds_since_clean
4197 << dendl;
4198
4199 timecheck_event = new C_MonContext(this, [this](int) {
4200 timecheck_start_round();
4201 });
4202 timer.add_event_after(delay, timecheck_event);
4203 }
4204
4205 void Monitor::timecheck_check_skews()
4206 {
4207 dout(10) << __func__ << dendl;
4208 assert(is_leader());
4209 assert((timecheck_round % 2) == 0);
4210 if (monmap->size() == 1) {
4211 assert(0 == "We are alone; we shouldn't have gotten here!");
4212 return;
4213 }
4214 assert(timecheck_latencies.size() == timecheck_skews.size());
4215
4216 bool found_skew = false;
4217 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4218 p != timecheck_skews.end(); ++p) {
4219
4220 double abs_skew;
4221 if (timecheck_has_skew(p->second, &abs_skew)) {
4222 dout(10) << __func__
4223 << " " << p->first << " skew " << abs_skew << dendl;
4224 found_skew = true;
4225 }
4226 }
4227
4228 if (found_skew) {
4229 ++timecheck_rounds_since_clean;
4230 timecheck_reset_event();
4231 } else if (timecheck_rounds_since_clean > 0) {
4232 dout(1) << __func__
4233 << " no clock skews found after " << timecheck_rounds_since_clean
4234 << " rounds" << dendl;
4235 // make sure the skews are really gone and not just a transient success
4236 // this will run just once if not in the presence of skews again.
4237 timecheck_rounds_since_clean = 1;
4238 timecheck_reset_event();
4239 timecheck_rounds_since_clean = 0;
4240 }
4241
4242 }
4243
4244 void Monitor::timecheck_report()
4245 {
4246 dout(10) << __func__ << dendl;
4247 assert(is_leader());
4248 assert((timecheck_round % 2) == 0);
4249 if (monmap->size() == 1) {
4250 assert(0 == "We are alone; we shouldn't have gotten here!");
4251 return;
4252 }
4253
4254 assert(timecheck_latencies.size() == timecheck_skews.size());
4255 bool do_output = true; // only output report once
4256 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4257 if (monmap->get_name(*q) == name)
4258 continue;
4259
4260 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4261 m->epoch = get_epoch();
4262 m->round = timecheck_round;
4263
4264 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4265 it != timecheck_skews.end(); ++it) {
4266 double skew = it->second;
4267 double latency = timecheck_latencies[it->first];
4268
4269 m->skews[it->first] = skew;
4270 m->latencies[it->first] = latency;
4271
4272 if (do_output) {
4273 dout(25) << __func__ << " " << it->first
4274 << " latency " << latency
4275 << " skew " << skew << dendl;
4276 }
4277 }
4278 do_output = false;
4279 entity_inst_t inst = monmap->get_inst(*q);
4280 dout(10) << __func__ << " send report to " << inst << dendl;
4281 messenger->send_message(m, inst);
4282 }
4283 }
4284
4285 void Monitor::timecheck()
4286 {
4287 dout(10) << __func__ << dendl;
4288 assert(is_leader());
4289 if (monmap->size() == 1) {
4290 assert(0 == "We are alone; we shouldn't have gotten here!");
4291 return;
4292 }
4293 assert(timecheck_round % 2 != 0);
4294
4295 timecheck_acks = 1; // we ack ourselves
4296
4297 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4298 << " round " << timecheck_round << dendl;
4299
4300 // we are at the eye of the storm; the point of reference
4301 timecheck_skews[messenger->get_myinst()] = 0.0;
4302 timecheck_latencies[messenger->get_myinst()] = 0.0;
4303
4304 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4305 if (monmap->get_name(*it) == name)
4306 continue;
4307
4308 entity_inst_t inst = monmap->get_inst(*it);
4309 utime_t curr_time = ceph_clock_now();
4310 timecheck_waiting[inst] = curr_time;
4311 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4312 m->epoch = get_epoch();
4313 m->round = timecheck_round;
4314 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4315 messenger->send_message(m, inst);
4316 }
4317 }
4318
4319 health_status_t Monitor::timecheck_status(ostringstream &ss,
4320 const double skew_bound,
4321 const double latency)
4322 {
4323 health_status_t status = HEALTH_OK;
4324 assert(latency >= 0);
4325
4326 double abs_skew;
4327 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4328 status = HEALTH_WARN;
4329 ss << "clock skew " << abs_skew << "s"
4330 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4331 }
4332
4333 return status;
4334 }
4335
4336 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4337 {
4338 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4339 dout(10) << __func__ << " " << *m << dendl;
4340 /* handles PONG's */
4341 assert(m->op == MTimeCheck::OP_PONG);
4342
4343 entity_inst_t other = m->get_source_inst();
4344 if (m->epoch < get_epoch()) {
4345 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4346 << " from " << other
4347 << " curr " << get_epoch()
4348 << " -- severely lagged? discard" << dendl;
4349 return;
4350 }
4351 assert(m->epoch == get_epoch());
4352
4353 if (m->round < timecheck_round) {
4354 dout(1) << __func__ << " got old round " << m->round
4355 << " from " << other
4356 << " curr " << timecheck_round << " -- discard" << dendl;
4357 return;
4358 }
4359
4360 utime_t curr_time = ceph_clock_now();
4361
4362 assert(timecheck_waiting.count(other) > 0);
4363 utime_t timecheck_sent = timecheck_waiting[other];
4364 timecheck_waiting.erase(other);
4365 if (curr_time < timecheck_sent) {
4366 // our clock was readjusted -- drop everything until it all makes sense.
4367 dout(1) << __func__ << " our clock was readjusted --"
4368 << " bump round and drop current check"
4369 << dendl;
4370 timecheck_cancel_round();
4371 return;
4372 }
4373
4374 /* update peer latencies */
4375 double latency = (double)(curr_time - timecheck_sent);
4376
4377 if (timecheck_latencies.count(other) == 0)
4378 timecheck_latencies[other] = latency;
4379 else {
4380 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4381 timecheck_latencies[other] = avg_latency;
4382 }
4383
4384 /*
4385 * update skews
4386 *
4387 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4388 * and 'a' happens to be lower than 'b'; so we use double instead.
4389 *
4390 * latency is always expected to be >= 0.
4391 *
4392 * delta, the difference between theirs timestamp and ours, may either be
4393 * lower or higher than 0; will hardly ever be 0.
4394 *
4395 * The absolute skew is the absolute delta minus the latency, which is
4396 * taken as a whole instead of an rtt given that there is some queueing
4397 * and dispatch times involved and it's hard to assess how long exactly
4398 * it took for the message to travel to the other side and be handled. So
4399 * we call it a bounded skew, the worst case scenario.
4400 *
4401 * Now, to math!
4402 *
4403 * Given that the latency is always positive, we can establish that the
4404 * bounded skew will be:
4405 *
4406 * 1. positive if the absolute delta is higher than the latency and
4407 * delta is positive
4408 * 2. negative if the absolute delta is higher than the latency and
4409 * delta is negative.
4410 * 3. zero if the absolute delta is lower than the latency.
4411 *
4412 * On 3. we make a judgement call and treat the skew as non-existent.
4413 * This is because that, if the absolute delta is lower than the
4414 * latency, then the apparently existing skew is nothing more than a
4415 * side-effect of the high latency at work.
4416 *
4417 * This may not be entirely true though, as a severely skewed clock
4418 * may be masked by an even higher latency, but with high latencies
4419 * we probably have worse issues to deal with than just skewed clocks.
4420 */
4421 assert(latency >= 0);
4422
4423 double delta = ((double) m->timestamp) - ((double) curr_time);
4424 double abs_delta = (delta > 0 ? delta : -delta);
4425 double skew_bound = abs_delta - latency;
4426 if (skew_bound < 0)
4427 skew_bound = 0;
4428 else if (delta < 0)
4429 skew_bound = -skew_bound;
4430
4431 ostringstream ss;
4432 health_status_t status = timecheck_status(ss, skew_bound, latency);
4433 if (status == HEALTH_ERR)
4434 clog->error() << other << " " << ss.str();
4435 else if (status == HEALTH_WARN)
4436 clog->warn() << other << " " << ss.str();
4437
4438 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4439 << " delta " << delta << " skew_bound " << skew_bound
4440 << " latency " << latency << dendl;
4441
4442 timecheck_skews[other] = skew_bound;
4443
4444 timecheck_acks++;
4445 if (timecheck_acks == quorum.size()) {
4446 dout(10) << __func__ << " got pongs from everybody ("
4447 << timecheck_acks << " total)" << dendl;
4448 assert(timecheck_skews.size() == timecheck_acks);
4449 assert(timecheck_waiting.empty());
4450 // everyone has acked, so bump the round to finish it.
4451 timecheck_finish_round();
4452 }
4453 }
4454
4455 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4456 {
4457 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4458 dout(10) << __func__ << " " << *m << dendl;
4459
4460 assert(is_peon());
4461 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4462
4463 if (m->epoch != get_epoch()) {
4464 dout(1) << __func__ << " got wrong epoch "
4465 << "(ours " << get_epoch()
4466 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4467 return;
4468 }
4469
4470 if (m->round < timecheck_round) {
4471 dout(1) << __func__ << " got old round " << m->round
4472 << " current " << timecheck_round
4473 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4474 return;
4475 }
4476
4477 timecheck_round = m->round;
4478
4479 if (m->op == MTimeCheck::OP_REPORT) {
4480 assert((timecheck_round % 2) == 0);
4481 timecheck_latencies.swap(m->latencies);
4482 timecheck_skews.swap(m->skews);
4483 return;
4484 }
4485
4486 assert((timecheck_round % 2) != 0);
4487 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4488 utime_t curr_time = ceph_clock_now();
4489 reply->timestamp = curr_time;
4490 reply->epoch = m->epoch;
4491 reply->round = m->round;
4492 dout(10) << __func__ << " send " << *m
4493 << " to " << m->get_source_inst() << dendl;
4494 m->get_connection()->send_message(reply);
4495 }
4496
4497 void Monitor::handle_timecheck(MonOpRequestRef op)
4498 {
4499 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4500 dout(10) << __func__ << " " << *m << dendl;
4501
4502 if (is_leader()) {
4503 if (m->op != MTimeCheck::OP_PONG) {
4504 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4505 } else {
4506 handle_timecheck_leader(op);
4507 }
4508 } else if (is_peon()) {
4509 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4510 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4511 } else {
4512 handle_timecheck_peon(op);
4513 }
4514 } else {
4515 dout(1) << __func__ << " drop unexpected msg" << dendl;
4516 }
4517 }
4518
4519 void Monitor::handle_subscribe(MonOpRequestRef op)
4520 {
4521 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4522 dout(10) << "handle_subscribe " << *m << dendl;
4523
4524 bool reply = false;
4525
4526 MonSession *s = op->get_session();
4527 assert(s);
4528
4529 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4530 p != m->what.end();
4531 ++p) {
4532 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4533 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4534 reply = true;
4535
4536 // remove conflicting subscribes
4537 if (logmon()->sub_name_to_id(p->first) >= 0) {
4538 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4539 it != s->sub_map.end(); ) {
4540 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4541 Mutex::Locker l(session_map_lock);
4542 session_map.remove_sub((it++)->second);
4543 } else {
4544 ++it;
4545 }
4546 }
4547 }
4548
4549 {
4550 Mutex::Locker l(session_map_lock);
4551 session_map.add_update_sub(s, p->first, p->second.start,
4552 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4553 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4554 }
4555
4556 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4557 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4558 if ((int)s->is_capable("mds", MON_CAP_R)) {
4559 Subscription *sub = s->sub_map[p->first];
4560 assert(sub != nullptr);
4561 mdsmon()->check_sub(sub);
4562 }
4563 } else if (p->first == "osdmap") {
4564 if ((int)s->is_capable("osd", MON_CAP_R)) {
4565 if (s->osd_epoch > p->second.start) {
4566 // client needs earlier osdmaps on purpose, so reset the sent epoch
4567 s->osd_epoch = 0;
4568 }
4569 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4570 }
4571 } else if (p->first == "osd_pg_creates") {
4572 if ((int)s->is_capable("osd", MON_CAP_W)) {
4573 if (monmap->get_required_features().contains_all(
4574 ceph::features::mon::FEATURE_LUMINOUS)) {
4575 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
4576 } else {
4577 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
4578 }
4579 }
4580 } else if (p->first == "monmap") {
4581 monmon()->check_sub(s->sub_map[p->first]);
4582 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
4583 logmon()->check_sub(s->sub_map[p->first]);
4584 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
4585 mgrmon()->check_sub(s->sub_map[p->first]);
4586 }
4587 }
4588
4589 if (reply) {
4590 // we only need to reply if the client is old enough to think it
4591 // has to send renewals.
4592 ConnectionRef con = m->get_connection();
4593 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
4594 m->get_connection()->send_message(new MMonSubscribeAck(
4595 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
4596 }
4597
4598 }
4599
4600 void Monitor::handle_get_version(MonOpRequestRef op)
4601 {
4602 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
4603 dout(10) << "handle_get_version " << *m << dendl;
4604 PaxosService *svc = NULL;
4605
4606 MonSession *s = op->get_session();
4607 assert(s);
4608
4609 if (!is_leader() && !is_peon()) {
4610 dout(10) << " waiting for quorum" << dendl;
4611 waitfor_quorum.push_back(new C_RetryMessage(this, op));
4612 goto out;
4613 }
4614
4615 if (m->what == "mdsmap") {
4616 svc = mdsmon();
4617 } else if (m->what == "fsmap") {
4618 svc = mdsmon();
4619 } else if (m->what == "osdmap") {
4620 svc = osdmon();
4621 } else if (m->what == "monmap") {
4622 svc = monmon();
4623 } else {
4624 derr << "invalid map type " << m->what << dendl;
4625 }
4626
4627 if (svc) {
4628 if (!svc->is_readable()) {
4629 svc->wait_for_readable(op, new C_RetryMessage(this, op));
4630 goto out;
4631 }
4632
4633 MMonGetVersionReply *reply = new MMonGetVersionReply();
4634 reply->handle = m->handle;
4635 reply->version = svc->get_last_committed();
4636 reply->oldest_version = svc->get_first_committed();
4637 reply->set_tid(m->get_tid());
4638
4639 m->get_connection()->send_message(reply);
4640 }
4641 out:
4642 return;
4643 }
4644
4645 bool Monitor::ms_handle_reset(Connection *con)
4646 {
4647 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
4648
4649 // ignore lossless monitor sessions
4650 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4651 return false;
4652
4653 MonSession *s = static_cast<MonSession *>(con->get_priv());
4654 if (!s)
4655 return false;
4656
4657 // break any con <-> session ref cycle
4658 s->con->set_priv(NULL);
4659
4660 if (is_shutdown())
4661 return false;
4662
4663 Mutex::Locker l(lock);
4664
4665 dout(10) << "reset/close on session " << s->inst << dendl;
4666 if (!s->closed) {
4667 Mutex::Locker l(session_map_lock);
4668 remove_session(s);
4669 }
4670 s->put();
4671 return true;
4672 }
4673
4674 bool Monitor::ms_handle_refused(Connection *con)
4675 {
4676 // just log for now...
4677 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
4678 return false;
4679 }
4680
4681 // -----
4682
4683 void Monitor::send_latest_monmap(Connection *con)
4684 {
4685 bufferlist bl;
4686 monmap->encode(bl, con->get_features());
4687 con->send_message(new MMonMap(bl));
4688 }
4689
4690 void Monitor::handle_mon_get_map(MonOpRequestRef op)
4691 {
4692 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
4693 dout(10) << "handle_mon_get_map" << dendl;
4694 send_latest_monmap(m->get_connection().get());
4695 }
4696
4697 void Monitor::handle_mon_metadata(MonOpRequestRef op)
4698 {
4699 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
4700 if (is_leader()) {
4701 dout(10) << __func__ << dendl;
4702 update_mon_metadata(m->get_source().num(), std::move(m->data));
4703 }
4704 }
4705
4706 void Monitor::update_mon_metadata(int from, Metadata&& m)
4707 {
4708 pending_metadata.insert(make_pair(from, std::move(m)));
4709
4710 bufferlist bl;
4711 int err = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
4712 map<int, Metadata> last_metadata;
4713 if (!err) {
4714 bufferlist::iterator iter = bl.begin();
4715 ::decode(last_metadata, iter);
4716 pending_metadata.insert(last_metadata.begin(), last_metadata.end());
4717 }
4718
4719 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
4720 bl.clear();
4721 ::encode(pending_metadata, bl);
4722 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
4723 paxos->trigger_propose();
4724 }
4725
4726 int Monitor::load_metadata(map<int, Metadata>& metadata)
4727 {
4728 bufferlist bl;
4729 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
4730 if (r)
4731 return r;
4732 bufferlist::iterator it = bl.begin();
4733 ::decode(metadata, it);
4734 return 0;
4735 }
4736
4737 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
4738 {
4739 assert(f);
4740 map<int, Metadata> last_metadata;
4741 if (int r = load_metadata(last_metadata)) {
4742 err << "Unable to load metadata: " << cpp_strerror(r);
4743 return r;
4744 }
4745 if (!last_metadata.count(mon)) {
4746 err << "mon." << mon << " not found";
4747 return -EINVAL;
4748 }
4749 const Metadata& m = last_metadata[mon];
4750 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
4751 f->dump_string(p->first.c_str(), p->second);
4752 }
4753 return 0;
4754 }
4755
4756 void Monitor::count_metadata(const string& field, Formatter *f)
4757 {
4758 map<int, Metadata> meta;
4759 load_metadata(meta);
4760 map<string,int> by_val;
4761 for (auto& p : meta) {
4762 auto q = p.second.find(field);
4763 if (q == p.second.end()) {
4764 by_val["unknown"]++;
4765 } else {
4766 by_val[q->second]++;
4767 }
4768 }
4769 f->open_object_section(field.c_str());
4770 for (auto& p : by_val) {
4771 f->dump_int(p.first.c_str(), p.second);
4772 }
4773 f->close_section();
4774 }
4775
4776 int Monitor::print_nodes(Formatter *f, ostream& err)
4777 {
4778 map<int, Metadata> metadata;
4779 if (int r = load_metadata(metadata)) {
4780 err << "Unable to load metadata.\n";
4781 return r;
4782 }
4783
4784 map<string, list<int> > mons; // hostname => mon
4785 for (map<int, Metadata>::iterator it = metadata.begin();
4786 it != metadata.end(); ++it) {
4787 const Metadata& m = it->second;
4788 Metadata::const_iterator hostname = m.find("hostname");
4789 if (hostname == m.end()) {
4790 // not likely though
4791 continue;
4792 }
4793 mons[hostname->second].push_back(it->first);
4794 }
4795
4796 dump_services(f, mons, "mon");
4797 return 0;
4798 }
4799
4800 // ----------------------------------------------
4801 // scrub
4802
4803 int Monitor::scrub_start()
4804 {
4805 dout(10) << __func__ << dendl;
4806 assert(is_leader());
4807
4808 if (!scrub_result.empty()) {
4809 clog->info() << "scrub already in progress";
4810 return -EBUSY;
4811 }
4812
4813 scrub_event_cancel();
4814 scrub_result.clear();
4815 scrub_state.reset(new ScrubState);
4816
4817 scrub();
4818 return 0;
4819 }
4820
4821 int Monitor::scrub()
4822 {
4823 assert(is_leader());
4824 assert(scrub_state);
4825
4826 scrub_cancel_timeout();
4827 wait_for_paxos_write();
4828 scrub_version = paxos->get_version();
4829
4830
4831 // scrub all keys if we're the only monitor in the quorum
4832 int32_t num_keys =
4833 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
4834
4835 for (set<int>::iterator p = quorum.begin();
4836 p != quorum.end();
4837 ++p) {
4838 if (*p == rank)
4839 continue;
4840 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
4841 num_keys);
4842 r->key = scrub_state->last_key;
4843 messenger->send_message(r, monmap->get_inst(*p));
4844 }
4845
4846 // scrub my keys
4847 bool r = _scrub(&scrub_result[rank],
4848 &scrub_state->last_key,
4849 &num_keys);
4850
4851 scrub_state->finished = !r;
4852
4853 // only after we got our scrub results do we really care whether the
4854 // other monitors are late on their results. Also, this way we avoid
4855 // triggering the timeout if we end up getting stuck in _scrub() for
4856 // longer than the duration of the timeout.
4857 scrub_reset_timeout();
4858
4859 if (quorum.size() == 1) {
4860 assert(scrub_state->finished == true);
4861 scrub_finish();
4862 }
4863 return 0;
4864 }
4865
4866 void Monitor::handle_scrub(MonOpRequestRef op)
4867 {
4868 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
4869 dout(10) << __func__ << " " << *m << dendl;
4870 switch (m->op) {
4871 case MMonScrub::OP_SCRUB:
4872 {
4873 if (!is_peon())
4874 break;
4875
4876 wait_for_paxos_write();
4877
4878 if (m->version != paxos->get_version())
4879 break;
4880
4881 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
4882 m->version,
4883 m->num_keys);
4884
4885 reply->key = m->key;
4886 _scrub(&reply->result, &reply->key, &reply->num_keys);
4887 m->get_connection()->send_message(reply);
4888 }
4889 break;
4890
4891 case MMonScrub::OP_RESULT:
4892 {
4893 if (!is_leader())
4894 break;
4895 if (m->version != scrub_version)
4896 break;
4897 // reset the timeout each time we get a result
4898 scrub_reset_timeout();
4899
4900 int from = m->get_source().num();
4901 assert(scrub_result.count(from) == 0);
4902 scrub_result[from] = m->result;
4903
4904 if (scrub_result.size() == quorum.size()) {
4905 scrub_check_results();
4906 scrub_result.clear();
4907 if (scrub_state->finished)
4908 scrub_finish();
4909 else
4910 scrub();
4911 }
4912 }
4913 break;
4914 }
4915 }
4916
4917 bool Monitor::_scrub(ScrubResult *r,
4918 pair<string,string> *start,
4919 int *num_keys)
4920 {
4921 assert(r != NULL);
4922 assert(start != NULL);
4923 assert(num_keys != NULL);
4924
4925 set<string> prefixes = get_sync_targets_names();
4926 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
4927
4928 dout(10) << __func__ << " start (" << *start << ")"
4929 << " num_keys " << *num_keys << dendl;
4930
4931 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
4932
4933 int scrubbed_keys = 0;
4934 pair<string,string> last_key;
4935
4936 while (it->has_next_chunk()) {
4937
4938 if (*num_keys > 0 && scrubbed_keys == *num_keys)
4939 break;
4940
4941 pair<string,string> k = it->get_next_key();
4942 if (prefixes.count(k.first) == 0)
4943 continue;
4944
4945 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
4946 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
4947 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
4948 << dendl;
4949 continue;
4950 }
4951
4952 bufferlist bl;
4953 //TODO: what when store->get returns error or empty bl?
4954 store->get(k.first, k.second, bl);
4955 uint32_t key_crc = bl.crc32c(0);
4956 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
4957 << " crc " << key_crc << dendl;
4958 r->prefix_keys[k.first]++;
4959 if (r->prefix_crc.count(k.first) == 0)
4960 r->prefix_crc[k.first] = 0;
4961 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
4962
4963 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
4964 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
4965 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
4966 r->prefix_crc[k.first] += 1;
4967 }
4968
4969 ++scrubbed_keys;
4970 last_key = k;
4971 }
4972
4973 dout(20) << __func__ << " last_key (" << last_key << ")"
4974 << " scrubbed_keys " << scrubbed_keys
4975 << " has_next " << it->has_next_chunk() << dendl;
4976
4977 *start = last_key;
4978 *num_keys = scrubbed_keys;
4979
4980 return it->has_next_chunk();
4981 }
4982
4983 void Monitor::scrub_check_results()
4984 {
4985 dout(10) << __func__ << dendl;
4986
4987 // compare
4988 int errors = 0;
4989 ScrubResult& mine = scrub_result[rank];
4990 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
4991 p != scrub_result.end();
4992 ++p) {
4993 if (p->first == rank)
4994 continue;
4995 if (p->second != mine) {
4996 ++errors;
4997 clog->error() << "scrub mismatch";
4998 clog->error() << " mon." << rank << " " << mine;
4999 clog->error() << " mon." << p->first << " " << p->second;
5000 }
5001 }
5002 if (!errors)
5003 clog->info() << "scrub ok on " << quorum << ": " << mine;
5004 }
5005
5006 inline void Monitor::scrub_timeout()
5007 {
5008 dout(1) << __func__ << " restarting scrub" << dendl;
5009 scrub_reset();
5010 scrub_start();
5011 }
5012
5013 void Monitor::scrub_finish()
5014 {
5015 dout(10) << __func__ << dendl;
5016 scrub_reset();
5017 scrub_event_start();
5018 }
5019
5020 void Monitor::scrub_reset()
5021 {
5022 dout(10) << __func__ << dendl;
5023 scrub_cancel_timeout();
5024 scrub_version = 0;
5025 scrub_result.clear();
5026 scrub_state.reset();
5027 }
5028
5029 inline void Monitor::scrub_update_interval(int secs)
5030 {
5031 // we don't care about changes if we are not the leader.
5032 // changes will be visible if we become the leader.
5033 if (!is_leader())
5034 return;
5035
5036 dout(1) << __func__ << " new interval = " << secs << dendl;
5037
5038 // if scrub already in progress, all changes will already be visible during
5039 // the next round. Nothing to do.
5040 if (scrub_state != NULL)
5041 return;
5042
5043 scrub_event_cancel();
5044 scrub_event_start();
5045 }
5046
5047 void Monitor::scrub_event_start()
5048 {
5049 dout(10) << __func__ << dendl;
5050
5051 if (scrub_event)
5052 scrub_event_cancel();
5053
5054 if (cct->_conf->mon_scrub_interval <= 0) {
5055 dout(1) << __func__ << " scrub event is disabled"
5056 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5057 << ")" << dendl;
5058 return;
5059 }
5060
5061 scrub_event = new C_MonContext(this, [this](int) {
5062 scrub_start();
5063 });
5064 timer.add_event_after(cct->_conf->mon_scrub_interval, scrub_event);
5065 }
5066
5067 void Monitor::scrub_event_cancel()
5068 {
5069 dout(10) << __func__ << dendl;
5070 if (scrub_event) {
5071 timer.cancel_event(scrub_event);
5072 scrub_event = NULL;
5073 }
5074 }
5075
5076 inline void Monitor::scrub_cancel_timeout()
5077 {
5078 if (scrub_timeout_event) {
5079 timer.cancel_event(scrub_timeout_event);
5080 scrub_timeout_event = NULL;
5081 }
5082 }
5083
5084 void Monitor::scrub_reset_timeout()
5085 {
5086 dout(15) << __func__ << " reset timeout event" << dendl;
5087 scrub_cancel_timeout();
5088
5089 scrub_timeout_event = new C_MonContext(this, [this](int) {
5090 scrub_timeout();
5091 });
5092 timer.add_event_after(g_conf->mon_scrub_timeout, scrub_timeout_event);
5093 }
5094
5095 /************ TICK ***************/
5096 void Monitor::new_tick()
5097 {
5098 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5099 tick();
5100 }));
5101 }
5102
5103 void Monitor::tick()
5104 {
5105 // ok go.
5106 dout(11) << "tick" << dendl;
5107
5108 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5109 (*p)->tick();
5110 (*p)->maybe_trim();
5111 }
5112
5113 // trim sessions
5114 utime_t now = ceph_clock_now();
5115 {
5116 Mutex::Locker l(session_map_lock);
5117 auto p = session_map.sessions.begin();
5118
5119 bool out_for_too_long = (!exited_quorum.is_zero() &&
5120 now > (exited_quorum + 2*g_conf->mon_lease));
5121
5122 while (!p.end()) {
5123 MonSession *s = *p;
5124 ++p;
5125
5126 // don't trim monitors
5127 if (s->inst.name.is_mon())
5128 continue;
5129
5130 if (s->session_timeout < now && s->con) {
5131 // check keepalive, too
5132 s->session_timeout = s->con->get_last_keepalive();
5133 s->session_timeout += g_conf->mon_session_timeout;
5134 }
5135 if (s->session_timeout < now) {
5136 dout(10) << " trimming session " << s->con << " " << s->inst
5137 << " (timeout " << s->session_timeout
5138 << " < now " << now << ")" << dendl;
5139 } else if (out_for_too_long) {
5140 // boot the client Session because we've taken too long getting back in
5141 dout(10) << " trimming session " << s->con << " " << s->inst
5142 << " because we've been out of quorum too long" << dendl;
5143 } else {
5144 continue;
5145 }
5146
5147 s->con->mark_down();
5148 remove_session(s);
5149 logger->inc(l_mon_session_trim);
5150 }
5151 }
5152 sync_trim_providers();
5153
5154 if (!maybe_wait_for_quorum.empty()) {
5155 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5156 }
5157
5158 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5159 // this is only necessary on upgraded clusters.
5160 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5161 prepare_new_fingerprint(t);
5162 paxos->trigger_propose();
5163 }
5164
5165 new_tick();
5166 }
5167
5168 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5169 {
5170 uuid_d nf;
5171 nf.generate_random();
5172 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5173
5174 bufferlist bl;
5175 ::encode(nf, bl);
5176 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5177 }
5178
5179 int Monitor::check_fsid()
5180 {
5181 bufferlist ebl;
5182 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5183 if (r == -ENOENT)
5184 return r;
5185 assert(r == 0);
5186
5187 string es(ebl.c_str(), ebl.length());
5188
5189 // only keep the first line
5190 size_t pos = es.find_first_of('\n');
5191 if (pos != string::npos)
5192 es.resize(pos);
5193
5194 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5195 uuid_d ondisk;
5196 if (!ondisk.parse(es.c_str())) {
5197 derr << "error: unable to parse uuid" << dendl;
5198 return -EINVAL;
5199 }
5200
5201 if (monmap->get_fsid() != ondisk) {
5202 derr << "error: cluster_uuid file exists with value " << ondisk
5203 << ", != our uuid " << monmap->get_fsid() << dendl;
5204 return -EEXIST;
5205 }
5206
5207 return 0;
5208 }
5209
5210 int Monitor::write_fsid()
5211 {
5212 auto t(std::make_shared<MonitorDBStore::Transaction>());
5213 write_fsid(t);
5214 int r = store->apply_transaction(t);
5215 return r;
5216 }
5217
5218 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5219 {
5220 ostringstream ss;
5221 ss << monmap->get_fsid() << "\n";
5222 string us = ss.str();
5223
5224 bufferlist b;
5225 b.append(us);
5226
5227 t->put(MONITOR_NAME, "cluster_uuid", b);
5228 return 0;
5229 }
5230
5231 /*
5232 * this is the closest thing to a traditional 'mkfs' for ceph.
5233 * initialize the monitor state machines to their initial values.
5234 */
5235 int Monitor::mkfs(bufferlist& osdmapbl)
5236 {
5237 auto t(std::make_shared<MonitorDBStore::Transaction>());
5238
5239 // verify cluster fsid
5240 int r = check_fsid();
5241 if (r < 0 && r != -ENOENT)
5242 return r;
5243
5244 bufferlist magicbl;
5245 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5246 magicbl.append("\n");
5247 t->put(MONITOR_NAME, "magic", magicbl);
5248
5249
5250 features = get_initial_supported_features();
5251 write_features(t);
5252
5253 // save monmap, osdmap, keyring.
5254 bufferlist monmapbl;
5255 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5256 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5257 t->put("mkfs", "monmap", monmapbl);
5258
5259 if (osdmapbl.length()) {
5260 // make sure it's a valid osdmap
5261 try {
5262 OSDMap om;
5263 om.decode(osdmapbl);
5264 }
5265 catch (buffer::error& e) {
5266 derr << "error decoding provided osdmap: " << e.what() << dendl;
5267 return -EINVAL;
5268 }
5269 t->put("mkfs", "osdmap", osdmapbl);
5270 }
5271
5272 if (is_keyring_required()) {
5273 KeyRing keyring;
5274 string keyring_filename;
5275
5276 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5277 if (r) {
5278 derr << "unable to find a keyring file on " << g_conf->keyring
5279 << ": " << cpp_strerror(r) << dendl;
5280 if (g_conf->key != "") {
5281 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5282 "\n\tcaps mon = \"allow *\"\n";
5283 bufferlist bl;
5284 bl.append(keyring_plaintext);
5285 try {
5286 bufferlist::iterator i = bl.begin();
5287 keyring.decode_plaintext(i);
5288 }
5289 catch (const buffer::error& e) {
5290 derr << "error decoding keyring " << keyring_plaintext
5291 << ": " << e.what() << dendl;
5292 return -EINVAL;
5293 }
5294 } else {
5295 return -ENOENT;
5296 }
5297 } else {
5298 r = keyring.load(g_ceph_context, keyring_filename);
5299 if (r < 0) {
5300 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5301 return r;
5302 }
5303 }
5304
5305 // put mon. key in external keyring; seed with everything else.
5306 extract_save_mon_key(keyring);
5307
5308 bufferlist keyringbl;
5309 keyring.encode_plaintext(keyringbl);
5310 t->put("mkfs", "keyring", keyringbl);
5311 }
5312 write_fsid(t);
5313 store->apply_transaction(t);
5314
5315 return 0;
5316 }
5317
5318 int Monitor::write_default_keyring(bufferlist& bl)
5319 {
5320 ostringstream os;
5321 os << g_conf->mon_data << "/keyring";
5322
5323 int err = 0;
5324 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5325 if (fd < 0) {
5326 err = -errno;
5327 dout(0) << __func__ << " failed to open " << os.str()
5328 << ": " << cpp_strerror(err) << dendl;
5329 return err;
5330 }
5331
5332 err = bl.write_fd(fd);
5333 if (!err)
5334 ::fsync(fd);
5335 VOID_TEMP_FAILURE_RETRY(::close(fd));
5336
5337 return err;
5338 }
5339
5340 void Monitor::extract_save_mon_key(KeyRing& keyring)
5341 {
5342 EntityName mon_name;
5343 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5344 EntityAuth mon_key;
5345 if (keyring.get_auth(mon_name, mon_key)) {
5346 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5347 KeyRing pkey;
5348 pkey.add(mon_name, mon_key);
5349 bufferlist bl;
5350 pkey.encode_plaintext(bl);
5351 write_default_keyring(bl);
5352 keyring.remove(mon_name);
5353 }
5354 }
5355
5356 bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5357 bool force_new)
5358 {
5359 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5360 << dendl;
5361
5362 if (is_shutdown())
5363 return false;
5364
5365 // we only connect to other monitors and mgr; every else connects to us.
5366 if (service_id != CEPH_ENTITY_TYPE_MON &&
5367 service_id != CEPH_ENTITY_TYPE_MGR)
5368 return false;
5369
5370 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5371 // auth_none
5372 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5373 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5374 handler.set_global_id(0);
5375 *authorizer = handler.build_authorizer(service_id);
5376 return true;
5377 }
5378
5379 CephXServiceTicketInfo auth_ticket_info;
5380 CephXSessionAuthInfo info;
5381 int ret;
5382
5383 EntityName name;
5384 name.set_type(CEPH_ENTITY_TYPE_MON);
5385 auth_ticket_info.ticket.name = name;
5386 auth_ticket_info.ticket.global_id = 0;
5387
5388 if (service_id == CEPH_ENTITY_TYPE_MON) {
5389 // mon to mon authentication uses the private monitor shared key and not the
5390 // rotating key
5391 CryptoKey secret;
5392 if (!keyring.get_secret(name, secret) &&
5393 !key_server.get_secret(name, secret)) {
5394 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5395 << dendl;
5396 stringstream ss, ds;
5397 int err = key_server.list_secrets(ds);
5398 if (err < 0)
5399 ss << "no installed auth entries!";
5400 else
5401 ss << "installed auth entries:";
5402 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5403 return false;
5404 }
5405
5406 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5407 secret, (uint64_t)-1);
5408 if (ret < 0) {
5409 dout(0) << __func__ << " failed to build mon session_auth_info "
5410 << cpp_strerror(ret) << dendl;
5411 return false;
5412 }
5413 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5414 // mgr
5415 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5416 if (ret < 0) {
5417 derr << __func__ << " failed to build mgr service session_auth_info "
5418 << cpp_strerror(ret) << dendl;
5419 return false;
5420 }
5421 } else {
5422 ceph_abort(); // see check at top of fn
5423 }
5424
5425 CephXTicketBlob blob;
5426 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5427 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5428 return false;
5429 }
5430 bufferlist ticket_data;
5431 ::encode(blob, ticket_data);
5432
5433 bufferlist::iterator iter = ticket_data.begin();
5434 CephXTicketHandler handler(g_ceph_context, service_id);
5435 ::decode(handler.ticket, iter);
5436
5437 handler.session_key = info.session_key;
5438
5439 *authorizer = handler.build_authorizer(0);
5440
5441 return true;
5442 }
5443
5444 bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5445 int protocol, bufferlist& authorizer_data,
5446 bufferlist& authorizer_reply,
5447 bool& isvalid, CryptoKey& session_key)
5448 {
5449 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5450 << " " << ceph_entity_type_name(peer_type)
5451 << " protocol " << protocol << dendl;
5452
5453 if (is_shutdown())
5454 return false;
5455
5456 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5457 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5458 // monitor, and cephx is enabled
5459 isvalid = false;
5460 if (protocol == CEPH_AUTH_CEPHX) {
5461 bufferlist::iterator iter = authorizer_data.begin();
5462 CephXServiceTicketInfo auth_ticket_info;
5463
5464 if (authorizer_data.length()) {
5465 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5466 auth_ticket_info, authorizer_reply);
5467 if (ret) {
5468 session_key = auth_ticket_info.session_key;
5469 isvalid = true;
5470 } else {
5471 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5472 }
5473 }
5474 } else {
5475 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5476 }
5477 } else {
5478 // who cares.
5479 isvalid = true;
5480 }
5481 return true;
5482 }