]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Monitor.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mon / Monitor.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include <sstream>
17#include <stdlib.h>
18#include <signal.h>
19#include <limits.h>
20#include <cstring>
21#include <boost/scope_exit.hpp>
22#include <boost/algorithm/string/predicate.hpp>
23
24#include "Monitor.h"
25#include "common/version.h"
26
27#include "osd/OSDMap.h"
28
29#include "MonitorDBStore.h"
30
31#include "messages/PaxosServiceMessage.h"
32#include "messages/MMonMap.h"
33#include "messages/MMonGetMap.h"
34#include "messages/MMonGetVersion.h"
35#include "messages/MMonGetVersionReply.h"
36#include "messages/MGenericMessage.h"
37#include "messages/MMonCommand.h"
38#include "messages/MMonCommandAck.h"
31f18b77 39#include "messages/MMonHealth.h"
7c673cae
FG
40#include "messages/MMonMetadata.h"
41#include "messages/MMonSync.h"
42#include "messages/MMonScrub.h"
43#include "messages/MMonProbe.h"
44#include "messages/MMonJoin.h"
45#include "messages/MMonPaxos.h"
46#include "messages/MRoute.h"
47#include "messages/MForward.h"
48
49#include "messages/MMonSubscribe.h"
50#include "messages/MMonSubscribeAck.h"
51
52#include "messages/MAuthReply.h"
53
54#include "messages/MTimeCheck.h"
7c673cae
FG
55#include "messages/MPing.h"
56
57#include "common/strtol.h"
58#include "common/ceph_argparse.h"
59#include "common/Timer.h"
60#include "common/Clock.h"
61#include "common/errno.h"
62#include "common/perf_counters.h"
63#include "common/admin_socket.h"
64#include "global/signal_handler.h"
65#include "common/Formatter.h"
66#include "include/stringify.h"
67#include "include/color.h"
68#include "include/ceph_fs.h"
69#include "include/str_list.h"
70
71#include "OSDMonitor.h"
72#include "MDSMonitor.h"
73#include "MonmapMonitor.h"
74#include "PGMonitor.h"
75#include "LogMonitor.h"
76#include "AuthMonitor.h"
77#include "MgrMonitor.h"
31f18b77 78#include "MgrStatMonitor.h"
7c673cae 79#include "mon/QuorumService.h"
224ce89b 80#include "mon/OldHealthMonitor.h"
7c673cae
FG
81#include "mon/HealthMonitor.h"
82#include "mon/ConfigKeyService.h"
83#include "common/config.h"
84#include "common/cmdparse.h"
85#include "include/assert.h"
86#include "include/compat.h"
87#include "perfglue/heap_profiler.h"
88
89#include "auth/none/AuthNoneClientHandler.h"
90
91#define dout_subsys ceph_subsys_mon
92#undef dout_prefix
93#define dout_prefix _prefix(_dout, this)
94static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
95 return *_dout << "mon." << mon->name << "@" << mon->rank
96 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
97}
98
99const string Monitor::MONITOR_NAME = "monitor";
100const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
101
102
103#undef FLAG
104#undef COMMAND
105#undef COMMAND_WITH_FLAG
106MonCommand mon_commands[] = {
107#define FLAG(f) (MonCommand::FLAG_##f)
108#define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
109 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
110#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
111 {parsesig, helptext, modulename, req_perms, avail, flags},
112#include <mon/MonCommands.h>
113#undef COMMAND
114#undef COMMAND_WITH_FLAG
115
116 // FIXME: slurp up the Mgr commands too
117
118#define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
119 {parsesig, helptext, modulename, req_perms, avail, FLAG(MGR)},
120#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
121 {parsesig, helptext, modulename, req_perms, avail, flags | FLAG(MGR)},
122#include <mgr/MgrCommands.h>
123#undef COMMAND
124#undef COMMAND_WITH_FLAG
125
126};
127
128
7c673cae
FG
129void C_MonContext::finish(int r) {
130 if (mon->is_shutdown())
131 return;
132 FunctionContext::finish(r);
133}
134
135Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
136 Messenger *m, Messenger *mgr_m, MonMap *map) :
137 Dispatcher(cct_),
138 name(nm),
139 rank(-1),
140 messenger(m),
141 con_self(m ? m->get_loopback_connection() : NULL),
142 lock("Monitor::lock"),
143 timer(cct_, lock),
144 finisher(cct_, "mon_finisher", "fin"),
145 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
146 has_ever_joined(false),
147 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
148 monmap(map),
149 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
150 key_server(cct, &keyring),
151 auth_cluster_required(cct,
152 cct->_conf->auth_supported.empty() ?
153 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
154 auth_service_required(cct,
155 cct->_conf->auth_supported.empty() ?
156 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
157 leader_supported_mon_commands(NULL),
158 leader_supported_mon_commands_size(0),
159 mgr_messenger(mgr_m),
160 mgr_client(cct_, mgr_m),
31f18b77 161 pgservice(nullptr),
7c673cae
FG
162 store(s),
163
164 state(STATE_PROBING),
165
166 elector(this),
167 required_features(0),
168 leader(0),
169 quorum_con_features(0),
170 // scrub
171 scrub_version(0),
172 scrub_event(NULL),
173 scrub_timeout_event(NULL),
174
175 // sync state
176 sync_provider_count(0),
177 sync_cookie(0),
178 sync_full(false),
179 sync_start_version(0),
180 sync_timeout_event(NULL),
181 sync_last_committed_floor(0),
182
183 timecheck_round(0),
184 timecheck_acks(0),
185 timecheck_rounds_since_clean(0),
186 timecheck_event(NULL),
187
188 paxos_service(PAXOS_NUM),
189 admin_hook(NULL),
190 routed_request_tid(0),
191 op_tracker(cct, true, 1)
192{
193 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
194 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
195
196 update_log_clients();
197
198 paxos = new Paxos(this, "paxos");
199
200 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
201 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
202 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
203 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
204 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
205 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
206 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
31f18b77 207 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
224ce89b 208 paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
7c673cae 209
224ce89b 210 health_monitor = new OldHealthMonitor(this);
7c673cae
FG
211 config_key_service = new ConfigKeyService(this, paxos);
212
213 mon_caps = new MonCap();
214 bool r = mon_caps->parse("allow *", NULL);
215 assert(r);
216
217 exited_quorum = ceph_clock_now();
218
219 // assume our commands until we have an election. this only means
220 // we won't reply with EINVAL before the election; any command that
221 // actually matters will wait until we have quorum etc and then
222 // retry (and revalidate).
223 const MonCommand *cmds;
224 int cmdsize;
225 get_locally_supported_monitor_commands(&cmds, &cmdsize);
226 set_leader_supported_commands(cmds, cmdsize);
31f18b77
FG
227
228 // note: OSDMonitor may update this based on the luminous flag.
229 pgservice = mgrstatmon()->get_pg_stat_service();
7c673cae
FG
230}
231
7c673cae
FG
232Monitor::~Monitor()
233{
234 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
235 delete *p;
236 delete health_monitor;
237 delete config_key_service;
238 delete paxos;
239 assert(session_map.sessions.empty());
240 delete mon_caps;
241 if (leader_supported_mon_commands != mon_commands)
242 delete[] leader_supported_mon_commands;
243}
244
245
246class AdminHook : public AdminSocketHook {
247 Monitor *mon;
248public:
249 explicit AdminHook(Monitor *m) : mon(m) {}
250 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
251 bufferlist& out) override {
252 stringstream ss;
253 mon->do_admin_command(command, cmdmap, format, ss);
254 out.append(ss);
255 return true;
256 }
257};
258
259void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
260 ostream& ss)
261{
262 Mutex::Locker l(lock);
263
264 boost::scoped_ptr<Formatter> f(Formatter::create(format));
265
266 string args;
267 for (cmdmap_t::iterator p = cmdmap.begin();
268 p != cmdmap.end(); ++p) {
269 if (p->first == "prefix")
270 continue;
271 if (!args.empty())
272 args += ", ";
273 args += cmd_vartype_stringify(p->second);
274 }
275 args = "[" + args + "]";
276
277 bool read_only = (command == "mon_status" ||
278 command == "mon metadata" ||
279 command == "quorum_status" ||
224ce89b
WB
280 command == "ops" ||
281 command == "sessions");
7c673cae
FG
282
283 (read_only ? audit_clog->debug() : audit_clog->info())
284 << "from='admin socket' entity='admin socket' "
285 << "cmd='" << command << "' args=" << args << ": dispatch";
286
287 if (command == "mon_status") {
288 get_mon_status(f.get(), ss);
289 if (f)
290 f->flush(ss);
291 } else if (command == "quorum_status") {
292 _quorum_status(f.get(), ss);
293 } else if (command == "sync_force") {
294 string validate;
295 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
296 (validate != "--yes-i-really-mean-it")) {
297 ss << "are you SURE? this will mean the monitor store will be erased "
298 "the next time the monitor is restarted. pass "
299 "'--yes-i-really-mean-it' if you really do.";
300 goto abort;
301 }
302 sync_force(f.get(), ss);
303 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
304 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
305 goto abort;
306 } else if (command == "quorum enter") {
307 elector.start_participating();
308 start_election();
309 ss << "started responding to quorum, initiated new election";
310 } else if (command == "quorum exit") {
311 start_election();
312 elector.stop_participating();
313 ss << "stopped responding to quorum, initiated new election";
314 } else if (command == "ops") {
315 (void)op_tracker.dump_ops_in_flight(f.get());
316 if (f) {
317 f->flush(ss);
318 }
224ce89b
WB
319 } else if (command == "sessions") {
320
321 if (f) {
322 f->open_array_section("sessions");
323 for (auto p : session_map.sessions) {
324 f->dump_stream("session") << *p;
325 }
326 f->close_section();
327 f->flush(ss);
328 }
329
7c673cae
FG
330 } else {
331 assert(0 == "bad AdminSocket command binding");
332 }
333 (read_only ? audit_clog->debug() : audit_clog->info())
334 << "from='admin socket' "
335 << "entity='admin socket' "
336 << "cmd=" << command << " "
337 << "args=" << args << ": finished";
338 return;
339
340abort:
341 (read_only ? audit_clog->debug() : audit_clog->info())
342 << "from='admin socket' "
343 << "entity='admin socket' "
344 << "cmd=" << command << " "
345 << "args=" << args << ": aborted";
346}
347
348void Monitor::handle_signal(int signum)
349{
350 assert(signum == SIGINT || signum == SIGTERM);
351 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
352 shutdown();
353}
354
355CompatSet Monitor::get_initial_supported_features()
356{
357 CompatSet::FeatureSet ceph_mon_feature_compat;
358 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
359 CompatSet::FeatureSet ceph_mon_feature_incompat;
360 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
361 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
362 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
363 ceph_mon_feature_incompat);
364}
365
366CompatSet Monitor::get_supported_features()
367{
368 CompatSet compat = get_initial_supported_features();
369 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
370 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
371 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
372 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
373 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
374 return compat;
375}
376
377CompatSet Monitor::get_legacy_features()
378{
379 CompatSet::FeatureSet ceph_mon_feature_compat;
380 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
381 CompatSet::FeatureSet ceph_mon_feature_incompat;
382 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
383 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
384 ceph_mon_feature_incompat);
385}
386
387int Monitor::check_features(MonitorDBStore *store)
388{
389 CompatSet required = get_supported_features();
390 CompatSet ondisk;
391
392 read_features_off_disk(store, &ondisk);
393
394 if (!required.writeable(ondisk)) {
395 CompatSet diff = required.unsupported(ondisk);
396 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
397 return -EPERM;
398 }
399
400 return 0;
401}
402
403void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
404{
405 bufferlist featuresbl;
406 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
407 if (featuresbl.length() == 0) {
408 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
409 << "Assuming it is old-style and introducing one." << dendl;
410 //we only want the baseline ~v.18 features assumed to be on disk.
411 //If new features are introduced this code needs to disappear or
412 //be made smarter.
413 *features = get_legacy_features();
414
415 features->encode(featuresbl);
416 auto t(std::make_shared<MonitorDBStore::Transaction>());
417 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
418 store->apply_transaction(t);
419 } else {
420 bufferlist::iterator it = featuresbl.begin();
421 features->decode(it);
422 }
423}
424
425void Monitor::read_features()
426{
427 read_features_off_disk(store, &features);
428 dout(10) << "features " << features << dendl;
429
430 calc_quorum_requirements();
431 dout(10) << "required_features " << required_features << dendl;
432}
433
434void Monitor::write_features(MonitorDBStore::TransactionRef t)
435{
436 bufferlist bl;
437 features.encode(bl);
438 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
439}
440
441const char** Monitor::get_tracked_conf_keys() const
442{
443 static const char* KEYS[] = {
444 "crushtool", // helpful for testing
445 "mon_election_timeout",
446 "mon_lease",
447 "mon_lease_renew_interval_factor",
448 "mon_lease_ack_timeout_factor",
449 "mon_accept_timeout_factor",
450 // clog & admin clog
451 "clog_to_monitors",
452 "clog_to_syslog",
453 "clog_to_syslog_facility",
454 "clog_to_syslog_level",
455 "clog_to_graylog",
456 "clog_to_graylog_host",
457 "clog_to_graylog_port",
458 "host",
459 "fsid",
460 // periodic health to clog
461 "mon_health_to_clog",
462 "mon_health_to_clog_interval",
463 "mon_health_to_clog_tick_interval",
464 // scrub interval
465 "mon_scrub_interval",
466 NULL
467 };
468 return KEYS;
469}
470
471void Monitor::handle_conf_change(const struct md_config_t *conf,
472 const std::set<std::string> &changed)
473{
474 sanitize_options();
475
476 dout(10) << __func__ << " " << changed << dendl;
477
478 if (changed.count("clog_to_monitors") ||
479 changed.count("clog_to_syslog") ||
480 changed.count("clog_to_syslog_level") ||
481 changed.count("clog_to_syslog_facility") ||
482 changed.count("clog_to_graylog") ||
483 changed.count("clog_to_graylog_host") ||
484 changed.count("clog_to_graylog_port") ||
485 changed.count("host") ||
486 changed.count("fsid")) {
487 update_log_clients();
488 }
489
490 if (changed.count("mon_health_to_clog") ||
491 changed.count("mon_health_to_clog_interval") ||
492 changed.count("mon_health_to_clog_tick_interval")) {
493 health_to_clog_update_conf(changed);
494 }
495
496 if (changed.count("mon_scrub_interval")) {
497 scrub_update_interval(conf->mon_scrub_interval);
498 }
499}
500
501void Monitor::update_log_clients()
502{
503 map<string,string> log_to_monitors;
504 map<string,string> log_to_syslog;
505 map<string,string> log_channel;
506 map<string,string> log_prio;
507 map<string,string> log_to_graylog;
508 map<string,string> log_to_graylog_host;
509 map<string,string> log_to_graylog_port;
510 uuid_d fsid;
511 string host;
512
513 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
514 log_channel, log_prio, log_to_graylog,
515 log_to_graylog_host, log_to_graylog_port,
516 fsid, host))
517 return;
518
519 clog->update_config(log_to_monitors, log_to_syslog,
520 log_channel, log_prio, log_to_graylog,
521 log_to_graylog_host, log_to_graylog_port,
522 fsid, host);
523
524 audit_clog->update_config(log_to_monitors, log_to_syslog,
525 log_channel, log_prio, log_to_graylog,
526 log_to_graylog_host, log_to_graylog_port,
527 fsid, host);
528}
529
530int Monitor::sanitize_options()
531{
532 int r = 0;
533
534 // mon_lease must be greater than mon_lease_renewal; otherwise we
535 // may incur in leases expiring before they are renewed.
536 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
537 clog->error() << "mon_lease_renew_interval_factor ("
538 << g_conf->mon_lease_renew_interval_factor
539 << ") must be less than 1.0";
540 r = -EINVAL;
541 }
542
543 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
544 // got time to renew the lease and get an ack for it. Having both options
545 // with the same value, for a given small vale, could mean timing out if
546 // the monitors happened to be overloaded -- or even under normal load for
547 // a small enough value.
548 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
549 clog->error() << "mon_lease_ack_timeout_factor ("
550 << g_conf->mon_lease_ack_timeout_factor
551 << ") must be greater than 1.0";
552 r = -EINVAL;
553 }
554
555 return r;
556}
557
558int Monitor::preinit()
559{
560 lock.Lock();
561
562 dout(1) << "preinit fsid " << monmap->fsid << dendl;
563
564 int r = sanitize_options();
565 if (r < 0) {
566 derr << "option sanitization failed!" << dendl;
567 lock.Unlock();
568 return r;
569 }
570
571 assert(!logger);
572 {
573 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
574 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess");
575 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd");
576 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm");
577 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions");
578 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in");
579 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started");
580 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won");
581 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost");
582 logger = pcb.create_perf_counters();
583 cct->get_perfcounters_collection()->add(logger);
584 }
585
586 assert(!cluster_logger);
587 {
588 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
589 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
590 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
591 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
592 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
593 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
594 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
595 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
596 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
597 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
598 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
599 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
600 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
601 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
602 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
603 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
604 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
605 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
606 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
607 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
608 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
609 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
610 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
611 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
612 cluster_logger = pcb.create_perf_counters();
613 }
614
615 paxos->init_logger();
616
617 // verify cluster_uuid
618 {
619 int r = check_fsid();
620 if (r == -ENOENT)
621 r = write_fsid();
622 if (r < 0) {
623 lock.Unlock();
624 return r;
625 }
626 }
627
628 // open compatset
629 read_features();
630
631 // have we ever joined a quorum?
632 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
633 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
634
635 if (!has_ever_joined) {
636 // impose initial quorum restrictions?
637 list<string> initial_members;
638 get_str_list(g_conf->mon_initial_members, initial_members);
639
640 if (!initial_members.empty()) {
641 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
642
643 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
644 &extra_probe_peers);
645
646 dout(10) << " monmap is " << *monmap << dendl;
647 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
648 }
649 } else if (!monmap->contains(name)) {
650 derr << "not in monmap and have been in a quorum before; "
651 << "must have been removed" << dendl;
652 if (g_conf->mon_force_quorum_join) {
653 dout(0) << "we should have died but "
654 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
655 } else {
656 derr << "commit suicide!" << dendl;
657 lock.Unlock();
658 return -ENOENT;
659 }
660 }
661
662 {
663 // We have a potentially inconsistent store state in hands. Get rid of it
664 // and start fresh.
665 bool clear_store = false;
666 if (store->exists("mon_sync", "in_sync")) {
667 dout(1) << __func__ << " clean up potentially inconsistent store state"
668 << dendl;
669 clear_store = true;
670 }
671
672 if (store->get("mon_sync", "force_sync") > 0) {
673 dout(1) << __func__ << " force sync by clearing store state" << dendl;
674 clear_store = true;
675 }
676
677 if (clear_store) {
678 set<string> sync_prefixes = get_sync_targets_names();
679 store->clear(sync_prefixes);
680 }
681 }
682
683 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
684 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
685
686 init_paxos();
687 health_monitor->init();
688
689 if (is_keyring_required()) {
690 // we need to bootstrap authentication keys so we can form an
691 // initial quorum.
692 if (authmon()->get_last_committed() == 0) {
693 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
694 bufferlist bl;
695 int err = store->get("mkfs", "keyring", bl);
696 if (err == 0 && bl.length() > 0) {
697 // Attempt to decode and extract keyring only if it is found.
698 KeyRing keyring;
699 bufferlist::iterator p = bl.begin();
700 ::decode(keyring, p);
701 extract_save_mon_key(keyring);
702 }
703 }
704
705 string keyring_loc = g_conf->mon_data + "/keyring";
706
707 r = keyring.load(cct, keyring_loc);
708 if (r < 0) {
709 EntityName mon_name;
710 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
711 EntityAuth mon_key;
712 if (key_server.get_auth(mon_name, mon_key)) {
713 dout(1) << "copying mon. key from old db to external keyring" << dendl;
714 keyring.add(mon_name, mon_key);
715 bufferlist bl;
716 keyring.encode_plaintext(bl);
717 write_default_keyring(bl);
718 } else {
719 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
720 lock.Unlock();
721 return r;
722 }
723 }
724 }
725
726 admin_hook = new AdminHook(this);
727 AdminSocket* admin_socket = cct->get_admin_socket();
728
729 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
730 lock.Unlock();
731 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
732 "show current monitor status");
733 assert(r == 0);
734 r = admin_socket->register_command("quorum_status", "quorum_status",
735 admin_hook, "show current quorum status");
736 assert(r == 0);
737 r = admin_socket->register_command("sync_force",
738 "sync_force name=validate,"
739 "type=CephChoices,"
740 "strings=--yes-i-really-mean-it",
741 admin_hook,
742 "force sync of and clear monitor store");
743 assert(r == 0);
744 r = admin_socket->register_command("add_bootstrap_peer_hint",
745 "add_bootstrap_peer_hint name=addr,"
746 "type=CephIPAddr",
747 admin_hook,
748 "add peer address as potential bootstrap"
749 " peer for cluster bringup");
750 assert(r == 0);
751 r = admin_socket->register_command("quorum enter", "quorum enter",
752 admin_hook,
753 "force monitor back into quorum");
754 assert(r == 0);
755 r = admin_socket->register_command("quorum exit", "quorum exit",
756 admin_hook,
757 "force monitor out of the quorum");
758 assert(r == 0);
759 r = admin_socket->register_command("ops",
760 "ops",
761 admin_hook,
762 "show the ops currently in flight");
763 assert(r == 0);
224ce89b
WB
764 r = admin_socket->register_command("sessions",
765 "sessions",
766 admin_hook,
767 "list existing sessions");
768 assert(r == 0);
7c673cae
FG
769
770 lock.Lock();
771
772 // add ourselves as a conf observer
773 g_conf->add_observer(this);
774
775 lock.Unlock();
776 return 0;
777}
778
779int Monitor::init()
780{
781 dout(2) << "init" << dendl;
782 Mutex::Locker l(lock);
783
784 finisher.start();
785
786 // start ticker
787 timer.init();
788 new_tick();
789
790 cpu_tp.start();
791
792 // i'm ready!
793 messenger->add_dispatcher_tail(this);
794
795 mgr_client.init();
796 mgr_messenger->add_dispatcher_tail(&mgr_client);
797 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
798
799 bootstrap();
800
801 // encode command sets
802 const MonCommand *cmds;
803 int cmdsize;
804 get_locally_supported_monitor_commands(&cmds, &cmdsize);
805 MonCommand::encode_array(cmds, cmdsize, supported_commands_bl);
806
807 return 0;
808}
809
810void Monitor::init_paxos()
811{
812 dout(10) << __func__ << dendl;
813 paxos->init();
814
815 // init services
816 for (int i = 0; i < PAXOS_NUM; ++i) {
817 paxos_service[i]->init();
818 }
819
820 refresh_from_paxos(NULL);
821}
822
823void Monitor::refresh_from_paxos(bool *need_bootstrap)
824{
825 dout(10) << __func__ << dendl;
826
827 bufferlist bl;
828 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
829 if (r >= 0) {
830 try {
831 bufferlist::iterator p = bl.begin();
832 ::decode(fingerprint, p);
833 }
834 catch (buffer::error& e) {
835 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
836 }
837 } else {
838 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
839 }
840
841 for (int i = 0; i < PAXOS_NUM; ++i) {
842 paxos_service[i]->refresh(need_bootstrap);
843 }
844 for (int i = 0; i < PAXOS_NUM; ++i) {
845 paxos_service[i]->post_refresh();
846 }
224ce89b 847 load_metadata();
7c673cae
FG
848}
849
850void Monitor::register_cluster_logger()
851{
852 if (!cluster_logger_registered) {
853 dout(10) << "register_cluster_logger" << dendl;
854 cluster_logger_registered = true;
855 cct->get_perfcounters_collection()->add(cluster_logger);
856 } else {
857 dout(10) << "register_cluster_logger - already registered" << dendl;
858 }
859}
860
861void Monitor::unregister_cluster_logger()
862{
863 if (cluster_logger_registered) {
864 dout(10) << "unregister_cluster_logger" << dendl;
865 cluster_logger_registered = false;
866 cct->get_perfcounters_collection()->remove(cluster_logger);
867 } else {
868 dout(10) << "unregister_cluster_logger - not registered" << dendl;
869 }
870}
871
872void Monitor::update_logger()
873{
874 cluster_logger->set(l_cluster_num_mon, monmap->size());
875 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
876}
877
878void Monitor::shutdown()
879{
880 dout(1) << "shutdown" << dendl;
881
882 lock.Lock();
883
884 wait_for_paxos_write();
885
886 state = STATE_SHUTDOWN;
887
888 g_conf->remove_observer(this);
889
890 if (admin_hook) {
891 AdminSocket* admin_socket = cct->get_admin_socket();
892 admin_socket->unregister_command("mon_status");
893 admin_socket->unregister_command("quorum_status");
894 admin_socket->unregister_command("sync_force");
895 admin_socket->unregister_command("add_bootstrap_peer_hint");
896 admin_socket->unregister_command("quorum enter");
897 admin_socket->unregister_command("quorum exit");
898 admin_socket->unregister_command("ops");
224ce89b 899 admin_socket->unregister_command("sessions");
7c673cae
FG
900 delete admin_hook;
901 admin_hook = NULL;
902 }
903
904 elector.shutdown();
905
906 mgr_client.shutdown();
907
908 lock.Unlock();
909 finisher.wait_for_empty();
910 finisher.stop();
911 lock.Lock();
912
913 // clean up
914 paxos->shutdown();
915 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
916 (*p)->shutdown();
917 health_monitor->shutdown();
918
919 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
920 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
921
922 timer.shutdown();
923
924 cpu_tp.stop();
925
926 remove_all_sessions();
927
928 if (logger) {
929 cct->get_perfcounters_collection()->remove(logger);
930 delete logger;
931 logger = NULL;
932 }
933 if (cluster_logger) {
934 if (cluster_logger_registered)
935 cct->get_perfcounters_collection()->remove(cluster_logger);
936 delete cluster_logger;
937 cluster_logger = NULL;
938 }
939
940 log_client.shutdown();
941
942 // unlock before msgr shutdown...
943 lock.Unlock();
944
945 messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
946 mgr_messenger->shutdown();
947}
948
949void Monitor::wait_for_paxos_write()
950{
951 if (paxos->is_writing() || paxos->is_writing_previous()) {
952 dout(10) << __func__ << " flushing pending write" << dendl;
953 lock.Unlock();
954 store->flush();
955 lock.Lock();
956 dout(10) << __func__ << " flushed pending write" << dendl;
957 }
958}
959
960void Monitor::bootstrap()
961{
962 dout(10) << "bootstrap" << dendl;
963 wait_for_paxos_write();
964
965 sync_reset_requester();
966 unregister_cluster_logger();
967 cancel_probe_timeout();
968
969 // note my rank
970 int newrank = monmap->get_rank(messenger->get_myaddr());
971 if (newrank < 0 && rank >= 0) {
972 // was i ever part of the quorum?
973 if (has_ever_joined) {
974 dout(0) << " removed from monmap, suicide." << dendl;
975 exit(0);
976 }
977 }
978 if (newrank != rank) {
979 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
980 messenger->set_myname(entity_name_t::MON(newrank));
981 rank = newrank;
982
983 // reset all connections, or else our peers will think we are someone else.
984 messenger->mark_down_all();
985 }
986
987 // reset
988 state = STATE_PROBING;
989
990 _reset();
991
992 // sync store
993 if (g_conf->mon_compact_on_bootstrap) {
994 dout(10) << "bootstrap -- triggering compaction" << dendl;
995 store->compact();
996 dout(10) << "bootstrap -- finished compaction" << dendl;
997 }
998
999 // singleton monitor?
1000 if (monmap->size() == 1 && rank == 0) {
1001 win_standalone_election();
1002 return;
1003 }
1004
1005 reset_probe_timeout();
1006
1007 // i'm outside the quorum
1008 if (monmap->contains(name))
1009 outside_quorum.insert(name);
1010
1011 // probe monitors
1012 dout(10) << "probing other monitors" << dendl;
1013 for (unsigned i = 0; i < monmap->size(); i++) {
1014 if ((int)i != rank)
1015 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1016 monmap->get_inst(i));
1017 }
1018 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1019 p != extra_probe_peers.end();
1020 ++p) {
1021 if (*p != messenger->get_myaddr()) {
1022 entity_inst_t i;
1023 i.name = entity_name_t::MON(-1);
1024 i.addr = *p;
1025 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1026 }
1027 }
1028}
1029
1030bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1031{
1032 string addrstr;
1033 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1034 ss << "unable to parse address string value '"
1035 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1036 return false;
1037 }
1038 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1039 << addrstr << "'" << dendl;
1040
1041 entity_addr_t addr;
1042 const char *end = 0;
1043 if (!addr.parse(addrstr.c_str(), &end)) {
1044 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1045 return false;
1046 }
1047
1048 if (is_leader() || is_peon()) {
1049 ss << "mon already active; ignoring bootstrap hint";
1050 return true;
1051 }
1052
1053 if (addr.get_port() == 0)
1054 addr.set_port(CEPH_MON_PORT);
1055
1056 extra_probe_peers.insert(addr);
1057 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1058 return true;
1059}
1060
1061// called by bootstrap(), or on leader|peon -> electing
1062void Monitor::_reset()
1063{
1064 dout(10) << __func__ << dendl;
1065
1066 cancel_probe_timeout();
1067 timecheck_finish();
1068 health_events_cleanup();
1069 scrub_event_cancel();
1070
1071 leader_since = utime_t();
1072 if (!quorum.empty()) {
1073 exited_quorum = ceph_clock_now();
1074 }
1075 quorum.clear();
1076 outside_quorum.clear();
31f18b77 1077 quorum_feature_map.clear();
7c673cae
FG
1078
1079 scrub_reset();
1080
1081 paxos->restart();
1082
1083 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1084 (*p)->restart();
1085 health_monitor->finish();
1086}
1087
1088
1089// -----------------------------------------------------------
1090// sync
1091
1092set<string> Monitor::get_sync_targets_names()
1093{
1094 set<string> targets;
1095 targets.insert(paxos->get_name());
1096 for (int i = 0; i < PAXOS_NUM; ++i)
1097 paxos_service[i]->get_store_prefixes(targets);
1098 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1099 assert(config_key_service_ptr);
1100 config_key_service_ptr->get_store_prefixes(targets);
1101 return targets;
1102}
1103
1104
1105void Monitor::sync_timeout()
1106{
1107 dout(10) << __func__ << dendl;
1108 assert(state == STATE_SYNCHRONIZING);
1109 bootstrap();
1110}
1111
1112void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1113{
1114 dout(1) << __func__ << dendl;
1115
1116 MonMap latest_monmap;
1117
1118 // Grab latest monmap from MonmapMonitor
1119 bufferlist monmon_bl;
1120 int err = monmon()->get_monmap(monmon_bl);
1121 if (err < 0) {
1122 if (err != -ENOENT) {
1123 derr << __func__
1124 << " something wrong happened while reading the store: "
1125 << cpp_strerror(err) << dendl;
1126 assert(0 == "error reading the store");
1127 }
1128 } else {
1129 latest_monmap.decode(monmon_bl);
1130 }
1131
1132 // Grab last backed up monmap (if any) and compare epochs
1133 if (store->exists("mon_sync", "latest_monmap")) {
1134 bufferlist backup_bl;
1135 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1136 if (err < 0) {
1137 derr << __func__
1138 << " something wrong happened while reading the store: "
1139 << cpp_strerror(err) << dendl;
1140 assert(0 == "error reading the store");
1141 }
1142 assert(backup_bl.length() > 0);
1143
1144 MonMap backup_monmap;
1145 backup_monmap.decode(backup_bl);
1146
1147 if (backup_monmap.epoch > latest_monmap.epoch)
1148 latest_monmap = backup_monmap;
1149 }
1150
1151 // Check if our current monmap's epoch is greater than the one we've
1152 // got so far.
1153 if (monmap->epoch > latest_monmap.epoch)
1154 latest_monmap = *monmap;
1155
1156 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1157
1158 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1159}
1160
1161void Monitor::sync_reset_requester()
1162{
1163 dout(10) << __func__ << dendl;
1164
1165 if (sync_timeout_event) {
1166 timer.cancel_event(sync_timeout_event);
1167 sync_timeout_event = NULL;
1168 }
1169
1170 sync_provider = entity_inst_t();
1171 sync_cookie = 0;
1172 sync_full = false;
1173 sync_start_version = 0;
1174}
1175
1176void Monitor::sync_reset_provider()
1177{
1178 dout(10) << __func__ << dendl;
1179 sync_providers.clear();
1180}
1181
1182void Monitor::sync_start(entity_inst_t &other, bool full)
1183{
1184 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1185
1186 assert(state == STATE_PROBING ||
1187 state == STATE_SYNCHRONIZING);
1188 state = STATE_SYNCHRONIZING;
1189
1190 // make sure are not a provider for anyone!
1191 sync_reset_provider();
1192
1193 sync_full = full;
1194
1195 if (sync_full) {
1196 // stash key state, and mark that we are syncing
1197 auto t(std::make_shared<MonitorDBStore::Transaction>());
1198 sync_stash_critical_state(t);
1199 t->put("mon_sync", "in_sync", 1);
1200
1201 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1202 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1203 << sync_last_committed_floor << dendl;
1204 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1205
1206 store->apply_transaction(t);
1207
1208 assert(g_conf->mon_sync_requester_kill_at != 1);
1209
1210 // clear the underlying store
1211 set<string> targets = get_sync_targets_names();
1212 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1213 store->clear(targets);
1214
1215 // make sure paxos knows it has been reset. this prevents a
1216 // bootstrap and then different probe reply order from possibly
1217 // deciding a partial or no sync is needed.
1218 paxos->init();
1219
1220 assert(g_conf->mon_sync_requester_kill_at != 2);
1221 }
1222
1223 // assume 'other' as the leader. We will update the leader once we receive
1224 // a reply to the sync start.
1225 sync_provider = other;
1226
1227 sync_reset_timeout();
1228
1229 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1230 if (!sync_full)
1231 m->last_committed = paxos->get_version();
1232 messenger->send_message(m, sync_provider);
1233}
1234
1235void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1236{
1237 dout(10) << __func__ << dendl;
1238 bufferlist backup_monmap;
1239 sync_obtain_latest_monmap(backup_monmap);
1240 assert(backup_monmap.length() > 0);
1241 t->put("mon_sync", "latest_monmap", backup_monmap);
1242}
1243
1244void Monitor::sync_reset_timeout()
1245{
1246 dout(10) << __func__ << dendl;
1247 if (sync_timeout_event)
1248 timer.cancel_event(sync_timeout_event);
1249 sync_timeout_event = new C_MonContext(this, [this](int) {
1250 sync_timeout();
1251 });
1252 timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
1253}
1254
1255void Monitor::sync_finish(version_t last_committed)
1256{
1257 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1258
1259 assert(g_conf->mon_sync_requester_kill_at != 7);
1260
1261 if (sync_full) {
1262 // finalize the paxos commits
1263 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1264 paxos->read_and_prepare_transactions(tx, sync_start_version,
1265 last_committed);
1266 tx->put(paxos->get_name(), "last_committed", last_committed);
1267
1268 dout(30) << __func__ << " final tx dump:\n";
1269 JSONFormatter f(true);
1270 tx->dump(&f);
1271 f.flush(*_dout);
1272 *_dout << dendl;
1273
1274 store->apply_transaction(tx);
1275 }
1276
1277 assert(g_conf->mon_sync_requester_kill_at != 8);
1278
1279 auto t(std::make_shared<MonitorDBStore::Transaction>());
1280 t->erase("mon_sync", "in_sync");
1281 t->erase("mon_sync", "force_sync");
1282 t->erase("mon_sync", "last_committed_floor");
1283 store->apply_transaction(t);
1284
1285 assert(g_conf->mon_sync_requester_kill_at != 9);
1286
1287 init_paxos();
1288
1289 assert(g_conf->mon_sync_requester_kill_at != 10);
1290
1291 bootstrap();
1292}
1293
1294void Monitor::handle_sync(MonOpRequestRef op)
1295{
1296 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1297 dout(10) << __func__ << " " << *m << dendl;
1298 switch (m->op) {
1299
1300 // provider ---------
1301
1302 case MMonSync::OP_GET_COOKIE_FULL:
1303 case MMonSync::OP_GET_COOKIE_RECENT:
1304 handle_sync_get_cookie(op);
1305 break;
1306 case MMonSync::OP_GET_CHUNK:
1307 handle_sync_get_chunk(op);
1308 break;
1309
1310 // client -----------
1311
1312 case MMonSync::OP_COOKIE:
1313 handle_sync_cookie(op);
1314 break;
1315
1316 case MMonSync::OP_CHUNK:
1317 case MMonSync::OP_LAST_CHUNK:
1318 handle_sync_chunk(op);
1319 break;
1320 case MMonSync::OP_NO_COOKIE:
1321 handle_sync_no_cookie(op);
1322 break;
1323
1324 default:
1325 dout(0) << __func__ << " unknown op " << m->op << dendl;
1326 assert(0 == "unknown op");
1327 }
1328}
1329
1330// leader
1331
1332void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1333{
1334 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1335 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1336 m->get_connection()->send_message(reply);
1337}
1338
1339void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1340{
1341 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1342 if (is_synchronizing()) {
1343 _sync_reply_no_cookie(op);
1344 return;
1345 }
1346
1347 assert(g_conf->mon_sync_provider_kill_at != 1);
1348
1349 // make sure they can understand us.
1350 if ((required_features ^ m->get_connection()->get_features()) &
1351 required_features) {
1352 dout(5) << " ignoring peer mon." << m->get_source().num()
1353 << " has features " << std::hex
1354 << m->get_connection()->get_features()
1355 << " but we require " << required_features << std::dec << dendl;
1356 return;
1357 }
1358
1359 // make up a unique cookie. include election epoch (which persists
1360 // across restarts for the whole cluster) and a counter for this
1361 // process instance. there is no need to be unique *across*
1362 // monitors, though.
1363 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1364 assert(sync_providers.count(cookie) == 0);
1365
1366 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1367
1368 SyncProvider& sp = sync_providers[cookie];
1369 sp.cookie = cookie;
1370 sp.entity = m->get_source_inst();
1371 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1372
1373 set<string> sync_targets;
1374 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1375 // full scan
1376 sync_targets = get_sync_targets_names();
1377 sp.last_committed = paxos->get_version();
1378 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1379 sp.full = true;
1380 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1381 } else {
1382 // just catch up paxos
1383 sp.last_committed = m->last_committed;
1384 }
1385 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1386
1387 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1388 reply->last_committed = sp.last_committed;
1389 m->get_connection()->send_message(reply);
1390}
1391
1392void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1393{
1394 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1395 dout(10) << __func__ << " " << *m << dendl;
1396
1397 if (sync_providers.count(m->cookie) == 0) {
1398 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1399 _sync_reply_no_cookie(op);
1400 return;
1401 }
1402
1403 assert(g_conf->mon_sync_provider_kill_at != 2);
1404
1405 SyncProvider& sp = sync_providers[m->cookie];
1406 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1407
1408 if (sp.last_committed < paxos->get_first_committed() &&
1409 paxos->get_first_committed() > 1) {
1410 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1411 << " < our fc " << paxos->get_first_committed() << dendl;
1412 sync_providers.erase(m->cookie);
1413 _sync_reply_no_cookie(op);
1414 return;
1415 }
1416
1417 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1418 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1419
1420 int left = g_conf->mon_sync_max_payload_size;
1421 while (sp.last_committed < paxos->get_version() && left > 0) {
1422 bufferlist bl;
1423 sp.last_committed++;
224ce89b
WB
1424
1425 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1426 assert(err == 0);
1427
7c673cae
FG
1428 tx->put(paxos->get_name(), sp.last_committed, bl);
1429 left -= bl.length();
1430 dout(20) << __func__ << " including paxos state " << sp.last_committed
1431 << dendl;
1432 }
1433 reply->last_committed = sp.last_committed;
1434
1435 if (sp.full && left > 0) {
1436 sp.synchronizer->get_chunk_tx(tx, left);
1437 sp.last_key = sp.synchronizer->get_last_key();
1438 reply->last_key = sp.last_key;
1439 }
1440
1441 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1442 sp.last_committed < paxos->get_version()) {
1443 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1444 << " key " << sp.last_key << dendl;
1445 } else {
1446 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1447 << " key " << sp.last_key << dendl;
1448 reply->op = MMonSync::OP_LAST_CHUNK;
1449
1450 assert(g_conf->mon_sync_provider_kill_at != 3);
1451
1452 // clean up our local state
1453 sync_providers.erase(sp.cookie);
1454 }
1455
1456 ::encode(*tx, reply->chunk_bl);
1457
1458 m->get_connection()->send_message(reply);
1459}
1460
1461// requester
1462
1463void Monitor::handle_sync_cookie(MonOpRequestRef op)
1464{
1465 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1466 dout(10) << __func__ << " " << *m << dendl;
1467 if (sync_cookie) {
1468 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1469 return;
1470 }
1471 if (m->get_source_inst() != sync_provider) {
1472 dout(10) << __func__ << " source does not match, discarding" << dendl;
1473 return;
1474 }
1475 sync_cookie = m->cookie;
1476 sync_start_version = m->last_committed;
1477
1478 sync_reset_timeout();
1479 sync_get_next_chunk();
1480
1481 assert(g_conf->mon_sync_requester_kill_at != 3);
1482}
1483
1484void Monitor::sync_get_next_chunk()
1485{
1486 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1487 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1488 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1489 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1490 }
1491 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1492 messenger->send_message(r, sync_provider);
1493
1494 assert(g_conf->mon_sync_requester_kill_at != 4);
1495}
1496
1497void Monitor::handle_sync_chunk(MonOpRequestRef op)
1498{
1499 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1500 dout(10) << __func__ << " " << *m << dendl;
1501
1502 if (m->cookie != sync_cookie) {
1503 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1504 return;
1505 }
1506 if (m->get_source_inst() != sync_provider) {
1507 dout(10) << __func__ << " source does not match, discarding" << dendl;
1508 return;
1509 }
1510
1511 assert(state == STATE_SYNCHRONIZING);
1512 assert(g_conf->mon_sync_requester_kill_at != 5);
1513
1514 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1515 tx->append_from_encoded(m->chunk_bl);
1516
1517 dout(30) << __func__ << " tx dump:\n";
1518 JSONFormatter f(true);
1519 tx->dump(&f);
1520 f.flush(*_dout);
1521 *_dout << dendl;
1522
1523 store->apply_transaction(tx);
1524
1525 assert(g_conf->mon_sync_requester_kill_at != 6);
1526
1527 if (!sync_full) {
1528 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1529 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1530 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1531 m->last_committed);
1532 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1533
1534 dout(30) << __func__ << " tx dump:\n";
1535 JSONFormatter f(true);
1536 tx->dump(&f);
1537 f.flush(*_dout);
1538 *_dout << dendl;
1539
1540 store->apply_transaction(tx);
1541 paxos->init(); // to refresh what we just wrote
1542 }
1543
1544 if (m->op == MMonSync::OP_CHUNK) {
1545 sync_reset_timeout();
1546 sync_get_next_chunk();
1547 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1548 sync_finish(m->last_committed);
1549 }
1550}
1551
1552void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1553{
1554 dout(10) << __func__ << dendl;
1555 bootstrap();
1556}
1557
1558void Monitor::sync_trim_providers()
1559{
1560 dout(20) << __func__ << dendl;
1561
1562 utime_t now = ceph_clock_now();
1563 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1564 while (p != sync_providers.end()) {
1565 if (now > p->second.timeout) {
1566 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1567 sync_providers.erase(p++);
1568 } else {
1569 ++p;
1570 }
1571 }
1572}
1573
1574// ---------------------------------------------------
1575// probe
1576
1577void Monitor::cancel_probe_timeout()
1578{
1579 if (probe_timeout_event) {
1580 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1581 timer.cancel_event(probe_timeout_event);
1582 probe_timeout_event = NULL;
1583 } else {
1584 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1585 }
1586}
1587
1588void Monitor::reset_probe_timeout()
1589{
1590 cancel_probe_timeout();
1591 probe_timeout_event = new C_MonContext(this, [this](int r) {
1592 probe_timeout(r);
1593 });
1594 double t = g_conf->mon_probe_timeout;
1595 timer.add_event_after(t, probe_timeout_event);
1596 dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
1597}
1598
1599void Monitor::probe_timeout(int r)
1600{
1601 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1602 assert(is_probing() || is_synchronizing());
1603 assert(probe_timeout_event);
1604 probe_timeout_event = NULL;
1605 bootstrap();
1606}
1607
1608void Monitor::handle_probe(MonOpRequestRef op)
1609{
1610 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1611 dout(10) << "handle_probe " << *m << dendl;
1612
1613 if (m->fsid != monmap->fsid) {
1614 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1615 return;
1616 }
1617
1618 switch (m->op) {
1619 case MMonProbe::OP_PROBE:
1620 handle_probe_probe(op);
1621 break;
1622
1623 case MMonProbe::OP_REPLY:
1624 handle_probe_reply(op);
1625 break;
1626
1627 case MMonProbe::OP_MISSING_FEATURES:
1628 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1629 << ", required " << m->required_features
1630 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1631 << dendl;
1632 break;
1633 }
1634}
1635
1636void Monitor::handle_probe_probe(MonOpRequestRef op)
1637{
1638 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1639
1640 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1641 << " features " << m->get_connection()->get_features() << dendl;
1642 uint64_t missing = required_features & ~m->get_connection()->get_features();
1643 if (missing) {
1644 dout(1) << " peer " << m->get_source_addr() << " missing features "
1645 << missing << dendl;
1646 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1647 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1648 name, has_ever_joined);
1649 m->required_features = required_features;
1650 m->get_connection()->send_message(r);
1651 }
1652 goto out;
1653 }
1654
1655 if (!is_probing() && !is_synchronizing()) {
1656 // If the probing mon is way ahead of us, we need to re-bootstrap.
1657 // Normally we capture this case when we initially bootstrap, but
1658 // it is possible we pass those checks (we overlap with
1659 // quorum-to-be) but fail to join a quorum before it moves past
1660 // us. We need to be kicked back to bootstrap so we can
1661 // synchonize, not keep calling elections.
1662 if (paxos->get_version() + 1 < m->paxos_first_version) {
1663 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1664 << "ahead of us, re-bootstrapping" << dendl;
1665 bootstrap();
1666 goto out;
1667
1668 }
1669 }
1670
1671 MMonProbe *r;
1672 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1673 r->name = name;
1674 r->quorum = quorum;
1675 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1676 r->paxos_first_version = paxos->get_first_committed();
1677 r->paxos_last_version = paxos->get_version();
1678 m->get_connection()->send_message(r);
1679
1680 // did we discover a peer here?
1681 if (!monmap->contains(m->get_source_addr())) {
1682 dout(1) << " adding peer " << m->get_source_addr()
1683 << " to list of hints" << dendl;
1684 extra_probe_peers.insert(m->get_source_addr());
1685 }
1686
1687 out:
1688 return;
1689}
1690
1691void Monitor::handle_probe_reply(MonOpRequestRef op)
1692{
1693 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1694 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1695 dout(10) << " monmap is " << *monmap << dendl;
1696
1697 // discover name and addrs during probing or electing states.
1698 if (!is_probing() && !is_electing()) {
1699 return;
1700 }
1701
1702 // newer map, or they've joined a quorum and we haven't?
1703 bufferlist mybl;
1704 monmap->encode(mybl, m->get_connection()->get_features());
1705 // make sure it's actually different; the checks below err toward
1706 // taking the other guy's map, which could cause us to loop.
1707 if (!mybl.contents_equal(m->monmap_bl)) {
1708 MonMap *newmap = new MonMap;
1709 newmap->decode(m->monmap_bl);
1710 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1711 !has_ever_joined)) {
1712 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1713 << ", mine was " << monmap->get_epoch() << dendl;
1714 delete newmap;
1715 monmap->decode(m->monmap_bl);
1716
1717 bootstrap();
1718 return;
1719 }
1720 delete newmap;
1721 }
1722
1723 // rename peer?
1724 string peer_name = monmap->get_name(m->get_source_addr());
1725 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1726 dout(10) << " renaming peer " << m->get_source_addr() << " "
1727 << peer_name << " -> " << m->name << " in my monmap"
1728 << dendl;
1729 monmap->rename(peer_name, m->name);
1730
1731 if (is_electing()) {
1732 bootstrap();
1733 return;
1734 }
1735 } else {
1736 dout(10) << " peer name is " << peer_name << dendl;
1737 }
1738
1739 // new initial peer?
1740 if (monmap->get_epoch() == 0 &&
1741 monmap->contains(m->name) &&
1742 monmap->get_addr(m->name).is_blank_ip()) {
1743 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1744 monmap->set_addr(m->name, m->get_source_addr());
1745
1746 bootstrap();
1747 return;
1748 }
1749
1750 // end discover phase
1751 if (!is_probing()) {
1752 return;
1753 }
1754
1755 assert(paxos != NULL);
1756
1757 if (is_synchronizing()) {
1758 dout(10) << " currently syncing" << dendl;
1759 return;
1760 }
1761
1762 entity_inst_t other = m->get_source_inst();
1763
1764 if (m->paxos_last_version < sync_last_committed_floor) {
1765 dout(10) << " peer paxos versions [" << m->paxos_first_version
1766 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1767 << sync_last_committed_floor << ", ignoring"
1768 << dendl;
1769 } else {
1770 if (paxos->get_version() < m->paxos_first_version &&
1771 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1772 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1773 << "," << m->paxos_last_version << "]"
1774 << " vs my version " << paxos->get_version()
1775 << " (too far ahead)"
1776 << dendl;
1777 cancel_probe_timeout();
1778 sync_start(other, true);
1779 return;
1780 }
1781 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1782 dout(10) << " peer paxos last version " << m->paxos_last_version
1783 << " vs my version " << paxos->get_version()
1784 << " (too far ahead)"
1785 << dendl;
1786 cancel_probe_timeout();
1787 sync_start(other, false);
1788 return;
1789 }
1790 }
1791
1792 // is there an existing quorum?
1793 if (m->quorum.size()) {
1794 dout(10) << " existing quorum " << m->quorum << dendl;
1795
1796 dout(10) << " peer paxos version " << m->paxos_last_version
1797 << " vs my version " << paxos->get_version()
1798 << " (ok)"
1799 << dendl;
1800
1801 if (monmap->contains(name) &&
1802 !monmap->get_addr(name).is_blank_ip()) {
1803 // i'm part of the cluster; just initiate a new election
1804 start_election();
1805 } else {
1806 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1807 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1808 monmap->get_inst(*m->quorum.begin()));
1809 }
1810 } else {
1811 if (monmap->contains(m->name)) {
1812 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1813 outside_quorum.insert(m->name);
1814 } else {
1815 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1816 return;
1817 }
1818
1819 unsigned need = monmap->size() / 2 + 1;
1820 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1821 if (outside_quorum.size() >= need) {
1822 if (outside_quorum.count(name)) {
1823 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1824 start_election();
1825 } else {
1826 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1827 }
1828 } else {
1829 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1830 }
1831 }
1832}
1833
1834void Monitor::join_election()
1835{
1836 dout(10) << __func__ << dendl;
1837 wait_for_paxos_write();
1838 _reset();
1839 state = STATE_ELECTING;
1840
1841 logger->inc(l_mon_num_elections);
1842}
1843
1844void Monitor::start_election()
1845{
1846 dout(10) << "start_election" << dendl;
1847 wait_for_paxos_write();
1848 _reset();
1849 state = STATE_ELECTING;
1850
1851 logger->inc(l_mon_num_elections);
1852 logger->inc(l_mon_election_call);
1853
1854 clog->info() << "mon." << name << " calling new monitor election";
1855 elector.call_election();
1856}
1857
1858void Monitor::win_standalone_election()
1859{
1860 dout(1) << "win_standalone_election" << dendl;
1861
1862 // bump election epoch, in case the previous epoch included other
1863 // monitors; we need to be able to make the distinction.
1864 elector.init();
1865 elector.advance_epoch();
1866
1867 rank = monmap->get_rank(name);
1868 assert(rank == 0);
1869 set<int> q;
1870 q.insert(rank);
1871
224ce89b
WB
1872 map<int,Metadata> metadata;
1873 collect_metadata(&metadata[0]);
1874
1875 const MonCommand *my_cmds = nullptr;
1876 int cmdsize = 0;
7c673cae
FG
1877 get_locally_supported_monitor_commands(&my_cmds, &cmdsize);
1878 win_election(elector.get_epoch(), q,
1879 CEPH_FEATURES_ALL,
1880 ceph::features::mon::get_supported(),
224ce89b 1881 metadata,
7c673cae
FG
1882 my_cmds, cmdsize);
1883}
1884
1885const utime_t& Monitor::get_leader_since() const
1886{
1887 assert(state == STATE_LEADER);
1888 return leader_since;
1889}
1890
1891epoch_t Monitor::get_epoch()
1892{
1893 return elector.get_epoch();
1894}
1895
1896void Monitor::_finish_svc_election()
1897{
1898 assert(state == STATE_LEADER || state == STATE_PEON);
1899
1900 for (auto p : paxos_service) {
1901 // we already called election_finished() on monmon(); avoid callig twice
1902 if (state == STATE_LEADER && p == monmon())
1903 continue;
1904 p->election_finished();
1905 }
1906}
1907
1908void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1909 const mon_feature_t& mon_features,
224ce89b 1910 const map<int,Metadata>& metadata,
7c673cae
FG
1911 const MonCommand *cmdset, int cmdsize)
1912{
1913 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1914 << " features " << features
1915 << " mon_features " << mon_features
1916 << dendl;
1917 assert(is_electing());
1918 state = STATE_LEADER;
1919 leader_since = ceph_clock_now();
1920 leader = rank;
1921 quorum = active;
1922 quorum_con_features = features;
1923 quorum_mon_features = mon_features;
224ce89b 1924 pending_metadata = metadata;
7c673cae
FG
1925 outside_quorum.clear();
1926
1927 clog->info() << "mon." << name << "@" << rank
1928 << " won leader election with quorum " << quorum;
1929
1930 set_leader_supported_commands(cmdset, cmdsize);
1931
1932 paxos->leader_init();
1933 // NOTE: tell monmap monitor first. This is important for the
1934 // bootstrap case to ensure that the very first paxos proposal
1935 // codifies the monmap. Otherwise any manner of chaos can ensue
1936 // when monitors are call elections or participating in a paxos
1937 // round without agreeing on who the participants are.
1938 monmon()->election_finished();
1939 _finish_svc_election();
1940 health_monitor->start(epoch);
1941
1942 logger->inc(l_mon_election_win);
1943
224ce89b
WB
1944 // inject new metadata in first transaction.
1945 {
1946 // include previous metadata for missing mons (that aren't part of
1947 // the current quorum).
1948 map<int,Metadata> m = metadata;
1949 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1950 if (m.count(rank) == 0 &&
1951 mon_metadata.count(rank)) {
1952 m[rank] = mon_metadata[rank];
1953 }
1954 }
1955
1956 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1957 // a new transaction immediately after the election finishes. We should
1958 // do that anyway for other reasons, though.
1959 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1960 bufferlist bl;
1961 ::encode(m, bl);
1962 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1963 }
1964
7c673cae
FG
1965 finish_election();
1966 if (monmap->size() > 1 &&
1967 monmap->get_epoch() > 0) {
1968 timecheck_start();
1969 health_tick_start();
1970 do_health_to_clog_interval();
1971 scrub_event_start();
1972 }
7c673cae
FG
1973}
1974
1975void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1976 uint64_t features,
1977 const mon_feature_t& mon_features)
1978{
1979 state = STATE_PEON;
1980 leader_since = utime_t();
1981 leader = l;
1982 quorum = q;
1983 outside_quorum.clear();
1984 quorum_con_features = features;
1985 quorum_mon_features = mon_features;
1986 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
1987 << " quorum is " << quorum << " features are " << quorum_con_features
1988 << " mon_features are " << quorum_mon_features
1989 << dendl;
1990
1991 paxos->peon_init();
1992 _finish_svc_election();
1993 health_monitor->start(epoch);
1994
1995 logger->inc(l_mon_election_lose);
1996
1997 finish_election();
1998
224ce89b
WB
1999 if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
2000 !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
2001 // for pre-luminous mons only
7c673cae 2002 Metadata sys_info;
224ce89b 2003 collect_metadata(&sys_info);
7c673cae
FG
2004 messenger->send_message(new MMonMetadata(sys_info),
2005 monmap->get_inst(get_leader()));
2006 }
2007}
2008
224ce89b
WB
2009void Monitor::collect_metadata(Metadata *m)
2010{
2011 collect_sys_info(m, g_ceph_context);
2012 (*m)["addr"] = stringify(messenger->get_myaddr());
2013}
2014
7c673cae
FG
2015void Monitor::finish_election()
2016{
2017 apply_quorum_to_compatset_features();
2018 apply_monmap_to_compatset_features();
2019 timecheck_finish();
2020 exited_quorum = utime_t();
2021 finish_contexts(g_ceph_context, waitfor_quorum);
2022 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2023 resend_routed_requests();
2024 update_logger();
2025 register_cluster_logger();
2026
2027 // am i named properly?
2028 string cur_name = monmap->get_name(messenger->get_myaddr());
2029 if (cur_name != name) {
2030 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2031 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2032 monmap->get_inst(*quorum.begin()));
2033 }
2034}
2035
2036void Monitor::_apply_compatset_features(CompatSet &new_features)
2037{
2038 if (new_features.compare(features) != 0) {
2039 CompatSet diff = features.unsupported(new_features);
2040 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2041 features = new_features;
2042
2043 auto t = std::make_shared<MonitorDBStore::Transaction>();
2044 write_features(t);
2045 store->apply_transaction(t);
2046
2047 calc_quorum_requirements();
2048 }
2049}
2050
2051void Monitor::apply_quorum_to_compatset_features()
2052{
2053 CompatSet new_features(features);
2054 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2055 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2056 }
2057 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2058 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2059 }
2060 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2061 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2062 }
2063 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2064 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2065 }
2066 dout(5) << __func__ << dendl;
2067 _apply_compatset_features(new_features);
2068}
2069
2070void Monitor::apply_monmap_to_compatset_features()
2071{
2072 CompatSet new_features(features);
2073 mon_feature_t monmap_features = monmap->get_required_features();
2074
2075 /* persistent monmap features may go into the compatset.
2076 * optional monmap features may not - why?
2077 * because optional monmap features may be set/unset by the admin,
2078 * and possibly by other means that haven't yet been thought out,
2079 * so we can't make the monitor enforce them on start - because they
2080 * may go away.
2081 * this, of course, does not invalidate setting a compatset feature
2082 * for an optional feature - as long as you make sure to clean it up
2083 * once you unset it.
2084 */
2085 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2086 assert(ceph::features::mon::get_persistent().contains_all(
2087 ceph::features::mon::FEATURE_KRAKEN));
2088 // this feature should only ever be set if the quorum supports it.
2089 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2090 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2091 }
2092
2093 dout(5) << __func__ << dendl;
2094 _apply_compatset_features(new_features);
2095}
2096
2097void Monitor::calc_quorum_requirements()
2098{
2099 required_features = 0;
2100
2101 // compatset
2102 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2103 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2104 }
2105 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2106 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2107 }
2108 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2109 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2110 }
2111 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2112 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2113 }
2114 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2115 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2116 }
2117
2118 // monmap
2119 if (monmap->get_required_features().contains_all(
2120 ceph::features::mon::FEATURE_KRAKEN)) {
2121 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2122 }
2123 if (monmap->get_required_features().contains_all(
2124 ceph::features::mon::FEATURE_LUMINOUS)) {
2125 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2126 }
2127 dout(10) << __func__ << " required_features " << required_features << dendl;
2128}
2129
31f18b77
FG
2130void Monitor::get_combined_feature_map(FeatureMap *fm)
2131{
2132 *fm += session_map.feature_map;
2133 for (auto id : quorum) {
2134 if (id != rank) {
2135 *fm += quorum_feature_map[id];
2136 }
2137 }
2138}
2139
7c673cae
FG
2140void Monitor::sync_force(Formatter *f, ostream& ss)
2141{
2142 bool free_formatter = false;
2143
2144 if (!f) {
2145 // louzy/lazy hack: default to json if no formatter has been defined
2146 f = new JSONFormatter();
2147 free_formatter = true;
2148 }
2149
2150 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2151 sync_stash_critical_state(tx);
2152 tx->put("mon_sync", "force_sync", 1);
2153 store->apply_transaction(tx);
2154
2155 f->open_object_section("sync_force");
2156 f->dump_int("ret", 0);
2157 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2158 f->close_section(); // sync_force
2159 f->flush(ss);
2160 if (free_formatter)
2161 delete f;
2162}
2163
2164void Monitor::_quorum_status(Formatter *f, ostream& ss)
2165{
2166 bool free_formatter = false;
2167
2168 if (!f) {
2169 // louzy/lazy hack: default to json if no formatter has been defined
2170 f = new JSONFormatter();
2171 free_formatter = true;
2172 }
2173 f->open_object_section("quorum_status");
2174 f->dump_int("election_epoch", get_epoch());
2175
2176 f->open_array_section("quorum");
2177 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2178 f->dump_int("mon", *p);
2179 f->close_section(); // quorum
2180
2181 list<string> quorum_names = get_quorum_names();
2182 f->open_array_section("quorum_names");
2183 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2184 f->dump_string("mon", *p);
2185 f->close_section(); // quorum_names
2186
2187 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2188
2189 f->open_object_section("monmap");
2190 monmap->dump(f);
2191 f->close_section(); // monmap
2192
2193 f->close_section(); // quorum_status
2194 f->flush(ss);
2195 if (free_formatter)
2196 delete f;
2197}
2198
2199void Monitor::get_mon_status(Formatter *f, ostream& ss)
2200{
2201 bool free_formatter = false;
2202
2203 if (!f) {
2204 // louzy/lazy hack: default to json if no formatter has been defined
2205 f = new JSONFormatter();
2206 free_formatter = true;
2207 }
2208
2209 f->open_object_section("mon_status");
2210 f->dump_string("name", name);
2211 f->dump_int("rank", rank);
2212 f->dump_string("state", get_state_name());
2213 f->dump_int("election_epoch", get_epoch());
2214
2215 f->open_array_section("quorum");
2216 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2217 f->dump_int("mon", *p);
2218 }
2219
2220 f->close_section(); // quorum
2221
2222 f->open_object_section("features");
2223 f->dump_stream("required_con") << required_features;
2224 mon_feature_t req_mon_features = get_required_mon_features();
2225 req_mon_features.dump(f, "required_mon");
2226 f->dump_stream("quorum_con") << quorum_con_features;
2227 quorum_mon_features.dump(f, "quorum_mon");
2228 f->close_section(); // features
2229
2230 f->open_array_section("outside_quorum");
2231 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2232 f->dump_string("mon", *p);
2233 f->close_section(); // outside_quorum
2234
2235 f->open_array_section("extra_probe_peers");
2236 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2237 p != extra_probe_peers.end();
2238 ++p)
2239 f->dump_stream("peer") << *p;
2240 f->close_section(); // extra_probe_peers
2241
2242 f->open_array_section("sync_provider");
2243 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2244 p != sync_providers.end();
2245 ++p) {
2246 f->dump_unsigned("cookie", p->second.cookie);
2247 f->dump_stream("entity") << p->second.entity;
2248 f->dump_stream("timeout") << p->second.timeout;
2249 f->dump_unsigned("last_committed", p->second.last_committed);
2250 f->dump_stream("last_key") << p->second.last_key;
2251 }
2252 f->close_section();
2253
2254 if (is_synchronizing()) {
2255 f->open_object_section("sync");
2256 f->dump_stream("sync_provider") << sync_provider;
2257 f->dump_unsigned("sync_cookie", sync_cookie);
2258 f->dump_unsigned("sync_start_version", sync_start_version);
2259 f->close_section();
2260 }
2261
2262 if (g_conf->mon_sync_provider_kill_at > 0)
2263 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2264 if (g_conf->mon_sync_requester_kill_at > 0)
2265 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2266
2267 f->open_object_section("monmap");
2268 monmap->dump(f);
2269 f->close_section();
2270
31f18b77 2271 f->dump_object("feature_map", session_map.feature_map);
7c673cae
FG
2272 f->close_section(); // mon_status
2273
2274 if (free_formatter) {
2275 // flush formatter to ss and delete it iff we created the formatter
2276 f->flush(ss);
2277 delete f;
2278 }
2279}
2280
2281
2282// health status to clog
2283
2284void Monitor::health_tick_start()
2285{
2286 if (!cct->_conf->mon_health_to_clog ||
2287 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2288 return;
2289
2290 dout(15) << __func__ << dendl;
2291
2292 health_tick_stop();
2293 health_tick_event = new C_MonContext(this, [this](int r) {
2294 if (r < 0)
2295 return;
2296 do_health_to_clog();
2297 health_tick_start();
2298 });
2299 timer.add_event_after(cct->_conf->mon_health_to_clog_tick_interval,
2300 health_tick_event);
2301}
2302
2303void Monitor::health_tick_stop()
2304{
2305 dout(15) << __func__ << dendl;
2306
2307 if (health_tick_event) {
2308 timer.cancel_event(health_tick_event);
2309 health_tick_event = NULL;
2310 }
2311}
2312
2313utime_t Monitor::health_interval_calc_next_update()
2314{
2315 utime_t now = ceph_clock_now();
2316
2317 time_t secs = now.sec();
2318 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2319 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2320 utime_t next = utime_t(secs + adjustment, 0);
2321
2322 dout(20) << __func__
2323 << " now: " << now << ","
2324 << " next: " << next << ","
2325 << " interval: " << cct->_conf->mon_health_to_clog_interval
2326 << dendl;
2327
2328 return next;
2329}
2330
2331void Monitor::health_interval_start()
2332{
2333 dout(15) << __func__ << dendl;
2334
2335 if (!cct->_conf->mon_health_to_clog ||
2336 cct->_conf->mon_health_to_clog_interval <= 0) {
2337 return;
2338 }
2339
2340 health_interval_stop();
2341 utime_t next = health_interval_calc_next_update();
2342 health_interval_event = new C_MonContext(this, [this](int r) {
2343 if (r < 0)
2344 return;
2345 do_health_to_clog_interval();
2346 });
2347 timer.add_event_at(next, health_interval_event);
2348}
2349
2350void Monitor::health_interval_stop()
2351{
2352 dout(15) << __func__ << dendl;
2353 if (health_interval_event) {
2354 timer.cancel_event(health_interval_event);
2355 }
2356 health_interval_event = NULL;
2357}
2358
2359void Monitor::health_events_cleanup()
2360{
2361 health_tick_stop();
2362 health_interval_stop();
2363 health_status_cache.reset();
2364}
2365
2366void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2367{
2368 dout(20) << __func__ << dendl;
2369
2370 if (changed.count("mon_health_to_clog")) {
2371 if (!cct->_conf->mon_health_to_clog) {
2372 health_events_cleanup();
2373 } else {
2374 if (!health_tick_event) {
2375 health_tick_start();
2376 }
2377 if (!health_interval_event) {
2378 health_interval_start();
2379 }
2380 }
2381 }
2382
2383 if (changed.count("mon_health_to_clog_interval")) {
2384 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2385 health_interval_stop();
2386 } else {
2387 health_interval_start();
2388 }
2389 }
2390
2391 if (changed.count("mon_health_to_clog_tick_interval")) {
2392 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2393 health_tick_stop();
2394 } else {
2395 health_tick_start();
2396 }
2397 }
2398}
2399
2400void Monitor::do_health_to_clog_interval()
2401{
2402 // outputting to clog may have been disabled in the conf
2403 // since we were scheduled.
2404 if (!cct->_conf->mon_health_to_clog ||
2405 cct->_conf->mon_health_to_clog_interval <= 0)
2406 return;
2407
2408 dout(10) << __func__ << dendl;
2409
2410 // do we have a cached value for next_clog_update? if not,
2411 // do we know when the last update was?
2412
2413 do_health_to_clog(true);
2414 health_interval_start();
2415}
2416
2417void Monitor::do_health_to_clog(bool force)
2418{
2419 // outputting to clog may have been disabled in the conf
2420 // since we were scheduled.
2421 if (!cct->_conf->mon_health_to_clog ||
2422 cct->_conf->mon_health_to_clog_interval <= 0)
2423 return;
2424
2425 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2426
224ce89b
WB
2427 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2428 string summary;
2429 health_status_t level = get_health_status(false, nullptr, &summary);
2430 if (!force &&
2431 summary == health_status_cache.summary &&
2432 level == health_status_cache.overall)
2433 return;
2434 if (level == HEALTH_OK)
2435 clog->info() << "overall " << summary;
2436 else if (level == HEALTH_WARN)
2437 clog->warn() << "overall " << summary;
2438 else if (level == HEALTH_ERR)
2439 clog->error() << "overall " << summary;
2440 else
2441 ceph_abort();
2442 health_status_cache.summary = summary;
2443 health_status_cache.overall = level;
2444 } else {
2445 // for jewel only
2446 list<string> status;
2447 health_status_t overall = get_health(status, NULL, NULL);
2448 dout(25) << __func__
2449 << (force ? " (force)" : "")
2450 << dendl;
2451
2452 string summary = joinify(status.begin(), status.end(), string("; "));
7c673cae 2453
224ce89b
WB
2454 if (!force &&
2455 overall == health_status_cache.overall &&
2456 !health_status_cache.summary.empty() &&
2457 health_status_cache.summary == summary) {
2458 // we got a dup!
2459 return;
2460 }
2461
2462 clog->info() << summary;
2463
2464 health_status_cache.overall = overall;
2465 health_status_cache.summary = summary;
2466 }
2467}
2468
2469health_status_t Monitor::get_health_status(
2470 bool want_detail,
2471 Formatter *f,
2472 std::string *plain,
2473 const char *sep1,
2474 const char *sep2)
2475{
2476 health_status_t r = HEALTH_OK;
2477 bool compat = g_conf->mon_health_preluminous_compat;
2478 if (f) {
2479 f->open_object_section("health");
2480 f->open_object_section("checks");
2481 }
7c673cae 2482
224ce89b
WB
2483 string summary;
2484 string *psummary = f ? nullptr : &summary;
2485 for (auto& svc : paxos_service) {
2486 r = std::min(r, svc->get_health_checks().dump_summary(
2487 f, psummary, sep2, want_detail));
2488 }
7c673cae 2489
224ce89b
WB
2490 if (f) {
2491 f->close_section();
2492 f->dump_stream("status") << r;
2493 } else {
2494 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2495 *plain = stringify(r);
2496 if (summary.size()) {
2497 *plain += sep1;
2498 *plain += summary;
2499 }
2500 *plain += "\n";
2501 }
2502
2503 if (f && compat) {
2504 f->open_array_section("summary");
2505 for (auto& svc : paxos_service) {
2506 svc->get_health_checks().dump_summary_compat(f);
2507 }
2508 f->close_section();
2509 f->dump_stream("overall_status") << r;
2510 }
2511
2512 if (want_detail) {
2513 if (f && compat) {
2514 f->open_array_section("detail");
2515 }
2516
2517 for (auto& svc : paxos_service) {
2518 svc->get_health_checks().dump_detail(f, plain, compat);
2519 }
2520
2521 if (f && compat) {
2522 f->close_section();
2523 }
2524 }
2525 if (f) {
2526 f->close_section();
2527 }
2528 return r;
2529}
2530
2531void Monitor::log_health(
2532 const health_check_map_t& updated,
2533 const health_check_map_t& previous,
2534 MonitorDBStore::TransactionRef t)
2535{
2536 if (!g_conf->mon_health_to_clog) {
7c673cae
FG
2537 return;
2538 }
224ce89b
WB
2539 // FIXME: log atomically as part of @t instead of using clog.
2540 dout(10) << __func__ << " updated " << updated.checks.size()
2541 << " previous " << previous.checks.size()
2542 << dendl;
2543 for (auto& p : updated.checks) {
2544 auto q = previous.checks.find(p.first);
2545 if (q == previous.checks.end()) {
2546 // new
2547 ostringstream ss;
2548 ss << "Health check failed: " << p.second.summary << " ("
2549 << p.first << ")";
2550 if (p.second.severity == HEALTH_WARN)
2551 clog->warn() << ss.str();
2552 else
2553 clog->error() << ss.str();
2554 } else {
2555 if (p.second.summary != q->second.summary ||
2556 p.second.severity != q->second.severity) {
2557 // summary or severity changed (ignore detail changes at this level)
2558 ostringstream ss;
2559 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2560 if (p.second.severity == HEALTH_WARN)
2561 clog->warn() << ss.str();
2562 else
2563 clog->error() << ss.str();
2564 }
2565 }
2566 }
2567 for (auto& p : previous.checks) {
2568 if (!updated.checks.count(p.first)) {
2569 // cleared
2570 ostringstream ss;
2571 if (p.first == "DEGRADED_OBJECTS") {
2572 clog->info() << "All degraded objects recovered";
2573 } else if (p.first == "OSD_FLAGS") {
2574 clog->info() << "OSD flags cleared";
2575 } else {
2576 clog->info() << "Health check cleared: " << p.first << " (was: "
2577 << p.second.summary << ")";
2578 }
2579 }
2580 }
7c673cae 2581
224ce89b
WB
2582 if (previous.checks.size() && updated.checks.size() == 0) {
2583 // We might be going into a fully healthy state, check
2584 // other subsystems
2585 bool any_checks = false;
2586 for (auto& svc : paxos_service) {
2587 if (&(svc->get_health_checks()) == &(previous)) {
2588 // Ignore the ones we're clearing right now
2589 continue;
2590 }
7c673cae 2591
224ce89b
WB
2592 if (svc->get_health_checks().checks.size() > 0) {
2593 any_checks = true;
2594 break;
2595 }
2596 }
2597 if (!any_checks) {
2598 clog->info() << "Cluster is now healthy";
2599 }
2600 }
7c673cae
FG
2601}
2602
2603health_status_t Monitor::get_health(list<string>& status,
2604 bufferlist *detailbl,
2605 Formatter *f)
2606{
2607 list<pair<health_status_t,string> > summary;
2608 list<pair<health_status_t,string> > detail;
2609
2610 if (f)
2611 f->open_object_section("health");
2612
2613 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2614 p != paxos_service.end();
2615 ++p) {
2616 PaxosService *s = *p;
2617 s->get_health(summary, detailbl ? &detail : NULL, cct);
2618 }
2619
224ce89b 2620 health_monitor->get_health(summary, (detailbl ? &detail : NULL));
7c673cae
FG
2621
2622 health_status_t overall = HEALTH_OK;
2623 if (!timecheck_skews.empty()) {
2624 list<string> warns;
7c673cae
FG
2625 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2626 i != timecheck_skews.end(); ++i) {
2627 entity_inst_t inst = i->first;
2628 double skew = i->second;
2629 double latency = timecheck_latencies[inst];
2630 string name = monmap->get_name(inst.addr);
7c673cae
FG
2631 ostringstream tcss;
2632 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2633 if (tcstatus != HEALTH_OK) {
2634 if (overall > tcstatus)
2635 overall = tcstatus;
2636 warns.push_back(name);
7c673cae
FG
2637 ostringstream tmp_ss;
2638 tmp_ss << "mon." << name
2639 << " addr " << inst.addr << " " << tcss.str()
2640 << " (latency " << latency << "s)";
2641 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2642 }
7c673cae
FG
2643 }
2644 if (!warns.empty()) {
2645 ostringstream ss;
2646 ss << "clock skew detected on";
2647 while (!warns.empty()) {
2648 ss << " mon." << warns.front();
2649 warns.pop_front();
2650 if (!warns.empty())
2651 ss << ",";
2652 }
2653 status.push_back(ss.str());
2654 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2655 }
7c673cae 2656 }
7c673cae
FG
2657
2658 if (f)
2659 f->open_array_section("summary");
2660 if (!summary.empty()) {
2661 while (!summary.empty()) {
2662 if (overall > summary.front().first)
2663 overall = summary.front().first;
2664 status.push_back(summary.front().second);
2665 if (f) {
2666 f->open_object_section("item");
2667 f->dump_stream("severity") << summary.front().first;
2668 f->dump_string("summary", summary.front().second);
2669 f->close_section();
2670 }
2671 summary.pop_front();
2672 }
2673 }
2674 if (f)
2675 f->close_section();
2676
2677 stringstream fss;
2678 fss << overall;
2679 status.push_front(fss.str());
2680 if (f)
2681 f->dump_stream("overall_status") << overall;
2682
2683 if (f)
2684 f->open_array_section("detail");
2685 while (!detail.empty()) {
2686 if (f)
2687 f->dump_string("item", detail.front().second);
2688 else if (detailbl != NULL) {
2689 detailbl->append(detail.front().second);
2690 detailbl->append('\n');
2691 }
2692 detail.pop_front();
2693 }
2694 if (f)
2695 f->close_section();
2696
2697 if (f)
2698 f->close_section();
2699
2700 return overall;
2701}
2702
2703void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2704{
2705 if (f)
2706 f->open_object_section("status");
2707
7c673cae
FG
2708 if (f) {
2709 f->dump_stream("fsid") << monmap->get_fsid();
224ce89b 2710 get_health_status(false, f, nullptr);
7c673cae
FG
2711 f->dump_unsigned("election_epoch", get_epoch());
2712 {
2713 f->open_array_section("quorum");
2714 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2715 f->dump_int("rank", *p);
2716 f->close_section();
2717 f->open_array_section("quorum_names");
2718 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2719 f->dump_string("id", monmap->get_name(*p));
2720 f->close_section();
2721 }
2722 f->open_object_section("monmap");
2723 monmap->dump(f);
2724 f->close_section();
2725 f->open_object_section("osdmap");
224ce89b 2726 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
7c673cae
FG
2727 f->close_section();
2728 f->open_object_section("pgmap");
31f18b77 2729 pgservice->print_summary(f, NULL);
7c673cae
FG
2730 f->close_section();
2731 f->open_object_section("fsmap");
2732 mdsmon()->get_fsmap().print_summary(f, NULL);
2733 f->close_section();
7c673cae
FG
2734 f->open_object_section("mgrmap");
2735 mgrmon()->get_map().print_summary(f, nullptr);
2736 f->close_section();
224ce89b
WB
2737
2738 f->dump_object("servicemap", mgrstatmon()->get_service_map());
7c673cae
FG
2739 f->close_section();
2740 } else {
31f18b77
FG
2741 ss << " cluster:\n";
2742 ss << " id: " << monmap->get_fsid() << "\n";
224ce89b
WB
2743
2744 string health;
2745 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2746 get_health_status(false, nullptr, &health,
2747 "\n ", "\n ");
2748 } else {
2749 list<string> ls;
2750 get_health(ls, NULL, f);
2751 health = joinify(ls.begin(), ls.end(),
2752 string("\n "));
2753 }
2754 ss << " health: " << health << "\n";
2755
31f18b77 2756 ss << "\n \n services:\n";
224ce89b
WB
2757 {
2758 size_t maxlen = 3;
2759 auto& service_map = mgrstatmon()->get_service_map();
2760 for (auto& p : service_map.services) {
2761 maxlen = std::max(maxlen, p.first.size());
2762 }
2763 string spacing(maxlen - 3, ' ');
2764 const auto quorum_names = get_quorum_names();
2765 const auto mon_count = monmap->mon_info.size();
2766 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2767 << quorum_names;
2768 if (quorum_names.size() != mon_count) {
2769 std::list<std::string> out_of_q;
2770 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2771 if (quorum.count(i) == 0) {
2772 out_of_q.push_back(monmap->ranks[i]);
2773 }
2774 }
2775 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2776 out_of_q.end(), std::string(", "));
31f18b77 2777 }
7c673cae 2778 ss << "\n";
224ce89b
WB
2779 if (mgrmon()->in_use()) {
2780 ss << " mgr: " << spacing;
2781 mgrmon()->get_map().print_summary(nullptr, &ss);
2782 ss << "\n";
2783 }
2784 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2785 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2786 }
2787 ss << " osd: " << spacing;
2788 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2789 ss << "\n";
2790 for (auto& p : service_map.services) {
2791 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2792 << p.second.get_summary() << "\n";
2793 }
7c673cae 2794 }
31f18b77
FG
2795
2796 ss << "\n \n data:\n";
2797 pgservice->print_summary(NULL, &ss);
2798 ss << "\n ";
7c673cae
FG
2799 }
2800}
2801
2802void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2803 map<string,string> &param_str_map)
2804{
2805 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2806 p != cmdmap.end(); ++p) {
2807 if (p->first == "prefix")
2808 continue;
2809 if (p->first == "caps") {
2810 vector<string> cv;
2811 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2812 cv.size() % 2 == 0) {
2813 for (unsigned i = 0; i < cv.size(); i += 2) {
2814 string k = string("caps_") + cv[i];
2815 param_str_map[k] = cv[i + 1];
2816 }
2817 continue;
2818 }
2819 }
2820 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2821 }
2822}
2823
2824const MonCommand *Monitor::_get_moncommand(const string &cmd_prefix,
2825 MonCommand *cmds, int cmds_size)
2826{
2827 MonCommand *this_cmd = NULL;
2828 for (MonCommand *cp = cmds;
2829 cp < &cmds[cmds_size]; cp++) {
2830 if (cp->cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2831 this_cmd = cp;
2832 break;
2833 }
2834 }
2835 return this_cmd;
2836}
2837
2838bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2839 const map<string,cmd_vartype>& cmdmap,
2840 const map<string,string>& param_str_map,
2841 const MonCommand *this_cmd) {
2842
2843 bool cmd_r = this_cmd->requires_perm('r');
2844 bool cmd_w = this_cmd->requires_perm('w');
2845 bool cmd_x = this_cmd->requires_perm('x');
2846
2847 bool capable = s->caps.is_capable(
2848 g_ceph_context,
2849 CEPH_ENTITY_TYPE_MON,
2850 s->entity_name,
2851 module, prefix, param_str_map,
2852 cmd_r, cmd_w, cmd_x);
2853
2854 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2855 return capable;
2856}
2857
2858void Monitor::format_command_descriptions(const MonCommand *commands,
2859 unsigned commands_size,
2860 Formatter *f,
2861 bufferlist *rdata,
2862 bool hide_mgr_flag)
2863{
2864 int cmdnum = 0;
2865 f->open_object_section("command_descriptions");
2866 for (const MonCommand *cp = commands;
2867 cp < &commands[commands_size]; cp++) {
2868
2869 unsigned flags = cp->flags;
2870 if (hide_mgr_flag) {
2871 flags &= ~MonCommand::FLAG_MGR;
2872 }
2873 ostringstream secname;
2874 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2875 dump_cmddesc_to_json(f, secname.str(),
2876 cp->cmdstring, cp->helpstring, cp->module,
2877 cp->req_perms, cp->availability, flags);
2878 cmdnum++;
2879 }
2880 f->close_section(); // command_descriptions
2881
2882 f->flush(*rdata);
2883}
2884
2885void Monitor::get_locally_supported_monitor_commands(const MonCommand **cmds,
2886 int *count)
2887{
2888 *cmds = mon_commands;
2889 *count = ARRAY_SIZE(mon_commands);
2890}
2891void Monitor::set_leader_supported_commands(const MonCommand *cmds, int size)
2892{
2893 if (leader_supported_mon_commands != mon_commands)
2894 delete[] leader_supported_mon_commands;
2895 leader_supported_mon_commands = cmds;
2896 leader_supported_mon_commands_size = size;
2897}
2898
2899bool Monitor::is_keyring_required()
2900{
2901 string auth_cluster_required = g_conf->auth_supported.empty() ?
2902 g_conf->auth_cluster_required : g_conf->auth_supported;
2903 string auth_service_required = g_conf->auth_supported.empty() ?
2904 g_conf->auth_service_required : g_conf->auth_supported;
2905
2906 return auth_service_required == "cephx" ||
2907 auth_cluster_required == "cephx";
2908}
2909
2910struct C_MgrProxyCommand : public Context {
2911 Monitor *mon;
2912 MonOpRequestRef op;
2913 uint64_t size;
2914 bufferlist outbl;
2915 string outs;
2916 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2917 : mon(mon), op(op), size(s) { }
2918 void finish(int r) {
2919 Mutex::Locker l(mon->lock);
2920 mon->mgr_proxy_bytes -= size;
2921 mon->reply_command(op, r, outs, outbl, 0);
2922 }
2923};
2924
2925void Monitor::handle_command(MonOpRequestRef op)
2926{
2927 assert(op->is_type_command());
2928 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2929 if (m->fsid != monmap->fsid) {
2930 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2931 reply_command(op, -EPERM, "wrong fsid", 0);
2932 return;
2933 }
2934
2935 MonSession *session = static_cast<MonSession *>(
2936 m->get_connection()->get_priv());
2937 if (!session) {
2938 dout(5) << __func__ << " dropping stray message " << *m << dendl;
2939 return;
2940 }
2941 BOOST_SCOPE_EXIT_ALL(=) {
2942 session->put();
2943 };
2944
2945 if (m->cmd.empty()) {
2946 string rs = "No command supplied";
2947 reply_command(op, -EINVAL, rs, 0);
2948 return;
2949 }
2950
2951 string prefix;
2952 vector<string> fullcmd;
2953 map<string, cmd_vartype> cmdmap;
2954 stringstream ss, ds;
2955 bufferlist rdata;
2956 string rs;
2957 int r = -EINVAL;
2958 rs = "unrecognized command";
2959
2960 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
2961 // ss has reason for failure
2962 r = -EINVAL;
2963 rs = ss.str();
2964 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
2965 reply_command(op, r, rs, 0);
2966 return;
2967 }
2968
2969 // check return value. If no prefix parameter provided,
2970 // return value will be false, then return error info.
2971 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
2972 reply_command(op, -EINVAL, "command prefix not found", 0);
2973 return;
2974 }
2975
2976 // check prefix is empty
2977 if (prefix.empty()) {
2978 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
2979 return;
2980 }
2981
2982 if (prefix == "get_command_descriptions") {
2983 bufferlist rdata;
2984 Formatter *f = Formatter::create("json");
2985 // hide mgr commands until luminous upgrade is complete
2986 bool hide_mgr_flag =
31f18b77 2987 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
7c673cae
FG
2988 format_command_descriptions(leader_supported_mon_commands,
2989 leader_supported_mon_commands_size, f, &rdata,
2990 hide_mgr_flag);
2991 delete f;
2992 reply_command(op, 0, "", rdata, 0);
2993 return;
2994 }
2995
2996 string module;
2997 string err;
2998
2999 dout(0) << "handle_command " << *m << dendl;
3000
3001 string format;
3002 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
3003 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3004
3005 get_str_vec(prefix, fullcmd);
3006
3007 // make sure fullcmd is not empty.
3008 // invalid prefix will cause empty vector fullcmd.
3009 // such as, prefix=";,,;"
3010 if (fullcmd.empty()) {
3011 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3012 return;
3013 }
3014
3015 module = fullcmd[0];
3016
3017 // validate command is in leader map
3018
3019 const MonCommand *leader_cmd;
3020 leader_cmd = _get_moncommand(prefix,
3021 // the boost underlying this isn't const for some reason
3022 const_cast<MonCommand*>(leader_supported_mon_commands),
3023 leader_supported_mon_commands_size);
3024 if (!leader_cmd) {
3025 reply_command(op, -EINVAL, "command not known", 0);
3026 return;
3027 }
3028 // validate command is in our map & matches, or forward if it is allowed
3029 const MonCommand *mon_cmd = _get_moncommand(prefix, mon_commands,
3030 ARRAY_SIZE(mon_commands));
3031 if (!is_leader()) {
3032 if (!mon_cmd) {
3033 if (leader_cmd->is_noforward()) {
3034 reply_command(op, -EINVAL,
3035 "command not locally supported and not allowed to forward",
3036 0);
3037 return;
3038 }
3039 dout(10) << "Command not locally supported, forwarding request "
3040 << m << dendl;
3041 forward_request_leader(op);
3042 return;
3043 } else if (!mon_cmd->is_compat(leader_cmd)) {
3044 if (mon_cmd->is_noforward()) {
3045 reply_command(op, -EINVAL,
3046 "command not compatible with leader and not allowed to forward",
3047 0);
3048 return;
3049 }
3050 dout(10) << "Command not compatible with leader, forwarding request "
3051 << m << dendl;
3052 forward_request_leader(op);
3053 return;
3054 }
3055 }
3056
3057 if (mon_cmd->is_obsolete() ||
3058 (cct->_conf->mon_debug_deprecated_as_obsolete
3059 && mon_cmd->is_deprecated())) {
3060 reply_command(op, -ENOTSUP,
3061 "command is obsolete; please check usage and/or man page",
3062 0);
3063 return;
3064 }
3065
3066 if (session->proxy_con && mon_cmd->is_noforward()) {
3067 dout(10) << "Got forward for noforward command " << m << dendl;
3068 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3069 return;
3070 }
3071
3072 /* what we perceive as being the service the command falls under */
3073 string service(mon_cmd->module);
3074
3075 dout(25) << __func__ << " prefix='" << prefix
3076 << "' module='" << module
3077 << "' service='" << service << "'" << dendl;
3078
3079 bool cmd_is_rw =
3080 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3081
3082 // validate user's permissions for requested command
3083 map<string,string> param_str_map;
3084 _generate_command_map(cmdmap, param_str_map);
3085 if (!_allowed_command(session, service, prefix, cmdmap,
3086 param_str_map, mon_cmd)) {
3087 dout(1) << __func__ << " access denied" << dendl;
3088 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3089 << "from='" << session->inst << "' "
3090 << "entity='" << session->entity_name << "' "
3091 << "cmd=" << m->cmd << ": access denied";
3092 reply_command(op, -EACCES, "access denied", 0);
3093 return;
3094 }
3095
3096 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3097 << "from='" << session->inst << "' "
3098 << "entity='" << session->entity_name << "' "
3099 << "cmd=" << m->cmd << ": dispatch";
3100
3101 if (mon_cmd->is_mgr() &&
31f18b77 3102 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
7c673cae
FG
3103 const auto& hdr = m->get_header();
3104 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3105 uint64_t max =
3106 g_conf->mon_client_bytes * g_conf->mon_mgr_proxy_client_bytes_ratio;
3107 if (mgr_proxy_bytes + size > max) {
3108 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3109 << " + " << size << " > max " << max << dendl;
3110 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3111 return;
3112 }
3113 mgr_proxy_bytes += size;
3114 dout(10) << __func__ << " proxying mgr command (+" << size
3115 << " -> " << mgr_proxy_bytes << ")" << dendl;
3116 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3117 mgr_client.start_command(m->cmd,
3118 m->get_data(),
3119 &fin->outbl,
3120 &fin->outs,
3121 new C_OnFinisher(fin, &finisher));
3122 return;
3123 }
3124
3125 if (module == "mds" || module == "fs") {
3126 mdsmon()->dispatch(op);
3127 return;
3128 }
31f18b77
FG
3129 if ((module == "osd" || prefix == "pg map") &&
3130 prefix != "osd last-stat-seq") {
7c673cae
FG
3131 osdmon()->dispatch(op);
3132 return;
3133 }
3134
3135 if (module == "pg") {
3136 pgmon()->dispatch(op);
3137 return;
3138 }
3139 if (module == "mon" &&
3140 /* Let the Monitor class handle the following commands:
3141 * 'mon compact'
3142 * 'mon scrub'
3143 * 'mon sync force'
3144 */
3145 prefix != "mon compact" &&
3146 prefix != "mon scrub" &&
3147 prefix != "mon sync force" &&
31f18b77
FG
3148 prefix != "mon metadata" &&
3149 prefix != "mon versions" &&
3150 prefix != "mon count-metadata") {
7c673cae
FG
3151 monmon()->dispatch(op);
3152 return;
3153 }
3154 if (module == "auth") {
3155 authmon()->dispatch(op);
3156 return;
3157 }
3158 if (module == "log") {
3159 logmon()->dispatch(op);
3160 return;
3161 }
3162
3163 if (module == "config-key") {
3164 config_key_service->dispatch(op);
3165 return;
3166 }
3167
3168 if (module == "mgr") {
3169 mgrmon()->dispatch(op);
3170 return;
3171 }
3172
3173 if (prefix == "fsid") {
3174 if (f) {
3175 f->open_object_section("fsid");
3176 f->dump_stream("fsid") << monmap->fsid;
3177 f->close_section();
3178 f->flush(rdata);
3179 } else {
3180 ds << monmap->fsid;
3181 rdata.append(ds);
3182 }
3183 reply_command(op, 0, "", rdata, 0);
3184 return;
3185 }
3186
3187 if (prefix == "scrub" || prefix == "mon scrub") {
3188 wait_for_paxos_write();
3189 if (is_leader()) {
3190 int r = scrub_start();
3191 reply_command(op, r, "", rdata, 0);
3192 } else if (is_peon()) {
3193 forward_request_leader(op);
3194 } else {
3195 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3196 }
3197 return;
3198 }
3199
3200 if (prefix == "compact" || prefix == "mon compact") {
3201 dout(1) << "triggering manual compaction" << dendl;
3202 utime_t start = ceph_clock_now();
3203 store->compact();
3204 utime_t end = ceph_clock_now();
3205 end -= start;
3206 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3207 ostringstream oss;
224ce89b 3208 oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
7c673cae
FG
3209 rs = oss.str();
3210 r = 0;
3211 }
3212 else if (prefix == "injectargs") {
3213 vector<string> injected_args;
3214 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3215 if (!injected_args.empty()) {
3216 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3217 ostringstream oss;
3218 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3219 ss << "injectargs:" << oss.str();
3220 rs = ss.str();
3221 goto out;
3222 } else {
3223 rs = "must supply options to be parsed in a single string";
3224 r = -EINVAL;
3225 }
224ce89b
WB
3226 } else if (prefix == "time-sync-status") {
3227 if (!f)
3228 f.reset(Formatter::create("json-pretty"));
3229 f->open_object_section("time_sync");
3230 if (!timecheck_skews.empty()) {
3231 f->open_object_section("time_skew_status");
3232 for (auto& i : timecheck_skews) {
3233 entity_inst_t inst = i.first;
3234 double skew = i.second;
3235 double latency = timecheck_latencies[inst];
3236 string name = monmap->get_name(inst.addr);
3237 ostringstream tcss;
3238 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3239 f->open_object_section(name.c_str());
3240 f->dump_float("skew", skew);
3241 f->dump_float("latency", latency);
3242 f->dump_stream("health") << tcstatus;
3243 if (tcstatus != HEALTH_OK) {
3244 f->dump_stream("details") << tcss.str();
3245 }
3246 f->close_section();
3247 }
3248 f->close_section();
3249 }
3250 f->open_object_section("timechecks");
3251 f->dump_unsigned("epoch", get_epoch());
3252 f->dump_int("round", timecheck_round);
3253 f->dump_stream("round_status") << ((timecheck_round%2) ?
3254 "on-going" : "finished");
3255 f->close_section();
3256 f->close_section();
3257 f->flush(rdata);
3258 r = 0;
3259 rs = "";
7c673cae
FG
3260 } else if (prefix == "status" ||
3261 prefix == "health" ||
3262 prefix == "df") {
3263 string detail;
3264 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3265
3266 if (prefix == "status") {
3267 // get_cluster_status handles f == NULL
3268 get_cluster_status(ds, f.get());
3269
3270 if (f) {
3271 f->flush(ds);
3272 ds << '\n';
3273 }
3274 rdata.append(ds);
3275 } else if (prefix == "health") {
224ce89b
WB
3276 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3277 string plain;
3278 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3279 if (f) {
3280 f->flush(rdata);
3281 } else {
3282 rdata.append(plain);
3283 }
7c673cae 3284 } else {
224ce89b
WB
3285 list<string> health_str;
3286 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3287 if (f) {
3288 f->flush(ds);
3289 ds << '\n';
3290 } else {
3291 assert(!health_str.empty());
3292 ds << health_str.front();
3293 health_str.pop_front();
3294 if (!health_str.empty()) {
3295 ds << ' ';
3296 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3297 }
7c673cae 3298 }
224ce89b
WB
3299 bufferlist comb;
3300 comb.append(ds);
3301 if (detail == "detail")
3302 comb.append(rdata);
3303 rdata = comb;
7c673cae 3304 }
7c673cae
FG
3305 } else if (prefix == "df") {
3306 bool verbose = (detail == "detail");
3307 if (f)
3308 f->open_object_section("stats");
3309
31f18b77 3310 pgservice->dump_fs_stats(&ds, f.get(), verbose);
7c673cae
FG
3311 if (!f)
3312 ds << '\n';
31f18b77 3313 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
7c673cae
FG
3314
3315 if (f) {
3316 f->close_section();
3317 f->flush(ds);
3318 ds << '\n';
3319 }
3320 } else {
3321 assert(0 == "We should never get here!");
3322 return;
3323 }
3324 rdata.append(ds);
3325 rs = "";
3326 r = 0;
3327 } else if (prefix == "report") {
3328
3329 // this must be formatted, in its current form
3330 if (!f)
3331 f.reset(Formatter::create("json-pretty"));
3332 f->open_object_section("report");
3333 f->dump_stream("cluster_fingerprint") << fingerprint;
3334 f->dump_string("version", ceph_version_to_str());
3335 f->dump_string("commit", git_version_to_str());
3336 f->dump_stream("timestamp") << ceph_clock_now();
3337
3338 vector<string> tagsvec;
3339 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3340 string tagstr = str_join(tagsvec, " ");
3341 if (!tagstr.empty())
3342 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3343 f->dump_string("tag", tagstr);
3344
3345 list<string> hs;
3346 get_health(hs, NULL, f.get());
3347
3348 monmon()->dump_info(f.get());
3349 osdmon()->dump_info(f.get());
3350 mdsmon()->dump_info(f.get());
7c673cae 3351 authmon()->dump_info(f.get());
31f18b77 3352 pgservice->dump_info(f.get());
7c673cae
FG
3353
3354 paxos->dump_info(f.get());
3355
3356 f->close_section();
3357 f->flush(rdata);
3358
3359 ostringstream ss2;
3360 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3361 rs = ss2.str();
3362 r = 0;
31f18b77
FG
3363 } else if (prefix == "osd last-stat-seq") {
3364 int64_t osd;
3365 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3366 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3367 if (f) {
3368 f->dump_unsigned("seq", seq);
3369 f->flush(ds);
3370 } else {
3371 ds << seq;
3372 rdata.append(ds);
3373 }
3374 rs = "";
3375 r = 0;
7c673cae
FG
3376 } else if (prefix == "node ls") {
3377 string node_type("all");
3378 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3379 if (!f)
3380 f.reset(Formatter::create("json-pretty"));
3381 if (node_type == "all") {
3382 f->open_object_section("nodes");
3383 print_nodes(f.get(), ds);
3384 osdmon()->print_nodes(f.get());
3385 mdsmon()->print_nodes(f.get());
3386 f->close_section();
3387 } else if (node_type == "mon") {
3388 print_nodes(f.get(), ds);
3389 } else if (node_type == "osd") {
3390 osdmon()->print_nodes(f.get());
3391 } else if (node_type == "mds") {
3392 mdsmon()->print_nodes(f.get());
3393 }
3394 f->flush(ds);
3395 rdata.append(ds);
3396 rs = "";
3397 r = 0;
31f18b77
FG
3398 } else if (prefix == "features") {
3399 if (!is_leader() && !is_peon()) {
3400 dout(10) << " waiting for quorum" << dendl;
3401 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3402 return;
3403 }
3404 if (!is_leader()) {
3405 forward_request_leader(op);
3406 return;
3407 }
3408 if (!f)
3409 f.reset(Formatter::create("json-pretty"));
3410 FeatureMap fm;
3411 get_combined_feature_map(&fm);
3412 f->dump_object("features", fm);
3413 f->flush(rdata);
3414 rs = "";
3415 r = 0;
7c673cae
FG
3416 } else if (prefix == "mon metadata") {
3417 if (!f)
3418 f.reset(Formatter::create("json-pretty"));
3419
3420 string name;
3421 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3422 if (!all) {
3423 // Dump a single mon's metadata
3424 int mon = monmap->get_rank(name);
3425 if (mon < 0) {
3426 rs = "requested mon not found";
3427 r = -ENOENT;
3428 goto out;
3429 }
3430 f->open_object_section("mon_metadata");
3431 r = get_mon_metadata(mon, f.get(), ds);
3432 f->close_section();
3433 } else {
3434 // Dump all mons' metadata
3435 r = 0;
3436 f->open_array_section("mon_metadata");
3437 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3438 std::ostringstream get_err;
3439 f->open_object_section("mon");
3440 f->dump_string("name", monmap->get_name(rank));
3441 r = get_mon_metadata(rank, f.get(), get_err);
3442 f->close_section();
3443 if (r == -ENOENT || r == -EINVAL) {
3444 dout(1) << get_err.str() << dendl;
3445 // Drop error, list what metadata we do have
3446 r = 0;
3447 } else if (r != 0) {
3448 derr << "Unexpected error from get_mon_metadata: "
3449 << cpp_strerror(r) << dendl;
3450 ds << get_err.str();
3451 break;
3452 }
3453 }
3454 f->close_section();
3455 }
3456
3457 f->flush(ds);
3458 rdata.append(ds);
3459 rs = "";
31f18b77
FG
3460 } else if (prefix == "mon versions") {
3461 if (!f)
3462 f.reset(Formatter::create("json-pretty"));
3463 count_metadata("ceph_version", f.get());
3464 f->flush(ds);
3465 rdata.append(ds);
3466 rs = "";
3467 r = 0;
3468 } else if (prefix == "mon count-metadata") {
3469 if (!f)
3470 f.reset(Formatter::create("json-pretty"));
3471 string field;
3472 cmd_getval(g_ceph_context, cmdmap, "property", field);
3473 count_metadata(field, f.get());
3474 f->flush(ds);
3475 rdata.append(ds);
3476 rs = "";
3477 r = 0;
7c673cae
FG
3478 } else if (prefix == "quorum_status") {
3479 // make sure our map is readable and up to date
3480 if (!is_leader() && !is_peon()) {
3481 dout(10) << " waiting for quorum" << dendl;
3482 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3483 return;
3484 }
3485 _quorum_status(f.get(), ds);
3486 rdata.append(ds);
3487 rs = "";
3488 r = 0;
3489 } else if (prefix == "mon_status") {
3490 get_mon_status(f.get(), ds);
3491 if (f)
3492 f->flush(ds);
3493 rdata.append(ds);
3494 rs = "";
3495 r = 0;
3496 } else if (prefix == "sync force" ||
3497 prefix == "mon sync force") {
3498 string validate1, validate2;
3499 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3500 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3501 if (validate1 != "--yes-i-really-mean-it" ||
3502 validate2 != "--i-know-what-i-am-doing") {
3503 r = -EINVAL;
3504 rs = "are you SURE? this will mean the monitor store will be "
3505 "erased. pass '--yes-i-really-mean-it "
3506 "--i-know-what-i-am-doing' if you really do.";
3507 goto out;
3508 }
3509 sync_force(f.get(), ds);
3510 rs = ds.str();
3511 r = 0;
3512 } else if (prefix == "heap") {
3513 if (!ceph_using_tcmalloc())
3514 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3515 else {
3516 string heapcmd;
3517 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3518 // XXX 1-element vector, change at callee or make vector here?
3519 vector<string> heapcmd_vec;
3520 get_str_vec(heapcmd, heapcmd_vec);
3521 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3522 rdata.append(ds);
3523 rs = "";
3524 r = 0;
3525 }
3526 } else if (prefix == "quorum") {
3527 string quorumcmd;
3528 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3529 if (quorumcmd == "exit") {
3530 start_election();
3531 elector.stop_participating();
3532 rs = "stopped responding to quorum, initiated new election";
3533 r = 0;
3534 } else if (quorumcmd == "enter") {
3535 elector.start_participating();
3536 start_election();
3537 rs = "started responding to quorum, initiated new election";
3538 r = 0;
3539 } else {
3540 rs = "needs a valid 'quorum' command";
3541 r = -EINVAL;
3542 }
3543 } else if (prefix == "version") {
3544 if (f) {
3545 f->open_object_section("version");
3546 f->dump_string("version", pretty_version_to_str());
3547 f->close_section();
3548 f->flush(ds);
3549 } else {
3550 ds << pretty_version_to_str();
3551 }
3552 rdata.append(ds);
3553 rs = "";
3554 r = 0;
3555 }
3556
3557 out:
3558 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3559 reply_command(op, r, rs, rdata, 0);
3560}
3561
3562void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3563{
3564 bufferlist rdata;
3565 reply_command(op, rc, rs, rdata, version);
3566}
3567
3568void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3569 bufferlist& rdata, version_t version)
3570{
3571 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3572 assert(m->get_type() == MSG_MON_COMMAND);
3573 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3574 reply->set_tid(m->get_tid());
3575 reply->set_data(rdata);
3576 send_reply(op, reply);
3577}
3578
3579
3580// ------------------------
3581// request/reply routing
3582//
3583// a client/mds/osd will connect to a random monitor. we need to forward any
3584// messages requiring state updates to the leader, and then route any replies
3585// back via the correct monitor and back to them. (the monitor will not
3586// initiate any connections.)
3587
3588void Monitor::forward_request_leader(MonOpRequestRef op)
3589{
3590 op->mark_event(__func__);
3591
3592 int mon = get_leader();
3593 MonSession *session = op->get_session();
3594 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3595
3596 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3597 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3598 } else if (session->proxy_con) {
3599 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3600 } else if (!session->closed) {
3601 RoutedRequest *rr = new RoutedRequest;
3602 rr->tid = ++routed_request_tid;
3603 rr->client_inst = req->get_source_inst();
3604 rr->con = req->get_connection();
3605 rr->con_features = rr->con->get_features();
3606 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3607 rr->session = static_cast<MonSession *>(session->get());
3608 rr->op = op;
3609 routed_requests[rr->tid] = rr;
3610 session->routed_request_tids.insert(rr->tid);
3611
3612 dout(10) << "forward_request " << rr->tid << " request " << *req
3613 << " features " << rr->con_features << dendl;
3614
3615 MForward *forward = new MForward(rr->tid,
3616 req,
3617 rr->con_features,
3618 rr->session->caps);
3619 forward->set_priority(req->get_priority());
3620 if (session->auth_handler) {
3621 forward->entity_name = session->entity_name;
3622 } else if (req->get_source().is_mon()) {
3623 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3624 }
3625 messenger->send_message(forward, monmap->get_inst(mon));
3626 op->mark_forwarded();
3627 assert(op->get_req()->get_type() != 0);
3628 } else {
3629 dout(10) << "forward_request no session for request " << *req << dendl;
3630 }
3631}
3632
3633// fake connection attached to forwarded messages
3634struct AnonConnection : public Connection {
3635 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3636
3637 int send_message(Message *m) override {
3638 assert(!"send_message on anonymous connection");
3639 }
3640 void send_keepalive() override {
3641 assert(!"send_keepalive on anonymous connection");
3642 }
3643 void mark_down() override {
3644 // silently ignore
3645 }
3646 void mark_disposable() override {
3647 // silengtly ignore
3648 }
3649 bool is_connected() override { return false; }
3650};
3651
3652//extract the original message and put it into the regular dispatch function
3653void Monitor::handle_forward(MonOpRequestRef op)
3654{
3655 MForward *m = static_cast<MForward*>(op->get_req());
3656 dout(10) << "received forwarded message from " << m->client
3657 << " via " << m->get_source_inst() << dendl;
3658 MonSession *session = op->get_session();
3659 assert(session);
3660
3661 if (!session->is_capable("mon", MON_CAP_X)) {
3662 dout(0) << "forward from entity with insufficient caps! "
3663 << session->caps << dendl;
3664 } else {
3665 // see PaxosService::dispatch(); we rely on this being anon
3666 // (c->msgr == NULL)
3667 PaxosServiceMessage *req = m->claim_message();
3668 assert(req != NULL);
3669
3670 ConnectionRef c(new AnonConnection(cct));
3671 MonSession *s = new MonSession(req->get_source_inst(),
3672 static_cast<Connection*>(c.get()));
3673 c->set_priv(s->get());
3674 c->set_peer_addr(m->client.addr);
3675 c->set_peer_type(m->client.name.type());
3676 c->set_features(m->con_features);
3677
3678 s->caps = m->client_caps;
3679 dout(10) << " caps are " << s->caps << dendl;
3680 s->entity_name = m->entity_name;
3681 dout(10) << " entity name '" << s->entity_name << "' type "
3682 << s->entity_name.get_type() << dendl;
3683 s->proxy_con = m->get_connection();
3684 s->proxy_tid = m->tid;
3685
3686 req->set_connection(c);
3687
3688 // not super accurate, but better than nothing.
3689 req->set_recv_stamp(m->get_recv_stamp());
3690
3691 /*
3692 * note which election epoch this is; we will drop the message if
3693 * there is a future election since our peers will resend routed
3694 * requests in that case.
3695 */
3696 req->rx_election_epoch = get_epoch();
3697
3698 /* Because this is a special fake connection, we need to break
3699 the ref loop between Connection and MonSession differently
3700 than we normally do. Here, the Message refers to the Connection
3701 which refers to the Session, and nobody else refers to the Connection
3702 or the Session. And due to the special nature of this message,
3703 nobody refers to the Connection via the Session. So, clear out that
3704 half of the ref loop.*/
3705 s->con.reset(NULL);
3706
3707 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3708
3709 _ms_dispatch(req);
3710 s->put();
3711 }
3712}
3713
3714void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3715{
3716 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3717
3718 bufferlist bl;
3719 encode_message(m, quorum_con_features, bl);
3720
3721 messenger->send_message(m, to);
3722
3723 for (int i=0; i<(int)monmap->size(); i++) {
3724 if (i != rank)
3725 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3726 }
3727}
3728
3729void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3730{
3731 op->mark_event(__func__);
3732
3733 MonSession *session = op->get_session();
3734 assert(session);
3735 Message *req = op->get_req();
3736 ConnectionRef con = op->get_connection();
3737
3738 reply->set_cct(g_ceph_context);
3739 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3740
3741 if (!con) {
3742 dout(2) << "send_reply no connection, dropping reply " << *reply
3743 << " to " << req << " " << *req << dendl;
3744 reply->put();
3745 op->mark_event("reply: no connection");
3746 return;
3747 }
3748
3749 if (!session->con && !session->proxy_con) {
3750 dout(2) << "send_reply no connection, dropping reply " << *reply
3751 << " to " << req << " " << *req << dendl;
3752 reply->put();
3753 op->mark_event("reply: no connection");
3754 return;
3755 }
3756
3757 if (session->proxy_con) {
3758 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3759 << " via " << session->proxy_con->get_peer_addr()
3760 << " for request " << *req << dendl;
3761 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3762 op->mark_event("reply: send routed request");
3763 } else {
3764 session->con->send_message(reply);
3765 op->mark_event("reply: send");
3766 }
3767}
3768
3769void Monitor::no_reply(MonOpRequestRef op)
3770{
3771 MonSession *session = op->get_session();
3772 Message *req = op->get_req();
3773
3774 if (session->proxy_con) {
3775 dout(10) << "no_reply to " << req->get_source_inst()
3776 << " via " << session->proxy_con->get_peer_addr()
3777 << " for request " << *req << dendl;
3778 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3779 op->mark_event("no_reply: send routed request");
3780 } else {
3781 dout(10) << "no_reply to " << req->get_source_inst()
3782 << " " << *req << dendl;
3783 op->mark_event("no_reply");
3784 }
3785}
3786
3787void Monitor::handle_route(MonOpRequestRef op)
3788{
3789 MRoute *m = static_cast<MRoute*>(op->get_req());
3790 MonSession *session = op->get_session();
3791 //check privileges
3792 if (!session->is_capable("mon", MON_CAP_X)) {
3793 dout(0) << "MRoute received from entity without appropriate perms! "
3794 << dendl;
3795 return;
3796 }
3797 if (m->msg)
3798 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3799 else
3800 dout(10) << "handle_route null to " << m->dest << dendl;
3801
3802 // look it up
3803 if (m->session_mon_tid) {
3804 if (routed_requests.count(m->session_mon_tid)) {
3805 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3806
3807 // reset payload, in case encoding is dependent on target features
3808 if (m->msg) {
3809 m->msg->clear_payload();
3810 rr->con->send_message(m->msg);
3811 m->msg = NULL;
3812 }
3813 if (m->send_osdmap_first) {
3814 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3815 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3816 true, MonOpRequestRef());
3817 }
3818 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3819 routed_requests.erase(m->session_mon_tid);
3820 rr->session->routed_request_tids.erase(m->session_mon_tid);
3821 delete rr;
3822 } else {
3823 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3824 }
3825 } else {
3826 dout(10) << " not a routed request, trying to send anyway" << dendl;
3827 if (m->msg) {
3828 messenger->send_message(m->msg, m->dest);
3829 m->msg = NULL;
3830 }
3831 }
3832}
3833
3834void Monitor::resend_routed_requests()
3835{
3836 dout(10) << "resend_routed_requests" << dendl;
3837 int mon = get_leader();
3838 list<Context*> retry;
3839 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
3840 p != routed_requests.end();
3841 ++p) {
3842 RoutedRequest *rr = p->second;
3843
3844 if (mon == rank) {
3845 dout(10) << " requeue for self tid " << rr->tid << dendl;
3846 rr->op->mark_event("retry routed request");
3847 retry.push_back(new C_RetryMessage(this, rr->op));
3848 if (rr->session) {
3849 assert(rr->session->routed_request_tids.count(p->first));
3850 rr->session->routed_request_tids.erase(p->first);
3851 }
3852 delete rr;
3853 } else {
3854 bufferlist::iterator q = rr->request_bl.begin();
3855 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
3856 rr->op->mark_event("resend forwarded message to leader");
3857 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
3858 MForward *forward = new MForward(rr->tid, req, rr->con_features,
3859 rr->session->caps);
3860 req->put(); // forward takes its own ref; drop ours.
3861 forward->client = rr->client_inst;
3862 forward->set_priority(req->get_priority());
3863 messenger->send_message(forward, monmap->get_inst(mon));
3864 }
3865 }
3866 if (mon == rank) {
3867 routed_requests.clear();
3868 finish_contexts(g_ceph_context, retry);
3869 }
3870}
3871
3872void Monitor::remove_session(MonSession *s)
3873{
224ce89b
WB
3874 dout(10) << "remove_session " << s << " " << s->inst
3875 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
7c673cae
FG
3876 assert(s->con);
3877 assert(!s->closed);
3878 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
3879 p != s->routed_request_tids.end();
3880 ++p) {
3881 assert(routed_requests.count(*p));
3882 RoutedRequest *rr = routed_requests[*p];
3883 dout(10) << " dropping routed request " << rr->tid << dendl;
3884 delete rr;
3885 routed_requests.erase(*p);
3886 }
3887 s->routed_request_tids.clear();
3888 s->con->set_priv(NULL);
3889 session_map.remove_session(s);
3890 logger->set(l_mon_num_sessions, session_map.get_size());
3891 logger->inc(l_mon_session_rm);
3892}
3893
3894void Monitor::remove_all_sessions()
3895{
3896 Mutex::Locker l(session_map_lock);
3897 while (!session_map.sessions.empty()) {
3898 MonSession *s = session_map.sessions.front();
3899 remove_session(s);
3900 if (logger)
3901 logger->inc(l_mon_session_rm);
3902 }
3903 if (logger)
3904 logger->set(l_mon_num_sessions, session_map.get_size());
3905}
3906
3907void Monitor::send_command(const entity_inst_t& inst,
3908 const vector<string>& com)
3909{
3910 dout(10) << "send_command " << inst << "" << com << dendl;
3911 MMonCommand *c = new MMonCommand(monmap->fsid);
3912 c->cmd = com;
3913 try_send_message(c, inst);
3914}
3915
3916void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
3917{
3918 /**
3919 * Wait list the new session until we're in the quorum, assuming it's
3920 * sufficiently new.
3921 * tick() will periodically send them back through so we can send
3922 * the client elsewhere if we don't think we're getting back in.
3923 *
3924 * But we whitelist a few sorts of messages:
3925 * 1) Monitors can talk to us at any time, of course.
3926 * 2) auth messages. It's unlikely to go through much faster, but
3927 * it's possible we've just lost our quorum status and we want to take...
3928 * 3) command messages. We want to accept these under all possible
3929 * circumstances.
3930 */
3931 Message *m = op->get_req();
3932 MonSession *s = op->get_session();
3933 ConnectionRef con = op->get_connection();
3934 utime_t too_old = ceph_clock_now();
3935 too_old -= g_ceph_context->_conf->mon_lease;
3936 if (m->get_recv_stamp() > too_old &&
3937 con->is_connected()) {
3938 dout(5) << "waitlisting message " << *m << dendl;
3939 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
3940 op->mark_wait_for_quorum();
3941 } else {
3942 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
3943 con->mark_down();
3944 // proxied sessions aren't registered and don't have a con; don't remove
3945 // those.
3946 if (!s->proxy_con) {
3947 Mutex::Locker l(session_map_lock);
3948 remove_session(s);
3949 }
3950 op->mark_zap();
3951 }
3952}
3953
3954void Monitor::_ms_dispatch(Message *m)
3955{
3956 if (is_shutdown()) {
3957 m->put();
3958 return;
3959 }
3960
3961 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
3962 bool src_is_mon = op->is_src_mon();
3963 op->mark_event("mon:_ms_dispatch");
3964 MonSession *s = op->get_session();
3965 if (s && s->closed) {
3966 return;
3967 }
224ce89b
WB
3968
3969 if (src_is_mon && s) {
3970 ConnectionRef con = m->get_connection();
3971 if (con->get_messenger() && con->get_features() != s->con_features) {
3972 // only update features if this is a non-anonymous connection
3973 dout(10) << __func__ << " feature change for " << m->get_source_inst()
3974 << " (was " << s->con_features
3975 << ", now " << con->get_features() << ")" << dendl;
3976 // connection features changed - recreate session.
3977 if (s->con && s->con != con) {
3978 dout(10) << __func__ << " connection for " << m->get_source_inst()
3979 << " changed from session; mark down and replace" << dendl;
3980 s->con->mark_down();
3981 }
3982 if (s->item.is_on_list()) {
3983 // forwarded messages' sessions are not in the sessions map and
3984 // exist only while the op is being handled.
3985 remove_session(s);
3986 }
3987 s->put();
3988 s = nullptr;
3989 }
3990 }
3991
7c673cae
FG
3992 if (!s) {
3993 // if the sender is not a monitor, make sure their first message for a
3994 // session is an MAuth. If it is not, assume it's a stray message,
3995 // and considering that we are creating a new session it is safe to
3996 // assume that the sender hasn't authenticated yet, so we have no way
3997 // of assessing whether we should handle it or not.
3998 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
3999 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4000 m->get_type() != CEPH_MSG_PING)) {
4001 dout(1) << __func__ << " dropping stray message " << *m
4002 << " from " << m->get_source_inst() << dendl;
4003 return;
4004 }
4005
4006 ConnectionRef con = m->get_connection();
4007 {
4008 Mutex::Locker l(session_map_lock);
4009 s = session_map.new_session(m->get_source_inst(), con.get());
4010 }
4011 assert(s);
4012 con->set_priv(s->get());
224ce89b
WB
4013 dout(10) << __func__ << " new session " << s << " " << *s
4014 << " features 0x" << std::hex
4015 << s->con_features << std::dec << dendl;
7c673cae
FG
4016 op->set_session(s);
4017
4018 logger->set(l_mon_num_sessions, session_map.get_size());
4019 logger->inc(l_mon_session_add);
4020
4021 if (src_is_mon) {
4022 // give it monitor caps; the peer type has been authenticated
4023 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4024 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4025 s->caps = *mon_caps;
4026 }
4027 s->put();
4028 } else {
4029 dout(20) << __func__ << " existing session " << s << " for " << s->inst
4030 << dendl;
4031 }
4032
4033 assert(s);
4034
4035 s->session_timeout = ceph_clock_now();
4036 s->session_timeout += g_conf->mon_session_timeout;
4037
4038 if (s->auth_handler) {
4039 s->entity_name = s->auth_handler->get_entity_name();
4040 }
4041 dout(20) << " caps " << s->caps.get_str() << dendl;
4042
4043 if ((is_synchronizing() ||
4044 (s->global_id == 0 && !exited_quorum.is_zero())) &&
4045 !src_is_mon &&
4046 m->get_type() != CEPH_MSG_PING) {
4047 waitlist_or_zap_client(op);
4048 } else {
4049 dispatch_op(op);
4050 }
4051 return;
4052}
4053
4054void Monitor::dispatch_op(MonOpRequestRef op)
4055{
4056 op->mark_event("mon:dispatch_op");
4057 MonSession *s = op->get_session();
4058 assert(s);
4059 if (s->closed) {
4060 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4061 return;
4062 }
4063
4064 /* we will consider the default type as being 'monitor' until proven wrong */
4065 op->set_type_monitor();
4066 /* deal with all messages that do not necessarily need caps */
4067 bool dealt_with = true;
4068 switch (op->get_req()->get_type()) {
4069 // auth
4070 case MSG_MON_GLOBAL_ID:
4071 case CEPH_MSG_AUTH:
4072 op->set_type_service();
4073 /* no need to check caps here */
4074 paxos_service[PAXOS_AUTH]->dispatch(op);
4075 break;
4076
4077 case CEPH_MSG_PING:
4078 handle_ping(op);
4079 break;
4080
4081 /* MMonGetMap may be used by clients to obtain a monmap *before*
4082 * authenticating with the monitor. We need to handle these without
4083 * checking caps because, even on a cluster without cephx, we only set
4084 * session caps *after* the auth handshake. A good example of this
4085 * is when a client calls MonClient::get_monmap_privately(), which does
4086 * not authenticate when obtaining a monmap.
4087 */
4088 case CEPH_MSG_MON_GET_MAP:
4089 handle_mon_get_map(op);
4090 break;
4091
4092 case CEPH_MSG_MON_METADATA:
4093 return handle_mon_metadata(op);
4094
4095 default:
4096 dealt_with = false;
4097 break;
4098 }
4099 if (dealt_with)
4100 return;
4101
4102 /* well, maybe the op belongs to a service... */
4103 op->set_type_service();
4104 /* deal with all messages which caps should be checked somewhere else */
4105 dealt_with = true;
4106 switch (op->get_req()->get_type()) {
4107
4108 // OSDs
4109 case CEPH_MSG_MON_GET_OSDMAP:
4110 case CEPH_MSG_POOLOP:
4111 case MSG_OSD_BEACON:
4112 case MSG_OSD_MARK_ME_DOWN:
4113 case MSG_OSD_FULL:
4114 case MSG_OSD_FAILURE:
4115 case MSG_OSD_BOOT:
4116 case MSG_OSD_ALIVE:
4117 case MSG_OSD_PGTEMP:
4118 case MSG_OSD_PG_CREATED:
4119 case MSG_REMOVE_SNAPS:
4120 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4121 break;
4122
4123 // MDSs
4124 case MSG_MDS_BEACON:
4125 case MSG_MDS_OFFLOAD_TARGETS:
4126 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4127 break;
4128
4129 // Mgrs
4130 case MSG_MGR_BEACON:
4131 paxos_service[PAXOS_MGR]->dispatch(op);
4132 break;
4133
31f18b77
FG
4134 // MgrStat
4135 case MSG_MON_MGR_REPORT:
7c673cae 4136 case CEPH_MSG_STATFS:
7c673cae 4137 case MSG_GETPOOLSTATS:
31f18b77
FG
4138 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4139 break;
4140
4141 // pg
4142 case MSG_PGSTATS:
7c673cae
FG
4143 paxos_service[PAXOS_PGMAP]->dispatch(op);
4144 break;
4145
4146 // log
4147 case MSG_LOG:
4148 paxos_service[PAXOS_LOG]->dispatch(op);
4149 break;
4150
4151 // handle_command() does its own caps checking
4152 case MSG_MON_COMMAND:
4153 op->set_type_command();
4154 handle_command(op);
4155 break;
4156
4157 default:
4158 dealt_with = false;
4159 break;
4160 }
4161 if (dealt_with)
4162 return;
4163
4164 /* nop, looks like it's not a service message; revert back to monitor */
4165 op->set_type_monitor();
4166
4167 /* messages we, the Monitor class, need to deal with
4168 * but may be sent by clients. */
4169
4170 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4171 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4172 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4173 << dendl;
4174 goto drop;
4175 }
4176
4177 dealt_with = true;
4178 switch (op->get_req()->get_type()) {
4179
4180 // misc
4181 case CEPH_MSG_MON_GET_VERSION:
4182 handle_get_version(op);
4183 break;
4184
4185 case CEPH_MSG_MON_SUBSCRIBE:
4186 /* FIXME: check what's being subscribed, filter accordingly */
4187 handle_subscribe(op);
4188 break;
4189
4190 default:
4191 dealt_with = false;
4192 break;
4193 }
4194 if (dealt_with)
4195 return;
4196
4197 if (!op->is_src_mon()) {
4198 dout(1) << __func__ << " unexpected monitor message from"
4199 << " non-monitor entity " << op->get_req()->get_source_inst()
4200 << " " << *(op->get_req()) << " -- dropping" << dendl;
4201 goto drop;
4202 }
4203
4204 /* messages that should only be sent by another monitor */
4205 dealt_with = true;
4206 switch (op->get_req()->get_type()) {
4207
4208 case MSG_ROUTE:
4209 handle_route(op);
4210 break;
4211
4212 case MSG_MON_PROBE:
4213 handle_probe(op);
4214 break;
4215
4216 // Sync (i.e., the new slurp, but on steroids)
4217 case MSG_MON_SYNC:
4218 handle_sync(op);
4219 break;
4220 case MSG_MON_SCRUB:
4221 handle_scrub(op);
4222 break;
4223
4224 /* log acks are sent from a monitor we sent the MLog to, and are
4225 never sent by clients to us. */
4226 case MSG_LOGACK:
4227 log_client.handle_log_ack((MLogAck*)op->get_req());
4228 break;
4229
4230 // monmap
4231 case MSG_MON_JOIN:
4232 op->set_type_service();
4233 paxos_service[PAXOS_MONMAP]->dispatch(op);
4234 break;
4235
4236 // paxos
4237 case MSG_MON_PAXOS:
4238 {
4239 op->set_type_paxos();
4240 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4241 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4242 //can't send these!
4243 break;
4244 }
4245
4246 if (state == STATE_SYNCHRONIZING) {
4247 // we are synchronizing. These messages would do us no
4248 // good, thus just drop them and ignore them.
4249 dout(10) << __func__ << " ignore paxos msg from "
4250 << pm->get_source_inst() << dendl;
4251 break;
4252 }
4253
4254 // sanitize
4255 if (pm->epoch > get_epoch()) {
4256 bootstrap();
4257 break;
4258 }
4259 if (pm->epoch != get_epoch()) {
4260 break;
4261 }
4262
4263 paxos->dispatch(op);
4264 }
4265 break;
4266
4267 // elector messages
4268 case MSG_MON_ELECTION:
4269 op->set_type_election();
4270 //check privileges here for simplicity
4271 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4272 dout(0) << "MMonElection received from entity without enough caps!"
4273 << op->get_session()->caps << dendl;
4274 break;
4275 }
4276 if (!is_probing() && !is_synchronizing()) {
4277 elector.dispatch(op);
4278 }
4279 break;
4280
4281 case MSG_FORWARD:
4282 handle_forward(op);
4283 break;
4284
4285 case MSG_TIMECHECK:
4286 handle_timecheck(op);
4287 break;
4288
4289 case MSG_MON_HEALTH:
4290 health_monitor->dispatch(op);
4291 break;
4292
224ce89b
WB
4293 case MSG_MON_HEALTH_CHECKS:
4294 op->set_type_service();
4295 paxos_service[PAXOS_HEALTH]->dispatch(op);
4296 break;
4297
7c673cae
FG
4298 default:
4299 dealt_with = false;
4300 break;
4301 }
4302 if (!dealt_with) {
4303 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4304 goto drop;
4305 }
4306 return;
4307
4308drop:
4309 return;
4310}
4311
4312void Monitor::handle_ping(MonOpRequestRef op)
4313{
4314 MPing *m = static_cast<MPing*>(op->get_req());
4315 dout(10) << __func__ << " " << *m << dendl;
4316 MPing *reply = new MPing;
4317 entity_inst_t inst = m->get_source_inst();
4318 bufferlist payload;
4319 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4320 f->open_object_section("pong");
4321
4322 list<string> health_str;
4323 get_health(health_str, NULL, f.get());
4324 {
4325 stringstream ss;
4326 get_mon_status(f.get(), ss);
4327 }
4328
4329 f->close_section();
4330 stringstream ss;
4331 f->flush(ss);
4332 ::encode(ss.str(), payload);
4333 reply->set_payload(payload);
4334 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4335 messenger->send_message(reply, inst);
4336}
4337
4338void Monitor::timecheck_start()
4339{
4340 dout(10) << __func__ << dendl;
4341 timecheck_cleanup();
4342 timecheck_start_round();
4343}
4344
4345void Monitor::timecheck_finish()
4346{
4347 dout(10) << __func__ << dendl;
4348 timecheck_cleanup();
4349}
4350
4351void Monitor::timecheck_start_round()
4352{
4353 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4354 assert(is_leader());
4355
4356 if (monmap->size() == 1) {
4357 assert(0 == "We are alone; this shouldn't have been scheduled!");
4358 return;
4359 }
4360
4361 if (timecheck_round % 2) {
4362 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4363 utime_t curr_time = ceph_clock_now();
4364 double max = g_conf->mon_timecheck_interval*3;
4365 if (curr_time - timecheck_round_start < max) {
4366 dout(10) << __func__ << " keep current round going" << dendl;
4367 goto out;
4368 } else {
4369 dout(10) << __func__
4370 << " finish current timecheck and start new" << dendl;
4371 timecheck_cancel_round();
4372 }
4373 }
4374
4375 assert(timecheck_round % 2 == 0);
4376 timecheck_acks = 0;
4377 timecheck_round ++;
4378 timecheck_round_start = ceph_clock_now();
4379 dout(10) << __func__ << " new " << timecheck_round << dendl;
4380
4381 timecheck();
4382out:
4383 dout(10) << __func__ << " setting up next event" << dendl;
4384 timecheck_reset_event();
4385}
4386
4387void Monitor::timecheck_finish_round(bool success)
4388{
4389 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4390 assert(timecheck_round % 2);
4391 timecheck_round ++;
4392 timecheck_round_start = utime_t();
4393
4394 if (success) {
4395 assert(timecheck_waiting.empty());
4396 assert(timecheck_acks == quorum.size());
4397 timecheck_report();
4398 timecheck_check_skews();
4399 return;
4400 }
4401
4402 dout(10) << __func__ << " " << timecheck_waiting.size()
4403 << " peers still waiting:";
4404 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4405 p != timecheck_waiting.end(); ++p) {
4406 *_dout << " " << p->first.name;
4407 }
4408 *_dout << dendl;
4409 timecheck_waiting.clear();
4410
4411 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4412}
4413
4414void Monitor::timecheck_cancel_round()
4415{
4416 timecheck_finish_round(false);
4417}
4418
4419void Monitor::timecheck_cleanup()
4420{
4421 timecheck_round = 0;
4422 timecheck_acks = 0;
4423 timecheck_round_start = utime_t();
4424
4425 if (timecheck_event) {
4426 timer.cancel_event(timecheck_event);
4427 timecheck_event = NULL;
4428 }
4429 timecheck_waiting.clear();
4430 timecheck_skews.clear();
4431 timecheck_latencies.clear();
4432
4433 timecheck_rounds_since_clean = 0;
4434}
4435
4436void Monitor::timecheck_reset_event()
4437{
4438 if (timecheck_event) {
4439 timer.cancel_event(timecheck_event);
4440 timecheck_event = NULL;
4441 }
4442
4443 double delay =
4444 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4445
4446 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4447 delay = cct->_conf->mon_timecheck_interval;
4448 }
4449
4450 dout(10) << __func__ << " delay " << delay
4451 << " rounds_since_clean " << timecheck_rounds_since_clean
4452 << dendl;
4453
4454 timecheck_event = new C_MonContext(this, [this](int) {
4455 timecheck_start_round();
4456 });
4457 timer.add_event_after(delay, timecheck_event);
4458}
4459
4460void Monitor::timecheck_check_skews()
4461{
4462 dout(10) << __func__ << dendl;
4463 assert(is_leader());
4464 assert((timecheck_round % 2) == 0);
4465 if (monmap->size() == 1) {
4466 assert(0 == "We are alone; we shouldn't have gotten here!");
4467 return;
4468 }
4469 assert(timecheck_latencies.size() == timecheck_skews.size());
4470
4471 bool found_skew = false;
4472 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4473 p != timecheck_skews.end(); ++p) {
4474
4475 double abs_skew;
4476 if (timecheck_has_skew(p->second, &abs_skew)) {
4477 dout(10) << __func__
4478 << " " << p->first << " skew " << abs_skew << dendl;
4479 found_skew = true;
4480 }
4481 }
4482
4483 if (found_skew) {
4484 ++timecheck_rounds_since_clean;
4485 timecheck_reset_event();
4486 } else if (timecheck_rounds_since_clean > 0) {
4487 dout(1) << __func__
4488 << " no clock skews found after " << timecheck_rounds_since_clean
4489 << " rounds" << dendl;
4490 // make sure the skews are really gone and not just a transient success
4491 // this will run just once if not in the presence of skews again.
4492 timecheck_rounds_since_clean = 1;
4493 timecheck_reset_event();
4494 timecheck_rounds_since_clean = 0;
4495 }
4496
4497}
4498
4499void Monitor::timecheck_report()
4500{
4501 dout(10) << __func__ << dendl;
4502 assert(is_leader());
4503 assert((timecheck_round % 2) == 0);
4504 if (monmap->size() == 1) {
4505 assert(0 == "We are alone; we shouldn't have gotten here!");
4506 return;
4507 }
4508
4509 assert(timecheck_latencies.size() == timecheck_skews.size());
4510 bool do_output = true; // only output report once
4511 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4512 if (monmap->get_name(*q) == name)
4513 continue;
4514
4515 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4516 m->epoch = get_epoch();
4517 m->round = timecheck_round;
4518
4519 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4520 it != timecheck_skews.end(); ++it) {
4521 double skew = it->second;
4522 double latency = timecheck_latencies[it->first];
4523
4524 m->skews[it->first] = skew;
4525 m->latencies[it->first] = latency;
4526
4527 if (do_output) {
4528 dout(25) << __func__ << " " << it->first
4529 << " latency " << latency
4530 << " skew " << skew << dendl;
4531 }
4532 }
4533 do_output = false;
4534 entity_inst_t inst = monmap->get_inst(*q);
4535 dout(10) << __func__ << " send report to " << inst << dendl;
4536 messenger->send_message(m, inst);
4537 }
4538}
4539
4540void Monitor::timecheck()
4541{
4542 dout(10) << __func__ << dendl;
4543 assert(is_leader());
4544 if (monmap->size() == 1) {
4545 assert(0 == "We are alone; we shouldn't have gotten here!");
4546 return;
4547 }
4548 assert(timecheck_round % 2 != 0);
4549
4550 timecheck_acks = 1; // we ack ourselves
4551
4552 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4553 << " round " << timecheck_round << dendl;
4554
4555 // we are at the eye of the storm; the point of reference
4556 timecheck_skews[messenger->get_myinst()] = 0.0;
4557 timecheck_latencies[messenger->get_myinst()] = 0.0;
4558
4559 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4560 if (monmap->get_name(*it) == name)
4561 continue;
4562
4563 entity_inst_t inst = monmap->get_inst(*it);
4564 utime_t curr_time = ceph_clock_now();
4565 timecheck_waiting[inst] = curr_time;
4566 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4567 m->epoch = get_epoch();
4568 m->round = timecheck_round;
4569 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4570 messenger->send_message(m, inst);
4571 }
4572}
4573
4574health_status_t Monitor::timecheck_status(ostringstream &ss,
4575 const double skew_bound,
4576 const double latency)
4577{
4578 health_status_t status = HEALTH_OK;
4579 assert(latency >= 0);
4580
4581 double abs_skew;
4582 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4583 status = HEALTH_WARN;
4584 ss << "clock skew " << abs_skew << "s"
4585 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4586 }
4587
4588 return status;
4589}
4590
4591void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4592{
4593 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4594 dout(10) << __func__ << " " << *m << dendl;
4595 /* handles PONG's */
4596 assert(m->op == MTimeCheck::OP_PONG);
4597
4598 entity_inst_t other = m->get_source_inst();
4599 if (m->epoch < get_epoch()) {
4600 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4601 << " from " << other
4602 << " curr " << get_epoch()
4603 << " -- severely lagged? discard" << dendl;
4604 return;
4605 }
4606 assert(m->epoch == get_epoch());
4607
4608 if (m->round < timecheck_round) {
4609 dout(1) << __func__ << " got old round " << m->round
4610 << " from " << other
4611 << " curr " << timecheck_round << " -- discard" << dendl;
4612 return;
4613 }
4614
4615 utime_t curr_time = ceph_clock_now();
4616
4617 assert(timecheck_waiting.count(other) > 0);
4618 utime_t timecheck_sent = timecheck_waiting[other];
4619 timecheck_waiting.erase(other);
4620 if (curr_time < timecheck_sent) {
4621 // our clock was readjusted -- drop everything until it all makes sense.
4622 dout(1) << __func__ << " our clock was readjusted --"
4623 << " bump round and drop current check"
4624 << dendl;
4625 timecheck_cancel_round();
4626 return;
4627 }
4628
4629 /* update peer latencies */
4630 double latency = (double)(curr_time - timecheck_sent);
4631
4632 if (timecheck_latencies.count(other) == 0)
4633 timecheck_latencies[other] = latency;
4634 else {
4635 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4636 timecheck_latencies[other] = avg_latency;
4637 }
4638
4639 /*
4640 * update skews
4641 *
4642 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4643 * and 'a' happens to be lower than 'b'; so we use double instead.
4644 *
4645 * latency is always expected to be >= 0.
4646 *
4647 * delta, the difference between theirs timestamp and ours, may either be
4648 * lower or higher than 0; will hardly ever be 0.
4649 *
4650 * The absolute skew is the absolute delta minus the latency, which is
4651 * taken as a whole instead of an rtt given that there is some queueing
4652 * and dispatch times involved and it's hard to assess how long exactly
4653 * it took for the message to travel to the other side and be handled. So
4654 * we call it a bounded skew, the worst case scenario.
4655 *
4656 * Now, to math!
4657 *
4658 * Given that the latency is always positive, we can establish that the
4659 * bounded skew will be:
4660 *
4661 * 1. positive if the absolute delta is higher than the latency and
4662 * delta is positive
4663 * 2. negative if the absolute delta is higher than the latency and
4664 * delta is negative.
4665 * 3. zero if the absolute delta is lower than the latency.
4666 *
4667 * On 3. we make a judgement call and treat the skew as non-existent.
4668 * This is because that, if the absolute delta is lower than the
4669 * latency, then the apparently existing skew is nothing more than a
4670 * side-effect of the high latency at work.
4671 *
4672 * This may not be entirely true though, as a severely skewed clock
4673 * may be masked by an even higher latency, but with high latencies
4674 * we probably have worse issues to deal with than just skewed clocks.
4675 */
4676 assert(latency >= 0);
4677
4678 double delta = ((double) m->timestamp) - ((double) curr_time);
4679 double abs_delta = (delta > 0 ? delta : -delta);
4680 double skew_bound = abs_delta - latency;
4681 if (skew_bound < 0)
4682 skew_bound = 0;
4683 else if (delta < 0)
4684 skew_bound = -skew_bound;
4685
4686 ostringstream ss;
4687 health_status_t status = timecheck_status(ss, skew_bound, latency);
4688 if (status == HEALTH_ERR)
4689 clog->error() << other << " " << ss.str();
4690 else if (status == HEALTH_WARN)
4691 clog->warn() << other << " " << ss.str();
4692
4693 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4694 << " delta " << delta << " skew_bound " << skew_bound
4695 << " latency " << latency << dendl;
4696
4697 timecheck_skews[other] = skew_bound;
4698
4699 timecheck_acks++;
4700 if (timecheck_acks == quorum.size()) {
4701 dout(10) << __func__ << " got pongs from everybody ("
4702 << timecheck_acks << " total)" << dendl;
4703 assert(timecheck_skews.size() == timecheck_acks);
4704 assert(timecheck_waiting.empty());
4705 // everyone has acked, so bump the round to finish it.
4706 timecheck_finish_round();
4707 }
4708}
4709
4710void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4711{
4712 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4713 dout(10) << __func__ << " " << *m << dendl;
4714
4715 assert(is_peon());
4716 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4717
4718 if (m->epoch != get_epoch()) {
4719 dout(1) << __func__ << " got wrong epoch "
4720 << "(ours " << get_epoch()
4721 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4722 return;
4723 }
4724
4725 if (m->round < timecheck_round) {
4726 dout(1) << __func__ << " got old round " << m->round
4727 << " current " << timecheck_round
4728 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4729 return;
4730 }
4731
4732 timecheck_round = m->round;
4733
4734 if (m->op == MTimeCheck::OP_REPORT) {
4735 assert((timecheck_round % 2) == 0);
4736 timecheck_latencies.swap(m->latencies);
4737 timecheck_skews.swap(m->skews);
4738 return;
4739 }
4740
4741 assert((timecheck_round % 2) != 0);
4742 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4743 utime_t curr_time = ceph_clock_now();
4744 reply->timestamp = curr_time;
4745 reply->epoch = m->epoch;
4746 reply->round = m->round;
4747 dout(10) << __func__ << " send " << *m
4748 << " to " << m->get_source_inst() << dendl;
4749 m->get_connection()->send_message(reply);
4750}
4751
4752void Monitor::handle_timecheck(MonOpRequestRef op)
4753{
4754 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4755 dout(10) << __func__ << " " << *m << dendl;
4756
4757 if (is_leader()) {
4758 if (m->op != MTimeCheck::OP_PONG) {
4759 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4760 } else {
4761 handle_timecheck_leader(op);
4762 }
4763 } else if (is_peon()) {
4764 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4765 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4766 } else {
4767 handle_timecheck_peon(op);
4768 }
4769 } else {
4770 dout(1) << __func__ << " drop unexpected msg" << dendl;
4771 }
4772}
4773
4774void Monitor::handle_subscribe(MonOpRequestRef op)
4775{
4776 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4777 dout(10) << "handle_subscribe " << *m << dendl;
4778
4779 bool reply = false;
4780
4781 MonSession *s = op->get_session();
4782 assert(s);
4783
4784 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4785 p != m->what.end();
4786 ++p) {
4787 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4788 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4789 reply = true;
4790
4791 // remove conflicting subscribes
4792 if (logmon()->sub_name_to_id(p->first) >= 0) {
4793 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4794 it != s->sub_map.end(); ) {
4795 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4796 Mutex::Locker l(session_map_lock);
4797 session_map.remove_sub((it++)->second);
4798 } else {
4799 ++it;
4800 }
4801 }
4802 }
4803
4804 {
4805 Mutex::Locker l(session_map_lock);
4806 session_map.add_update_sub(s, p->first, p->second.start,
4807 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4808 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4809 }
4810
4811 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4812 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4813 if ((int)s->is_capable("mds", MON_CAP_R)) {
4814 Subscription *sub = s->sub_map[p->first];
4815 assert(sub != nullptr);
4816 mdsmon()->check_sub(sub);
4817 }
4818 } else if (p->first == "osdmap") {
4819 if ((int)s->is_capable("osd", MON_CAP_R)) {
4820 if (s->osd_epoch > p->second.start) {
4821 // client needs earlier osdmaps on purpose, so reset the sent epoch
4822 s->osd_epoch = 0;
4823 }
4824 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4825 }
4826 } else if (p->first == "osd_pg_creates") {
4827 if ((int)s->is_capable("osd", MON_CAP_W)) {
4828 if (monmap->get_required_features().contains_all(
4829 ceph::features::mon::FEATURE_LUMINOUS)) {
4830 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
4831 } else {
4832 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
4833 }
4834 }
4835 } else if (p->first == "monmap") {
4836 monmon()->check_sub(s->sub_map[p->first]);
4837 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
4838 logmon()->check_sub(s->sub_map[p->first]);
4839 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
4840 mgrmon()->check_sub(s->sub_map[p->first]);
224ce89b
WB
4841 } else if (p->first == "servicemap") {
4842 mgrstatmon()->check_sub(s->sub_map[p->first]);
7c673cae
FG
4843 }
4844 }
4845
4846 if (reply) {
4847 // we only need to reply if the client is old enough to think it
4848 // has to send renewals.
4849 ConnectionRef con = m->get_connection();
4850 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
4851 m->get_connection()->send_message(new MMonSubscribeAck(
4852 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
4853 }
4854
4855}
4856
4857void Monitor::handle_get_version(MonOpRequestRef op)
4858{
4859 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
4860 dout(10) << "handle_get_version " << *m << dendl;
4861 PaxosService *svc = NULL;
4862
4863 MonSession *s = op->get_session();
4864 assert(s);
4865
4866 if (!is_leader() && !is_peon()) {
4867 dout(10) << " waiting for quorum" << dendl;
4868 waitfor_quorum.push_back(new C_RetryMessage(this, op));
4869 goto out;
4870 }
4871
4872 if (m->what == "mdsmap") {
4873 svc = mdsmon();
4874 } else if (m->what == "fsmap") {
4875 svc = mdsmon();
4876 } else if (m->what == "osdmap") {
4877 svc = osdmon();
4878 } else if (m->what == "monmap") {
4879 svc = monmon();
4880 } else {
4881 derr << "invalid map type " << m->what << dendl;
4882 }
4883
4884 if (svc) {
4885 if (!svc->is_readable()) {
4886 svc->wait_for_readable(op, new C_RetryMessage(this, op));
4887 goto out;
4888 }
4889
4890 MMonGetVersionReply *reply = new MMonGetVersionReply();
4891 reply->handle = m->handle;
4892 reply->version = svc->get_last_committed();
4893 reply->oldest_version = svc->get_first_committed();
4894 reply->set_tid(m->get_tid());
4895
4896 m->get_connection()->send_message(reply);
4897 }
4898 out:
4899 return;
4900}
4901
4902bool Monitor::ms_handle_reset(Connection *con)
4903{
4904 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
4905
4906 // ignore lossless monitor sessions
4907 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4908 return false;
4909
4910 MonSession *s = static_cast<MonSession *>(con->get_priv());
4911 if (!s)
4912 return false;
4913
4914 // break any con <-> session ref cycle
4915 s->con->set_priv(NULL);
4916
4917 if (is_shutdown())
4918 return false;
4919
4920 Mutex::Locker l(lock);
4921
4922 dout(10) << "reset/close on session " << s->inst << dendl;
4923 if (!s->closed) {
4924 Mutex::Locker l(session_map_lock);
4925 remove_session(s);
4926 }
4927 s->put();
4928 return true;
4929}
4930
4931bool Monitor::ms_handle_refused(Connection *con)
4932{
4933 // just log for now...
4934 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
4935 return false;
4936}
4937
4938// -----
4939
4940void Monitor::send_latest_monmap(Connection *con)
4941{
4942 bufferlist bl;
4943 monmap->encode(bl, con->get_features());
4944 con->send_message(new MMonMap(bl));
4945}
4946
4947void Monitor::handle_mon_get_map(MonOpRequestRef op)
4948{
4949 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
4950 dout(10) << "handle_mon_get_map" << dendl;
4951 send_latest_monmap(m->get_connection().get());
4952}
4953
4954void Monitor::handle_mon_metadata(MonOpRequestRef op)
4955{
4956 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
4957 if (is_leader()) {
4958 dout(10) << __func__ << dendl;
4959 update_mon_metadata(m->get_source().num(), std::move(m->data));
4960 }
4961}
4962
4963void Monitor::update_mon_metadata(int from, Metadata&& m)
4964{
224ce89b
WB
4965 // NOTE: this is now for legacy (kraken or jewel) mons only.
4966 pending_metadata[from] = std::move(m);
7c673cae
FG
4967
4968 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
224ce89b 4969 bufferlist bl;
7c673cae
FG
4970 ::encode(pending_metadata, bl);
4971 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
4972 paxos->trigger_propose();
4973}
4974
224ce89b 4975int Monitor::load_metadata()
7c673cae
FG
4976{
4977 bufferlist bl;
4978 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
4979 if (r)
4980 return r;
4981 bufferlist::iterator it = bl.begin();
224ce89b
WB
4982 ::decode(mon_metadata, it);
4983
4984 pending_metadata = mon_metadata;
7c673cae
FG
4985 return 0;
4986}
4987
4988int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
4989{
4990 assert(f);
224ce89b 4991 if (!mon_metadata.count(mon)) {
7c673cae
FG
4992 err << "mon." << mon << " not found";
4993 return -EINVAL;
4994 }
224ce89b 4995 const Metadata& m = mon_metadata[mon];
7c673cae
FG
4996 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
4997 f->dump_string(p->first.c_str(), p->second);
4998 }
4999 return 0;
5000}
5001
31f18b77
FG
5002void Monitor::count_metadata(const string& field, Formatter *f)
5003{
31f18b77 5004 map<string,int> by_val;
224ce89b 5005 for (auto& p : mon_metadata) {
31f18b77
FG
5006 auto q = p.second.find(field);
5007 if (q == p.second.end()) {
5008 by_val["unknown"]++;
5009 } else {
5010 by_val[q->second]++;
5011 }
5012 }
5013 f->open_object_section(field.c_str());
5014 for (auto& p : by_val) {
5015 f->dump_int(p.first.c_str(), p.second);
5016 }
5017 f->close_section();
5018}
5019
7c673cae
FG
5020int Monitor::print_nodes(Formatter *f, ostream& err)
5021{
7c673cae 5022 map<string, list<int> > mons; // hostname => mon
224ce89b
WB
5023 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5024 it != mon_metadata.end(); ++it) {
7c673cae
FG
5025 const Metadata& m = it->second;
5026 Metadata::const_iterator hostname = m.find("hostname");
5027 if (hostname == m.end()) {
5028 // not likely though
5029 continue;
5030 }
5031 mons[hostname->second].push_back(it->first);
5032 }
5033
5034 dump_services(f, mons, "mon");
5035 return 0;
5036}
5037
5038// ----------------------------------------------
5039// scrub
5040
5041int Monitor::scrub_start()
5042{
5043 dout(10) << __func__ << dendl;
5044 assert(is_leader());
5045
5046 if (!scrub_result.empty()) {
5047 clog->info() << "scrub already in progress";
5048 return -EBUSY;
5049 }
5050
5051 scrub_event_cancel();
5052 scrub_result.clear();
5053 scrub_state.reset(new ScrubState);
5054
5055 scrub();
5056 return 0;
5057}
5058
5059int Monitor::scrub()
5060{
5061 assert(is_leader());
5062 assert(scrub_state);
5063
5064 scrub_cancel_timeout();
5065 wait_for_paxos_write();
5066 scrub_version = paxos->get_version();
5067
5068
5069 // scrub all keys if we're the only monitor in the quorum
5070 int32_t num_keys =
5071 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5072
5073 for (set<int>::iterator p = quorum.begin();
5074 p != quorum.end();
5075 ++p) {
5076 if (*p == rank)
5077 continue;
5078 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5079 num_keys);
5080 r->key = scrub_state->last_key;
5081 messenger->send_message(r, monmap->get_inst(*p));
5082 }
5083
5084 // scrub my keys
5085 bool r = _scrub(&scrub_result[rank],
5086 &scrub_state->last_key,
5087 &num_keys);
5088
5089 scrub_state->finished = !r;
5090
5091 // only after we got our scrub results do we really care whether the
5092 // other monitors are late on their results. Also, this way we avoid
5093 // triggering the timeout if we end up getting stuck in _scrub() for
5094 // longer than the duration of the timeout.
5095 scrub_reset_timeout();
5096
5097 if (quorum.size() == 1) {
5098 assert(scrub_state->finished == true);
5099 scrub_finish();
5100 }
5101 return 0;
5102}
5103
5104void Monitor::handle_scrub(MonOpRequestRef op)
5105{
5106 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5107 dout(10) << __func__ << " " << *m << dendl;
5108 switch (m->op) {
5109 case MMonScrub::OP_SCRUB:
5110 {
5111 if (!is_peon())
5112 break;
5113
5114 wait_for_paxos_write();
5115
5116 if (m->version != paxos->get_version())
5117 break;
5118
5119 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5120 m->version,
5121 m->num_keys);
5122
5123 reply->key = m->key;
5124 _scrub(&reply->result, &reply->key, &reply->num_keys);
5125 m->get_connection()->send_message(reply);
5126 }
5127 break;
5128
5129 case MMonScrub::OP_RESULT:
5130 {
5131 if (!is_leader())
5132 break;
5133 if (m->version != scrub_version)
5134 break;
5135 // reset the timeout each time we get a result
5136 scrub_reset_timeout();
5137
5138 int from = m->get_source().num();
5139 assert(scrub_result.count(from) == 0);
5140 scrub_result[from] = m->result;
5141
5142 if (scrub_result.size() == quorum.size()) {
5143 scrub_check_results();
5144 scrub_result.clear();
5145 if (scrub_state->finished)
5146 scrub_finish();
5147 else
5148 scrub();
5149 }
5150 }
5151 break;
5152 }
5153}
5154
5155bool Monitor::_scrub(ScrubResult *r,
5156 pair<string,string> *start,
5157 int *num_keys)
5158{
5159 assert(r != NULL);
5160 assert(start != NULL);
5161 assert(num_keys != NULL);
5162
5163 set<string> prefixes = get_sync_targets_names();
5164 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5165
5166 dout(10) << __func__ << " start (" << *start << ")"
5167 << " num_keys " << *num_keys << dendl;
5168
5169 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5170
5171 int scrubbed_keys = 0;
5172 pair<string,string> last_key;
5173
5174 while (it->has_next_chunk()) {
5175
5176 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5177 break;
5178
5179 pair<string,string> k = it->get_next_key();
5180 if (prefixes.count(k.first) == 0)
5181 continue;
5182
5183 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5184 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5185 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5186 << dendl;
5187 continue;
5188 }
5189
5190 bufferlist bl;
224ce89b
WB
5191 int err = store->get(k.first, k.second, bl);
5192 assert(err == 0);
5193
7c673cae
FG
5194 uint32_t key_crc = bl.crc32c(0);
5195 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5196 << " crc " << key_crc << dendl;
5197 r->prefix_keys[k.first]++;
224ce89b 5198 if (r->prefix_crc.count(k.first) == 0) {
7c673cae 5199 r->prefix_crc[k.first] = 0;
224ce89b 5200 }
7c673cae
FG
5201 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5202
5203 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5204 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5205 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5206 r->prefix_crc[k.first] += 1;
5207 }
5208
5209 ++scrubbed_keys;
5210 last_key = k;
5211 }
5212
5213 dout(20) << __func__ << " last_key (" << last_key << ")"
5214 << " scrubbed_keys " << scrubbed_keys
5215 << " has_next " << it->has_next_chunk() << dendl;
5216
5217 *start = last_key;
5218 *num_keys = scrubbed_keys;
5219
5220 return it->has_next_chunk();
5221}
5222
5223void Monitor::scrub_check_results()
5224{
5225 dout(10) << __func__ << dendl;
5226
5227 // compare
5228 int errors = 0;
5229 ScrubResult& mine = scrub_result[rank];
5230 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5231 p != scrub_result.end();
5232 ++p) {
5233 if (p->first == rank)
5234 continue;
5235 if (p->second != mine) {
5236 ++errors;
5237 clog->error() << "scrub mismatch";
5238 clog->error() << " mon." << rank << " " << mine;
5239 clog->error() << " mon." << p->first << " " << p->second;
5240 }
5241 }
5242 if (!errors)
5243 clog->info() << "scrub ok on " << quorum << ": " << mine;
5244}
5245
5246inline void Monitor::scrub_timeout()
5247{
5248 dout(1) << __func__ << " restarting scrub" << dendl;
5249 scrub_reset();
5250 scrub_start();
5251}
5252
5253void Monitor::scrub_finish()
5254{
5255 dout(10) << __func__ << dendl;
5256 scrub_reset();
5257 scrub_event_start();
5258}
5259
5260void Monitor::scrub_reset()
5261{
5262 dout(10) << __func__ << dendl;
5263 scrub_cancel_timeout();
5264 scrub_version = 0;
5265 scrub_result.clear();
5266 scrub_state.reset();
5267}
5268
5269inline void Monitor::scrub_update_interval(int secs)
5270{
5271 // we don't care about changes if we are not the leader.
5272 // changes will be visible if we become the leader.
5273 if (!is_leader())
5274 return;
5275
5276 dout(1) << __func__ << " new interval = " << secs << dendl;
5277
5278 // if scrub already in progress, all changes will already be visible during
5279 // the next round. Nothing to do.
5280 if (scrub_state != NULL)
5281 return;
5282
5283 scrub_event_cancel();
5284 scrub_event_start();
5285}
5286
5287void Monitor::scrub_event_start()
5288{
5289 dout(10) << __func__ << dendl;
5290
5291 if (scrub_event)
5292 scrub_event_cancel();
5293
5294 if (cct->_conf->mon_scrub_interval <= 0) {
5295 dout(1) << __func__ << " scrub event is disabled"
5296 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5297 << ")" << dendl;
5298 return;
5299 }
5300
5301 scrub_event = new C_MonContext(this, [this](int) {
5302 scrub_start();
5303 });
5304 timer.add_event_after(cct->_conf->mon_scrub_interval, scrub_event);
5305}
5306
5307void Monitor::scrub_event_cancel()
5308{
5309 dout(10) << __func__ << dendl;
5310 if (scrub_event) {
5311 timer.cancel_event(scrub_event);
5312 scrub_event = NULL;
5313 }
5314}
5315
5316inline void Monitor::scrub_cancel_timeout()
5317{
5318 if (scrub_timeout_event) {
5319 timer.cancel_event(scrub_timeout_event);
5320 scrub_timeout_event = NULL;
5321 }
5322}
5323
5324void Monitor::scrub_reset_timeout()
5325{
5326 dout(15) << __func__ << " reset timeout event" << dendl;
5327 scrub_cancel_timeout();
5328
5329 scrub_timeout_event = new C_MonContext(this, [this](int) {
5330 scrub_timeout();
5331 });
5332 timer.add_event_after(g_conf->mon_scrub_timeout, scrub_timeout_event);
5333}
5334
5335/************ TICK ***************/
5336void Monitor::new_tick()
5337{
5338 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5339 tick();
5340 }));
5341}
5342
5343void Monitor::tick()
5344{
5345 // ok go.
5346 dout(11) << "tick" << dendl;
5347
5348 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5349 (*p)->tick();
5350 (*p)->maybe_trim();
5351 }
5352
5353 // trim sessions
5354 utime_t now = ceph_clock_now();
5355 {
5356 Mutex::Locker l(session_map_lock);
5357 auto p = session_map.sessions.begin();
5358
5359 bool out_for_too_long = (!exited_quorum.is_zero() &&
5360 now > (exited_quorum + 2*g_conf->mon_lease));
5361
5362 while (!p.end()) {
5363 MonSession *s = *p;
5364 ++p;
5365
5366 // don't trim monitors
5367 if (s->inst.name.is_mon())
5368 continue;
5369
5370 if (s->session_timeout < now && s->con) {
5371 // check keepalive, too
5372 s->session_timeout = s->con->get_last_keepalive();
5373 s->session_timeout += g_conf->mon_session_timeout;
5374 }
5375 if (s->session_timeout < now) {
5376 dout(10) << " trimming session " << s->con << " " << s->inst
5377 << " (timeout " << s->session_timeout
5378 << " < now " << now << ")" << dendl;
5379 } else if (out_for_too_long) {
5380 // boot the client Session because we've taken too long getting back in
5381 dout(10) << " trimming session " << s->con << " " << s->inst
5382 << " because we've been out of quorum too long" << dendl;
5383 } else {
5384 continue;
5385 }
5386
5387 s->con->mark_down();
5388 remove_session(s);
5389 logger->inc(l_mon_session_trim);
5390 }
5391 }
5392 sync_trim_providers();
5393
5394 if (!maybe_wait_for_quorum.empty()) {
5395 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5396 }
5397
5398 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5399 // this is only necessary on upgraded clusters.
5400 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5401 prepare_new_fingerprint(t);
5402 paxos->trigger_propose();
5403 }
5404
5405 new_tick();
5406}
5407
5408void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5409{
5410 uuid_d nf;
5411 nf.generate_random();
5412 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5413
5414 bufferlist bl;
5415 ::encode(nf, bl);
5416 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5417}
5418
5419int Monitor::check_fsid()
5420{
5421 bufferlist ebl;
5422 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5423 if (r == -ENOENT)
5424 return r;
5425 assert(r == 0);
5426
5427 string es(ebl.c_str(), ebl.length());
5428
5429 // only keep the first line
5430 size_t pos = es.find_first_of('\n');
5431 if (pos != string::npos)
5432 es.resize(pos);
5433
5434 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5435 uuid_d ondisk;
5436 if (!ondisk.parse(es.c_str())) {
5437 derr << "error: unable to parse uuid" << dendl;
5438 return -EINVAL;
5439 }
5440
5441 if (monmap->get_fsid() != ondisk) {
5442 derr << "error: cluster_uuid file exists with value " << ondisk
5443 << ", != our uuid " << monmap->get_fsid() << dendl;
5444 return -EEXIST;
5445 }
5446
5447 return 0;
5448}
5449
5450int Monitor::write_fsid()
5451{
5452 auto t(std::make_shared<MonitorDBStore::Transaction>());
5453 write_fsid(t);
5454 int r = store->apply_transaction(t);
5455 return r;
5456}
5457
5458int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5459{
5460 ostringstream ss;
5461 ss << monmap->get_fsid() << "\n";
5462 string us = ss.str();
5463
5464 bufferlist b;
5465 b.append(us);
5466
5467 t->put(MONITOR_NAME, "cluster_uuid", b);
5468 return 0;
5469}
5470
5471/*
5472 * this is the closest thing to a traditional 'mkfs' for ceph.
5473 * initialize the monitor state machines to their initial values.
5474 */
5475int Monitor::mkfs(bufferlist& osdmapbl)
5476{
5477 auto t(std::make_shared<MonitorDBStore::Transaction>());
5478
5479 // verify cluster fsid
5480 int r = check_fsid();
5481 if (r < 0 && r != -ENOENT)
5482 return r;
5483
5484 bufferlist magicbl;
5485 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5486 magicbl.append("\n");
5487 t->put(MONITOR_NAME, "magic", magicbl);
5488
5489
5490 features = get_initial_supported_features();
5491 write_features(t);
5492
5493 // save monmap, osdmap, keyring.
5494 bufferlist monmapbl;
5495 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5496 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5497 t->put("mkfs", "monmap", monmapbl);
5498
5499 if (osdmapbl.length()) {
5500 // make sure it's a valid osdmap
5501 try {
5502 OSDMap om;
5503 om.decode(osdmapbl);
5504 }
5505 catch (buffer::error& e) {
5506 derr << "error decoding provided osdmap: " << e.what() << dendl;
5507 return -EINVAL;
5508 }
5509 t->put("mkfs", "osdmap", osdmapbl);
5510 }
5511
5512 if (is_keyring_required()) {
5513 KeyRing keyring;
5514 string keyring_filename;
5515
5516 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5517 if (r) {
5518 derr << "unable to find a keyring file on " << g_conf->keyring
5519 << ": " << cpp_strerror(r) << dendl;
5520 if (g_conf->key != "") {
5521 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5522 "\n\tcaps mon = \"allow *\"\n";
5523 bufferlist bl;
5524 bl.append(keyring_plaintext);
5525 try {
5526 bufferlist::iterator i = bl.begin();
5527 keyring.decode_plaintext(i);
5528 }
5529 catch (const buffer::error& e) {
5530 derr << "error decoding keyring " << keyring_plaintext
5531 << ": " << e.what() << dendl;
5532 return -EINVAL;
5533 }
5534 } else {
5535 return -ENOENT;
5536 }
5537 } else {
5538 r = keyring.load(g_ceph_context, keyring_filename);
5539 if (r < 0) {
5540 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5541 return r;
5542 }
5543 }
5544
5545 // put mon. key in external keyring; seed with everything else.
5546 extract_save_mon_key(keyring);
5547
5548 bufferlist keyringbl;
5549 keyring.encode_plaintext(keyringbl);
5550 t->put("mkfs", "keyring", keyringbl);
5551 }
5552 write_fsid(t);
5553 store->apply_transaction(t);
5554
5555 return 0;
5556}
5557
5558int Monitor::write_default_keyring(bufferlist& bl)
5559{
5560 ostringstream os;
5561 os << g_conf->mon_data << "/keyring";
5562
5563 int err = 0;
5564 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5565 if (fd < 0) {
5566 err = -errno;
5567 dout(0) << __func__ << " failed to open " << os.str()
5568 << ": " << cpp_strerror(err) << dendl;
5569 return err;
5570 }
5571
5572 err = bl.write_fd(fd);
5573 if (!err)
5574 ::fsync(fd);
5575 VOID_TEMP_FAILURE_RETRY(::close(fd));
5576
5577 return err;
5578}
5579
5580void Monitor::extract_save_mon_key(KeyRing& keyring)
5581{
5582 EntityName mon_name;
5583 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5584 EntityAuth mon_key;
5585 if (keyring.get_auth(mon_name, mon_key)) {
5586 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5587 KeyRing pkey;
5588 pkey.add(mon_name, mon_key);
5589 bufferlist bl;
5590 pkey.encode_plaintext(bl);
5591 write_default_keyring(bl);
5592 keyring.remove(mon_name);
5593 }
5594}
5595
5596bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5597 bool force_new)
5598{
5599 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5600 << dendl;
5601
5602 if (is_shutdown())
5603 return false;
5604
5605 // we only connect to other monitors and mgr; every else connects to us.
5606 if (service_id != CEPH_ENTITY_TYPE_MON &&
5607 service_id != CEPH_ENTITY_TYPE_MGR)
5608 return false;
5609
5610 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5611 // auth_none
5612 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5613 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5614 handler.set_global_id(0);
5615 *authorizer = handler.build_authorizer(service_id);
5616 return true;
5617 }
5618
5619 CephXServiceTicketInfo auth_ticket_info;
5620 CephXSessionAuthInfo info;
5621 int ret;
5622
5623 EntityName name;
5624 name.set_type(CEPH_ENTITY_TYPE_MON);
5625 auth_ticket_info.ticket.name = name;
5626 auth_ticket_info.ticket.global_id = 0;
5627
5628 if (service_id == CEPH_ENTITY_TYPE_MON) {
5629 // mon to mon authentication uses the private monitor shared key and not the
5630 // rotating key
5631 CryptoKey secret;
5632 if (!keyring.get_secret(name, secret) &&
5633 !key_server.get_secret(name, secret)) {
5634 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5635 << dendl;
5636 stringstream ss, ds;
5637 int err = key_server.list_secrets(ds);
5638 if (err < 0)
5639 ss << "no installed auth entries!";
5640 else
5641 ss << "installed auth entries:";
5642 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5643 return false;
5644 }
5645
5646 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5647 secret, (uint64_t)-1);
5648 if (ret < 0) {
5649 dout(0) << __func__ << " failed to build mon session_auth_info "
5650 << cpp_strerror(ret) << dendl;
5651 return false;
5652 }
5653 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5654 // mgr
5655 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5656 if (ret < 0) {
5657 derr << __func__ << " failed to build mgr service session_auth_info "
5658 << cpp_strerror(ret) << dendl;
5659 return false;
5660 }
5661 } else {
5662 ceph_abort(); // see check at top of fn
5663 }
5664
5665 CephXTicketBlob blob;
5666 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5667 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5668 return false;
5669 }
5670 bufferlist ticket_data;
5671 ::encode(blob, ticket_data);
5672
5673 bufferlist::iterator iter = ticket_data.begin();
5674 CephXTicketHandler handler(g_ceph_context, service_id);
5675 ::decode(handler.ticket, iter);
5676
5677 handler.session_key = info.session_key;
5678
5679 *authorizer = handler.build_authorizer(0);
5680
5681 return true;
5682}
5683
5684bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5685 int protocol, bufferlist& authorizer_data,
5686 bufferlist& authorizer_reply,
5687 bool& isvalid, CryptoKey& session_key)
5688{
5689 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5690 << " " << ceph_entity_type_name(peer_type)
5691 << " protocol " << protocol << dendl;
5692
5693 if (is_shutdown())
5694 return false;
5695
5696 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5697 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5698 // monitor, and cephx is enabled
5699 isvalid = false;
5700 if (protocol == CEPH_AUTH_CEPHX) {
5701 bufferlist::iterator iter = authorizer_data.begin();
5702 CephXServiceTicketInfo auth_ticket_info;
5703
5704 if (authorizer_data.length()) {
5705 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5706 auth_ticket_info, authorizer_reply);
5707 if (ret) {
5708 session_key = auth_ticket_info.session_key;
5709 isvalid = true;
5710 } else {
5711 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5712 }
5713 }
5714 } else {
5715 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5716 }
5717 } else {
5718 // who cares.
5719 isvalid = true;
5720 }
5721 return true;
5722}