]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Monitor.cc
update sources to v12.2.0
[ceph.git] / ceph / src / mon / Monitor.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include <sstream>
17#include <stdlib.h>
18#include <signal.h>
19#include <limits.h>
20#include <cstring>
21#include <boost/scope_exit.hpp>
22#include <boost/algorithm/string/predicate.hpp>
23
24#include "Monitor.h"
25#include "common/version.h"
26
27#include "osd/OSDMap.h"
28
29#include "MonitorDBStore.h"
30
31#include "messages/PaxosServiceMessage.h"
32#include "messages/MMonMap.h"
33#include "messages/MMonGetMap.h"
34#include "messages/MMonGetVersion.h"
35#include "messages/MMonGetVersionReply.h"
36#include "messages/MGenericMessage.h"
37#include "messages/MMonCommand.h"
38#include "messages/MMonCommandAck.h"
31f18b77 39#include "messages/MMonHealth.h"
7c673cae
FG
40#include "messages/MMonMetadata.h"
41#include "messages/MMonSync.h"
42#include "messages/MMonScrub.h"
43#include "messages/MMonProbe.h"
44#include "messages/MMonJoin.h"
45#include "messages/MMonPaxos.h"
46#include "messages/MRoute.h"
47#include "messages/MForward.h"
48
49#include "messages/MMonSubscribe.h"
50#include "messages/MMonSubscribeAck.h"
51
52#include "messages/MAuthReply.h"
53
54#include "messages/MTimeCheck.h"
7c673cae
FG
55#include "messages/MPing.h"
56
57#include "common/strtol.h"
58#include "common/ceph_argparse.h"
59#include "common/Timer.h"
60#include "common/Clock.h"
61#include "common/errno.h"
62#include "common/perf_counters.h"
63#include "common/admin_socket.h"
64#include "global/signal_handler.h"
65#include "common/Formatter.h"
66#include "include/stringify.h"
67#include "include/color.h"
68#include "include/ceph_fs.h"
69#include "include/str_list.h"
70
71#include "OSDMonitor.h"
72#include "MDSMonitor.h"
73#include "MonmapMonitor.h"
74#include "PGMonitor.h"
75#include "LogMonitor.h"
76#include "AuthMonitor.h"
77#include "MgrMonitor.h"
31f18b77 78#include "MgrStatMonitor.h"
7c673cae 79#include "mon/QuorumService.h"
224ce89b 80#include "mon/OldHealthMonitor.h"
7c673cae
FG
81#include "mon/HealthMonitor.h"
82#include "mon/ConfigKeyService.h"
83#include "common/config.h"
84#include "common/cmdparse.h"
85#include "include/assert.h"
86#include "include/compat.h"
87#include "perfglue/heap_profiler.h"
88
89#include "auth/none/AuthNoneClientHandler.h"
90
91#define dout_subsys ceph_subsys_mon
92#undef dout_prefix
93#define dout_prefix _prefix(_dout, this)
94static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
95 return *_dout << "mon." << mon->name << "@" << mon->rank
96 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
97}
98
99const string Monitor::MONITOR_NAME = "monitor";
100const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
101
102
103#undef FLAG
104#undef COMMAND
105#undef COMMAND_WITH_FLAG
7c673cae
FG
106#define FLAG(f) (MonCommand::FLAG_##f)
107#define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
108 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
109#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
110 {parsesig, helptext, modulename, req_perms, avail, flags},
d2e6a577 111MonCommand mon_commands[] = {
7c673cae 112#include <mon/MonCommands.h>
d2e6a577
FG
113};
114MonCommand pgmonitor_commands[] = {
115#include <mon/PGMonitorCommands.h>
116};
7c673cae
FG
117#undef COMMAND
118#undef COMMAND_WITH_FLAG
7c673cae
FG
119
120
7c673cae
FG
121void C_MonContext::finish(int r) {
122 if (mon->is_shutdown())
123 return;
124 FunctionContext::finish(r);
125}
126
127Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
128 Messenger *m, Messenger *mgr_m, MonMap *map) :
129 Dispatcher(cct_),
130 name(nm),
131 rank(-1),
132 messenger(m),
133 con_self(m ? m->get_loopback_connection() : NULL),
134 lock("Monitor::lock"),
135 timer(cct_, lock),
136 finisher(cct_, "mon_finisher", "fin"),
137 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
138 has_ever_joined(false),
139 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
140 monmap(map),
141 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
142 key_server(cct, &keyring),
143 auth_cluster_required(cct,
144 cct->_conf->auth_supported.empty() ?
145 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
146 auth_service_required(cct,
147 cct->_conf->auth_supported.empty() ?
148 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
7c673cae
FG
149 mgr_messenger(mgr_m),
150 mgr_client(cct_, mgr_m),
31f18b77 151 pgservice(nullptr),
7c673cae
FG
152 store(s),
153
154 state(STATE_PROBING),
155
156 elector(this),
157 required_features(0),
158 leader(0),
159 quorum_con_features(0),
160 // scrub
161 scrub_version(0),
162 scrub_event(NULL),
163 scrub_timeout_event(NULL),
164
165 // sync state
166 sync_provider_count(0),
167 sync_cookie(0),
168 sync_full(false),
169 sync_start_version(0),
170 sync_timeout_event(NULL),
171 sync_last_committed_floor(0),
172
173 timecheck_round(0),
174 timecheck_acks(0),
175 timecheck_rounds_since_clean(0),
176 timecheck_event(NULL),
177
178 paxos_service(PAXOS_NUM),
179 admin_hook(NULL),
180 routed_request_tid(0),
181 op_tracker(cct, true, 1)
182{
183 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
184 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
185
186 update_log_clients();
187
188 paxos = new Paxos(this, "paxos");
189
190 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
191 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
192 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
193 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
194 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
195 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
196 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
31f18b77 197 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
224ce89b 198 paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
7c673cae 199
224ce89b 200 health_monitor = new OldHealthMonitor(this);
7c673cae
FG
201 config_key_service = new ConfigKeyService(this, paxos);
202
203 mon_caps = new MonCap();
204 bool r = mon_caps->parse("allow *", NULL);
205 assert(r);
206
207 exited_quorum = ceph_clock_now();
208
d2e6a577
FG
209 // prepare local commands
210 local_mon_commands.resize(ARRAY_SIZE(mon_commands));
211 for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
212 local_mon_commands[i] = mon_commands[i];
213 }
214 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
215
216 local_upgrading_mon_commands = local_mon_commands;
217 for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
218 local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
219 }
220 MonCommand::encode_vector(local_upgrading_mon_commands,
221 local_upgrading_mon_commands_bl);
222
7c673cae
FG
223 // assume our commands until we have an election. this only means
224 // we won't reply with EINVAL before the election; any command that
225 // actually matters will wait until we have quorum etc and then
226 // retry (and revalidate).
d2e6a577 227 leader_mon_commands = local_mon_commands;
31f18b77
FG
228
229 // note: OSDMonitor may update this based on the luminous flag.
230 pgservice = mgrstatmon()->get_pg_stat_service();
7c673cae
FG
231}
232
7c673cae
FG
233Monitor::~Monitor()
234{
235 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
236 delete *p;
237 delete health_monitor;
238 delete config_key_service;
239 delete paxos;
240 assert(session_map.sessions.empty());
241 delete mon_caps;
7c673cae
FG
242}
243
244
245class AdminHook : public AdminSocketHook {
246 Monitor *mon;
247public:
248 explicit AdminHook(Monitor *m) : mon(m) {}
249 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
250 bufferlist& out) override {
251 stringstream ss;
252 mon->do_admin_command(command, cmdmap, format, ss);
253 out.append(ss);
254 return true;
255 }
256};
257
258void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
259 ostream& ss)
260{
261 Mutex::Locker l(lock);
262
263 boost::scoped_ptr<Formatter> f(Formatter::create(format));
264
265 string args;
266 for (cmdmap_t::iterator p = cmdmap.begin();
267 p != cmdmap.end(); ++p) {
268 if (p->first == "prefix")
269 continue;
270 if (!args.empty())
271 args += ", ";
272 args += cmd_vartype_stringify(p->second);
273 }
274 args = "[" + args + "]";
275
276 bool read_only = (command == "mon_status" ||
277 command == "mon metadata" ||
278 command == "quorum_status" ||
224ce89b
WB
279 command == "ops" ||
280 command == "sessions");
7c673cae
FG
281
282 (read_only ? audit_clog->debug() : audit_clog->info())
283 << "from='admin socket' entity='admin socket' "
284 << "cmd='" << command << "' args=" << args << ": dispatch";
285
286 if (command == "mon_status") {
287 get_mon_status(f.get(), ss);
288 if (f)
289 f->flush(ss);
290 } else if (command == "quorum_status") {
291 _quorum_status(f.get(), ss);
292 } else if (command == "sync_force") {
293 string validate;
294 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
295 (validate != "--yes-i-really-mean-it")) {
296 ss << "are you SURE? this will mean the monitor store will be erased "
297 "the next time the monitor is restarted. pass "
298 "'--yes-i-really-mean-it' if you really do.";
299 goto abort;
300 }
301 sync_force(f.get(), ss);
302 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
303 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
304 goto abort;
305 } else if (command == "quorum enter") {
306 elector.start_participating();
307 start_election();
308 ss << "started responding to quorum, initiated new election";
309 } else if (command == "quorum exit") {
310 start_election();
311 elector.stop_participating();
312 ss << "stopped responding to quorum, initiated new election";
313 } else if (command == "ops") {
314 (void)op_tracker.dump_ops_in_flight(f.get());
315 if (f) {
316 f->flush(ss);
317 }
224ce89b
WB
318 } else if (command == "sessions") {
319
320 if (f) {
321 f->open_array_section("sessions");
322 for (auto p : session_map.sessions) {
323 f->dump_stream("session") << *p;
324 }
325 f->close_section();
326 f->flush(ss);
327 }
328
7c673cae
FG
329 } else {
330 assert(0 == "bad AdminSocket command binding");
331 }
332 (read_only ? audit_clog->debug() : audit_clog->info())
333 << "from='admin socket' "
334 << "entity='admin socket' "
335 << "cmd=" << command << " "
336 << "args=" << args << ": finished";
337 return;
338
339abort:
340 (read_only ? audit_clog->debug() : audit_clog->info())
341 << "from='admin socket' "
342 << "entity='admin socket' "
343 << "cmd=" << command << " "
344 << "args=" << args << ": aborted";
345}
346
347void Monitor::handle_signal(int signum)
348{
349 assert(signum == SIGINT || signum == SIGTERM);
350 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
351 shutdown();
352}
353
354CompatSet Monitor::get_initial_supported_features()
355{
356 CompatSet::FeatureSet ceph_mon_feature_compat;
357 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
358 CompatSet::FeatureSet ceph_mon_feature_incompat;
359 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
360 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
361 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
362 ceph_mon_feature_incompat);
363}
364
365CompatSet Monitor::get_supported_features()
366{
367 CompatSet compat = get_initial_supported_features();
368 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
369 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
370 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
371 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
372 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
373 return compat;
374}
375
376CompatSet Monitor::get_legacy_features()
377{
378 CompatSet::FeatureSet ceph_mon_feature_compat;
379 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
380 CompatSet::FeatureSet ceph_mon_feature_incompat;
381 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
382 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
383 ceph_mon_feature_incompat);
384}
385
386int Monitor::check_features(MonitorDBStore *store)
387{
388 CompatSet required = get_supported_features();
389 CompatSet ondisk;
390
391 read_features_off_disk(store, &ondisk);
392
393 if (!required.writeable(ondisk)) {
394 CompatSet diff = required.unsupported(ondisk);
395 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
396 return -EPERM;
397 }
398
399 return 0;
400}
401
402void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
403{
404 bufferlist featuresbl;
405 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
406 if (featuresbl.length() == 0) {
407 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
408 << "Assuming it is old-style and introducing one." << dendl;
409 //we only want the baseline ~v.18 features assumed to be on disk.
410 //If new features are introduced this code needs to disappear or
411 //be made smarter.
412 *features = get_legacy_features();
413
414 features->encode(featuresbl);
415 auto t(std::make_shared<MonitorDBStore::Transaction>());
416 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
417 store->apply_transaction(t);
418 } else {
419 bufferlist::iterator it = featuresbl.begin();
420 features->decode(it);
421 }
422}
423
424void Monitor::read_features()
425{
426 read_features_off_disk(store, &features);
427 dout(10) << "features " << features << dendl;
428
429 calc_quorum_requirements();
430 dout(10) << "required_features " << required_features << dendl;
431}
432
433void Monitor::write_features(MonitorDBStore::TransactionRef t)
434{
435 bufferlist bl;
436 features.encode(bl);
437 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
438}
439
440const char** Monitor::get_tracked_conf_keys() const
441{
442 static const char* KEYS[] = {
443 "crushtool", // helpful for testing
444 "mon_election_timeout",
445 "mon_lease",
446 "mon_lease_renew_interval_factor",
447 "mon_lease_ack_timeout_factor",
448 "mon_accept_timeout_factor",
449 // clog & admin clog
450 "clog_to_monitors",
451 "clog_to_syslog",
452 "clog_to_syslog_facility",
453 "clog_to_syslog_level",
454 "clog_to_graylog",
455 "clog_to_graylog_host",
456 "clog_to_graylog_port",
457 "host",
458 "fsid",
459 // periodic health to clog
460 "mon_health_to_clog",
461 "mon_health_to_clog_interval",
462 "mon_health_to_clog_tick_interval",
463 // scrub interval
464 "mon_scrub_interval",
465 NULL
466 };
467 return KEYS;
468}
469
470void Monitor::handle_conf_change(const struct md_config_t *conf,
471 const std::set<std::string> &changed)
472{
473 sanitize_options();
474
475 dout(10) << __func__ << " " << changed << dendl;
476
477 if (changed.count("clog_to_monitors") ||
478 changed.count("clog_to_syslog") ||
479 changed.count("clog_to_syslog_level") ||
480 changed.count("clog_to_syslog_facility") ||
481 changed.count("clog_to_graylog") ||
482 changed.count("clog_to_graylog_host") ||
483 changed.count("clog_to_graylog_port") ||
484 changed.count("host") ||
485 changed.count("fsid")) {
486 update_log_clients();
487 }
488
489 if (changed.count("mon_health_to_clog") ||
490 changed.count("mon_health_to_clog_interval") ||
491 changed.count("mon_health_to_clog_tick_interval")) {
492 health_to_clog_update_conf(changed);
493 }
494
495 if (changed.count("mon_scrub_interval")) {
496 scrub_update_interval(conf->mon_scrub_interval);
497 }
498}
499
500void Monitor::update_log_clients()
501{
502 map<string,string> log_to_monitors;
503 map<string,string> log_to_syslog;
504 map<string,string> log_channel;
505 map<string,string> log_prio;
506 map<string,string> log_to_graylog;
507 map<string,string> log_to_graylog_host;
508 map<string,string> log_to_graylog_port;
509 uuid_d fsid;
510 string host;
511
512 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
513 log_channel, log_prio, log_to_graylog,
514 log_to_graylog_host, log_to_graylog_port,
515 fsid, host))
516 return;
517
518 clog->update_config(log_to_monitors, log_to_syslog,
519 log_channel, log_prio, log_to_graylog,
520 log_to_graylog_host, log_to_graylog_port,
521 fsid, host);
522
523 audit_clog->update_config(log_to_monitors, log_to_syslog,
524 log_channel, log_prio, log_to_graylog,
525 log_to_graylog_host, log_to_graylog_port,
526 fsid, host);
527}
528
529int Monitor::sanitize_options()
530{
531 int r = 0;
532
533 // mon_lease must be greater than mon_lease_renewal; otherwise we
534 // may incur in leases expiring before they are renewed.
535 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
536 clog->error() << "mon_lease_renew_interval_factor ("
537 << g_conf->mon_lease_renew_interval_factor
538 << ") must be less than 1.0";
539 r = -EINVAL;
540 }
541
542 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
543 // got time to renew the lease and get an ack for it. Having both options
544 // with the same value, for a given small vale, could mean timing out if
545 // the monitors happened to be overloaded -- or even under normal load for
546 // a small enough value.
547 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
548 clog->error() << "mon_lease_ack_timeout_factor ("
549 << g_conf->mon_lease_ack_timeout_factor
550 << ") must be greater than 1.0";
551 r = -EINVAL;
552 }
553
554 return r;
555}
556
557int Monitor::preinit()
558{
559 lock.Lock();
560
561 dout(1) << "preinit fsid " << monmap->fsid << dendl;
562
563 int r = sanitize_options();
564 if (r < 0) {
565 derr << "option sanitization failed!" << dendl;
566 lock.Unlock();
567 return r;
568 }
569
570 assert(!logger);
571 {
572 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
573 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess");
574 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd");
575 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm");
576 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions");
577 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in");
578 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started");
579 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won");
580 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost");
581 logger = pcb.create_perf_counters();
582 cct->get_perfcounters_collection()->add(logger);
583 }
584
585 assert(!cluster_logger);
586 {
587 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
588 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
589 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
590 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
591 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
592 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
593 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
594 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
595 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
596 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
597 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
598 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
599 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
600 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
601 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
602 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
603 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
604 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
605 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
606 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
607 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
608 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
609 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
610 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
611 cluster_logger = pcb.create_perf_counters();
612 }
613
614 paxos->init_logger();
615
616 // verify cluster_uuid
617 {
618 int r = check_fsid();
619 if (r == -ENOENT)
620 r = write_fsid();
621 if (r < 0) {
622 lock.Unlock();
623 return r;
624 }
625 }
626
627 // open compatset
628 read_features();
629
630 // have we ever joined a quorum?
631 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
632 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
633
634 if (!has_ever_joined) {
635 // impose initial quorum restrictions?
636 list<string> initial_members;
637 get_str_list(g_conf->mon_initial_members, initial_members);
638
639 if (!initial_members.empty()) {
640 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
641
642 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
643 &extra_probe_peers);
644
645 dout(10) << " monmap is " << *monmap << dendl;
646 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
647 }
648 } else if (!monmap->contains(name)) {
649 derr << "not in monmap and have been in a quorum before; "
650 << "must have been removed" << dendl;
651 if (g_conf->mon_force_quorum_join) {
652 dout(0) << "we should have died but "
653 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
654 } else {
655 derr << "commit suicide!" << dendl;
656 lock.Unlock();
657 return -ENOENT;
658 }
659 }
660
661 {
662 // We have a potentially inconsistent store state in hands. Get rid of it
663 // and start fresh.
664 bool clear_store = false;
665 if (store->exists("mon_sync", "in_sync")) {
666 dout(1) << __func__ << " clean up potentially inconsistent store state"
667 << dendl;
668 clear_store = true;
669 }
670
671 if (store->get("mon_sync", "force_sync") > 0) {
672 dout(1) << __func__ << " force sync by clearing store state" << dendl;
673 clear_store = true;
674 }
675
676 if (clear_store) {
677 set<string> sync_prefixes = get_sync_targets_names();
678 store->clear(sync_prefixes);
679 }
680 }
681
682 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
683 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
684
685 init_paxos();
686 health_monitor->init();
687
688 if (is_keyring_required()) {
689 // we need to bootstrap authentication keys so we can form an
690 // initial quorum.
691 if (authmon()->get_last_committed() == 0) {
692 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
693 bufferlist bl;
694 int err = store->get("mkfs", "keyring", bl);
695 if (err == 0 && bl.length() > 0) {
696 // Attempt to decode and extract keyring only if it is found.
697 KeyRing keyring;
698 bufferlist::iterator p = bl.begin();
699 ::decode(keyring, p);
700 extract_save_mon_key(keyring);
701 }
702 }
703
704 string keyring_loc = g_conf->mon_data + "/keyring";
705
706 r = keyring.load(cct, keyring_loc);
707 if (r < 0) {
708 EntityName mon_name;
709 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
710 EntityAuth mon_key;
711 if (key_server.get_auth(mon_name, mon_key)) {
712 dout(1) << "copying mon. key from old db to external keyring" << dendl;
713 keyring.add(mon_name, mon_key);
714 bufferlist bl;
715 keyring.encode_plaintext(bl);
716 write_default_keyring(bl);
717 } else {
718 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
719 lock.Unlock();
720 return r;
721 }
722 }
723 }
724
725 admin_hook = new AdminHook(this);
726 AdminSocket* admin_socket = cct->get_admin_socket();
727
728 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
729 lock.Unlock();
730 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
731 "show current monitor status");
732 assert(r == 0);
733 r = admin_socket->register_command("quorum_status", "quorum_status",
734 admin_hook, "show current quorum status");
735 assert(r == 0);
736 r = admin_socket->register_command("sync_force",
737 "sync_force name=validate,"
738 "type=CephChoices,"
739 "strings=--yes-i-really-mean-it",
740 admin_hook,
741 "force sync of and clear monitor store");
742 assert(r == 0);
743 r = admin_socket->register_command("add_bootstrap_peer_hint",
744 "add_bootstrap_peer_hint name=addr,"
745 "type=CephIPAddr",
746 admin_hook,
747 "add peer address as potential bootstrap"
748 " peer for cluster bringup");
749 assert(r == 0);
750 r = admin_socket->register_command("quorum enter", "quorum enter",
751 admin_hook,
752 "force monitor back into quorum");
753 assert(r == 0);
754 r = admin_socket->register_command("quorum exit", "quorum exit",
755 admin_hook,
756 "force monitor out of the quorum");
757 assert(r == 0);
758 r = admin_socket->register_command("ops",
759 "ops",
760 admin_hook,
761 "show the ops currently in flight");
762 assert(r == 0);
224ce89b
WB
763 r = admin_socket->register_command("sessions",
764 "sessions",
765 admin_hook,
766 "list existing sessions");
767 assert(r == 0);
7c673cae
FG
768
769 lock.Lock();
770
771 // add ourselves as a conf observer
772 g_conf->add_observer(this);
773
774 lock.Unlock();
775 return 0;
776}
777
778int Monitor::init()
779{
780 dout(2) << "init" << dendl;
781 Mutex::Locker l(lock);
782
783 finisher.start();
784
785 // start ticker
786 timer.init();
787 new_tick();
788
789 cpu_tp.start();
790
791 // i'm ready!
792 messenger->add_dispatcher_tail(this);
793
794 mgr_client.init();
795 mgr_messenger->add_dispatcher_tail(&mgr_client);
796 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
797
798 bootstrap();
b5b8bbf5
FG
799 // add features of myself into feature_map
800 session_map.feature_map.add_mon(con_self->get_features());
7c673cae
FG
801 return 0;
802}
803
804void Monitor::init_paxos()
805{
806 dout(10) << __func__ << dendl;
807 paxos->init();
808
809 // init services
810 for (int i = 0; i < PAXOS_NUM; ++i) {
811 paxos_service[i]->init();
812 }
813
814 refresh_from_paxos(NULL);
815}
816
817void Monitor::refresh_from_paxos(bool *need_bootstrap)
818{
819 dout(10) << __func__ << dendl;
820
821 bufferlist bl;
822 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
823 if (r >= 0) {
824 try {
825 bufferlist::iterator p = bl.begin();
826 ::decode(fingerprint, p);
827 }
828 catch (buffer::error& e) {
829 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
830 }
831 } else {
832 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
833 }
834
835 for (int i = 0; i < PAXOS_NUM; ++i) {
836 paxos_service[i]->refresh(need_bootstrap);
837 }
838 for (int i = 0; i < PAXOS_NUM; ++i) {
839 paxos_service[i]->post_refresh();
840 }
224ce89b 841 load_metadata();
7c673cae
FG
842}
843
844void Monitor::register_cluster_logger()
845{
846 if (!cluster_logger_registered) {
847 dout(10) << "register_cluster_logger" << dendl;
848 cluster_logger_registered = true;
849 cct->get_perfcounters_collection()->add(cluster_logger);
850 } else {
851 dout(10) << "register_cluster_logger - already registered" << dendl;
852 }
853}
854
855void Monitor::unregister_cluster_logger()
856{
857 if (cluster_logger_registered) {
858 dout(10) << "unregister_cluster_logger" << dendl;
859 cluster_logger_registered = false;
860 cct->get_perfcounters_collection()->remove(cluster_logger);
861 } else {
862 dout(10) << "unregister_cluster_logger - not registered" << dendl;
863 }
864}
865
866void Monitor::update_logger()
867{
868 cluster_logger->set(l_cluster_num_mon, monmap->size());
869 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
870}
871
872void Monitor::shutdown()
873{
874 dout(1) << "shutdown" << dendl;
875
876 lock.Lock();
877
878 wait_for_paxos_write();
879
880 state = STATE_SHUTDOWN;
881
882 g_conf->remove_observer(this);
883
884 if (admin_hook) {
885 AdminSocket* admin_socket = cct->get_admin_socket();
886 admin_socket->unregister_command("mon_status");
887 admin_socket->unregister_command("quorum_status");
888 admin_socket->unregister_command("sync_force");
889 admin_socket->unregister_command("add_bootstrap_peer_hint");
890 admin_socket->unregister_command("quorum enter");
891 admin_socket->unregister_command("quorum exit");
892 admin_socket->unregister_command("ops");
224ce89b 893 admin_socket->unregister_command("sessions");
7c673cae
FG
894 delete admin_hook;
895 admin_hook = NULL;
896 }
897
898 elector.shutdown();
899
900 mgr_client.shutdown();
901
902 lock.Unlock();
903 finisher.wait_for_empty();
904 finisher.stop();
905 lock.Lock();
906
907 // clean up
908 paxos->shutdown();
909 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
910 (*p)->shutdown();
911 health_monitor->shutdown();
912
913 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
914 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
915
916 timer.shutdown();
917
918 cpu_tp.stop();
919
920 remove_all_sessions();
921
922 if (logger) {
923 cct->get_perfcounters_collection()->remove(logger);
924 delete logger;
925 logger = NULL;
926 }
927 if (cluster_logger) {
928 if (cluster_logger_registered)
929 cct->get_perfcounters_collection()->remove(cluster_logger);
930 delete cluster_logger;
931 cluster_logger = NULL;
932 }
933
934 log_client.shutdown();
935
936 // unlock before msgr shutdown...
937 lock.Unlock();
938
939 messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
940 mgr_messenger->shutdown();
941}
942
943void Monitor::wait_for_paxos_write()
944{
945 if (paxos->is_writing() || paxos->is_writing_previous()) {
946 dout(10) << __func__ << " flushing pending write" << dendl;
947 lock.Unlock();
948 store->flush();
949 lock.Lock();
950 dout(10) << __func__ << " flushed pending write" << dendl;
951 }
952}
953
954void Monitor::bootstrap()
955{
956 dout(10) << "bootstrap" << dendl;
957 wait_for_paxos_write();
958
959 sync_reset_requester();
960 unregister_cluster_logger();
961 cancel_probe_timeout();
962
963 // note my rank
964 int newrank = monmap->get_rank(messenger->get_myaddr());
965 if (newrank < 0 && rank >= 0) {
966 // was i ever part of the quorum?
967 if (has_ever_joined) {
968 dout(0) << " removed from monmap, suicide." << dendl;
969 exit(0);
970 }
971 }
972 if (newrank != rank) {
973 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
974 messenger->set_myname(entity_name_t::MON(newrank));
975 rank = newrank;
976
977 // reset all connections, or else our peers will think we are someone else.
978 messenger->mark_down_all();
979 }
980
981 // reset
982 state = STATE_PROBING;
983
984 _reset();
985
986 // sync store
987 if (g_conf->mon_compact_on_bootstrap) {
988 dout(10) << "bootstrap -- triggering compaction" << dendl;
989 store->compact();
990 dout(10) << "bootstrap -- finished compaction" << dendl;
991 }
992
993 // singleton monitor?
994 if (monmap->size() == 1 && rank == 0) {
995 win_standalone_election();
996 return;
997 }
998
999 reset_probe_timeout();
1000
1001 // i'm outside the quorum
1002 if (monmap->contains(name))
1003 outside_quorum.insert(name);
1004
1005 // probe monitors
1006 dout(10) << "probing other monitors" << dendl;
1007 for (unsigned i = 0; i < monmap->size(); i++) {
1008 if ((int)i != rank)
1009 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1010 monmap->get_inst(i));
1011 }
1012 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1013 p != extra_probe_peers.end();
1014 ++p) {
1015 if (*p != messenger->get_myaddr()) {
1016 entity_inst_t i;
1017 i.name = entity_name_t::MON(-1);
1018 i.addr = *p;
1019 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1020 }
1021 }
1022}
1023
1024bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1025{
1026 string addrstr;
1027 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1028 ss << "unable to parse address string value '"
1029 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1030 return false;
1031 }
1032 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1033 << addrstr << "'" << dendl;
1034
1035 entity_addr_t addr;
1036 const char *end = 0;
1037 if (!addr.parse(addrstr.c_str(), &end)) {
1038 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1039 return false;
1040 }
1041
1042 if (is_leader() || is_peon()) {
1043 ss << "mon already active; ignoring bootstrap hint";
1044 return true;
1045 }
1046
1047 if (addr.get_port() == 0)
1048 addr.set_port(CEPH_MON_PORT);
1049
1050 extra_probe_peers.insert(addr);
1051 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1052 return true;
1053}
1054
1055// called by bootstrap(), or on leader|peon -> electing
1056void Monitor::_reset()
1057{
1058 dout(10) << __func__ << dendl;
1059
1060 cancel_probe_timeout();
1061 timecheck_finish();
1062 health_events_cleanup();
1063 scrub_event_cancel();
1064
1065 leader_since = utime_t();
1066 if (!quorum.empty()) {
1067 exited_quorum = ceph_clock_now();
1068 }
1069 quorum.clear();
1070 outside_quorum.clear();
31f18b77 1071 quorum_feature_map.clear();
7c673cae
FG
1072
1073 scrub_reset();
1074
1075 paxos->restart();
1076
1077 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1078 (*p)->restart();
1079 health_monitor->finish();
1080}
1081
1082
1083// -----------------------------------------------------------
1084// sync
1085
1086set<string> Monitor::get_sync_targets_names()
1087{
1088 set<string> targets;
1089 targets.insert(paxos->get_name());
1090 for (int i = 0; i < PAXOS_NUM; ++i)
1091 paxos_service[i]->get_store_prefixes(targets);
1092 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1093 assert(config_key_service_ptr);
1094 config_key_service_ptr->get_store_prefixes(targets);
1095 return targets;
1096}
1097
1098
1099void Monitor::sync_timeout()
1100{
1101 dout(10) << __func__ << dendl;
1102 assert(state == STATE_SYNCHRONIZING);
1103 bootstrap();
1104}
1105
1106void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1107{
1108 dout(1) << __func__ << dendl;
1109
1110 MonMap latest_monmap;
1111
1112 // Grab latest monmap from MonmapMonitor
1113 bufferlist monmon_bl;
1114 int err = monmon()->get_monmap(monmon_bl);
1115 if (err < 0) {
1116 if (err != -ENOENT) {
1117 derr << __func__
1118 << " something wrong happened while reading the store: "
1119 << cpp_strerror(err) << dendl;
1120 assert(0 == "error reading the store");
1121 }
1122 } else {
1123 latest_monmap.decode(monmon_bl);
1124 }
1125
1126 // Grab last backed up monmap (if any) and compare epochs
1127 if (store->exists("mon_sync", "latest_monmap")) {
1128 bufferlist backup_bl;
1129 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1130 if (err < 0) {
1131 derr << __func__
1132 << " something wrong happened while reading the store: "
1133 << cpp_strerror(err) << dendl;
1134 assert(0 == "error reading the store");
1135 }
1136 assert(backup_bl.length() > 0);
1137
1138 MonMap backup_monmap;
1139 backup_monmap.decode(backup_bl);
1140
1141 if (backup_monmap.epoch > latest_monmap.epoch)
1142 latest_monmap = backup_monmap;
1143 }
1144
1145 // Check if our current monmap's epoch is greater than the one we've
1146 // got so far.
1147 if (monmap->epoch > latest_monmap.epoch)
1148 latest_monmap = *monmap;
1149
1150 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1151
1152 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1153}
1154
1155void Monitor::sync_reset_requester()
1156{
1157 dout(10) << __func__ << dendl;
1158
1159 if (sync_timeout_event) {
1160 timer.cancel_event(sync_timeout_event);
1161 sync_timeout_event = NULL;
1162 }
1163
1164 sync_provider = entity_inst_t();
1165 sync_cookie = 0;
1166 sync_full = false;
1167 sync_start_version = 0;
1168}
1169
1170void Monitor::sync_reset_provider()
1171{
1172 dout(10) << __func__ << dendl;
1173 sync_providers.clear();
1174}
1175
1176void Monitor::sync_start(entity_inst_t &other, bool full)
1177{
1178 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1179
1180 assert(state == STATE_PROBING ||
1181 state == STATE_SYNCHRONIZING);
1182 state = STATE_SYNCHRONIZING;
1183
1184 // make sure are not a provider for anyone!
1185 sync_reset_provider();
1186
1187 sync_full = full;
1188
1189 if (sync_full) {
1190 // stash key state, and mark that we are syncing
1191 auto t(std::make_shared<MonitorDBStore::Transaction>());
1192 sync_stash_critical_state(t);
1193 t->put("mon_sync", "in_sync", 1);
1194
1195 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1196 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1197 << sync_last_committed_floor << dendl;
1198 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1199
1200 store->apply_transaction(t);
1201
1202 assert(g_conf->mon_sync_requester_kill_at != 1);
1203
1204 // clear the underlying store
1205 set<string> targets = get_sync_targets_names();
1206 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1207 store->clear(targets);
1208
1209 // make sure paxos knows it has been reset. this prevents a
1210 // bootstrap and then different probe reply order from possibly
1211 // deciding a partial or no sync is needed.
1212 paxos->init();
1213
1214 assert(g_conf->mon_sync_requester_kill_at != 2);
1215 }
1216
1217 // assume 'other' as the leader. We will update the leader once we receive
1218 // a reply to the sync start.
1219 sync_provider = other;
1220
1221 sync_reset_timeout();
1222
1223 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1224 if (!sync_full)
1225 m->last_committed = paxos->get_version();
1226 messenger->send_message(m, sync_provider);
1227}
1228
1229void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1230{
1231 dout(10) << __func__ << dendl;
1232 bufferlist backup_monmap;
1233 sync_obtain_latest_monmap(backup_monmap);
1234 assert(backup_monmap.length() > 0);
1235 t->put("mon_sync", "latest_monmap", backup_monmap);
1236}
1237
1238void Monitor::sync_reset_timeout()
1239{
1240 dout(10) << __func__ << dendl;
1241 if (sync_timeout_event)
1242 timer.cancel_event(sync_timeout_event);
1243 sync_timeout_event = new C_MonContext(this, [this](int) {
1244 sync_timeout();
1245 });
1246 timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
1247}
1248
1249void Monitor::sync_finish(version_t last_committed)
1250{
1251 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1252
1253 assert(g_conf->mon_sync_requester_kill_at != 7);
1254
1255 if (sync_full) {
1256 // finalize the paxos commits
1257 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1258 paxos->read_and_prepare_transactions(tx, sync_start_version,
1259 last_committed);
1260 tx->put(paxos->get_name(), "last_committed", last_committed);
1261
1262 dout(30) << __func__ << " final tx dump:\n";
1263 JSONFormatter f(true);
1264 tx->dump(&f);
1265 f.flush(*_dout);
1266 *_dout << dendl;
1267
1268 store->apply_transaction(tx);
1269 }
1270
1271 assert(g_conf->mon_sync_requester_kill_at != 8);
1272
1273 auto t(std::make_shared<MonitorDBStore::Transaction>());
1274 t->erase("mon_sync", "in_sync");
1275 t->erase("mon_sync", "force_sync");
1276 t->erase("mon_sync", "last_committed_floor");
1277 store->apply_transaction(t);
1278
1279 assert(g_conf->mon_sync_requester_kill_at != 9);
1280
1281 init_paxos();
1282
1283 assert(g_conf->mon_sync_requester_kill_at != 10);
1284
1285 bootstrap();
1286}
1287
1288void Monitor::handle_sync(MonOpRequestRef op)
1289{
1290 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1291 dout(10) << __func__ << " " << *m << dendl;
1292 switch (m->op) {
1293
1294 // provider ---------
1295
1296 case MMonSync::OP_GET_COOKIE_FULL:
1297 case MMonSync::OP_GET_COOKIE_RECENT:
1298 handle_sync_get_cookie(op);
1299 break;
1300 case MMonSync::OP_GET_CHUNK:
1301 handle_sync_get_chunk(op);
1302 break;
1303
1304 // client -----------
1305
1306 case MMonSync::OP_COOKIE:
1307 handle_sync_cookie(op);
1308 break;
1309
1310 case MMonSync::OP_CHUNK:
1311 case MMonSync::OP_LAST_CHUNK:
1312 handle_sync_chunk(op);
1313 break;
1314 case MMonSync::OP_NO_COOKIE:
1315 handle_sync_no_cookie(op);
1316 break;
1317
1318 default:
1319 dout(0) << __func__ << " unknown op " << m->op << dendl;
1320 assert(0 == "unknown op");
1321 }
1322}
1323
1324// leader
1325
1326void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1327{
1328 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1329 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1330 m->get_connection()->send_message(reply);
1331}
1332
1333void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1334{
1335 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1336 if (is_synchronizing()) {
1337 _sync_reply_no_cookie(op);
1338 return;
1339 }
1340
1341 assert(g_conf->mon_sync_provider_kill_at != 1);
1342
1343 // make sure they can understand us.
1344 if ((required_features ^ m->get_connection()->get_features()) &
1345 required_features) {
1346 dout(5) << " ignoring peer mon." << m->get_source().num()
1347 << " has features " << std::hex
1348 << m->get_connection()->get_features()
1349 << " but we require " << required_features << std::dec << dendl;
1350 return;
1351 }
1352
1353 // make up a unique cookie. include election epoch (which persists
1354 // across restarts for the whole cluster) and a counter for this
1355 // process instance. there is no need to be unique *across*
1356 // monitors, though.
1357 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1358 assert(sync_providers.count(cookie) == 0);
1359
1360 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1361
1362 SyncProvider& sp = sync_providers[cookie];
1363 sp.cookie = cookie;
1364 sp.entity = m->get_source_inst();
1365 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1366
1367 set<string> sync_targets;
1368 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1369 // full scan
1370 sync_targets = get_sync_targets_names();
1371 sp.last_committed = paxos->get_version();
1372 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1373 sp.full = true;
1374 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1375 } else {
1376 // just catch up paxos
1377 sp.last_committed = m->last_committed;
1378 }
1379 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1380
1381 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1382 reply->last_committed = sp.last_committed;
1383 m->get_connection()->send_message(reply);
1384}
1385
1386void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1387{
1388 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1389 dout(10) << __func__ << " " << *m << dendl;
1390
1391 if (sync_providers.count(m->cookie) == 0) {
1392 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1393 _sync_reply_no_cookie(op);
1394 return;
1395 }
1396
1397 assert(g_conf->mon_sync_provider_kill_at != 2);
1398
1399 SyncProvider& sp = sync_providers[m->cookie];
1400 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1401
1402 if (sp.last_committed < paxos->get_first_committed() &&
1403 paxos->get_first_committed() > 1) {
1404 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1405 << " < our fc " << paxos->get_first_committed() << dendl;
1406 sync_providers.erase(m->cookie);
1407 _sync_reply_no_cookie(op);
1408 return;
1409 }
1410
1411 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1412 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1413
1414 int left = g_conf->mon_sync_max_payload_size;
1415 while (sp.last_committed < paxos->get_version() && left > 0) {
1416 bufferlist bl;
1417 sp.last_committed++;
224ce89b
WB
1418
1419 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1420 assert(err == 0);
1421
7c673cae
FG
1422 tx->put(paxos->get_name(), sp.last_committed, bl);
1423 left -= bl.length();
1424 dout(20) << __func__ << " including paxos state " << sp.last_committed
1425 << dendl;
1426 }
1427 reply->last_committed = sp.last_committed;
1428
1429 if (sp.full && left > 0) {
1430 sp.synchronizer->get_chunk_tx(tx, left);
1431 sp.last_key = sp.synchronizer->get_last_key();
1432 reply->last_key = sp.last_key;
1433 }
1434
1435 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1436 sp.last_committed < paxos->get_version()) {
1437 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1438 << " key " << sp.last_key << dendl;
1439 } else {
1440 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1441 << " key " << sp.last_key << dendl;
1442 reply->op = MMonSync::OP_LAST_CHUNK;
1443
1444 assert(g_conf->mon_sync_provider_kill_at != 3);
1445
1446 // clean up our local state
1447 sync_providers.erase(sp.cookie);
1448 }
1449
1450 ::encode(*tx, reply->chunk_bl);
1451
1452 m->get_connection()->send_message(reply);
1453}
1454
1455// requester
1456
1457void Monitor::handle_sync_cookie(MonOpRequestRef op)
1458{
1459 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1460 dout(10) << __func__ << " " << *m << dendl;
1461 if (sync_cookie) {
1462 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1463 return;
1464 }
1465 if (m->get_source_inst() != sync_provider) {
1466 dout(10) << __func__ << " source does not match, discarding" << dendl;
1467 return;
1468 }
1469 sync_cookie = m->cookie;
1470 sync_start_version = m->last_committed;
1471
1472 sync_reset_timeout();
1473 sync_get_next_chunk();
1474
1475 assert(g_conf->mon_sync_requester_kill_at != 3);
1476}
1477
1478void Monitor::sync_get_next_chunk()
1479{
1480 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1481 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1482 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1483 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1484 }
1485 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1486 messenger->send_message(r, sync_provider);
1487
1488 assert(g_conf->mon_sync_requester_kill_at != 4);
1489}
1490
1491void Monitor::handle_sync_chunk(MonOpRequestRef op)
1492{
1493 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1494 dout(10) << __func__ << " " << *m << dendl;
1495
1496 if (m->cookie != sync_cookie) {
1497 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1498 return;
1499 }
1500 if (m->get_source_inst() != sync_provider) {
1501 dout(10) << __func__ << " source does not match, discarding" << dendl;
1502 return;
1503 }
1504
1505 assert(state == STATE_SYNCHRONIZING);
1506 assert(g_conf->mon_sync_requester_kill_at != 5);
1507
1508 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1509 tx->append_from_encoded(m->chunk_bl);
1510
1511 dout(30) << __func__ << " tx dump:\n";
1512 JSONFormatter f(true);
1513 tx->dump(&f);
1514 f.flush(*_dout);
1515 *_dout << dendl;
1516
1517 store->apply_transaction(tx);
1518
1519 assert(g_conf->mon_sync_requester_kill_at != 6);
1520
1521 if (!sync_full) {
1522 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1523 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1524 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1525 m->last_committed);
1526 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1527
1528 dout(30) << __func__ << " tx dump:\n";
1529 JSONFormatter f(true);
1530 tx->dump(&f);
1531 f.flush(*_dout);
1532 *_dout << dendl;
1533
1534 store->apply_transaction(tx);
1535 paxos->init(); // to refresh what we just wrote
1536 }
1537
1538 if (m->op == MMonSync::OP_CHUNK) {
1539 sync_reset_timeout();
1540 sync_get_next_chunk();
1541 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1542 sync_finish(m->last_committed);
1543 }
1544}
1545
1546void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1547{
1548 dout(10) << __func__ << dendl;
1549 bootstrap();
1550}
1551
1552void Monitor::sync_trim_providers()
1553{
1554 dout(20) << __func__ << dendl;
1555
1556 utime_t now = ceph_clock_now();
1557 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1558 while (p != sync_providers.end()) {
1559 if (now > p->second.timeout) {
1560 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1561 sync_providers.erase(p++);
1562 } else {
1563 ++p;
1564 }
1565 }
1566}
1567
1568// ---------------------------------------------------
1569// probe
1570
1571void Monitor::cancel_probe_timeout()
1572{
1573 if (probe_timeout_event) {
1574 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1575 timer.cancel_event(probe_timeout_event);
1576 probe_timeout_event = NULL;
1577 } else {
1578 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1579 }
1580}
1581
1582void Monitor::reset_probe_timeout()
1583{
1584 cancel_probe_timeout();
1585 probe_timeout_event = new C_MonContext(this, [this](int r) {
1586 probe_timeout(r);
1587 });
1588 double t = g_conf->mon_probe_timeout;
1589 timer.add_event_after(t, probe_timeout_event);
1590 dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
1591}
1592
1593void Monitor::probe_timeout(int r)
1594{
1595 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1596 assert(is_probing() || is_synchronizing());
1597 assert(probe_timeout_event);
1598 probe_timeout_event = NULL;
1599 bootstrap();
1600}
1601
1602void Monitor::handle_probe(MonOpRequestRef op)
1603{
1604 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1605 dout(10) << "handle_probe " << *m << dendl;
1606
1607 if (m->fsid != monmap->fsid) {
1608 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1609 return;
1610 }
1611
1612 switch (m->op) {
1613 case MMonProbe::OP_PROBE:
1614 handle_probe_probe(op);
1615 break;
1616
1617 case MMonProbe::OP_REPLY:
1618 handle_probe_reply(op);
1619 break;
1620
1621 case MMonProbe::OP_MISSING_FEATURES:
1622 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1623 << ", required " << m->required_features
1624 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1625 << dendl;
1626 break;
1627 }
1628}
1629
1630void Monitor::handle_probe_probe(MonOpRequestRef op)
1631{
1632 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1633
1634 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1635 << " features " << m->get_connection()->get_features() << dendl;
1636 uint64_t missing = required_features & ~m->get_connection()->get_features();
1637 if (missing) {
1638 dout(1) << " peer " << m->get_source_addr() << " missing features "
1639 << missing << dendl;
1640 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1641 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1642 name, has_ever_joined);
1643 m->required_features = required_features;
1644 m->get_connection()->send_message(r);
1645 }
1646 goto out;
1647 }
1648
1649 if (!is_probing() && !is_synchronizing()) {
1650 // If the probing mon is way ahead of us, we need to re-bootstrap.
1651 // Normally we capture this case when we initially bootstrap, but
1652 // it is possible we pass those checks (we overlap with
1653 // quorum-to-be) but fail to join a quorum before it moves past
1654 // us. We need to be kicked back to bootstrap so we can
1655 // synchonize, not keep calling elections.
1656 if (paxos->get_version() + 1 < m->paxos_first_version) {
1657 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1658 << "ahead of us, re-bootstrapping" << dendl;
1659 bootstrap();
1660 goto out;
1661
1662 }
1663 }
1664
1665 MMonProbe *r;
1666 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1667 r->name = name;
1668 r->quorum = quorum;
1669 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1670 r->paxos_first_version = paxos->get_first_committed();
1671 r->paxos_last_version = paxos->get_version();
1672 m->get_connection()->send_message(r);
1673
1674 // did we discover a peer here?
1675 if (!monmap->contains(m->get_source_addr())) {
1676 dout(1) << " adding peer " << m->get_source_addr()
1677 << " to list of hints" << dendl;
1678 extra_probe_peers.insert(m->get_source_addr());
1679 }
1680
1681 out:
1682 return;
1683}
1684
1685void Monitor::handle_probe_reply(MonOpRequestRef op)
1686{
1687 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1688 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1689 dout(10) << " monmap is " << *monmap << dendl;
1690
1691 // discover name and addrs during probing or electing states.
1692 if (!is_probing() && !is_electing()) {
1693 return;
1694 }
1695
1696 // newer map, or they've joined a quorum and we haven't?
1697 bufferlist mybl;
1698 monmap->encode(mybl, m->get_connection()->get_features());
1699 // make sure it's actually different; the checks below err toward
1700 // taking the other guy's map, which could cause us to loop.
1701 if (!mybl.contents_equal(m->monmap_bl)) {
1702 MonMap *newmap = new MonMap;
1703 newmap->decode(m->monmap_bl);
1704 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1705 !has_ever_joined)) {
1706 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1707 << ", mine was " << monmap->get_epoch() << dendl;
1708 delete newmap;
1709 monmap->decode(m->monmap_bl);
1710
1711 bootstrap();
1712 return;
1713 }
1714 delete newmap;
1715 }
1716
1717 // rename peer?
1718 string peer_name = monmap->get_name(m->get_source_addr());
1719 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1720 dout(10) << " renaming peer " << m->get_source_addr() << " "
1721 << peer_name << " -> " << m->name << " in my monmap"
1722 << dendl;
1723 monmap->rename(peer_name, m->name);
1724
1725 if (is_electing()) {
1726 bootstrap();
1727 return;
1728 }
1729 } else {
1730 dout(10) << " peer name is " << peer_name << dendl;
1731 }
1732
1733 // new initial peer?
1734 if (monmap->get_epoch() == 0 &&
1735 monmap->contains(m->name) &&
1736 monmap->get_addr(m->name).is_blank_ip()) {
1737 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1738 monmap->set_addr(m->name, m->get_source_addr());
1739
1740 bootstrap();
1741 return;
1742 }
1743
1744 // end discover phase
1745 if (!is_probing()) {
1746 return;
1747 }
1748
1749 assert(paxos != NULL);
1750
1751 if (is_synchronizing()) {
1752 dout(10) << " currently syncing" << dendl;
1753 return;
1754 }
1755
1756 entity_inst_t other = m->get_source_inst();
1757
1758 if (m->paxos_last_version < sync_last_committed_floor) {
1759 dout(10) << " peer paxos versions [" << m->paxos_first_version
1760 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1761 << sync_last_committed_floor << ", ignoring"
1762 << dendl;
1763 } else {
1764 if (paxos->get_version() < m->paxos_first_version &&
1765 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1766 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1767 << "," << m->paxos_last_version << "]"
1768 << " vs my version " << paxos->get_version()
1769 << " (too far ahead)"
1770 << dendl;
1771 cancel_probe_timeout();
1772 sync_start(other, true);
1773 return;
1774 }
1775 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1776 dout(10) << " peer paxos last version " << m->paxos_last_version
1777 << " vs my version " << paxos->get_version()
1778 << " (too far ahead)"
1779 << dendl;
1780 cancel_probe_timeout();
1781 sync_start(other, false);
1782 return;
1783 }
1784 }
1785
1786 // is there an existing quorum?
1787 if (m->quorum.size()) {
1788 dout(10) << " existing quorum " << m->quorum << dendl;
1789
1790 dout(10) << " peer paxos version " << m->paxos_last_version
1791 << " vs my version " << paxos->get_version()
1792 << " (ok)"
1793 << dendl;
1794
1795 if (monmap->contains(name) &&
1796 !monmap->get_addr(name).is_blank_ip()) {
1797 // i'm part of the cluster; just initiate a new election
1798 start_election();
1799 } else {
1800 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1801 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1802 monmap->get_inst(*m->quorum.begin()));
1803 }
1804 } else {
1805 if (monmap->contains(m->name)) {
1806 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1807 outside_quorum.insert(m->name);
1808 } else {
1809 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1810 return;
1811 }
1812
1813 unsigned need = monmap->size() / 2 + 1;
1814 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1815 if (outside_quorum.size() >= need) {
1816 if (outside_quorum.count(name)) {
1817 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1818 start_election();
1819 } else {
1820 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1821 }
1822 } else {
1823 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1824 }
1825 }
1826}
1827
1828void Monitor::join_election()
1829{
1830 dout(10) << __func__ << dendl;
1831 wait_for_paxos_write();
1832 _reset();
1833 state = STATE_ELECTING;
1834
1835 logger->inc(l_mon_num_elections);
1836}
1837
1838void Monitor::start_election()
1839{
1840 dout(10) << "start_election" << dendl;
1841 wait_for_paxos_write();
1842 _reset();
1843 state = STATE_ELECTING;
1844
1845 logger->inc(l_mon_num_elections);
1846 logger->inc(l_mon_election_call);
1847
1848 clog->info() << "mon." << name << " calling new monitor election";
1849 elector.call_election();
1850}
1851
1852void Monitor::win_standalone_election()
1853{
1854 dout(1) << "win_standalone_election" << dendl;
1855
1856 // bump election epoch, in case the previous epoch included other
1857 // monitors; we need to be able to make the distinction.
1858 elector.init();
1859 elector.advance_epoch();
1860
1861 rank = monmap->get_rank(name);
1862 assert(rank == 0);
1863 set<int> q;
1864 q.insert(rank);
1865
224ce89b
WB
1866 map<int,Metadata> metadata;
1867 collect_metadata(&metadata[0]);
1868
7c673cae
FG
1869 win_election(elector.get_epoch(), q,
1870 CEPH_FEATURES_ALL,
1871 ceph::features::mon::get_supported(),
d2e6a577 1872 metadata);
7c673cae
FG
1873}
1874
1875const utime_t& Monitor::get_leader_since() const
1876{
1877 assert(state == STATE_LEADER);
1878 return leader_since;
1879}
1880
1881epoch_t Monitor::get_epoch()
1882{
1883 return elector.get_epoch();
1884}
1885
1886void Monitor::_finish_svc_election()
1887{
1888 assert(state == STATE_LEADER || state == STATE_PEON);
1889
1890 for (auto p : paxos_service) {
1891 // we already called election_finished() on monmon(); avoid callig twice
1892 if (state == STATE_LEADER && p == monmon())
1893 continue;
1894 p->election_finished();
1895 }
1896}
1897
1898void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1899 const mon_feature_t& mon_features,
d2e6a577 1900 const map<int,Metadata>& metadata)
7c673cae
FG
1901{
1902 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1903 << " features " << features
1904 << " mon_features " << mon_features
1905 << dendl;
1906 assert(is_electing());
1907 state = STATE_LEADER;
1908 leader_since = ceph_clock_now();
1909 leader = rank;
1910 quorum = active;
1911 quorum_con_features = features;
1912 quorum_mon_features = mon_features;
224ce89b 1913 pending_metadata = metadata;
7c673cae
FG
1914 outside_quorum.clear();
1915
1916 clog->info() << "mon." << name << "@" << rank
1917 << " won leader election with quorum " << quorum;
1918
d2e6a577 1919 set_leader_commands(get_local_commands(mon_features));
7c673cae
FG
1920
1921 paxos->leader_init();
1922 // NOTE: tell monmap monitor first. This is important for the
1923 // bootstrap case to ensure that the very first paxos proposal
1924 // codifies the monmap. Otherwise any manner of chaos can ensue
1925 // when monitors are call elections or participating in a paxos
1926 // round without agreeing on who the participants are.
1927 monmon()->election_finished();
1928 _finish_svc_election();
1929 health_monitor->start(epoch);
1930
1931 logger->inc(l_mon_election_win);
1932
224ce89b
WB
1933 // inject new metadata in first transaction.
1934 {
1935 // include previous metadata for missing mons (that aren't part of
1936 // the current quorum).
1937 map<int,Metadata> m = metadata;
1938 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1939 if (m.count(rank) == 0 &&
1940 mon_metadata.count(rank)) {
1941 m[rank] = mon_metadata[rank];
1942 }
1943 }
1944
1945 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1946 // a new transaction immediately after the election finishes. We should
1947 // do that anyway for other reasons, though.
1948 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1949 bufferlist bl;
1950 ::encode(m, bl);
1951 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1952 }
1953
7c673cae
FG
1954 finish_election();
1955 if (monmap->size() > 1 &&
1956 monmap->get_epoch() > 0) {
1957 timecheck_start();
1958 health_tick_start();
1959 do_health_to_clog_interval();
1960 scrub_event_start();
1961 }
7c673cae
FG
1962}
1963
1964void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1965 uint64_t features,
1966 const mon_feature_t& mon_features)
1967{
1968 state = STATE_PEON;
1969 leader_since = utime_t();
1970 leader = l;
1971 quorum = q;
1972 outside_quorum.clear();
1973 quorum_con_features = features;
1974 quorum_mon_features = mon_features;
1975 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
1976 << " quorum is " << quorum << " features are " << quorum_con_features
1977 << " mon_features are " << quorum_mon_features
1978 << dendl;
1979
1980 paxos->peon_init();
1981 _finish_svc_election();
1982 health_monitor->start(epoch);
1983
1984 logger->inc(l_mon_election_lose);
1985
1986 finish_election();
1987
224ce89b
WB
1988 if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
1989 !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
1990 // for pre-luminous mons only
7c673cae 1991 Metadata sys_info;
224ce89b 1992 collect_metadata(&sys_info);
7c673cae
FG
1993 messenger->send_message(new MMonMetadata(sys_info),
1994 monmap->get_inst(get_leader()));
1995 }
1996}
1997
224ce89b
WB
1998void Monitor::collect_metadata(Metadata *m)
1999{
2000 collect_sys_info(m, g_ceph_context);
2001 (*m)["addr"] = stringify(messenger->get_myaddr());
2002}
2003
7c673cae
FG
2004void Monitor::finish_election()
2005{
2006 apply_quorum_to_compatset_features();
2007 apply_monmap_to_compatset_features();
2008 timecheck_finish();
2009 exited_quorum = utime_t();
2010 finish_contexts(g_ceph_context, waitfor_quorum);
2011 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2012 resend_routed_requests();
2013 update_logger();
2014 register_cluster_logger();
2015
2016 // am i named properly?
2017 string cur_name = monmap->get_name(messenger->get_myaddr());
2018 if (cur_name != name) {
2019 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2020 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2021 monmap->get_inst(*quorum.begin()));
2022 }
2023}
2024
2025void Monitor::_apply_compatset_features(CompatSet &new_features)
2026{
2027 if (new_features.compare(features) != 0) {
2028 CompatSet diff = features.unsupported(new_features);
2029 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2030 features = new_features;
2031
2032 auto t = std::make_shared<MonitorDBStore::Transaction>();
2033 write_features(t);
2034 store->apply_transaction(t);
2035
2036 calc_quorum_requirements();
2037 }
2038}
2039
2040void Monitor::apply_quorum_to_compatset_features()
2041{
2042 CompatSet new_features(features);
2043 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2044 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2045 }
2046 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2047 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2048 }
2049 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2050 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2051 }
2052 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2053 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2054 }
2055 dout(5) << __func__ << dendl;
2056 _apply_compatset_features(new_features);
2057}
2058
2059void Monitor::apply_monmap_to_compatset_features()
2060{
2061 CompatSet new_features(features);
2062 mon_feature_t monmap_features = monmap->get_required_features();
2063
2064 /* persistent monmap features may go into the compatset.
2065 * optional monmap features may not - why?
2066 * because optional monmap features may be set/unset by the admin,
2067 * and possibly by other means that haven't yet been thought out,
2068 * so we can't make the monitor enforce them on start - because they
2069 * may go away.
2070 * this, of course, does not invalidate setting a compatset feature
2071 * for an optional feature - as long as you make sure to clean it up
2072 * once you unset it.
2073 */
2074 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2075 assert(ceph::features::mon::get_persistent().contains_all(
2076 ceph::features::mon::FEATURE_KRAKEN));
2077 // this feature should only ever be set if the quorum supports it.
2078 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2079 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2080 }
2081
2082 dout(5) << __func__ << dendl;
2083 _apply_compatset_features(new_features);
2084}
2085
2086void Monitor::calc_quorum_requirements()
2087{
2088 required_features = 0;
2089
2090 // compatset
2091 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2092 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2093 }
2094 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2095 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2096 }
2097 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2098 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2099 }
2100 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2101 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2102 }
2103 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2104 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2105 }
2106
2107 // monmap
2108 if (monmap->get_required_features().contains_all(
2109 ceph::features::mon::FEATURE_KRAKEN)) {
2110 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2111 }
2112 if (monmap->get_required_features().contains_all(
2113 ceph::features::mon::FEATURE_LUMINOUS)) {
2114 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2115 }
2116 dout(10) << __func__ << " required_features " << required_features << dendl;
2117}
2118
31f18b77
FG
2119void Monitor::get_combined_feature_map(FeatureMap *fm)
2120{
2121 *fm += session_map.feature_map;
2122 for (auto id : quorum) {
2123 if (id != rank) {
2124 *fm += quorum_feature_map[id];
2125 }
2126 }
2127}
2128
7c673cae
FG
2129void Monitor::sync_force(Formatter *f, ostream& ss)
2130{
2131 bool free_formatter = false;
2132
2133 if (!f) {
2134 // louzy/lazy hack: default to json if no formatter has been defined
2135 f = new JSONFormatter();
2136 free_formatter = true;
2137 }
2138
2139 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2140 sync_stash_critical_state(tx);
2141 tx->put("mon_sync", "force_sync", 1);
2142 store->apply_transaction(tx);
2143
2144 f->open_object_section("sync_force");
2145 f->dump_int("ret", 0);
2146 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2147 f->close_section(); // sync_force
2148 f->flush(ss);
2149 if (free_formatter)
2150 delete f;
2151}
2152
2153void Monitor::_quorum_status(Formatter *f, ostream& ss)
2154{
2155 bool free_formatter = false;
2156
2157 if (!f) {
2158 // louzy/lazy hack: default to json if no formatter has been defined
2159 f = new JSONFormatter();
2160 free_formatter = true;
2161 }
2162 f->open_object_section("quorum_status");
2163 f->dump_int("election_epoch", get_epoch());
2164
2165 f->open_array_section("quorum");
2166 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2167 f->dump_int("mon", *p);
2168 f->close_section(); // quorum
2169
2170 list<string> quorum_names = get_quorum_names();
2171 f->open_array_section("quorum_names");
2172 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2173 f->dump_string("mon", *p);
2174 f->close_section(); // quorum_names
2175
2176 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2177
2178 f->open_object_section("monmap");
2179 monmap->dump(f);
2180 f->close_section(); // monmap
2181
2182 f->close_section(); // quorum_status
2183 f->flush(ss);
2184 if (free_formatter)
2185 delete f;
2186}
2187
2188void Monitor::get_mon_status(Formatter *f, ostream& ss)
2189{
2190 bool free_formatter = false;
2191
2192 if (!f) {
2193 // louzy/lazy hack: default to json if no formatter has been defined
2194 f = new JSONFormatter();
2195 free_formatter = true;
2196 }
2197
2198 f->open_object_section("mon_status");
2199 f->dump_string("name", name);
2200 f->dump_int("rank", rank);
2201 f->dump_string("state", get_state_name());
2202 f->dump_int("election_epoch", get_epoch());
2203
2204 f->open_array_section("quorum");
2205 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2206 f->dump_int("mon", *p);
2207 }
2208
2209 f->close_section(); // quorum
2210
2211 f->open_object_section("features");
2212 f->dump_stream("required_con") << required_features;
2213 mon_feature_t req_mon_features = get_required_mon_features();
2214 req_mon_features.dump(f, "required_mon");
2215 f->dump_stream("quorum_con") << quorum_con_features;
2216 quorum_mon_features.dump(f, "quorum_mon");
2217 f->close_section(); // features
2218
2219 f->open_array_section("outside_quorum");
2220 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2221 f->dump_string("mon", *p);
2222 f->close_section(); // outside_quorum
2223
2224 f->open_array_section("extra_probe_peers");
2225 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2226 p != extra_probe_peers.end();
2227 ++p)
2228 f->dump_stream("peer") << *p;
2229 f->close_section(); // extra_probe_peers
2230
2231 f->open_array_section("sync_provider");
2232 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2233 p != sync_providers.end();
2234 ++p) {
2235 f->dump_unsigned("cookie", p->second.cookie);
2236 f->dump_stream("entity") << p->second.entity;
2237 f->dump_stream("timeout") << p->second.timeout;
2238 f->dump_unsigned("last_committed", p->second.last_committed);
2239 f->dump_stream("last_key") << p->second.last_key;
2240 }
2241 f->close_section();
2242
2243 if (is_synchronizing()) {
2244 f->open_object_section("sync");
2245 f->dump_stream("sync_provider") << sync_provider;
2246 f->dump_unsigned("sync_cookie", sync_cookie);
2247 f->dump_unsigned("sync_start_version", sync_start_version);
2248 f->close_section();
2249 }
2250
2251 if (g_conf->mon_sync_provider_kill_at > 0)
2252 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2253 if (g_conf->mon_sync_requester_kill_at > 0)
2254 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2255
2256 f->open_object_section("monmap");
2257 monmap->dump(f);
2258 f->close_section();
2259
31f18b77 2260 f->dump_object("feature_map", session_map.feature_map);
7c673cae
FG
2261 f->close_section(); // mon_status
2262
2263 if (free_formatter) {
2264 // flush formatter to ss and delete it iff we created the formatter
2265 f->flush(ss);
2266 delete f;
2267 }
2268}
2269
2270
2271// health status to clog
2272
2273void Monitor::health_tick_start()
2274{
2275 if (!cct->_conf->mon_health_to_clog ||
2276 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2277 return;
2278
2279 dout(15) << __func__ << dendl;
2280
2281 health_tick_stop();
2282 health_tick_event = new C_MonContext(this, [this](int r) {
2283 if (r < 0)
2284 return;
2285 do_health_to_clog();
2286 health_tick_start();
2287 });
2288 timer.add_event_after(cct->_conf->mon_health_to_clog_tick_interval,
2289 health_tick_event);
2290}
2291
2292void Monitor::health_tick_stop()
2293{
2294 dout(15) << __func__ << dendl;
2295
2296 if (health_tick_event) {
2297 timer.cancel_event(health_tick_event);
2298 health_tick_event = NULL;
2299 }
2300}
2301
2302utime_t Monitor::health_interval_calc_next_update()
2303{
2304 utime_t now = ceph_clock_now();
2305
2306 time_t secs = now.sec();
2307 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2308 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2309 utime_t next = utime_t(secs + adjustment, 0);
2310
2311 dout(20) << __func__
2312 << " now: " << now << ","
2313 << " next: " << next << ","
2314 << " interval: " << cct->_conf->mon_health_to_clog_interval
2315 << dendl;
2316
2317 return next;
2318}
2319
2320void Monitor::health_interval_start()
2321{
2322 dout(15) << __func__ << dendl;
2323
2324 if (!cct->_conf->mon_health_to_clog ||
2325 cct->_conf->mon_health_to_clog_interval <= 0) {
2326 return;
2327 }
2328
2329 health_interval_stop();
2330 utime_t next = health_interval_calc_next_update();
2331 health_interval_event = new C_MonContext(this, [this](int r) {
2332 if (r < 0)
2333 return;
2334 do_health_to_clog_interval();
2335 });
2336 timer.add_event_at(next, health_interval_event);
2337}
2338
2339void Monitor::health_interval_stop()
2340{
2341 dout(15) << __func__ << dendl;
2342 if (health_interval_event) {
2343 timer.cancel_event(health_interval_event);
2344 }
2345 health_interval_event = NULL;
2346}
2347
2348void Monitor::health_events_cleanup()
2349{
2350 health_tick_stop();
2351 health_interval_stop();
2352 health_status_cache.reset();
2353}
2354
2355void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2356{
2357 dout(20) << __func__ << dendl;
2358
2359 if (changed.count("mon_health_to_clog")) {
2360 if (!cct->_conf->mon_health_to_clog) {
2361 health_events_cleanup();
2362 } else {
2363 if (!health_tick_event) {
2364 health_tick_start();
2365 }
2366 if (!health_interval_event) {
2367 health_interval_start();
2368 }
2369 }
2370 }
2371
2372 if (changed.count("mon_health_to_clog_interval")) {
2373 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2374 health_interval_stop();
2375 } else {
2376 health_interval_start();
2377 }
2378 }
2379
2380 if (changed.count("mon_health_to_clog_tick_interval")) {
2381 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2382 health_tick_stop();
2383 } else {
2384 health_tick_start();
2385 }
2386 }
2387}
2388
2389void Monitor::do_health_to_clog_interval()
2390{
2391 // outputting to clog may have been disabled in the conf
2392 // since we were scheduled.
2393 if (!cct->_conf->mon_health_to_clog ||
2394 cct->_conf->mon_health_to_clog_interval <= 0)
2395 return;
2396
2397 dout(10) << __func__ << dendl;
2398
2399 // do we have a cached value for next_clog_update? if not,
2400 // do we know when the last update was?
2401
2402 do_health_to_clog(true);
2403 health_interval_start();
2404}
2405
2406void Monitor::do_health_to_clog(bool force)
2407{
2408 // outputting to clog may have been disabled in the conf
2409 // since we were scheduled.
2410 if (!cct->_conf->mon_health_to_clog ||
2411 cct->_conf->mon_health_to_clog_interval <= 0)
2412 return;
2413
2414 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2415
224ce89b
WB
2416 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2417 string summary;
2418 health_status_t level = get_health_status(false, nullptr, &summary);
2419 if (!force &&
2420 summary == health_status_cache.summary &&
2421 level == health_status_cache.overall)
2422 return;
2423 if (level == HEALTH_OK)
2424 clog->info() << "overall " << summary;
2425 else if (level == HEALTH_WARN)
2426 clog->warn() << "overall " << summary;
2427 else if (level == HEALTH_ERR)
2428 clog->error() << "overall " << summary;
2429 else
2430 ceph_abort();
2431 health_status_cache.summary = summary;
2432 health_status_cache.overall = level;
2433 } else {
2434 // for jewel only
2435 list<string> status;
2436 health_status_t overall = get_health(status, NULL, NULL);
2437 dout(25) << __func__
2438 << (force ? " (force)" : "")
2439 << dendl;
2440
2441 string summary = joinify(status.begin(), status.end(), string("; "));
7c673cae 2442
224ce89b
WB
2443 if (!force &&
2444 overall == health_status_cache.overall &&
2445 !health_status_cache.summary.empty() &&
2446 health_status_cache.summary == summary) {
2447 // we got a dup!
2448 return;
2449 }
2450
2451 clog->info() << summary;
2452
2453 health_status_cache.overall = overall;
2454 health_status_cache.summary = summary;
2455 }
2456}
2457
2458health_status_t Monitor::get_health_status(
2459 bool want_detail,
2460 Formatter *f,
2461 std::string *plain,
2462 const char *sep1,
2463 const char *sep2)
2464{
2465 health_status_t r = HEALTH_OK;
2466 bool compat = g_conf->mon_health_preluminous_compat;
d2e6a577 2467 bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
224ce89b
WB
2468 if (f) {
2469 f->open_object_section("health");
2470 f->open_object_section("checks");
2471 }
7c673cae 2472
224ce89b
WB
2473 string summary;
2474 string *psummary = f ? nullptr : &summary;
2475 for (auto& svc : paxos_service) {
2476 r = std::min(r, svc->get_health_checks().dump_summary(
2477 f, psummary, sep2, want_detail));
2478 }
7c673cae 2479
224ce89b
WB
2480 if (f) {
2481 f->close_section();
2482 f->dump_stream("status") << r;
2483 } else {
2484 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2485 *plain = stringify(r);
2486 if (summary.size()) {
2487 *plain += sep1;
2488 *plain += summary;
2489 }
2490 *plain += "\n";
2491 }
2492
d2e6a577
FG
2493 if (f && (compat || compat_warn)) {
2494 health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
2495 if (compat) {
2496 f->open_array_section("summary");
2497 if (compat_warn) {
2498 f->open_object_section("item");
2499 f->dump_stream("severity") << HEALTH_WARN;
2500 f->dump_string("summary", "'ceph health' JSON format has changed in luminous; update your health monitoring scripts");
2501 f->close_section();
2502 }
2503 for (auto& svc : paxos_service) {
2504 svc->get_health_checks().dump_summary_compat(f);
2505 }
2506 f->close_section();
224ce89b 2507 }
d2e6a577 2508 f->dump_stream("overall_status") << cr;
224ce89b
WB
2509 }
2510
2511 if (want_detail) {
d2e6a577 2512 if (f && (compat || compat_warn)) {
224ce89b 2513 f->open_array_section("detail");
d2e6a577
FG
2514 if (compat_warn) {
2515 f->dump_string("item", "'ceph health' JSON format has changed in luminous. If you see this your monitoring system is scraping the wrong fields. Disable this with 'mon health preluminous compat warning = false'");
2516 }
224ce89b
WB
2517 }
2518
2519 for (auto& svc : paxos_service) {
2520 svc->get_health_checks().dump_detail(f, plain, compat);
2521 }
2522
d2e6a577 2523 if (f && (compat || compat_warn)) {
224ce89b
WB
2524 f->close_section();
2525 }
2526 }
2527 if (f) {
2528 f->close_section();
2529 }
2530 return r;
2531}
2532
2533void Monitor::log_health(
2534 const health_check_map_t& updated,
2535 const health_check_map_t& previous,
2536 MonitorDBStore::TransactionRef t)
2537{
2538 if (!g_conf->mon_health_to_clog) {
7c673cae
FG
2539 return;
2540 }
224ce89b
WB
2541 // FIXME: log atomically as part of @t instead of using clog.
2542 dout(10) << __func__ << " updated " << updated.checks.size()
2543 << " previous " << previous.checks.size()
2544 << dendl;
2545 for (auto& p : updated.checks) {
2546 auto q = previous.checks.find(p.first);
2547 if (q == previous.checks.end()) {
2548 // new
2549 ostringstream ss;
2550 ss << "Health check failed: " << p.second.summary << " ("
2551 << p.first << ")";
2552 if (p.second.severity == HEALTH_WARN)
2553 clog->warn() << ss.str();
2554 else
2555 clog->error() << ss.str();
2556 } else {
2557 if (p.second.summary != q->second.summary ||
2558 p.second.severity != q->second.severity) {
2559 // summary or severity changed (ignore detail changes at this level)
2560 ostringstream ss;
2561 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2562 if (p.second.severity == HEALTH_WARN)
2563 clog->warn() << ss.str();
2564 else
2565 clog->error() << ss.str();
2566 }
2567 }
2568 }
2569 for (auto& p : previous.checks) {
2570 if (!updated.checks.count(p.first)) {
2571 // cleared
2572 ostringstream ss;
2573 if (p.first == "DEGRADED_OBJECTS") {
2574 clog->info() << "All degraded objects recovered";
2575 } else if (p.first == "OSD_FLAGS") {
2576 clog->info() << "OSD flags cleared";
2577 } else {
2578 clog->info() << "Health check cleared: " << p.first << " (was: "
2579 << p.second.summary << ")";
2580 }
2581 }
2582 }
7c673cae 2583
224ce89b
WB
2584 if (previous.checks.size() && updated.checks.size() == 0) {
2585 // We might be going into a fully healthy state, check
2586 // other subsystems
2587 bool any_checks = false;
2588 for (auto& svc : paxos_service) {
2589 if (&(svc->get_health_checks()) == &(previous)) {
2590 // Ignore the ones we're clearing right now
2591 continue;
2592 }
7c673cae 2593
224ce89b
WB
2594 if (svc->get_health_checks().checks.size() > 0) {
2595 any_checks = true;
2596 break;
2597 }
2598 }
2599 if (!any_checks) {
2600 clog->info() << "Cluster is now healthy";
2601 }
2602 }
7c673cae
FG
2603}
2604
2605health_status_t Monitor::get_health(list<string>& status,
2606 bufferlist *detailbl,
2607 Formatter *f)
2608{
2609 list<pair<health_status_t,string> > summary;
2610 list<pair<health_status_t,string> > detail;
2611
2612 if (f)
2613 f->open_object_section("health");
2614
2615 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2616 p != paxos_service.end();
2617 ++p) {
2618 PaxosService *s = *p;
2619 s->get_health(summary, detailbl ? &detail : NULL, cct);
2620 }
2621
224ce89b 2622 health_monitor->get_health(summary, (detailbl ? &detail : NULL));
7c673cae
FG
2623
2624 health_status_t overall = HEALTH_OK;
2625 if (!timecheck_skews.empty()) {
2626 list<string> warns;
7c673cae
FG
2627 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2628 i != timecheck_skews.end(); ++i) {
2629 entity_inst_t inst = i->first;
2630 double skew = i->second;
2631 double latency = timecheck_latencies[inst];
2632 string name = monmap->get_name(inst.addr);
7c673cae
FG
2633 ostringstream tcss;
2634 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2635 if (tcstatus != HEALTH_OK) {
2636 if (overall > tcstatus)
2637 overall = tcstatus;
2638 warns.push_back(name);
7c673cae
FG
2639 ostringstream tmp_ss;
2640 tmp_ss << "mon." << name
2641 << " addr " << inst.addr << " " << tcss.str()
2642 << " (latency " << latency << "s)";
2643 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2644 }
7c673cae
FG
2645 }
2646 if (!warns.empty()) {
2647 ostringstream ss;
2648 ss << "clock skew detected on";
2649 while (!warns.empty()) {
2650 ss << " mon." << warns.front();
2651 warns.pop_front();
2652 if (!warns.empty())
2653 ss << ",";
2654 }
2655 status.push_back(ss.str());
2656 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2657 }
7c673cae 2658 }
7c673cae
FG
2659
2660 if (f)
2661 f->open_array_section("summary");
2662 if (!summary.empty()) {
2663 while (!summary.empty()) {
2664 if (overall > summary.front().first)
2665 overall = summary.front().first;
2666 status.push_back(summary.front().second);
2667 if (f) {
2668 f->open_object_section("item");
2669 f->dump_stream("severity") << summary.front().first;
2670 f->dump_string("summary", summary.front().second);
2671 f->close_section();
2672 }
2673 summary.pop_front();
2674 }
2675 }
2676 if (f)
2677 f->close_section();
2678
2679 stringstream fss;
2680 fss << overall;
2681 status.push_front(fss.str());
2682 if (f)
2683 f->dump_stream("overall_status") << overall;
2684
2685 if (f)
2686 f->open_array_section("detail");
2687 while (!detail.empty()) {
2688 if (f)
2689 f->dump_string("item", detail.front().second);
2690 else if (detailbl != NULL) {
2691 detailbl->append(detail.front().second);
2692 detailbl->append('\n');
2693 }
2694 detail.pop_front();
2695 }
2696 if (f)
2697 f->close_section();
2698
2699 if (f)
2700 f->close_section();
2701
2702 return overall;
2703}
2704
2705void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2706{
2707 if (f)
2708 f->open_object_section("status");
2709
7c673cae
FG
2710 if (f) {
2711 f->dump_stream("fsid") << monmap->get_fsid();
b5b8bbf5
FG
2712 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2713 get_health_status(false, f, nullptr);
2714 } else {
2715 list<string> health_str;
2716 get_health(health_str, nullptr, f);
2717 }
7c673cae
FG
2718 f->dump_unsigned("election_epoch", get_epoch());
2719 {
2720 f->open_array_section("quorum");
2721 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2722 f->dump_int("rank", *p);
2723 f->close_section();
2724 f->open_array_section("quorum_names");
2725 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2726 f->dump_string("id", monmap->get_name(*p));
2727 f->close_section();
2728 }
2729 f->open_object_section("monmap");
2730 monmap->dump(f);
2731 f->close_section();
2732 f->open_object_section("osdmap");
224ce89b 2733 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
7c673cae
FG
2734 f->close_section();
2735 f->open_object_section("pgmap");
31f18b77 2736 pgservice->print_summary(f, NULL);
7c673cae
FG
2737 f->close_section();
2738 f->open_object_section("fsmap");
2739 mdsmon()->get_fsmap().print_summary(f, NULL);
2740 f->close_section();
7c673cae
FG
2741 f->open_object_section("mgrmap");
2742 mgrmon()->get_map().print_summary(f, nullptr);
2743 f->close_section();
224ce89b
WB
2744
2745 f->dump_object("servicemap", mgrstatmon()->get_service_map());
7c673cae
FG
2746 f->close_section();
2747 } else {
31f18b77
FG
2748 ss << " cluster:\n";
2749 ss << " id: " << monmap->get_fsid() << "\n";
224ce89b
WB
2750
2751 string health;
2752 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2753 get_health_status(false, nullptr, &health,
2754 "\n ", "\n ");
2755 } else {
2756 list<string> ls;
2757 get_health(ls, NULL, f);
2758 health = joinify(ls.begin(), ls.end(),
2759 string("\n "));
2760 }
2761 ss << " health: " << health << "\n";
2762
31f18b77 2763 ss << "\n \n services:\n";
224ce89b
WB
2764 {
2765 size_t maxlen = 3;
2766 auto& service_map = mgrstatmon()->get_service_map();
2767 for (auto& p : service_map.services) {
2768 maxlen = std::max(maxlen, p.first.size());
2769 }
2770 string spacing(maxlen - 3, ' ');
2771 const auto quorum_names = get_quorum_names();
2772 const auto mon_count = monmap->mon_info.size();
2773 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2774 << quorum_names;
2775 if (quorum_names.size() != mon_count) {
2776 std::list<std::string> out_of_q;
2777 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2778 if (quorum.count(i) == 0) {
2779 out_of_q.push_back(monmap->ranks[i]);
2780 }
2781 }
2782 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2783 out_of_q.end(), std::string(", "));
31f18b77 2784 }
7c673cae 2785 ss << "\n";
224ce89b
WB
2786 if (mgrmon()->in_use()) {
2787 ss << " mgr: " << spacing;
2788 mgrmon()->get_map().print_summary(nullptr, &ss);
2789 ss << "\n";
2790 }
2791 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2792 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2793 }
2794 ss << " osd: " << spacing;
2795 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2796 ss << "\n";
2797 for (auto& p : service_map.services) {
2798 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2799 << p.second.get_summary() << "\n";
2800 }
7c673cae 2801 }
31f18b77
FG
2802
2803 ss << "\n \n data:\n";
2804 pgservice->print_summary(NULL, &ss);
2805 ss << "\n ";
7c673cae
FG
2806 }
2807}
2808
2809void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2810 map<string,string> &param_str_map)
2811{
2812 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2813 p != cmdmap.end(); ++p) {
2814 if (p->first == "prefix")
2815 continue;
2816 if (p->first == "caps") {
2817 vector<string> cv;
2818 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2819 cv.size() % 2 == 0) {
2820 for (unsigned i = 0; i < cv.size(); i += 2) {
2821 string k = string("caps_") + cv[i];
2822 param_str_map[k] = cv[i + 1];
2823 }
2824 continue;
2825 }
2826 }
2827 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2828 }
2829}
2830
c07f9fc5
FG
2831const MonCommand *Monitor::_get_moncommand(
2832 const string &cmd_prefix,
d2e6a577
FG
2833 const vector<MonCommand>& cmds)
2834{
2835 for (auto& c : cmds) {
2836 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2837 return &c;
7c673cae
FG
2838 }
2839 }
d2e6a577 2840 return nullptr;
7c673cae
FG
2841}
2842
2843bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2844 const map<string,cmd_vartype>& cmdmap,
2845 const map<string,string>& param_str_map,
2846 const MonCommand *this_cmd) {
2847
2848 bool cmd_r = this_cmd->requires_perm('r');
2849 bool cmd_w = this_cmd->requires_perm('w');
2850 bool cmd_x = this_cmd->requires_perm('x');
2851
2852 bool capable = s->caps.is_capable(
2853 g_ceph_context,
2854 CEPH_ENTITY_TYPE_MON,
2855 s->entity_name,
2856 module, prefix, param_str_map,
2857 cmd_r, cmd_w, cmd_x);
2858
2859 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2860 return capable;
2861}
2862
c07f9fc5 2863void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
7c673cae
FG
2864 Formatter *f,
2865 bufferlist *rdata,
2866 bool hide_mgr_flag)
2867{
2868 int cmdnum = 0;
2869 f->open_object_section("command_descriptions");
c07f9fc5
FG
2870 for (const auto &cmd : commands) {
2871 unsigned flags = cmd.flags;
7c673cae
FG
2872 if (hide_mgr_flag) {
2873 flags &= ~MonCommand::FLAG_MGR;
2874 }
2875 ostringstream secname;
2876 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2877 dump_cmddesc_to_json(f, secname.str(),
c07f9fc5
FG
2878 cmd.cmdstring, cmd.helpstring, cmd.module,
2879 cmd.req_perms, cmd.availability, flags);
7c673cae
FG
2880 cmdnum++;
2881 }
2882 f->close_section(); // command_descriptions
2883
2884 f->flush(*rdata);
2885}
2886
7c673cae
FG
2887bool Monitor::is_keyring_required()
2888{
2889 string auth_cluster_required = g_conf->auth_supported.empty() ?
2890 g_conf->auth_cluster_required : g_conf->auth_supported;
2891 string auth_service_required = g_conf->auth_supported.empty() ?
2892 g_conf->auth_service_required : g_conf->auth_supported;
2893
2894 return auth_service_required == "cephx" ||
2895 auth_cluster_required == "cephx";
2896}
2897
2898struct C_MgrProxyCommand : public Context {
2899 Monitor *mon;
2900 MonOpRequestRef op;
2901 uint64_t size;
2902 bufferlist outbl;
2903 string outs;
2904 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2905 : mon(mon), op(op), size(s) { }
2906 void finish(int r) {
2907 Mutex::Locker l(mon->lock);
2908 mon->mgr_proxy_bytes -= size;
2909 mon->reply_command(op, r, outs, outbl, 0);
2910 }
2911};
2912
2913void Monitor::handle_command(MonOpRequestRef op)
2914{
2915 assert(op->is_type_command());
2916 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2917 if (m->fsid != monmap->fsid) {
2918 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2919 reply_command(op, -EPERM, "wrong fsid", 0);
2920 return;
2921 }
2922
2923 MonSession *session = static_cast<MonSession *>(
2924 m->get_connection()->get_priv());
2925 if (!session) {
2926 dout(5) << __func__ << " dropping stray message " << *m << dendl;
2927 return;
2928 }
2929 BOOST_SCOPE_EXIT_ALL(=) {
2930 session->put();
2931 };
2932
2933 if (m->cmd.empty()) {
2934 string rs = "No command supplied";
2935 reply_command(op, -EINVAL, rs, 0);
2936 return;
2937 }
2938
2939 string prefix;
2940 vector<string> fullcmd;
2941 map<string, cmd_vartype> cmdmap;
2942 stringstream ss, ds;
2943 bufferlist rdata;
2944 string rs;
2945 int r = -EINVAL;
2946 rs = "unrecognized command";
2947
2948 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
2949 // ss has reason for failure
2950 r = -EINVAL;
2951 rs = ss.str();
2952 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
2953 reply_command(op, r, rs, 0);
2954 return;
2955 }
2956
2957 // check return value. If no prefix parameter provided,
2958 // return value will be false, then return error info.
2959 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
2960 reply_command(op, -EINVAL, "command prefix not found", 0);
2961 return;
2962 }
2963
2964 // check prefix is empty
2965 if (prefix.empty()) {
2966 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
2967 return;
2968 }
2969
2970 if (prefix == "get_command_descriptions") {
2971 bufferlist rdata;
2972 Formatter *f = Formatter::create("json");
2973 // hide mgr commands until luminous upgrade is complete
2974 bool hide_mgr_flag =
31f18b77 2975 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
c07f9fc5
FG
2976
2977 std::vector<MonCommand> commands;
d2e6a577
FG
2978
2979 // only include mgr commands once all mons are upgrade (and we've dropped
2980 // the hard-coded PGMonitor commands)
2981 if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2982 commands = static_cast<MgrMonitor*>(
c07f9fc5 2983 paxos_service[PAXOS_MGR])->get_command_descs();
d2e6a577 2984 }
c07f9fc5 2985
d2e6a577
FG
2986 for (auto& c : leader_mon_commands) {
2987 commands.push_back(c);
c07f9fc5
FG
2988 }
2989
2990 format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
7c673cae
FG
2991 delete f;
2992 reply_command(op, 0, "", rdata, 0);
2993 return;
2994 }
2995
2996 string module;
2997 string err;
2998
2999 dout(0) << "handle_command " << *m << dendl;
3000
3001 string format;
3002 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
3003 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3004
3005 get_str_vec(prefix, fullcmd);
3006
3007 // make sure fullcmd is not empty.
3008 // invalid prefix will cause empty vector fullcmd.
3009 // such as, prefix=";,,;"
3010 if (fullcmd.empty()) {
3011 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3012 return;
3013 }
3014
3015 module = fullcmd[0];
3016
3017 // validate command is in leader map
3018
3019 const MonCommand *leader_cmd;
c07f9fc5
FG
3020 const auto& mgr_cmds = mgrmon()->get_command_descs();
3021 const MonCommand *mgr_cmd = nullptr;
3022 if (!mgr_cmds.empty()) {
d2e6a577 3023 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
c07f9fc5 3024 }
d2e6a577 3025 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
7c673cae 3026 if (!leader_cmd) {
c07f9fc5
FG
3027 leader_cmd = mgr_cmd;
3028 if (!leader_cmd) {
3029 reply_command(op, -EINVAL, "command not known", 0);
3030 return;
3031 }
7c673cae
FG
3032 }
3033 // validate command is in our map & matches, or forward if it is allowed
d2e6a577
FG
3034 const MonCommand *mon_cmd = _get_moncommand(
3035 prefix,
3036 get_local_commands(quorum_mon_features));
c07f9fc5
FG
3037 if (!mon_cmd) {
3038 mon_cmd = mgr_cmd;
3039 }
7c673cae
FG
3040 if (!is_leader()) {
3041 if (!mon_cmd) {
3042 if (leader_cmd->is_noforward()) {
3043 reply_command(op, -EINVAL,
3044 "command not locally supported and not allowed to forward",
3045 0);
3046 return;
3047 }
3048 dout(10) << "Command not locally supported, forwarding request "
3049 << m << dendl;
3050 forward_request_leader(op);
3051 return;
3052 } else if (!mon_cmd->is_compat(leader_cmd)) {
3053 if (mon_cmd->is_noforward()) {
3054 reply_command(op, -EINVAL,
3055 "command not compatible with leader and not allowed to forward",
3056 0);
3057 return;
3058 }
3059 dout(10) << "Command not compatible with leader, forwarding request "
3060 << m << dendl;
3061 forward_request_leader(op);
3062 return;
3063 }
3064 }
3065
3066 if (mon_cmd->is_obsolete() ||
3067 (cct->_conf->mon_debug_deprecated_as_obsolete
3068 && mon_cmd->is_deprecated())) {
3069 reply_command(op, -ENOTSUP,
3070 "command is obsolete; please check usage and/or man page",
3071 0);
3072 return;
3073 }
3074
3075 if (session->proxy_con && mon_cmd->is_noforward()) {
3076 dout(10) << "Got forward for noforward command " << m << dendl;
3077 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3078 return;
3079 }
3080
3081 /* what we perceive as being the service the command falls under */
3082 string service(mon_cmd->module);
3083
3084 dout(25) << __func__ << " prefix='" << prefix
3085 << "' module='" << module
3086 << "' service='" << service << "'" << dendl;
3087
3088 bool cmd_is_rw =
3089 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3090
3091 // validate user's permissions for requested command
3092 map<string,string> param_str_map;
3093 _generate_command_map(cmdmap, param_str_map);
3094 if (!_allowed_command(session, service, prefix, cmdmap,
3095 param_str_map, mon_cmd)) {
3096 dout(1) << __func__ << " access denied" << dendl;
3097 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3098 << "from='" << session->inst << "' "
3099 << "entity='" << session->entity_name << "' "
3100 << "cmd=" << m->cmd << ": access denied";
3101 reply_command(op, -EACCES, "access denied", 0);
3102 return;
3103 }
3104
3105 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3106 << "from='" << session->inst << "' "
3107 << "entity='" << session->entity_name << "' "
3108 << "cmd=" << m->cmd << ": dispatch";
3109
3110 if (mon_cmd->is_mgr() &&
31f18b77 3111 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
7c673cae
FG
3112 const auto& hdr = m->get_header();
3113 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3114 uint64_t max =
3115 g_conf->mon_client_bytes * g_conf->mon_mgr_proxy_client_bytes_ratio;
3116 if (mgr_proxy_bytes + size > max) {
3117 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3118 << " + " << size << " > max " << max << dendl;
3119 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3120 return;
3121 }
3122 mgr_proxy_bytes += size;
3123 dout(10) << __func__ << " proxying mgr command (+" << size
3124 << " -> " << mgr_proxy_bytes << ")" << dendl;
3125 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3126 mgr_client.start_command(m->cmd,
3127 m->get_data(),
3128 &fin->outbl,
3129 &fin->outs,
3130 new C_OnFinisher(fin, &finisher));
3131 return;
3132 }
3133
d2e6a577
FG
3134 if ((module == "mds" || module == "fs") &&
3135 prefix != "fs authorize") {
7c673cae
FG
3136 mdsmon()->dispatch(op);
3137 return;
3138 }
31f18b77
FG
3139 if ((module == "osd" || prefix == "pg map") &&
3140 prefix != "osd last-stat-seq") {
7c673cae
FG
3141 osdmon()->dispatch(op);
3142 return;
3143 }
3144
3145 if (module == "pg") {
3146 pgmon()->dispatch(op);
3147 return;
3148 }
3149 if (module == "mon" &&
3150 /* Let the Monitor class handle the following commands:
3151 * 'mon compact'
3152 * 'mon scrub'
3153 * 'mon sync force'
3154 */
3155 prefix != "mon compact" &&
3156 prefix != "mon scrub" &&
3157 prefix != "mon sync force" &&
31f18b77
FG
3158 prefix != "mon metadata" &&
3159 prefix != "mon versions" &&
3160 prefix != "mon count-metadata") {
7c673cae
FG
3161 monmon()->dispatch(op);
3162 return;
3163 }
d2e6a577 3164 if (module == "auth" || prefix == "fs authorize") {
7c673cae
FG
3165 authmon()->dispatch(op);
3166 return;
3167 }
3168 if (module == "log") {
3169 logmon()->dispatch(op);
3170 return;
3171 }
3172
3173 if (module == "config-key") {
3174 config_key_service->dispatch(op);
3175 return;
3176 }
3177
3178 if (module == "mgr") {
3179 mgrmon()->dispatch(op);
3180 return;
3181 }
3182
3183 if (prefix == "fsid") {
3184 if (f) {
3185 f->open_object_section("fsid");
3186 f->dump_stream("fsid") << monmap->fsid;
3187 f->close_section();
3188 f->flush(rdata);
3189 } else {
3190 ds << monmap->fsid;
3191 rdata.append(ds);
3192 }
3193 reply_command(op, 0, "", rdata, 0);
3194 return;
3195 }
3196
3197 if (prefix == "scrub" || prefix == "mon scrub") {
3198 wait_for_paxos_write();
3199 if (is_leader()) {
3200 int r = scrub_start();
3201 reply_command(op, r, "", rdata, 0);
3202 } else if (is_peon()) {
3203 forward_request_leader(op);
3204 } else {
3205 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3206 }
3207 return;
3208 }
3209
3210 if (prefix == "compact" || prefix == "mon compact") {
3211 dout(1) << "triggering manual compaction" << dendl;
3212 utime_t start = ceph_clock_now();
3213 store->compact();
3214 utime_t end = ceph_clock_now();
3215 end -= start;
3216 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3217 ostringstream oss;
224ce89b 3218 oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
7c673cae
FG
3219 rs = oss.str();
3220 r = 0;
3221 }
3222 else if (prefix == "injectargs") {
3223 vector<string> injected_args;
3224 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3225 if (!injected_args.empty()) {
3226 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3227 ostringstream oss;
3228 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3229 ss << "injectargs:" << oss.str();
3230 rs = ss.str();
3231 goto out;
3232 } else {
3233 rs = "must supply options to be parsed in a single string";
3234 r = -EINVAL;
3235 }
224ce89b
WB
3236 } else if (prefix == "time-sync-status") {
3237 if (!f)
3238 f.reset(Formatter::create("json-pretty"));
3239 f->open_object_section("time_sync");
3240 if (!timecheck_skews.empty()) {
3241 f->open_object_section("time_skew_status");
3242 for (auto& i : timecheck_skews) {
3243 entity_inst_t inst = i.first;
3244 double skew = i.second;
3245 double latency = timecheck_latencies[inst];
3246 string name = monmap->get_name(inst.addr);
3247 ostringstream tcss;
3248 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3249 f->open_object_section(name.c_str());
3250 f->dump_float("skew", skew);
3251 f->dump_float("latency", latency);
3252 f->dump_stream("health") << tcstatus;
3253 if (tcstatus != HEALTH_OK) {
3254 f->dump_stream("details") << tcss.str();
3255 }
3256 f->close_section();
3257 }
3258 f->close_section();
3259 }
3260 f->open_object_section("timechecks");
3261 f->dump_unsigned("epoch", get_epoch());
3262 f->dump_int("round", timecheck_round);
3263 f->dump_stream("round_status") << ((timecheck_round%2) ?
3264 "on-going" : "finished");
3265 f->close_section();
3266 f->close_section();
3267 f->flush(rdata);
3268 r = 0;
3269 rs = "";
c07f9fc5
FG
3270 } else if (prefix == "config set") {
3271 std::string key;
3272 cmd_getval(cct, cmdmap, "key", key);
3273 std::string val;
3274 cmd_getval(cct, cmdmap, "value", val);
3275 r = g_conf->set_val(key, val, true, &ss);
d2e6a577
FG
3276 if (r == 0) {
3277 g_conf->apply_changes(nullptr);
3278 }
c07f9fc5
FG
3279 rs = ss.str();
3280 goto out;
7c673cae
FG
3281 } else if (prefix == "status" ||
3282 prefix == "health" ||
3283 prefix == "df") {
3284 string detail;
3285 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3286
3287 if (prefix == "status") {
3288 // get_cluster_status handles f == NULL
3289 get_cluster_status(ds, f.get());
3290
3291 if (f) {
3292 f->flush(ds);
3293 ds << '\n';
3294 }
3295 rdata.append(ds);
3296 } else if (prefix == "health") {
224ce89b
WB
3297 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3298 string plain;
3299 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3300 if (f) {
3301 f->flush(rdata);
3302 } else {
3303 rdata.append(plain);
3304 }
7c673cae 3305 } else {
224ce89b
WB
3306 list<string> health_str;
3307 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3308 if (f) {
3309 f->flush(ds);
3310 ds << '\n';
3311 } else {
3312 assert(!health_str.empty());
3313 ds << health_str.front();
3314 health_str.pop_front();
3315 if (!health_str.empty()) {
3316 ds << ' ';
3317 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3318 }
7c673cae 3319 }
224ce89b
WB
3320 bufferlist comb;
3321 comb.append(ds);
3322 if (detail == "detail")
3323 comb.append(rdata);
3324 rdata = comb;
7c673cae 3325 }
7c673cae
FG
3326 } else if (prefix == "df") {
3327 bool verbose = (detail == "detail");
3328 if (f)
3329 f->open_object_section("stats");
3330
31f18b77 3331 pgservice->dump_fs_stats(&ds, f.get(), verbose);
7c673cae
FG
3332 if (!f)
3333 ds << '\n';
31f18b77 3334 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
7c673cae
FG
3335
3336 if (f) {
3337 f->close_section();
3338 f->flush(ds);
3339 ds << '\n';
3340 }
3341 } else {
3342 assert(0 == "We should never get here!");
3343 return;
3344 }
3345 rdata.append(ds);
3346 rs = "";
3347 r = 0;
3348 } else if (prefix == "report") {
3349
3350 // this must be formatted, in its current form
3351 if (!f)
3352 f.reset(Formatter::create("json-pretty"));
3353 f->open_object_section("report");
3354 f->dump_stream("cluster_fingerprint") << fingerprint;
3355 f->dump_string("version", ceph_version_to_str());
3356 f->dump_string("commit", git_version_to_str());
3357 f->dump_stream("timestamp") << ceph_clock_now();
3358
3359 vector<string> tagsvec;
3360 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3361 string tagstr = str_join(tagsvec, " ");
3362 if (!tagstr.empty())
3363 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3364 f->dump_string("tag", tagstr);
3365
3366 list<string> hs;
3367 get_health(hs, NULL, f.get());
3368
3369 monmon()->dump_info(f.get());
3370 osdmon()->dump_info(f.get());
3371 mdsmon()->dump_info(f.get());
7c673cae 3372 authmon()->dump_info(f.get());
31f18b77 3373 pgservice->dump_info(f.get());
7c673cae
FG
3374
3375 paxos->dump_info(f.get());
3376
3377 f->close_section();
3378 f->flush(rdata);
3379
3380 ostringstream ss2;
3381 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3382 rs = ss2.str();
3383 r = 0;
31f18b77
FG
3384 } else if (prefix == "osd last-stat-seq") {
3385 int64_t osd;
3386 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3387 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3388 if (f) {
3389 f->dump_unsigned("seq", seq);
3390 f->flush(ds);
3391 } else {
3392 ds << seq;
3393 rdata.append(ds);
3394 }
3395 rs = "";
3396 r = 0;
7c673cae
FG
3397 } else if (prefix == "node ls") {
3398 string node_type("all");
3399 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3400 if (!f)
3401 f.reset(Formatter::create("json-pretty"));
3402 if (node_type == "all") {
3403 f->open_object_section("nodes");
3404 print_nodes(f.get(), ds);
3405 osdmon()->print_nodes(f.get());
3406 mdsmon()->print_nodes(f.get());
3407 f->close_section();
3408 } else if (node_type == "mon") {
3409 print_nodes(f.get(), ds);
3410 } else if (node_type == "osd") {
3411 osdmon()->print_nodes(f.get());
3412 } else if (node_type == "mds") {
3413 mdsmon()->print_nodes(f.get());
3414 }
3415 f->flush(ds);
3416 rdata.append(ds);
3417 rs = "";
3418 r = 0;
31f18b77
FG
3419 } else if (prefix == "features") {
3420 if (!is_leader() && !is_peon()) {
3421 dout(10) << " waiting for quorum" << dendl;
3422 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3423 return;
3424 }
3425 if (!is_leader()) {
3426 forward_request_leader(op);
3427 return;
3428 }
3429 if (!f)
3430 f.reset(Formatter::create("json-pretty"));
3431 FeatureMap fm;
3432 get_combined_feature_map(&fm);
3433 f->dump_object("features", fm);
3434 f->flush(rdata);
3435 rs = "";
3436 r = 0;
7c673cae
FG
3437 } else if (prefix == "mon metadata") {
3438 if (!f)
3439 f.reset(Formatter::create("json-pretty"));
3440
3441 string name;
3442 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3443 if (!all) {
3444 // Dump a single mon's metadata
3445 int mon = monmap->get_rank(name);
3446 if (mon < 0) {
3447 rs = "requested mon not found";
3448 r = -ENOENT;
3449 goto out;
3450 }
3451 f->open_object_section("mon_metadata");
3452 r = get_mon_metadata(mon, f.get(), ds);
3453 f->close_section();
3454 } else {
3455 // Dump all mons' metadata
3456 r = 0;
3457 f->open_array_section("mon_metadata");
3458 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3459 std::ostringstream get_err;
3460 f->open_object_section("mon");
3461 f->dump_string("name", monmap->get_name(rank));
3462 r = get_mon_metadata(rank, f.get(), get_err);
3463 f->close_section();
3464 if (r == -ENOENT || r == -EINVAL) {
3465 dout(1) << get_err.str() << dendl;
3466 // Drop error, list what metadata we do have
3467 r = 0;
3468 } else if (r != 0) {
3469 derr << "Unexpected error from get_mon_metadata: "
3470 << cpp_strerror(r) << dendl;
3471 ds << get_err.str();
3472 break;
3473 }
3474 }
3475 f->close_section();
3476 }
3477
3478 f->flush(ds);
3479 rdata.append(ds);
3480 rs = "";
31f18b77
FG
3481 } else if (prefix == "mon versions") {
3482 if (!f)
3483 f.reset(Formatter::create("json-pretty"));
3484 count_metadata("ceph_version", f.get());
3485 f->flush(ds);
3486 rdata.append(ds);
3487 rs = "";
3488 r = 0;
3489 } else if (prefix == "mon count-metadata") {
3490 if (!f)
3491 f.reset(Formatter::create("json-pretty"));
3492 string field;
3493 cmd_getval(g_ceph_context, cmdmap, "property", field);
3494 count_metadata(field, f.get());
3495 f->flush(ds);
3496 rdata.append(ds);
3497 rs = "";
3498 r = 0;
7c673cae
FG
3499 } else if (prefix == "quorum_status") {
3500 // make sure our map is readable and up to date
3501 if (!is_leader() && !is_peon()) {
3502 dout(10) << " waiting for quorum" << dendl;
3503 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3504 return;
3505 }
3506 _quorum_status(f.get(), ds);
3507 rdata.append(ds);
3508 rs = "";
3509 r = 0;
3510 } else if (prefix == "mon_status") {
3511 get_mon_status(f.get(), ds);
3512 if (f)
3513 f->flush(ds);
3514 rdata.append(ds);
3515 rs = "";
3516 r = 0;
3517 } else if (prefix == "sync force" ||
3518 prefix == "mon sync force") {
3519 string validate1, validate2;
3520 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3521 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3522 if (validate1 != "--yes-i-really-mean-it" ||
3523 validate2 != "--i-know-what-i-am-doing") {
3524 r = -EINVAL;
3525 rs = "are you SURE? this will mean the monitor store will be "
3526 "erased. pass '--yes-i-really-mean-it "
3527 "--i-know-what-i-am-doing' if you really do.";
3528 goto out;
3529 }
3530 sync_force(f.get(), ds);
3531 rs = ds.str();
3532 r = 0;
3533 } else if (prefix == "heap") {
3534 if (!ceph_using_tcmalloc())
3535 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3536 else {
3537 string heapcmd;
3538 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3539 // XXX 1-element vector, change at callee or make vector here?
3540 vector<string> heapcmd_vec;
3541 get_str_vec(heapcmd, heapcmd_vec);
3542 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3543 rdata.append(ds);
3544 rs = "";
3545 r = 0;
3546 }
3547 } else if (prefix == "quorum") {
3548 string quorumcmd;
3549 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3550 if (quorumcmd == "exit") {
3551 start_election();
3552 elector.stop_participating();
3553 rs = "stopped responding to quorum, initiated new election";
3554 r = 0;
3555 } else if (quorumcmd == "enter") {
3556 elector.start_participating();
3557 start_election();
3558 rs = "started responding to quorum, initiated new election";
3559 r = 0;
3560 } else {
3561 rs = "needs a valid 'quorum' command";
3562 r = -EINVAL;
3563 }
3564 } else if (prefix == "version") {
3565 if (f) {
3566 f->open_object_section("version");
3567 f->dump_string("version", pretty_version_to_str());
3568 f->close_section();
3569 f->flush(ds);
3570 } else {
3571 ds << pretty_version_to_str();
3572 }
3573 rdata.append(ds);
3574 rs = "";
3575 r = 0;
c07f9fc5
FG
3576 } else if (prefix == "versions") {
3577 if (!f)
3578 f.reset(Formatter::create("json-pretty"));
3579 map<string,int> overall;
3580 f->open_object_section("version");
3581 map<string,int> mon, mgr, osd, mds;
3582
3583 count_metadata("ceph_version", &mon);
3584 f->open_object_section("mon");
3585 for (auto& p : mon) {
3586 f->dump_int(p.first.c_str(), p.second);
3587 overall[p.first] += p.second;
3588 }
3589 f->close_section();
3590
3591 mgrmon()->count_metadata("ceph_version", &mgr);
3592 f->open_object_section("mgr");
3593 for (auto& p : mgr) {
3594 f->dump_int(p.first.c_str(), p.second);
3595 overall[p.first] += p.second;
3596 }
3597 f->close_section();
3598
3599 osdmon()->count_metadata("ceph_version", &osd);
3600 f->open_object_section("osd");
3601 for (auto& p : osd) {
3602 f->dump_int(p.first.c_str(), p.second);
3603 overall[p.first] += p.second;
3604 }
3605 f->close_section();
3606
3607 mdsmon()->count_metadata("ceph_version", &mds);
3608 f->open_object_section("mds");
35e4c445 3609 for (auto& p : mds) {
c07f9fc5
FG
3610 f->dump_int(p.first.c_str(), p.second);
3611 overall[p.first] += p.second;
3612 }
3613 f->close_section();
3614
3615 for (auto& p : mgrstatmon()->get_service_map().services) {
3616 f->open_object_section(p.first.c_str());
3617 map<string,int> m;
3618 p.second.count_metadata("ceph_version", &m);
3619 for (auto& q : m) {
3620 f->dump_int(q.first.c_str(), q.second);
3621 overall[q.first] += q.second;
3622 }
3623 f->close_section();
3624 }
3625
3626 f->open_object_section("overall");
3627 for (auto& p : overall) {
3628 f->dump_int(p.first.c_str(), p.second);
3629 }
3630 f->close_section();
3631 f->close_section();
3632 f->flush(rdata);
3633 rs = "";
3634 r = 0;
7c673cae
FG
3635 }
3636
3637 out:
3638 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3639 reply_command(op, r, rs, rdata, 0);
3640}
3641
3642void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3643{
3644 bufferlist rdata;
3645 reply_command(op, rc, rs, rdata, version);
3646}
3647
3648void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3649 bufferlist& rdata, version_t version)
3650{
3651 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3652 assert(m->get_type() == MSG_MON_COMMAND);
3653 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3654 reply->set_tid(m->get_tid());
3655 reply->set_data(rdata);
3656 send_reply(op, reply);
3657}
3658
3659
3660// ------------------------
3661// request/reply routing
3662//
3663// a client/mds/osd will connect to a random monitor. we need to forward any
3664// messages requiring state updates to the leader, and then route any replies
3665// back via the correct monitor and back to them. (the monitor will not
3666// initiate any connections.)
3667
3668void Monitor::forward_request_leader(MonOpRequestRef op)
3669{
3670 op->mark_event(__func__);
3671
3672 int mon = get_leader();
3673 MonSession *session = op->get_session();
3674 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3675
3676 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3677 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3678 } else if (session->proxy_con) {
3679 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3680 } else if (!session->closed) {
3681 RoutedRequest *rr = new RoutedRequest;
3682 rr->tid = ++routed_request_tid;
3683 rr->client_inst = req->get_source_inst();
3684 rr->con = req->get_connection();
3685 rr->con_features = rr->con->get_features();
3686 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3687 rr->session = static_cast<MonSession *>(session->get());
3688 rr->op = op;
3689 routed_requests[rr->tid] = rr;
3690 session->routed_request_tids.insert(rr->tid);
3691
3692 dout(10) << "forward_request " << rr->tid << " request " << *req
3693 << " features " << rr->con_features << dendl;
3694
3695 MForward *forward = new MForward(rr->tid,
3696 req,
3697 rr->con_features,
3698 rr->session->caps);
3699 forward->set_priority(req->get_priority());
3700 if (session->auth_handler) {
3701 forward->entity_name = session->entity_name;
3702 } else if (req->get_source().is_mon()) {
3703 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3704 }
3705 messenger->send_message(forward, monmap->get_inst(mon));
3706 op->mark_forwarded();
3707 assert(op->get_req()->get_type() != 0);
3708 } else {
3709 dout(10) << "forward_request no session for request " << *req << dendl;
3710 }
3711}
3712
3713// fake connection attached to forwarded messages
3714struct AnonConnection : public Connection {
3715 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3716
3717 int send_message(Message *m) override {
3718 assert(!"send_message on anonymous connection");
3719 }
3720 void send_keepalive() override {
3721 assert(!"send_keepalive on anonymous connection");
3722 }
3723 void mark_down() override {
3724 // silently ignore
3725 }
3726 void mark_disposable() override {
3727 // silengtly ignore
3728 }
3729 bool is_connected() override { return false; }
3730};
3731
3732//extract the original message and put it into the regular dispatch function
3733void Monitor::handle_forward(MonOpRequestRef op)
3734{
3735 MForward *m = static_cast<MForward*>(op->get_req());
3736 dout(10) << "received forwarded message from " << m->client
3737 << " via " << m->get_source_inst() << dendl;
3738 MonSession *session = op->get_session();
3739 assert(session);
3740
3741 if (!session->is_capable("mon", MON_CAP_X)) {
3742 dout(0) << "forward from entity with insufficient caps! "
3743 << session->caps << dendl;
3744 } else {
3745 // see PaxosService::dispatch(); we rely on this being anon
3746 // (c->msgr == NULL)
3747 PaxosServiceMessage *req = m->claim_message();
3748 assert(req != NULL);
3749
3750 ConnectionRef c(new AnonConnection(cct));
3751 MonSession *s = new MonSession(req->get_source_inst(),
3752 static_cast<Connection*>(c.get()));
3753 c->set_priv(s->get());
3754 c->set_peer_addr(m->client.addr);
3755 c->set_peer_type(m->client.name.type());
3756 c->set_features(m->con_features);
3757
3758 s->caps = m->client_caps;
3759 dout(10) << " caps are " << s->caps << dendl;
3760 s->entity_name = m->entity_name;
3761 dout(10) << " entity name '" << s->entity_name << "' type "
3762 << s->entity_name.get_type() << dendl;
3763 s->proxy_con = m->get_connection();
3764 s->proxy_tid = m->tid;
3765
3766 req->set_connection(c);
3767
3768 // not super accurate, but better than nothing.
3769 req->set_recv_stamp(m->get_recv_stamp());
3770
3771 /*
3772 * note which election epoch this is; we will drop the message if
3773 * there is a future election since our peers will resend routed
3774 * requests in that case.
3775 */
3776 req->rx_election_epoch = get_epoch();
3777
3778 /* Because this is a special fake connection, we need to break
3779 the ref loop between Connection and MonSession differently
3780 than we normally do. Here, the Message refers to the Connection
3781 which refers to the Session, and nobody else refers to the Connection
3782 or the Session. And due to the special nature of this message,
3783 nobody refers to the Connection via the Session. So, clear out that
3784 half of the ref loop.*/
3785 s->con.reset(NULL);
3786
3787 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3788
3789 _ms_dispatch(req);
3790 s->put();
3791 }
3792}
3793
3794void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3795{
3796 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3797
3798 bufferlist bl;
3799 encode_message(m, quorum_con_features, bl);
3800
3801 messenger->send_message(m, to);
3802
3803 for (int i=0; i<(int)monmap->size(); i++) {
3804 if (i != rank)
3805 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3806 }
3807}
3808
3809void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3810{
3811 op->mark_event(__func__);
3812
3813 MonSession *session = op->get_session();
3814 assert(session);
3815 Message *req = op->get_req();
3816 ConnectionRef con = op->get_connection();
3817
3818 reply->set_cct(g_ceph_context);
3819 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3820
3821 if (!con) {
3822 dout(2) << "send_reply no connection, dropping reply " << *reply
3823 << " to " << req << " " << *req << dendl;
3824 reply->put();
3825 op->mark_event("reply: no connection");
3826 return;
3827 }
3828
3829 if (!session->con && !session->proxy_con) {
3830 dout(2) << "send_reply no connection, dropping reply " << *reply
3831 << " to " << req << " " << *req << dendl;
3832 reply->put();
3833 op->mark_event("reply: no connection");
3834 return;
3835 }
3836
3837 if (session->proxy_con) {
3838 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3839 << " via " << session->proxy_con->get_peer_addr()
3840 << " for request " << *req << dendl;
3841 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3842 op->mark_event("reply: send routed request");
3843 } else {
3844 session->con->send_message(reply);
3845 op->mark_event("reply: send");
3846 }
3847}
3848
3849void Monitor::no_reply(MonOpRequestRef op)
3850{
3851 MonSession *session = op->get_session();
3852 Message *req = op->get_req();
3853
3854 if (session->proxy_con) {
3855 dout(10) << "no_reply to " << req->get_source_inst()
3856 << " via " << session->proxy_con->get_peer_addr()
3857 << " for request " << *req << dendl;
3858 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3859 op->mark_event("no_reply: send routed request");
3860 } else {
3861 dout(10) << "no_reply to " << req->get_source_inst()
3862 << " " << *req << dendl;
3863 op->mark_event("no_reply");
3864 }
3865}
3866
3867void Monitor::handle_route(MonOpRequestRef op)
3868{
3869 MRoute *m = static_cast<MRoute*>(op->get_req());
3870 MonSession *session = op->get_session();
3871 //check privileges
3872 if (!session->is_capable("mon", MON_CAP_X)) {
3873 dout(0) << "MRoute received from entity without appropriate perms! "
3874 << dendl;
3875 return;
3876 }
3877 if (m->msg)
3878 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3879 else
3880 dout(10) << "handle_route null to " << m->dest << dendl;
3881
3882 // look it up
3883 if (m->session_mon_tid) {
3884 if (routed_requests.count(m->session_mon_tid)) {
3885 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3886
3887 // reset payload, in case encoding is dependent on target features
3888 if (m->msg) {
3889 m->msg->clear_payload();
3890 rr->con->send_message(m->msg);
3891 m->msg = NULL;
3892 }
3893 if (m->send_osdmap_first) {
3894 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3895 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3896 true, MonOpRequestRef());
3897 }
3898 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3899 routed_requests.erase(m->session_mon_tid);
3900 rr->session->routed_request_tids.erase(m->session_mon_tid);
3901 delete rr;
3902 } else {
3903 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3904 }
3905 } else {
3906 dout(10) << " not a routed request, trying to send anyway" << dendl;
3907 if (m->msg) {
3908 messenger->send_message(m->msg, m->dest);
3909 m->msg = NULL;
3910 }
3911 }
3912}
3913
3914void Monitor::resend_routed_requests()
3915{
3916 dout(10) << "resend_routed_requests" << dendl;
3917 int mon = get_leader();
3918 list<Context*> retry;
3919 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
3920 p != routed_requests.end();
3921 ++p) {
3922 RoutedRequest *rr = p->second;
3923
3924 if (mon == rank) {
3925 dout(10) << " requeue for self tid " << rr->tid << dendl;
3926 rr->op->mark_event("retry routed request");
3927 retry.push_back(new C_RetryMessage(this, rr->op));
3928 if (rr->session) {
3929 assert(rr->session->routed_request_tids.count(p->first));
3930 rr->session->routed_request_tids.erase(p->first);
3931 }
3932 delete rr;
3933 } else {
3934 bufferlist::iterator q = rr->request_bl.begin();
3935 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
3936 rr->op->mark_event("resend forwarded message to leader");
3937 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
3938 MForward *forward = new MForward(rr->tid, req, rr->con_features,
3939 rr->session->caps);
3940 req->put(); // forward takes its own ref; drop ours.
3941 forward->client = rr->client_inst;
3942 forward->set_priority(req->get_priority());
3943 messenger->send_message(forward, monmap->get_inst(mon));
3944 }
3945 }
3946 if (mon == rank) {
3947 routed_requests.clear();
3948 finish_contexts(g_ceph_context, retry);
3949 }
3950}
3951
3952void Monitor::remove_session(MonSession *s)
3953{
224ce89b
WB
3954 dout(10) << "remove_session " << s << " " << s->inst
3955 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
7c673cae
FG
3956 assert(s->con);
3957 assert(!s->closed);
3958 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
3959 p != s->routed_request_tids.end();
3960 ++p) {
3961 assert(routed_requests.count(*p));
3962 RoutedRequest *rr = routed_requests[*p];
3963 dout(10) << " dropping routed request " << rr->tid << dendl;
3964 delete rr;
3965 routed_requests.erase(*p);
3966 }
3967 s->routed_request_tids.clear();
3968 s->con->set_priv(NULL);
3969 session_map.remove_session(s);
3970 logger->set(l_mon_num_sessions, session_map.get_size());
3971 logger->inc(l_mon_session_rm);
3972}
3973
3974void Monitor::remove_all_sessions()
3975{
3976 Mutex::Locker l(session_map_lock);
3977 while (!session_map.sessions.empty()) {
3978 MonSession *s = session_map.sessions.front();
3979 remove_session(s);
3980 if (logger)
3981 logger->inc(l_mon_session_rm);
3982 }
3983 if (logger)
3984 logger->set(l_mon_num_sessions, session_map.get_size());
3985}
3986
3987void Monitor::send_command(const entity_inst_t& inst,
3988 const vector<string>& com)
3989{
3990 dout(10) << "send_command " << inst << "" << com << dendl;
3991 MMonCommand *c = new MMonCommand(monmap->fsid);
3992 c->cmd = com;
3993 try_send_message(c, inst);
3994}
3995
3996void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
3997{
3998 /**
3999 * Wait list the new session until we're in the quorum, assuming it's
4000 * sufficiently new.
4001 * tick() will periodically send them back through so we can send
4002 * the client elsewhere if we don't think we're getting back in.
4003 *
4004 * But we whitelist a few sorts of messages:
4005 * 1) Monitors can talk to us at any time, of course.
4006 * 2) auth messages. It's unlikely to go through much faster, but
4007 * it's possible we've just lost our quorum status and we want to take...
4008 * 3) command messages. We want to accept these under all possible
4009 * circumstances.
4010 */
4011 Message *m = op->get_req();
4012 MonSession *s = op->get_session();
4013 ConnectionRef con = op->get_connection();
4014 utime_t too_old = ceph_clock_now();
4015 too_old -= g_ceph_context->_conf->mon_lease;
4016 if (m->get_recv_stamp() > too_old &&
4017 con->is_connected()) {
4018 dout(5) << "waitlisting message " << *m << dendl;
4019 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4020 op->mark_wait_for_quorum();
4021 } else {
4022 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4023 con->mark_down();
4024 // proxied sessions aren't registered and don't have a con; don't remove
4025 // those.
4026 if (!s->proxy_con) {
4027 Mutex::Locker l(session_map_lock);
4028 remove_session(s);
4029 }
4030 op->mark_zap();
4031 }
4032}
4033
4034void Monitor::_ms_dispatch(Message *m)
4035{
4036 if (is_shutdown()) {
4037 m->put();
4038 return;
4039 }
4040
4041 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4042 bool src_is_mon = op->is_src_mon();
4043 op->mark_event("mon:_ms_dispatch");
4044 MonSession *s = op->get_session();
4045 if (s && s->closed) {
4046 return;
4047 }
224ce89b
WB
4048
4049 if (src_is_mon && s) {
4050 ConnectionRef con = m->get_connection();
4051 if (con->get_messenger() && con->get_features() != s->con_features) {
4052 // only update features if this is a non-anonymous connection
4053 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4054 << " (was " << s->con_features
4055 << ", now " << con->get_features() << ")" << dendl;
4056 // connection features changed - recreate session.
4057 if (s->con && s->con != con) {
4058 dout(10) << __func__ << " connection for " << m->get_source_inst()
4059 << " changed from session; mark down and replace" << dendl;
4060 s->con->mark_down();
4061 }
4062 if (s->item.is_on_list()) {
4063 // forwarded messages' sessions are not in the sessions map and
4064 // exist only while the op is being handled.
4065 remove_session(s);
4066 }
4067 s->put();
4068 s = nullptr;
4069 }
4070 }
4071
7c673cae
FG
4072 if (!s) {
4073 // if the sender is not a monitor, make sure their first message for a
4074 // session is an MAuth. If it is not, assume it's a stray message,
4075 // and considering that we are creating a new session it is safe to
4076 // assume that the sender hasn't authenticated yet, so we have no way
4077 // of assessing whether we should handle it or not.
4078 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4079 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4080 m->get_type() != CEPH_MSG_PING)) {
4081 dout(1) << __func__ << " dropping stray message " << *m
4082 << " from " << m->get_source_inst() << dendl;
4083 return;
4084 }
4085
4086 ConnectionRef con = m->get_connection();
4087 {
4088 Mutex::Locker l(session_map_lock);
4089 s = session_map.new_session(m->get_source_inst(), con.get());
4090 }
4091 assert(s);
4092 con->set_priv(s->get());
224ce89b
WB
4093 dout(10) << __func__ << " new session " << s << " " << *s
4094 << " features 0x" << std::hex
4095 << s->con_features << std::dec << dendl;
7c673cae
FG
4096 op->set_session(s);
4097
4098 logger->set(l_mon_num_sessions, session_map.get_size());
4099 logger->inc(l_mon_session_add);
4100
4101 if (src_is_mon) {
4102 // give it monitor caps; the peer type has been authenticated
4103 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4104 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4105 s->caps = *mon_caps;
4106 }
4107 s->put();
4108 } else {
4109 dout(20) << __func__ << " existing session " << s << " for " << s->inst
4110 << dendl;
4111 }
4112
4113 assert(s);
4114
4115 s->session_timeout = ceph_clock_now();
4116 s->session_timeout += g_conf->mon_session_timeout;
4117
4118 if (s->auth_handler) {
4119 s->entity_name = s->auth_handler->get_entity_name();
4120 }
4121 dout(20) << " caps " << s->caps.get_str() << dendl;
4122
4123 if ((is_synchronizing() ||
4124 (s->global_id == 0 && !exited_quorum.is_zero())) &&
4125 !src_is_mon &&
4126 m->get_type() != CEPH_MSG_PING) {
4127 waitlist_or_zap_client(op);
4128 } else {
4129 dispatch_op(op);
4130 }
4131 return;
4132}
4133
4134void Monitor::dispatch_op(MonOpRequestRef op)
4135{
4136 op->mark_event("mon:dispatch_op");
4137 MonSession *s = op->get_session();
4138 assert(s);
4139 if (s->closed) {
4140 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4141 return;
4142 }
4143
4144 /* we will consider the default type as being 'monitor' until proven wrong */
4145 op->set_type_monitor();
4146 /* deal with all messages that do not necessarily need caps */
4147 bool dealt_with = true;
4148 switch (op->get_req()->get_type()) {
4149 // auth
4150 case MSG_MON_GLOBAL_ID:
4151 case CEPH_MSG_AUTH:
4152 op->set_type_service();
4153 /* no need to check caps here */
4154 paxos_service[PAXOS_AUTH]->dispatch(op);
4155 break;
4156
4157 case CEPH_MSG_PING:
4158 handle_ping(op);
4159 break;
4160
4161 /* MMonGetMap may be used by clients to obtain a monmap *before*
4162 * authenticating with the monitor. We need to handle these without
4163 * checking caps because, even on a cluster without cephx, we only set
4164 * session caps *after* the auth handshake. A good example of this
4165 * is when a client calls MonClient::get_monmap_privately(), which does
4166 * not authenticate when obtaining a monmap.
4167 */
4168 case CEPH_MSG_MON_GET_MAP:
4169 handle_mon_get_map(op);
4170 break;
4171
4172 case CEPH_MSG_MON_METADATA:
4173 return handle_mon_metadata(op);
4174
4175 default:
4176 dealt_with = false;
4177 break;
4178 }
4179 if (dealt_with)
4180 return;
4181
4182 /* well, maybe the op belongs to a service... */
4183 op->set_type_service();
4184 /* deal with all messages which caps should be checked somewhere else */
4185 dealt_with = true;
4186 switch (op->get_req()->get_type()) {
4187
4188 // OSDs
4189 case CEPH_MSG_MON_GET_OSDMAP:
4190 case CEPH_MSG_POOLOP:
4191 case MSG_OSD_BEACON:
4192 case MSG_OSD_MARK_ME_DOWN:
4193 case MSG_OSD_FULL:
4194 case MSG_OSD_FAILURE:
4195 case MSG_OSD_BOOT:
4196 case MSG_OSD_ALIVE:
4197 case MSG_OSD_PGTEMP:
4198 case MSG_OSD_PG_CREATED:
4199 case MSG_REMOVE_SNAPS:
4200 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4201 break;
4202
4203 // MDSs
4204 case MSG_MDS_BEACON:
4205 case MSG_MDS_OFFLOAD_TARGETS:
4206 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4207 break;
4208
4209 // Mgrs
4210 case MSG_MGR_BEACON:
4211 paxos_service[PAXOS_MGR]->dispatch(op);
4212 break;
4213
31f18b77
FG
4214 // MgrStat
4215 case MSG_MON_MGR_REPORT:
7c673cae 4216 case CEPH_MSG_STATFS:
7c673cae 4217 case MSG_GETPOOLSTATS:
31f18b77
FG
4218 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4219 break;
4220
4221 // pg
4222 case MSG_PGSTATS:
7c673cae
FG
4223 paxos_service[PAXOS_PGMAP]->dispatch(op);
4224 break;
4225
4226 // log
4227 case MSG_LOG:
4228 paxos_service[PAXOS_LOG]->dispatch(op);
4229 break;
4230
4231 // handle_command() does its own caps checking
4232 case MSG_MON_COMMAND:
4233 op->set_type_command();
4234 handle_command(op);
4235 break;
4236
4237 default:
4238 dealt_with = false;
4239 break;
4240 }
4241 if (dealt_with)
4242 return;
4243
4244 /* nop, looks like it's not a service message; revert back to monitor */
4245 op->set_type_monitor();
4246
4247 /* messages we, the Monitor class, need to deal with
4248 * but may be sent by clients. */
4249
4250 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4251 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4252 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4253 << dendl;
4254 goto drop;
4255 }
4256
4257 dealt_with = true;
4258 switch (op->get_req()->get_type()) {
4259
4260 // misc
4261 case CEPH_MSG_MON_GET_VERSION:
4262 handle_get_version(op);
4263 break;
4264
4265 case CEPH_MSG_MON_SUBSCRIBE:
4266 /* FIXME: check what's being subscribed, filter accordingly */
4267 handle_subscribe(op);
4268 break;
4269
4270 default:
4271 dealt_with = false;
4272 break;
4273 }
4274 if (dealt_with)
4275 return;
4276
4277 if (!op->is_src_mon()) {
4278 dout(1) << __func__ << " unexpected monitor message from"
4279 << " non-monitor entity " << op->get_req()->get_source_inst()
4280 << " " << *(op->get_req()) << " -- dropping" << dendl;
4281 goto drop;
4282 }
4283
4284 /* messages that should only be sent by another monitor */
4285 dealt_with = true;
4286 switch (op->get_req()->get_type()) {
4287
4288 case MSG_ROUTE:
4289 handle_route(op);
4290 break;
4291
4292 case MSG_MON_PROBE:
4293 handle_probe(op);
4294 break;
4295
4296 // Sync (i.e., the new slurp, but on steroids)
4297 case MSG_MON_SYNC:
4298 handle_sync(op);
4299 break;
4300 case MSG_MON_SCRUB:
4301 handle_scrub(op);
4302 break;
4303
4304 /* log acks are sent from a monitor we sent the MLog to, and are
4305 never sent by clients to us. */
4306 case MSG_LOGACK:
4307 log_client.handle_log_ack((MLogAck*)op->get_req());
4308 break;
4309
4310 // monmap
4311 case MSG_MON_JOIN:
4312 op->set_type_service();
4313 paxos_service[PAXOS_MONMAP]->dispatch(op);
4314 break;
4315
4316 // paxos
4317 case MSG_MON_PAXOS:
4318 {
4319 op->set_type_paxos();
4320 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4321 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4322 //can't send these!
4323 break;
4324 }
4325
4326 if (state == STATE_SYNCHRONIZING) {
4327 // we are synchronizing. These messages would do us no
4328 // good, thus just drop them and ignore them.
4329 dout(10) << __func__ << " ignore paxos msg from "
4330 << pm->get_source_inst() << dendl;
4331 break;
4332 }
4333
4334 // sanitize
4335 if (pm->epoch > get_epoch()) {
4336 bootstrap();
4337 break;
4338 }
4339 if (pm->epoch != get_epoch()) {
4340 break;
4341 }
4342
4343 paxos->dispatch(op);
4344 }
4345 break;
4346
4347 // elector messages
4348 case MSG_MON_ELECTION:
4349 op->set_type_election();
4350 //check privileges here for simplicity
4351 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4352 dout(0) << "MMonElection received from entity without enough caps!"
4353 << op->get_session()->caps << dendl;
4354 break;
4355 }
4356 if (!is_probing() && !is_synchronizing()) {
4357 elector.dispatch(op);
4358 }
4359 break;
4360
4361 case MSG_FORWARD:
4362 handle_forward(op);
4363 break;
4364
4365 case MSG_TIMECHECK:
4366 handle_timecheck(op);
4367 break;
4368
4369 case MSG_MON_HEALTH:
4370 health_monitor->dispatch(op);
4371 break;
4372
224ce89b
WB
4373 case MSG_MON_HEALTH_CHECKS:
4374 op->set_type_service();
4375 paxos_service[PAXOS_HEALTH]->dispatch(op);
4376 break;
4377
7c673cae
FG
4378 default:
4379 dealt_with = false;
4380 break;
4381 }
4382 if (!dealt_with) {
4383 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4384 goto drop;
4385 }
4386 return;
4387
4388drop:
4389 return;
4390}
4391
4392void Monitor::handle_ping(MonOpRequestRef op)
4393{
4394 MPing *m = static_cast<MPing*>(op->get_req());
4395 dout(10) << __func__ << " " << *m << dendl;
4396 MPing *reply = new MPing;
4397 entity_inst_t inst = m->get_source_inst();
4398 bufferlist payload;
4399 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4400 f->open_object_section("pong");
4401
4402 list<string> health_str;
4403 get_health(health_str, NULL, f.get());
4404 {
4405 stringstream ss;
4406 get_mon_status(f.get(), ss);
4407 }
4408
4409 f->close_section();
4410 stringstream ss;
4411 f->flush(ss);
4412 ::encode(ss.str(), payload);
4413 reply->set_payload(payload);
4414 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4415 messenger->send_message(reply, inst);
4416}
4417
4418void Monitor::timecheck_start()
4419{
4420 dout(10) << __func__ << dendl;
4421 timecheck_cleanup();
4422 timecheck_start_round();
4423}
4424
4425void Monitor::timecheck_finish()
4426{
4427 dout(10) << __func__ << dendl;
4428 timecheck_cleanup();
4429}
4430
4431void Monitor::timecheck_start_round()
4432{
4433 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4434 assert(is_leader());
4435
4436 if (monmap->size() == 1) {
4437 assert(0 == "We are alone; this shouldn't have been scheduled!");
4438 return;
4439 }
4440
4441 if (timecheck_round % 2) {
4442 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4443 utime_t curr_time = ceph_clock_now();
4444 double max = g_conf->mon_timecheck_interval*3;
4445 if (curr_time - timecheck_round_start < max) {
4446 dout(10) << __func__ << " keep current round going" << dendl;
4447 goto out;
4448 } else {
4449 dout(10) << __func__
4450 << " finish current timecheck and start new" << dendl;
4451 timecheck_cancel_round();
4452 }
4453 }
4454
4455 assert(timecheck_round % 2 == 0);
4456 timecheck_acks = 0;
4457 timecheck_round ++;
4458 timecheck_round_start = ceph_clock_now();
4459 dout(10) << __func__ << " new " << timecheck_round << dendl;
4460
4461 timecheck();
4462out:
4463 dout(10) << __func__ << " setting up next event" << dendl;
4464 timecheck_reset_event();
4465}
4466
4467void Monitor::timecheck_finish_round(bool success)
4468{
4469 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4470 assert(timecheck_round % 2);
4471 timecheck_round ++;
4472 timecheck_round_start = utime_t();
4473
4474 if (success) {
4475 assert(timecheck_waiting.empty());
4476 assert(timecheck_acks == quorum.size());
4477 timecheck_report();
4478 timecheck_check_skews();
4479 return;
4480 }
4481
4482 dout(10) << __func__ << " " << timecheck_waiting.size()
4483 << " peers still waiting:";
4484 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4485 p != timecheck_waiting.end(); ++p) {
4486 *_dout << " " << p->first.name;
4487 }
4488 *_dout << dendl;
4489 timecheck_waiting.clear();
4490
4491 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4492}
4493
4494void Monitor::timecheck_cancel_round()
4495{
4496 timecheck_finish_round(false);
4497}
4498
4499void Monitor::timecheck_cleanup()
4500{
4501 timecheck_round = 0;
4502 timecheck_acks = 0;
4503 timecheck_round_start = utime_t();
4504
4505 if (timecheck_event) {
4506 timer.cancel_event(timecheck_event);
4507 timecheck_event = NULL;
4508 }
4509 timecheck_waiting.clear();
4510 timecheck_skews.clear();
4511 timecheck_latencies.clear();
4512
4513 timecheck_rounds_since_clean = 0;
4514}
4515
4516void Monitor::timecheck_reset_event()
4517{
4518 if (timecheck_event) {
4519 timer.cancel_event(timecheck_event);
4520 timecheck_event = NULL;
4521 }
4522
4523 double delay =
4524 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4525
4526 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4527 delay = cct->_conf->mon_timecheck_interval;
4528 }
4529
4530 dout(10) << __func__ << " delay " << delay
4531 << " rounds_since_clean " << timecheck_rounds_since_clean
4532 << dendl;
4533
4534 timecheck_event = new C_MonContext(this, [this](int) {
4535 timecheck_start_round();
4536 });
4537 timer.add_event_after(delay, timecheck_event);
4538}
4539
4540void Monitor::timecheck_check_skews()
4541{
4542 dout(10) << __func__ << dendl;
4543 assert(is_leader());
4544 assert((timecheck_round % 2) == 0);
4545 if (monmap->size() == 1) {
4546 assert(0 == "We are alone; we shouldn't have gotten here!");
4547 return;
4548 }
4549 assert(timecheck_latencies.size() == timecheck_skews.size());
4550
4551 bool found_skew = false;
4552 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4553 p != timecheck_skews.end(); ++p) {
4554
4555 double abs_skew;
4556 if (timecheck_has_skew(p->second, &abs_skew)) {
4557 dout(10) << __func__
4558 << " " << p->first << " skew " << abs_skew << dendl;
4559 found_skew = true;
4560 }
4561 }
4562
4563 if (found_skew) {
4564 ++timecheck_rounds_since_clean;
4565 timecheck_reset_event();
4566 } else if (timecheck_rounds_since_clean > 0) {
4567 dout(1) << __func__
4568 << " no clock skews found after " << timecheck_rounds_since_clean
4569 << " rounds" << dendl;
4570 // make sure the skews are really gone and not just a transient success
4571 // this will run just once if not in the presence of skews again.
4572 timecheck_rounds_since_clean = 1;
4573 timecheck_reset_event();
4574 timecheck_rounds_since_clean = 0;
4575 }
4576
4577}
4578
4579void Monitor::timecheck_report()
4580{
4581 dout(10) << __func__ << dendl;
4582 assert(is_leader());
4583 assert((timecheck_round % 2) == 0);
4584 if (monmap->size() == 1) {
4585 assert(0 == "We are alone; we shouldn't have gotten here!");
4586 return;
4587 }
4588
4589 assert(timecheck_latencies.size() == timecheck_skews.size());
4590 bool do_output = true; // only output report once
4591 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4592 if (monmap->get_name(*q) == name)
4593 continue;
4594
4595 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4596 m->epoch = get_epoch();
4597 m->round = timecheck_round;
4598
4599 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4600 it != timecheck_skews.end(); ++it) {
4601 double skew = it->second;
4602 double latency = timecheck_latencies[it->first];
4603
4604 m->skews[it->first] = skew;
4605 m->latencies[it->first] = latency;
4606
4607 if (do_output) {
4608 dout(25) << __func__ << " " << it->first
4609 << " latency " << latency
4610 << " skew " << skew << dendl;
4611 }
4612 }
4613 do_output = false;
4614 entity_inst_t inst = monmap->get_inst(*q);
4615 dout(10) << __func__ << " send report to " << inst << dendl;
4616 messenger->send_message(m, inst);
4617 }
4618}
4619
4620void Monitor::timecheck()
4621{
4622 dout(10) << __func__ << dendl;
4623 assert(is_leader());
4624 if (monmap->size() == 1) {
4625 assert(0 == "We are alone; we shouldn't have gotten here!");
4626 return;
4627 }
4628 assert(timecheck_round % 2 != 0);
4629
4630 timecheck_acks = 1; // we ack ourselves
4631
4632 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4633 << " round " << timecheck_round << dendl;
4634
4635 // we are at the eye of the storm; the point of reference
4636 timecheck_skews[messenger->get_myinst()] = 0.0;
4637 timecheck_latencies[messenger->get_myinst()] = 0.0;
4638
4639 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4640 if (monmap->get_name(*it) == name)
4641 continue;
4642
4643 entity_inst_t inst = monmap->get_inst(*it);
4644 utime_t curr_time = ceph_clock_now();
4645 timecheck_waiting[inst] = curr_time;
4646 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4647 m->epoch = get_epoch();
4648 m->round = timecheck_round;
4649 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4650 messenger->send_message(m, inst);
4651 }
4652}
4653
4654health_status_t Monitor::timecheck_status(ostringstream &ss,
4655 const double skew_bound,
4656 const double latency)
4657{
4658 health_status_t status = HEALTH_OK;
4659 assert(latency >= 0);
4660
4661 double abs_skew;
4662 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4663 status = HEALTH_WARN;
4664 ss << "clock skew " << abs_skew << "s"
4665 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4666 }
4667
4668 return status;
4669}
4670
4671void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4672{
4673 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4674 dout(10) << __func__ << " " << *m << dendl;
4675 /* handles PONG's */
4676 assert(m->op == MTimeCheck::OP_PONG);
4677
4678 entity_inst_t other = m->get_source_inst();
4679 if (m->epoch < get_epoch()) {
4680 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4681 << " from " << other
4682 << " curr " << get_epoch()
4683 << " -- severely lagged? discard" << dendl;
4684 return;
4685 }
4686 assert(m->epoch == get_epoch());
4687
4688 if (m->round < timecheck_round) {
4689 dout(1) << __func__ << " got old round " << m->round
4690 << " from " << other
4691 << " curr " << timecheck_round << " -- discard" << dendl;
4692 return;
4693 }
4694
4695 utime_t curr_time = ceph_clock_now();
4696
4697 assert(timecheck_waiting.count(other) > 0);
4698 utime_t timecheck_sent = timecheck_waiting[other];
4699 timecheck_waiting.erase(other);
4700 if (curr_time < timecheck_sent) {
4701 // our clock was readjusted -- drop everything until it all makes sense.
4702 dout(1) << __func__ << " our clock was readjusted --"
4703 << " bump round and drop current check"
4704 << dendl;
4705 timecheck_cancel_round();
4706 return;
4707 }
4708
4709 /* update peer latencies */
4710 double latency = (double)(curr_time - timecheck_sent);
4711
4712 if (timecheck_latencies.count(other) == 0)
4713 timecheck_latencies[other] = latency;
4714 else {
4715 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4716 timecheck_latencies[other] = avg_latency;
4717 }
4718
4719 /*
4720 * update skews
4721 *
4722 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4723 * and 'a' happens to be lower than 'b'; so we use double instead.
4724 *
4725 * latency is always expected to be >= 0.
4726 *
4727 * delta, the difference between theirs timestamp and ours, may either be
4728 * lower or higher than 0; will hardly ever be 0.
4729 *
4730 * The absolute skew is the absolute delta minus the latency, which is
4731 * taken as a whole instead of an rtt given that there is some queueing
4732 * and dispatch times involved and it's hard to assess how long exactly
4733 * it took for the message to travel to the other side and be handled. So
4734 * we call it a bounded skew, the worst case scenario.
4735 *
4736 * Now, to math!
4737 *
4738 * Given that the latency is always positive, we can establish that the
4739 * bounded skew will be:
4740 *
4741 * 1. positive if the absolute delta is higher than the latency and
4742 * delta is positive
4743 * 2. negative if the absolute delta is higher than the latency and
4744 * delta is negative.
4745 * 3. zero if the absolute delta is lower than the latency.
4746 *
4747 * On 3. we make a judgement call and treat the skew as non-existent.
4748 * This is because that, if the absolute delta is lower than the
4749 * latency, then the apparently existing skew is nothing more than a
4750 * side-effect of the high latency at work.
4751 *
4752 * This may not be entirely true though, as a severely skewed clock
4753 * may be masked by an even higher latency, but with high latencies
4754 * we probably have worse issues to deal with than just skewed clocks.
4755 */
4756 assert(latency >= 0);
4757
4758 double delta = ((double) m->timestamp) - ((double) curr_time);
4759 double abs_delta = (delta > 0 ? delta : -delta);
4760 double skew_bound = abs_delta - latency;
4761 if (skew_bound < 0)
4762 skew_bound = 0;
4763 else if (delta < 0)
4764 skew_bound = -skew_bound;
4765
4766 ostringstream ss;
4767 health_status_t status = timecheck_status(ss, skew_bound, latency);
4768 if (status == HEALTH_ERR)
4769 clog->error() << other << " " << ss.str();
4770 else if (status == HEALTH_WARN)
4771 clog->warn() << other << " " << ss.str();
4772
4773 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4774 << " delta " << delta << " skew_bound " << skew_bound
4775 << " latency " << latency << dendl;
4776
4777 timecheck_skews[other] = skew_bound;
4778
4779 timecheck_acks++;
4780 if (timecheck_acks == quorum.size()) {
4781 dout(10) << __func__ << " got pongs from everybody ("
4782 << timecheck_acks << " total)" << dendl;
4783 assert(timecheck_skews.size() == timecheck_acks);
4784 assert(timecheck_waiting.empty());
4785 // everyone has acked, so bump the round to finish it.
4786 timecheck_finish_round();
4787 }
4788}
4789
4790void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4791{
4792 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4793 dout(10) << __func__ << " " << *m << dendl;
4794
4795 assert(is_peon());
4796 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4797
4798 if (m->epoch != get_epoch()) {
4799 dout(1) << __func__ << " got wrong epoch "
4800 << "(ours " << get_epoch()
4801 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4802 return;
4803 }
4804
4805 if (m->round < timecheck_round) {
4806 dout(1) << __func__ << " got old round " << m->round
4807 << " current " << timecheck_round
4808 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4809 return;
4810 }
4811
4812 timecheck_round = m->round;
4813
4814 if (m->op == MTimeCheck::OP_REPORT) {
4815 assert((timecheck_round % 2) == 0);
4816 timecheck_latencies.swap(m->latencies);
4817 timecheck_skews.swap(m->skews);
4818 return;
4819 }
4820
4821 assert((timecheck_round % 2) != 0);
4822 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4823 utime_t curr_time = ceph_clock_now();
4824 reply->timestamp = curr_time;
4825 reply->epoch = m->epoch;
4826 reply->round = m->round;
4827 dout(10) << __func__ << " send " << *m
4828 << " to " << m->get_source_inst() << dendl;
4829 m->get_connection()->send_message(reply);
4830}
4831
4832void Monitor::handle_timecheck(MonOpRequestRef op)
4833{
4834 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4835 dout(10) << __func__ << " " << *m << dendl;
4836
4837 if (is_leader()) {
4838 if (m->op != MTimeCheck::OP_PONG) {
4839 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4840 } else {
4841 handle_timecheck_leader(op);
4842 }
4843 } else if (is_peon()) {
4844 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4845 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4846 } else {
4847 handle_timecheck_peon(op);
4848 }
4849 } else {
4850 dout(1) << __func__ << " drop unexpected msg" << dendl;
4851 }
4852}
4853
4854void Monitor::handle_subscribe(MonOpRequestRef op)
4855{
4856 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4857 dout(10) << "handle_subscribe " << *m << dendl;
4858
4859 bool reply = false;
4860
4861 MonSession *s = op->get_session();
4862 assert(s);
4863
4864 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4865 p != m->what.end();
4866 ++p) {
4867 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4868 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4869 reply = true;
4870
4871 // remove conflicting subscribes
4872 if (logmon()->sub_name_to_id(p->first) >= 0) {
4873 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4874 it != s->sub_map.end(); ) {
4875 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4876 Mutex::Locker l(session_map_lock);
4877 session_map.remove_sub((it++)->second);
4878 } else {
4879 ++it;
4880 }
4881 }
4882 }
4883
4884 {
4885 Mutex::Locker l(session_map_lock);
4886 session_map.add_update_sub(s, p->first, p->second.start,
4887 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4888 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4889 }
4890
4891 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4892 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4893 if ((int)s->is_capable("mds", MON_CAP_R)) {
4894 Subscription *sub = s->sub_map[p->first];
4895 assert(sub != nullptr);
4896 mdsmon()->check_sub(sub);
4897 }
4898 } else if (p->first == "osdmap") {
4899 if ((int)s->is_capable("osd", MON_CAP_R)) {
4900 if (s->osd_epoch > p->second.start) {
4901 // client needs earlier osdmaps on purpose, so reset the sent epoch
4902 s->osd_epoch = 0;
4903 }
4904 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4905 }
4906 } else if (p->first == "osd_pg_creates") {
4907 if ((int)s->is_capable("osd", MON_CAP_W)) {
4908 if (monmap->get_required_features().contains_all(
4909 ceph::features::mon::FEATURE_LUMINOUS)) {
4910 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
4911 } else {
4912 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
4913 }
4914 }
4915 } else if (p->first == "monmap") {
4916 monmon()->check_sub(s->sub_map[p->first]);
4917 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
4918 logmon()->check_sub(s->sub_map[p->first]);
4919 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
4920 mgrmon()->check_sub(s->sub_map[p->first]);
224ce89b
WB
4921 } else if (p->first == "servicemap") {
4922 mgrstatmon()->check_sub(s->sub_map[p->first]);
7c673cae
FG
4923 }
4924 }
4925
4926 if (reply) {
4927 // we only need to reply if the client is old enough to think it
4928 // has to send renewals.
4929 ConnectionRef con = m->get_connection();
4930 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
4931 m->get_connection()->send_message(new MMonSubscribeAck(
4932 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
4933 }
4934
4935}
4936
4937void Monitor::handle_get_version(MonOpRequestRef op)
4938{
4939 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
4940 dout(10) << "handle_get_version " << *m << dendl;
4941 PaxosService *svc = NULL;
4942
4943 MonSession *s = op->get_session();
4944 assert(s);
4945
4946 if (!is_leader() && !is_peon()) {
4947 dout(10) << " waiting for quorum" << dendl;
4948 waitfor_quorum.push_back(new C_RetryMessage(this, op));
4949 goto out;
4950 }
4951
4952 if (m->what == "mdsmap") {
4953 svc = mdsmon();
4954 } else if (m->what == "fsmap") {
4955 svc = mdsmon();
4956 } else if (m->what == "osdmap") {
4957 svc = osdmon();
4958 } else if (m->what == "monmap") {
4959 svc = monmon();
4960 } else {
4961 derr << "invalid map type " << m->what << dendl;
4962 }
4963
4964 if (svc) {
4965 if (!svc->is_readable()) {
4966 svc->wait_for_readable(op, new C_RetryMessage(this, op));
4967 goto out;
4968 }
4969
4970 MMonGetVersionReply *reply = new MMonGetVersionReply();
4971 reply->handle = m->handle;
4972 reply->version = svc->get_last_committed();
4973 reply->oldest_version = svc->get_first_committed();
4974 reply->set_tid(m->get_tid());
4975
4976 m->get_connection()->send_message(reply);
4977 }
4978 out:
4979 return;
4980}
4981
4982bool Monitor::ms_handle_reset(Connection *con)
4983{
4984 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
4985
4986 // ignore lossless monitor sessions
4987 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4988 return false;
4989
4990 MonSession *s = static_cast<MonSession *>(con->get_priv());
4991 if (!s)
4992 return false;
4993
4994 // break any con <-> session ref cycle
4995 s->con->set_priv(NULL);
4996
4997 if (is_shutdown())
4998 return false;
4999
5000 Mutex::Locker l(lock);
5001
5002 dout(10) << "reset/close on session " << s->inst << dendl;
5003 if (!s->closed) {
5004 Mutex::Locker l(session_map_lock);
5005 remove_session(s);
5006 }
5007 s->put();
5008 return true;
5009}
5010
5011bool Monitor::ms_handle_refused(Connection *con)
5012{
5013 // just log for now...
5014 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5015 return false;
5016}
5017
5018// -----
5019
5020void Monitor::send_latest_monmap(Connection *con)
5021{
5022 bufferlist bl;
5023 monmap->encode(bl, con->get_features());
5024 con->send_message(new MMonMap(bl));
5025}
5026
5027void Monitor::handle_mon_get_map(MonOpRequestRef op)
5028{
5029 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5030 dout(10) << "handle_mon_get_map" << dendl;
5031 send_latest_monmap(m->get_connection().get());
5032}
5033
5034void Monitor::handle_mon_metadata(MonOpRequestRef op)
5035{
5036 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5037 if (is_leader()) {
5038 dout(10) << __func__ << dendl;
5039 update_mon_metadata(m->get_source().num(), std::move(m->data));
5040 }
5041}
5042
5043void Monitor::update_mon_metadata(int from, Metadata&& m)
5044{
224ce89b
WB
5045 // NOTE: this is now for legacy (kraken or jewel) mons only.
5046 pending_metadata[from] = std::move(m);
7c673cae
FG
5047
5048 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
224ce89b 5049 bufferlist bl;
7c673cae
FG
5050 ::encode(pending_metadata, bl);
5051 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5052 paxos->trigger_propose();
5053}
5054
224ce89b 5055int Monitor::load_metadata()
7c673cae
FG
5056{
5057 bufferlist bl;
5058 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5059 if (r)
5060 return r;
5061 bufferlist::iterator it = bl.begin();
224ce89b
WB
5062 ::decode(mon_metadata, it);
5063
5064 pending_metadata = mon_metadata;
7c673cae
FG
5065 return 0;
5066}
5067
5068int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5069{
5070 assert(f);
224ce89b 5071 if (!mon_metadata.count(mon)) {
7c673cae
FG
5072 err << "mon." << mon << " not found";
5073 return -EINVAL;
5074 }
224ce89b 5075 const Metadata& m = mon_metadata[mon];
7c673cae
FG
5076 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5077 f->dump_string(p->first.c_str(), p->second);
5078 }
5079 return 0;
5080}
5081
c07f9fc5 5082void Monitor::count_metadata(const string& field, map<string,int> *out)
31f18b77 5083{
224ce89b 5084 for (auto& p : mon_metadata) {
31f18b77
FG
5085 auto q = p.second.find(field);
5086 if (q == p.second.end()) {
c07f9fc5 5087 (*out)["unknown"]++;
31f18b77 5088 } else {
c07f9fc5 5089 (*out)[q->second]++;
31f18b77
FG
5090 }
5091 }
c07f9fc5
FG
5092}
5093
5094void Monitor::count_metadata(const string& field, Formatter *f)
5095{
5096 map<string,int> by_val;
5097 count_metadata(field, &by_val);
31f18b77
FG
5098 f->open_object_section(field.c_str());
5099 for (auto& p : by_val) {
5100 f->dump_int(p.first.c_str(), p.second);
5101 }
5102 f->close_section();
5103}
5104
7c673cae
FG
5105int Monitor::print_nodes(Formatter *f, ostream& err)
5106{
7c673cae 5107 map<string, list<int> > mons; // hostname => mon
224ce89b
WB
5108 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5109 it != mon_metadata.end(); ++it) {
7c673cae
FG
5110 const Metadata& m = it->second;
5111 Metadata::const_iterator hostname = m.find("hostname");
5112 if (hostname == m.end()) {
5113 // not likely though
5114 continue;
5115 }
5116 mons[hostname->second].push_back(it->first);
5117 }
5118
5119 dump_services(f, mons, "mon");
5120 return 0;
5121}
5122
5123// ----------------------------------------------
5124// scrub
5125
5126int Monitor::scrub_start()
5127{
5128 dout(10) << __func__ << dendl;
5129 assert(is_leader());
5130
5131 if (!scrub_result.empty()) {
5132 clog->info() << "scrub already in progress";
5133 return -EBUSY;
5134 }
5135
5136 scrub_event_cancel();
5137 scrub_result.clear();
5138 scrub_state.reset(new ScrubState);
5139
5140 scrub();
5141 return 0;
5142}
5143
5144int Monitor::scrub()
5145{
5146 assert(is_leader());
5147 assert(scrub_state);
5148
5149 scrub_cancel_timeout();
5150 wait_for_paxos_write();
5151 scrub_version = paxos->get_version();
5152
5153
5154 // scrub all keys if we're the only monitor in the quorum
5155 int32_t num_keys =
5156 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5157
5158 for (set<int>::iterator p = quorum.begin();
5159 p != quorum.end();
5160 ++p) {
5161 if (*p == rank)
5162 continue;
5163 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5164 num_keys);
5165 r->key = scrub_state->last_key;
5166 messenger->send_message(r, monmap->get_inst(*p));
5167 }
5168
5169 // scrub my keys
5170 bool r = _scrub(&scrub_result[rank],
5171 &scrub_state->last_key,
5172 &num_keys);
5173
5174 scrub_state->finished = !r;
5175
5176 // only after we got our scrub results do we really care whether the
5177 // other monitors are late on their results. Also, this way we avoid
5178 // triggering the timeout if we end up getting stuck in _scrub() for
5179 // longer than the duration of the timeout.
5180 scrub_reset_timeout();
5181
5182 if (quorum.size() == 1) {
5183 assert(scrub_state->finished == true);
5184 scrub_finish();
5185 }
5186 return 0;
5187}
5188
5189void Monitor::handle_scrub(MonOpRequestRef op)
5190{
5191 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5192 dout(10) << __func__ << " " << *m << dendl;
5193 switch (m->op) {
5194 case MMonScrub::OP_SCRUB:
5195 {
5196 if (!is_peon())
5197 break;
5198
5199 wait_for_paxos_write();
5200
5201 if (m->version != paxos->get_version())
5202 break;
5203
5204 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5205 m->version,
5206 m->num_keys);
5207
5208 reply->key = m->key;
5209 _scrub(&reply->result, &reply->key, &reply->num_keys);
5210 m->get_connection()->send_message(reply);
5211 }
5212 break;
5213
5214 case MMonScrub::OP_RESULT:
5215 {
5216 if (!is_leader())
5217 break;
5218 if (m->version != scrub_version)
5219 break;
5220 // reset the timeout each time we get a result
5221 scrub_reset_timeout();
5222
5223 int from = m->get_source().num();
5224 assert(scrub_result.count(from) == 0);
5225 scrub_result[from] = m->result;
5226
5227 if (scrub_result.size() == quorum.size()) {
5228 scrub_check_results();
5229 scrub_result.clear();
5230 if (scrub_state->finished)
5231 scrub_finish();
5232 else
5233 scrub();
5234 }
5235 }
5236 break;
5237 }
5238}
5239
5240bool Monitor::_scrub(ScrubResult *r,
5241 pair<string,string> *start,
5242 int *num_keys)
5243{
5244 assert(r != NULL);
5245 assert(start != NULL);
5246 assert(num_keys != NULL);
5247
5248 set<string> prefixes = get_sync_targets_names();
5249 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5250
5251 dout(10) << __func__ << " start (" << *start << ")"
5252 << " num_keys " << *num_keys << dendl;
5253
5254 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5255
5256 int scrubbed_keys = 0;
5257 pair<string,string> last_key;
5258
5259 while (it->has_next_chunk()) {
5260
5261 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5262 break;
5263
5264 pair<string,string> k = it->get_next_key();
5265 if (prefixes.count(k.first) == 0)
5266 continue;
5267
5268 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5269 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5270 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5271 << dendl;
5272 continue;
5273 }
5274
5275 bufferlist bl;
224ce89b
WB
5276 int err = store->get(k.first, k.second, bl);
5277 assert(err == 0);
5278
7c673cae
FG
5279 uint32_t key_crc = bl.crc32c(0);
5280 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5281 << " crc " << key_crc << dendl;
5282 r->prefix_keys[k.first]++;
224ce89b 5283 if (r->prefix_crc.count(k.first) == 0) {
7c673cae 5284 r->prefix_crc[k.first] = 0;
224ce89b 5285 }
7c673cae
FG
5286 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5287
5288 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5289 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5290 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5291 r->prefix_crc[k.first] += 1;
5292 }
5293
5294 ++scrubbed_keys;
5295 last_key = k;
5296 }
5297
5298 dout(20) << __func__ << " last_key (" << last_key << ")"
5299 << " scrubbed_keys " << scrubbed_keys
5300 << " has_next " << it->has_next_chunk() << dendl;
5301
5302 *start = last_key;
5303 *num_keys = scrubbed_keys;
5304
5305 return it->has_next_chunk();
5306}
5307
5308void Monitor::scrub_check_results()
5309{
5310 dout(10) << __func__ << dendl;
5311
5312 // compare
5313 int errors = 0;
5314 ScrubResult& mine = scrub_result[rank];
5315 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5316 p != scrub_result.end();
5317 ++p) {
5318 if (p->first == rank)
5319 continue;
5320 if (p->second != mine) {
5321 ++errors;
5322 clog->error() << "scrub mismatch";
5323 clog->error() << " mon." << rank << " " << mine;
5324 clog->error() << " mon." << p->first << " " << p->second;
5325 }
5326 }
5327 if (!errors)
d2e6a577 5328 clog->debug() << "scrub ok on " << quorum << ": " << mine;
7c673cae
FG
5329}
5330
5331inline void Monitor::scrub_timeout()
5332{
5333 dout(1) << __func__ << " restarting scrub" << dendl;
5334 scrub_reset();
5335 scrub_start();
5336}
5337
5338void Monitor::scrub_finish()
5339{
5340 dout(10) << __func__ << dendl;
5341 scrub_reset();
5342 scrub_event_start();
5343}
5344
5345void Monitor::scrub_reset()
5346{
5347 dout(10) << __func__ << dendl;
5348 scrub_cancel_timeout();
5349 scrub_version = 0;
5350 scrub_result.clear();
5351 scrub_state.reset();
5352}
5353
5354inline void Monitor::scrub_update_interval(int secs)
5355{
5356 // we don't care about changes if we are not the leader.
5357 // changes will be visible if we become the leader.
5358 if (!is_leader())
5359 return;
5360
5361 dout(1) << __func__ << " new interval = " << secs << dendl;
5362
5363 // if scrub already in progress, all changes will already be visible during
5364 // the next round. Nothing to do.
5365 if (scrub_state != NULL)
5366 return;
5367
5368 scrub_event_cancel();
5369 scrub_event_start();
5370}
5371
5372void Monitor::scrub_event_start()
5373{
5374 dout(10) << __func__ << dendl;
5375
5376 if (scrub_event)
5377 scrub_event_cancel();
5378
5379 if (cct->_conf->mon_scrub_interval <= 0) {
5380 dout(1) << __func__ << " scrub event is disabled"
5381 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5382 << ")" << dendl;
5383 return;
5384 }
5385
5386 scrub_event = new C_MonContext(this, [this](int) {
5387 scrub_start();
5388 });
5389 timer.add_event_after(cct->_conf->mon_scrub_interval, scrub_event);
5390}
5391
5392void Monitor::scrub_event_cancel()
5393{
5394 dout(10) << __func__ << dendl;
5395 if (scrub_event) {
5396 timer.cancel_event(scrub_event);
5397 scrub_event = NULL;
5398 }
5399}
5400
5401inline void Monitor::scrub_cancel_timeout()
5402{
5403 if (scrub_timeout_event) {
5404 timer.cancel_event(scrub_timeout_event);
5405 scrub_timeout_event = NULL;
5406 }
5407}
5408
5409void Monitor::scrub_reset_timeout()
5410{
5411 dout(15) << __func__ << " reset timeout event" << dendl;
5412 scrub_cancel_timeout();
5413
5414 scrub_timeout_event = new C_MonContext(this, [this](int) {
5415 scrub_timeout();
5416 });
5417 timer.add_event_after(g_conf->mon_scrub_timeout, scrub_timeout_event);
5418}
5419
5420/************ TICK ***************/
5421void Monitor::new_tick()
5422{
5423 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5424 tick();
5425 }));
5426}
5427
5428void Monitor::tick()
5429{
5430 // ok go.
5431 dout(11) << "tick" << dendl;
5432
5433 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5434 (*p)->tick();
5435 (*p)->maybe_trim();
5436 }
5437
5438 // trim sessions
5439 utime_t now = ceph_clock_now();
5440 {
5441 Mutex::Locker l(session_map_lock);
5442 auto p = session_map.sessions.begin();
5443
5444 bool out_for_too_long = (!exited_quorum.is_zero() &&
5445 now > (exited_quorum + 2*g_conf->mon_lease));
5446
5447 while (!p.end()) {
5448 MonSession *s = *p;
5449 ++p;
5450
5451 // don't trim monitors
5452 if (s->inst.name.is_mon())
5453 continue;
5454
5455 if (s->session_timeout < now && s->con) {
5456 // check keepalive, too
5457 s->session_timeout = s->con->get_last_keepalive();
5458 s->session_timeout += g_conf->mon_session_timeout;
5459 }
5460 if (s->session_timeout < now) {
5461 dout(10) << " trimming session " << s->con << " " << s->inst
5462 << " (timeout " << s->session_timeout
5463 << " < now " << now << ")" << dendl;
5464 } else if (out_for_too_long) {
5465 // boot the client Session because we've taken too long getting back in
5466 dout(10) << " trimming session " << s->con << " " << s->inst
5467 << " because we've been out of quorum too long" << dendl;
5468 } else {
5469 continue;
5470 }
5471
5472 s->con->mark_down();
5473 remove_session(s);
5474 logger->inc(l_mon_session_trim);
5475 }
5476 }
5477 sync_trim_providers();
5478
5479 if (!maybe_wait_for_quorum.empty()) {
5480 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5481 }
5482
5483 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5484 // this is only necessary on upgraded clusters.
5485 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5486 prepare_new_fingerprint(t);
5487 paxos->trigger_propose();
5488 }
5489
5490 new_tick();
5491}
5492
5493void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5494{
5495 uuid_d nf;
5496 nf.generate_random();
5497 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5498
5499 bufferlist bl;
5500 ::encode(nf, bl);
5501 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5502}
5503
5504int Monitor::check_fsid()
5505{
5506 bufferlist ebl;
5507 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5508 if (r == -ENOENT)
5509 return r;
5510 assert(r == 0);
5511
5512 string es(ebl.c_str(), ebl.length());
5513
5514 // only keep the first line
5515 size_t pos = es.find_first_of('\n');
5516 if (pos != string::npos)
5517 es.resize(pos);
5518
5519 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5520 uuid_d ondisk;
5521 if (!ondisk.parse(es.c_str())) {
5522 derr << "error: unable to parse uuid" << dendl;
5523 return -EINVAL;
5524 }
5525
5526 if (monmap->get_fsid() != ondisk) {
5527 derr << "error: cluster_uuid file exists with value " << ondisk
5528 << ", != our uuid " << monmap->get_fsid() << dendl;
5529 return -EEXIST;
5530 }
5531
5532 return 0;
5533}
5534
5535int Monitor::write_fsid()
5536{
5537 auto t(std::make_shared<MonitorDBStore::Transaction>());
5538 write_fsid(t);
5539 int r = store->apply_transaction(t);
5540 return r;
5541}
5542
5543int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5544{
5545 ostringstream ss;
5546 ss << monmap->get_fsid() << "\n";
5547 string us = ss.str();
5548
5549 bufferlist b;
5550 b.append(us);
5551
5552 t->put(MONITOR_NAME, "cluster_uuid", b);
5553 return 0;
5554}
5555
5556/*
5557 * this is the closest thing to a traditional 'mkfs' for ceph.
5558 * initialize the monitor state machines to their initial values.
5559 */
5560int Monitor::mkfs(bufferlist& osdmapbl)
5561{
5562 auto t(std::make_shared<MonitorDBStore::Transaction>());
5563
5564 // verify cluster fsid
5565 int r = check_fsid();
5566 if (r < 0 && r != -ENOENT)
5567 return r;
5568
5569 bufferlist magicbl;
5570 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5571 magicbl.append("\n");
5572 t->put(MONITOR_NAME, "magic", magicbl);
5573
5574
5575 features = get_initial_supported_features();
5576 write_features(t);
5577
5578 // save monmap, osdmap, keyring.
5579 bufferlist monmapbl;
5580 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5581 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5582 t->put("mkfs", "monmap", monmapbl);
5583
5584 if (osdmapbl.length()) {
5585 // make sure it's a valid osdmap
5586 try {
5587 OSDMap om;
5588 om.decode(osdmapbl);
5589 }
5590 catch (buffer::error& e) {
5591 derr << "error decoding provided osdmap: " << e.what() << dendl;
5592 return -EINVAL;
5593 }
5594 t->put("mkfs", "osdmap", osdmapbl);
5595 }
5596
5597 if (is_keyring_required()) {
5598 KeyRing keyring;
5599 string keyring_filename;
5600
5601 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5602 if (r) {
5603 derr << "unable to find a keyring file on " << g_conf->keyring
5604 << ": " << cpp_strerror(r) << dendl;
5605 if (g_conf->key != "") {
5606 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5607 "\n\tcaps mon = \"allow *\"\n";
5608 bufferlist bl;
5609 bl.append(keyring_plaintext);
5610 try {
5611 bufferlist::iterator i = bl.begin();
5612 keyring.decode_plaintext(i);
5613 }
5614 catch (const buffer::error& e) {
5615 derr << "error decoding keyring " << keyring_plaintext
5616 << ": " << e.what() << dendl;
5617 return -EINVAL;
5618 }
5619 } else {
5620 return -ENOENT;
5621 }
5622 } else {
5623 r = keyring.load(g_ceph_context, keyring_filename);
5624 if (r < 0) {
5625 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5626 return r;
5627 }
5628 }
5629
5630 // put mon. key in external keyring; seed with everything else.
5631 extract_save_mon_key(keyring);
5632
5633 bufferlist keyringbl;
5634 keyring.encode_plaintext(keyringbl);
5635 t->put("mkfs", "keyring", keyringbl);
5636 }
5637 write_fsid(t);
5638 store->apply_transaction(t);
5639
5640 return 0;
5641}
5642
5643int Monitor::write_default_keyring(bufferlist& bl)
5644{
5645 ostringstream os;
5646 os << g_conf->mon_data << "/keyring";
5647
5648 int err = 0;
5649 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5650 if (fd < 0) {
5651 err = -errno;
5652 dout(0) << __func__ << " failed to open " << os.str()
5653 << ": " << cpp_strerror(err) << dendl;
5654 return err;
5655 }
5656
5657 err = bl.write_fd(fd);
5658 if (!err)
5659 ::fsync(fd);
5660 VOID_TEMP_FAILURE_RETRY(::close(fd));
5661
5662 return err;
5663}
5664
5665void Monitor::extract_save_mon_key(KeyRing& keyring)
5666{
5667 EntityName mon_name;
5668 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5669 EntityAuth mon_key;
5670 if (keyring.get_auth(mon_name, mon_key)) {
5671 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5672 KeyRing pkey;
5673 pkey.add(mon_name, mon_key);
5674 bufferlist bl;
5675 pkey.encode_plaintext(bl);
5676 write_default_keyring(bl);
5677 keyring.remove(mon_name);
5678 }
5679}
5680
5681bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5682 bool force_new)
5683{
5684 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5685 << dendl;
5686
5687 if (is_shutdown())
5688 return false;
5689
5690 // we only connect to other monitors and mgr; every else connects to us.
5691 if (service_id != CEPH_ENTITY_TYPE_MON &&
5692 service_id != CEPH_ENTITY_TYPE_MGR)
5693 return false;
5694
5695 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5696 // auth_none
5697 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5698 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5699 handler.set_global_id(0);
5700 *authorizer = handler.build_authorizer(service_id);
5701 return true;
5702 }
5703
5704 CephXServiceTicketInfo auth_ticket_info;
5705 CephXSessionAuthInfo info;
5706 int ret;
5707
5708 EntityName name;
5709 name.set_type(CEPH_ENTITY_TYPE_MON);
5710 auth_ticket_info.ticket.name = name;
5711 auth_ticket_info.ticket.global_id = 0;
5712
5713 if (service_id == CEPH_ENTITY_TYPE_MON) {
5714 // mon to mon authentication uses the private monitor shared key and not the
5715 // rotating key
5716 CryptoKey secret;
5717 if (!keyring.get_secret(name, secret) &&
5718 !key_server.get_secret(name, secret)) {
5719 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5720 << dendl;
5721 stringstream ss, ds;
5722 int err = key_server.list_secrets(ds);
5723 if (err < 0)
5724 ss << "no installed auth entries!";
5725 else
5726 ss << "installed auth entries:";
5727 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5728 return false;
5729 }
5730
5731 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5732 secret, (uint64_t)-1);
5733 if (ret < 0) {
5734 dout(0) << __func__ << " failed to build mon session_auth_info "
5735 << cpp_strerror(ret) << dendl;
5736 return false;
5737 }
5738 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5739 // mgr
5740 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5741 if (ret < 0) {
5742 derr << __func__ << " failed to build mgr service session_auth_info "
5743 << cpp_strerror(ret) << dendl;
5744 return false;
5745 }
5746 } else {
5747 ceph_abort(); // see check at top of fn
5748 }
5749
5750 CephXTicketBlob blob;
5751 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5752 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5753 return false;
5754 }
5755 bufferlist ticket_data;
5756 ::encode(blob, ticket_data);
5757
5758 bufferlist::iterator iter = ticket_data.begin();
5759 CephXTicketHandler handler(g_ceph_context, service_id);
5760 ::decode(handler.ticket, iter);
5761
5762 handler.session_key = info.session_key;
5763
5764 *authorizer = handler.build_authorizer(0);
5765
5766 return true;
5767}
5768
5769bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5770 int protocol, bufferlist& authorizer_data,
5771 bufferlist& authorizer_reply,
5772 bool& isvalid, CryptoKey& session_key)
5773{
5774 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5775 << " " << ceph_entity_type_name(peer_type)
5776 << " protocol " << protocol << dendl;
5777
5778 if (is_shutdown())
5779 return false;
5780
5781 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5782 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5783 // monitor, and cephx is enabled
5784 isvalid = false;
5785 if (protocol == CEPH_AUTH_CEPHX) {
5786 bufferlist::iterator iter = authorizer_data.begin();
5787 CephXServiceTicketInfo auth_ticket_info;
5788
5789 if (authorizer_data.length()) {
5790 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5791 auth_ticket_info, authorizer_reply);
5792 if (ret) {
5793 session_key = auth_ticket_info.session_key;
5794 isvalid = true;
5795 } else {
5796 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5797 }
5798 }
5799 } else {
5800 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5801 }
5802 } else {
5803 // who cares.
5804 isvalid = true;
5805 }
5806 return true;
5807}