]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Monitor.cc
update sources to v12.1.3
[ceph.git] / ceph / src / mon / Monitor.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include <sstream>
17#include <stdlib.h>
18#include <signal.h>
19#include <limits.h>
20#include <cstring>
21#include <boost/scope_exit.hpp>
22#include <boost/algorithm/string/predicate.hpp>
23
24#include "Monitor.h"
25#include "common/version.h"
26
27#include "osd/OSDMap.h"
28
29#include "MonitorDBStore.h"
30
31#include "messages/PaxosServiceMessage.h"
32#include "messages/MMonMap.h"
33#include "messages/MMonGetMap.h"
34#include "messages/MMonGetVersion.h"
35#include "messages/MMonGetVersionReply.h"
36#include "messages/MGenericMessage.h"
37#include "messages/MMonCommand.h"
38#include "messages/MMonCommandAck.h"
31f18b77 39#include "messages/MMonHealth.h"
7c673cae
FG
40#include "messages/MMonMetadata.h"
41#include "messages/MMonSync.h"
42#include "messages/MMonScrub.h"
43#include "messages/MMonProbe.h"
44#include "messages/MMonJoin.h"
45#include "messages/MMonPaxos.h"
46#include "messages/MRoute.h"
47#include "messages/MForward.h"
48
49#include "messages/MMonSubscribe.h"
50#include "messages/MMonSubscribeAck.h"
51
52#include "messages/MAuthReply.h"
53
54#include "messages/MTimeCheck.h"
7c673cae
FG
55#include "messages/MPing.h"
56
57#include "common/strtol.h"
58#include "common/ceph_argparse.h"
59#include "common/Timer.h"
60#include "common/Clock.h"
61#include "common/errno.h"
62#include "common/perf_counters.h"
63#include "common/admin_socket.h"
64#include "global/signal_handler.h"
65#include "common/Formatter.h"
66#include "include/stringify.h"
67#include "include/color.h"
68#include "include/ceph_fs.h"
69#include "include/str_list.h"
70
71#include "OSDMonitor.h"
72#include "MDSMonitor.h"
73#include "MonmapMonitor.h"
74#include "PGMonitor.h"
75#include "LogMonitor.h"
76#include "AuthMonitor.h"
77#include "MgrMonitor.h"
31f18b77 78#include "MgrStatMonitor.h"
7c673cae 79#include "mon/QuorumService.h"
224ce89b 80#include "mon/OldHealthMonitor.h"
7c673cae
FG
81#include "mon/HealthMonitor.h"
82#include "mon/ConfigKeyService.h"
83#include "common/config.h"
84#include "common/cmdparse.h"
85#include "include/assert.h"
86#include "include/compat.h"
87#include "perfglue/heap_profiler.h"
88
89#include "auth/none/AuthNoneClientHandler.h"
90
91#define dout_subsys ceph_subsys_mon
92#undef dout_prefix
93#define dout_prefix _prefix(_dout, this)
94static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
95 return *_dout << "mon." << mon->name << "@" << mon->rank
96 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
97}
98
99const string Monitor::MONITOR_NAME = "monitor";
100const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
101
102
103#undef FLAG
104#undef COMMAND
105#undef COMMAND_WITH_FLAG
7c673cae
FG
106#define FLAG(f) (MonCommand::FLAG_##f)
107#define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
108 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
109#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
110 {parsesig, helptext, modulename, req_perms, avail, flags},
d2e6a577 111MonCommand mon_commands[] = {
7c673cae 112#include <mon/MonCommands.h>
d2e6a577
FG
113};
114MonCommand pgmonitor_commands[] = {
115#include <mon/PGMonitorCommands.h>
116};
7c673cae
FG
117#undef COMMAND
118#undef COMMAND_WITH_FLAG
7c673cae
FG
119
120
7c673cae
FG
121void C_MonContext::finish(int r) {
122 if (mon->is_shutdown())
123 return;
124 FunctionContext::finish(r);
125}
126
127Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
128 Messenger *m, Messenger *mgr_m, MonMap *map) :
129 Dispatcher(cct_),
130 name(nm),
131 rank(-1),
132 messenger(m),
133 con_self(m ? m->get_loopback_connection() : NULL),
134 lock("Monitor::lock"),
135 timer(cct_, lock),
136 finisher(cct_, "mon_finisher", "fin"),
137 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
138 has_ever_joined(false),
139 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
140 monmap(map),
141 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
142 key_server(cct, &keyring),
143 auth_cluster_required(cct,
144 cct->_conf->auth_supported.empty() ?
145 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
146 auth_service_required(cct,
147 cct->_conf->auth_supported.empty() ?
148 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
7c673cae
FG
149 mgr_messenger(mgr_m),
150 mgr_client(cct_, mgr_m),
31f18b77 151 pgservice(nullptr),
7c673cae
FG
152 store(s),
153
154 state(STATE_PROBING),
155
156 elector(this),
157 required_features(0),
158 leader(0),
159 quorum_con_features(0),
160 // scrub
161 scrub_version(0),
162 scrub_event(NULL),
163 scrub_timeout_event(NULL),
164
165 // sync state
166 sync_provider_count(0),
167 sync_cookie(0),
168 sync_full(false),
169 sync_start_version(0),
170 sync_timeout_event(NULL),
171 sync_last_committed_floor(0),
172
173 timecheck_round(0),
174 timecheck_acks(0),
175 timecheck_rounds_since_clean(0),
176 timecheck_event(NULL),
177
178 paxos_service(PAXOS_NUM),
179 admin_hook(NULL),
180 routed_request_tid(0),
181 op_tracker(cct, true, 1)
182{
183 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
184 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
185
186 update_log_clients();
187
188 paxos = new Paxos(this, "paxos");
189
190 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
191 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
192 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
193 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
194 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
195 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
196 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
31f18b77 197 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
224ce89b 198 paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
7c673cae 199
224ce89b 200 health_monitor = new OldHealthMonitor(this);
7c673cae
FG
201 config_key_service = new ConfigKeyService(this, paxos);
202
203 mon_caps = new MonCap();
204 bool r = mon_caps->parse("allow *", NULL);
205 assert(r);
206
207 exited_quorum = ceph_clock_now();
208
d2e6a577
FG
209 // prepare local commands
210 local_mon_commands.resize(ARRAY_SIZE(mon_commands));
211 for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
212 local_mon_commands[i] = mon_commands[i];
213 }
214 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
215
216 local_upgrading_mon_commands = local_mon_commands;
217 for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
218 local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
219 }
220 MonCommand::encode_vector(local_upgrading_mon_commands,
221 local_upgrading_mon_commands_bl);
222
7c673cae
FG
223 // assume our commands until we have an election. this only means
224 // we won't reply with EINVAL before the election; any command that
225 // actually matters will wait until we have quorum etc and then
226 // retry (and revalidate).
d2e6a577 227 leader_mon_commands = local_mon_commands;
31f18b77
FG
228
229 // note: OSDMonitor may update this based on the luminous flag.
230 pgservice = mgrstatmon()->get_pg_stat_service();
7c673cae
FG
231}
232
7c673cae
FG
233Monitor::~Monitor()
234{
235 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
236 delete *p;
237 delete health_monitor;
238 delete config_key_service;
239 delete paxos;
240 assert(session_map.sessions.empty());
241 delete mon_caps;
7c673cae
FG
242}
243
244
245class AdminHook : public AdminSocketHook {
246 Monitor *mon;
247public:
248 explicit AdminHook(Monitor *m) : mon(m) {}
249 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
250 bufferlist& out) override {
251 stringstream ss;
252 mon->do_admin_command(command, cmdmap, format, ss);
253 out.append(ss);
254 return true;
255 }
256};
257
258void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
259 ostream& ss)
260{
261 Mutex::Locker l(lock);
262
263 boost::scoped_ptr<Formatter> f(Formatter::create(format));
264
265 string args;
266 for (cmdmap_t::iterator p = cmdmap.begin();
267 p != cmdmap.end(); ++p) {
268 if (p->first == "prefix")
269 continue;
270 if (!args.empty())
271 args += ", ";
272 args += cmd_vartype_stringify(p->second);
273 }
274 args = "[" + args + "]";
275
276 bool read_only = (command == "mon_status" ||
277 command == "mon metadata" ||
278 command == "quorum_status" ||
224ce89b
WB
279 command == "ops" ||
280 command == "sessions");
7c673cae
FG
281
282 (read_only ? audit_clog->debug() : audit_clog->info())
283 << "from='admin socket' entity='admin socket' "
284 << "cmd='" << command << "' args=" << args << ": dispatch";
285
286 if (command == "mon_status") {
287 get_mon_status(f.get(), ss);
288 if (f)
289 f->flush(ss);
290 } else if (command == "quorum_status") {
291 _quorum_status(f.get(), ss);
292 } else if (command == "sync_force") {
293 string validate;
294 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
295 (validate != "--yes-i-really-mean-it")) {
296 ss << "are you SURE? this will mean the monitor store will be erased "
297 "the next time the monitor is restarted. pass "
298 "'--yes-i-really-mean-it' if you really do.";
299 goto abort;
300 }
301 sync_force(f.get(), ss);
302 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
303 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
304 goto abort;
305 } else if (command == "quorum enter") {
306 elector.start_participating();
307 start_election();
308 ss << "started responding to quorum, initiated new election";
309 } else if (command == "quorum exit") {
310 start_election();
311 elector.stop_participating();
312 ss << "stopped responding to quorum, initiated new election";
313 } else if (command == "ops") {
314 (void)op_tracker.dump_ops_in_flight(f.get());
315 if (f) {
316 f->flush(ss);
317 }
224ce89b
WB
318 } else if (command == "sessions") {
319
320 if (f) {
321 f->open_array_section("sessions");
322 for (auto p : session_map.sessions) {
323 f->dump_stream("session") << *p;
324 }
325 f->close_section();
326 f->flush(ss);
327 }
328
7c673cae
FG
329 } else {
330 assert(0 == "bad AdminSocket command binding");
331 }
332 (read_only ? audit_clog->debug() : audit_clog->info())
333 << "from='admin socket' "
334 << "entity='admin socket' "
335 << "cmd=" << command << " "
336 << "args=" << args << ": finished";
337 return;
338
339abort:
340 (read_only ? audit_clog->debug() : audit_clog->info())
341 << "from='admin socket' "
342 << "entity='admin socket' "
343 << "cmd=" << command << " "
344 << "args=" << args << ": aborted";
345}
346
347void Monitor::handle_signal(int signum)
348{
349 assert(signum == SIGINT || signum == SIGTERM);
350 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
351 shutdown();
352}
353
354CompatSet Monitor::get_initial_supported_features()
355{
356 CompatSet::FeatureSet ceph_mon_feature_compat;
357 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
358 CompatSet::FeatureSet ceph_mon_feature_incompat;
359 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
360 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
361 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
362 ceph_mon_feature_incompat);
363}
364
365CompatSet Monitor::get_supported_features()
366{
367 CompatSet compat = get_initial_supported_features();
368 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
369 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
370 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
371 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
372 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
373 return compat;
374}
375
376CompatSet Monitor::get_legacy_features()
377{
378 CompatSet::FeatureSet ceph_mon_feature_compat;
379 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
380 CompatSet::FeatureSet ceph_mon_feature_incompat;
381 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
382 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
383 ceph_mon_feature_incompat);
384}
385
386int Monitor::check_features(MonitorDBStore *store)
387{
388 CompatSet required = get_supported_features();
389 CompatSet ondisk;
390
391 read_features_off_disk(store, &ondisk);
392
393 if (!required.writeable(ondisk)) {
394 CompatSet diff = required.unsupported(ondisk);
395 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
396 return -EPERM;
397 }
398
399 return 0;
400}
401
402void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
403{
404 bufferlist featuresbl;
405 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
406 if (featuresbl.length() == 0) {
407 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
408 << "Assuming it is old-style and introducing one." << dendl;
409 //we only want the baseline ~v.18 features assumed to be on disk.
410 //If new features are introduced this code needs to disappear or
411 //be made smarter.
412 *features = get_legacy_features();
413
414 features->encode(featuresbl);
415 auto t(std::make_shared<MonitorDBStore::Transaction>());
416 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
417 store->apply_transaction(t);
418 } else {
419 bufferlist::iterator it = featuresbl.begin();
420 features->decode(it);
421 }
422}
423
424void Monitor::read_features()
425{
426 read_features_off_disk(store, &features);
427 dout(10) << "features " << features << dendl;
428
429 calc_quorum_requirements();
430 dout(10) << "required_features " << required_features << dendl;
431}
432
433void Monitor::write_features(MonitorDBStore::TransactionRef t)
434{
435 bufferlist bl;
436 features.encode(bl);
437 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
438}
439
440const char** Monitor::get_tracked_conf_keys() const
441{
442 static const char* KEYS[] = {
443 "crushtool", // helpful for testing
444 "mon_election_timeout",
445 "mon_lease",
446 "mon_lease_renew_interval_factor",
447 "mon_lease_ack_timeout_factor",
448 "mon_accept_timeout_factor",
449 // clog & admin clog
450 "clog_to_monitors",
451 "clog_to_syslog",
452 "clog_to_syslog_facility",
453 "clog_to_syslog_level",
454 "clog_to_graylog",
455 "clog_to_graylog_host",
456 "clog_to_graylog_port",
457 "host",
458 "fsid",
459 // periodic health to clog
460 "mon_health_to_clog",
461 "mon_health_to_clog_interval",
462 "mon_health_to_clog_tick_interval",
463 // scrub interval
464 "mon_scrub_interval",
465 NULL
466 };
467 return KEYS;
468}
469
470void Monitor::handle_conf_change(const struct md_config_t *conf,
471 const std::set<std::string> &changed)
472{
473 sanitize_options();
474
475 dout(10) << __func__ << " " << changed << dendl;
476
477 if (changed.count("clog_to_monitors") ||
478 changed.count("clog_to_syslog") ||
479 changed.count("clog_to_syslog_level") ||
480 changed.count("clog_to_syslog_facility") ||
481 changed.count("clog_to_graylog") ||
482 changed.count("clog_to_graylog_host") ||
483 changed.count("clog_to_graylog_port") ||
484 changed.count("host") ||
485 changed.count("fsid")) {
486 update_log_clients();
487 }
488
489 if (changed.count("mon_health_to_clog") ||
490 changed.count("mon_health_to_clog_interval") ||
491 changed.count("mon_health_to_clog_tick_interval")) {
492 health_to_clog_update_conf(changed);
493 }
494
495 if (changed.count("mon_scrub_interval")) {
496 scrub_update_interval(conf->mon_scrub_interval);
497 }
498}
499
500void Monitor::update_log_clients()
501{
502 map<string,string> log_to_monitors;
503 map<string,string> log_to_syslog;
504 map<string,string> log_channel;
505 map<string,string> log_prio;
506 map<string,string> log_to_graylog;
507 map<string,string> log_to_graylog_host;
508 map<string,string> log_to_graylog_port;
509 uuid_d fsid;
510 string host;
511
512 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
513 log_channel, log_prio, log_to_graylog,
514 log_to_graylog_host, log_to_graylog_port,
515 fsid, host))
516 return;
517
518 clog->update_config(log_to_monitors, log_to_syslog,
519 log_channel, log_prio, log_to_graylog,
520 log_to_graylog_host, log_to_graylog_port,
521 fsid, host);
522
523 audit_clog->update_config(log_to_monitors, log_to_syslog,
524 log_channel, log_prio, log_to_graylog,
525 log_to_graylog_host, log_to_graylog_port,
526 fsid, host);
527}
528
529int Monitor::sanitize_options()
530{
531 int r = 0;
532
533 // mon_lease must be greater than mon_lease_renewal; otherwise we
534 // may incur in leases expiring before they are renewed.
535 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
536 clog->error() << "mon_lease_renew_interval_factor ("
537 << g_conf->mon_lease_renew_interval_factor
538 << ") must be less than 1.0";
539 r = -EINVAL;
540 }
541
542 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
543 // got time to renew the lease and get an ack for it. Having both options
544 // with the same value, for a given small vale, could mean timing out if
545 // the monitors happened to be overloaded -- or even under normal load for
546 // a small enough value.
547 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
548 clog->error() << "mon_lease_ack_timeout_factor ("
549 << g_conf->mon_lease_ack_timeout_factor
550 << ") must be greater than 1.0";
551 r = -EINVAL;
552 }
553
554 return r;
555}
556
557int Monitor::preinit()
558{
559 lock.Lock();
560
561 dout(1) << "preinit fsid " << monmap->fsid << dendl;
562
563 int r = sanitize_options();
564 if (r < 0) {
565 derr << "option sanitization failed!" << dendl;
566 lock.Unlock();
567 return r;
568 }
569
570 assert(!logger);
571 {
572 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
573 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess");
574 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd");
575 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm");
576 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions");
577 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in");
578 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started");
579 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won");
580 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost");
581 logger = pcb.create_perf_counters();
582 cct->get_perfcounters_collection()->add(logger);
583 }
584
585 assert(!cluster_logger);
586 {
587 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
588 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
589 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
590 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
591 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
592 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
593 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
594 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
595 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
596 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
597 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
598 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
599 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
600 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
601 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
602 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
603 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
604 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
605 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
606 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
607 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
608 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
609 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
610 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
611 cluster_logger = pcb.create_perf_counters();
612 }
613
614 paxos->init_logger();
615
616 // verify cluster_uuid
617 {
618 int r = check_fsid();
619 if (r == -ENOENT)
620 r = write_fsid();
621 if (r < 0) {
622 lock.Unlock();
623 return r;
624 }
625 }
626
627 // open compatset
628 read_features();
629
630 // have we ever joined a quorum?
631 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
632 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
633
634 if (!has_ever_joined) {
635 // impose initial quorum restrictions?
636 list<string> initial_members;
637 get_str_list(g_conf->mon_initial_members, initial_members);
638
639 if (!initial_members.empty()) {
640 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
641
642 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
643 &extra_probe_peers);
644
645 dout(10) << " monmap is " << *monmap << dendl;
646 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
647 }
648 } else if (!monmap->contains(name)) {
649 derr << "not in monmap and have been in a quorum before; "
650 << "must have been removed" << dendl;
651 if (g_conf->mon_force_quorum_join) {
652 dout(0) << "we should have died but "
653 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
654 } else {
655 derr << "commit suicide!" << dendl;
656 lock.Unlock();
657 return -ENOENT;
658 }
659 }
660
661 {
662 // We have a potentially inconsistent store state in hands. Get rid of it
663 // and start fresh.
664 bool clear_store = false;
665 if (store->exists("mon_sync", "in_sync")) {
666 dout(1) << __func__ << " clean up potentially inconsistent store state"
667 << dendl;
668 clear_store = true;
669 }
670
671 if (store->get("mon_sync", "force_sync") > 0) {
672 dout(1) << __func__ << " force sync by clearing store state" << dendl;
673 clear_store = true;
674 }
675
676 if (clear_store) {
677 set<string> sync_prefixes = get_sync_targets_names();
678 store->clear(sync_prefixes);
679 }
680 }
681
682 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
683 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
684
685 init_paxos();
686 health_monitor->init();
687
688 if (is_keyring_required()) {
689 // we need to bootstrap authentication keys so we can form an
690 // initial quorum.
691 if (authmon()->get_last_committed() == 0) {
692 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
693 bufferlist bl;
694 int err = store->get("mkfs", "keyring", bl);
695 if (err == 0 && bl.length() > 0) {
696 // Attempt to decode and extract keyring only if it is found.
697 KeyRing keyring;
698 bufferlist::iterator p = bl.begin();
699 ::decode(keyring, p);
700 extract_save_mon_key(keyring);
701 }
702 }
703
704 string keyring_loc = g_conf->mon_data + "/keyring";
705
706 r = keyring.load(cct, keyring_loc);
707 if (r < 0) {
708 EntityName mon_name;
709 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
710 EntityAuth mon_key;
711 if (key_server.get_auth(mon_name, mon_key)) {
712 dout(1) << "copying mon. key from old db to external keyring" << dendl;
713 keyring.add(mon_name, mon_key);
714 bufferlist bl;
715 keyring.encode_plaintext(bl);
716 write_default_keyring(bl);
717 } else {
718 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
719 lock.Unlock();
720 return r;
721 }
722 }
723 }
724
725 admin_hook = new AdminHook(this);
726 AdminSocket* admin_socket = cct->get_admin_socket();
727
728 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
729 lock.Unlock();
730 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
731 "show current monitor status");
732 assert(r == 0);
733 r = admin_socket->register_command("quorum_status", "quorum_status",
734 admin_hook, "show current quorum status");
735 assert(r == 0);
736 r = admin_socket->register_command("sync_force",
737 "sync_force name=validate,"
738 "type=CephChoices,"
739 "strings=--yes-i-really-mean-it",
740 admin_hook,
741 "force sync of and clear monitor store");
742 assert(r == 0);
743 r = admin_socket->register_command("add_bootstrap_peer_hint",
744 "add_bootstrap_peer_hint name=addr,"
745 "type=CephIPAddr",
746 admin_hook,
747 "add peer address as potential bootstrap"
748 " peer for cluster bringup");
749 assert(r == 0);
750 r = admin_socket->register_command("quorum enter", "quorum enter",
751 admin_hook,
752 "force monitor back into quorum");
753 assert(r == 0);
754 r = admin_socket->register_command("quorum exit", "quorum exit",
755 admin_hook,
756 "force monitor out of the quorum");
757 assert(r == 0);
758 r = admin_socket->register_command("ops",
759 "ops",
760 admin_hook,
761 "show the ops currently in flight");
762 assert(r == 0);
224ce89b
WB
763 r = admin_socket->register_command("sessions",
764 "sessions",
765 admin_hook,
766 "list existing sessions");
767 assert(r == 0);
7c673cae
FG
768
769 lock.Lock();
770
771 // add ourselves as a conf observer
772 g_conf->add_observer(this);
773
774 lock.Unlock();
775 return 0;
776}
777
778int Monitor::init()
779{
780 dout(2) << "init" << dendl;
781 Mutex::Locker l(lock);
782
783 finisher.start();
784
785 // start ticker
786 timer.init();
787 new_tick();
788
789 cpu_tp.start();
790
791 // i'm ready!
792 messenger->add_dispatcher_tail(this);
793
794 mgr_client.init();
795 mgr_messenger->add_dispatcher_tail(&mgr_client);
796 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
797
798 bootstrap();
7c673cae
FG
799 return 0;
800}
801
802void Monitor::init_paxos()
803{
804 dout(10) << __func__ << dendl;
805 paxos->init();
806
807 // init services
808 for (int i = 0; i < PAXOS_NUM; ++i) {
809 paxos_service[i]->init();
810 }
811
812 refresh_from_paxos(NULL);
813}
814
815void Monitor::refresh_from_paxos(bool *need_bootstrap)
816{
817 dout(10) << __func__ << dendl;
818
819 bufferlist bl;
820 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
821 if (r >= 0) {
822 try {
823 bufferlist::iterator p = bl.begin();
824 ::decode(fingerprint, p);
825 }
826 catch (buffer::error& e) {
827 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
828 }
829 } else {
830 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
831 }
832
833 for (int i = 0; i < PAXOS_NUM; ++i) {
834 paxos_service[i]->refresh(need_bootstrap);
835 }
836 for (int i = 0; i < PAXOS_NUM; ++i) {
837 paxos_service[i]->post_refresh();
838 }
224ce89b 839 load_metadata();
7c673cae
FG
840}
841
842void Monitor::register_cluster_logger()
843{
844 if (!cluster_logger_registered) {
845 dout(10) << "register_cluster_logger" << dendl;
846 cluster_logger_registered = true;
847 cct->get_perfcounters_collection()->add(cluster_logger);
848 } else {
849 dout(10) << "register_cluster_logger - already registered" << dendl;
850 }
851}
852
853void Monitor::unregister_cluster_logger()
854{
855 if (cluster_logger_registered) {
856 dout(10) << "unregister_cluster_logger" << dendl;
857 cluster_logger_registered = false;
858 cct->get_perfcounters_collection()->remove(cluster_logger);
859 } else {
860 dout(10) << "unregister_cluster_logger - not registered" << dendl;
861 }
862}
863
864void Monitor::update_logger()
865{
866 cluster_logger->set(l_cluster_num_mon, monmap->size());
867 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
868}
869
870void Monitor::shutdown()
871{
872 dout(1) << "shutdown" << dendl;
873
874 lock.Lock();
875
876 wait_for_paxos_write();
877
878 state = STATE_SHUTDOWN;
879
880 g_conf->remove_observer(this);
881
882 if (admin_hook) {
883 AdminSocket* admin_socket = cct->get_admin_socket();
884 admin_socket->unregister_command("mon_status");
885 admin_socket->unregister_command("quorum_status");
886 admin_socket->unregister_command("sync_force");
887 admin_socket->unregister_command("add_bootstrap_peer_hint");
888 admin_socket->unregister_command("quorum enter");
889 admin_socket->unregister_command("quorum exit");
890 admin_socket->unregister_command("ops");
224ce89b 891 admin_socket->unregister_command("sessions");
7c673cae
FG
892 delete admin_hook;
893 admin_hook = NULL;
894 }
895
896 elector.shutdown();
897
898 mgr_client.shutdown();
899
900 lock.Unlock();
901 finisher.wait_for_empty();
902 finisher.stop();
903 lock.Lock();
904
905 // clean up
906 paxos->shutdown();
907 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
908 (*p)->shutdown();
909 health_monitor->shutdown();
910
911 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
912 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
913
914 timer.shutdown();
915
916 cpu_tp.stop();
917
918 remove_all_sessions();
919
920 if (logger) {
921 cct->get_perfcounters_collection()->remove(logger);
922 delete logger;
923 logger = NULL;
924 }
925 if (cluster_logger) {
926 if (cluster_logger_registered)
927 cct->get_perfcounters_collection()->remove(cluster_logger);
928 delete cluster_logger;
929 cluster_logger = NULL;
930 }
931
932 log_client.shutdown();
933
934 // unlock before msgr shutdown...
935 lock.Unlock();
936
937 messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
938 mgr_messenger->shutdown();
939}
940
941void Monitor::wait_for_paxos_write()
942{
943 if (paxos->is_writing() || paxos->is_writing_previous()) {
944 dout(10) << __func__ << " flushing pending write" << dendl;
945 lock.Unlock();
946 store->flush();
947 lock.Lock();
948 dout(10) << __func__ << " flushed pending write" << dendl;
949 }
950}
951
952void Monitor::bootstrap()
953{
954 dout(10) << "bootstrap" << dendl;
955 wait_for_paxos_write();
956
957 sync_reset_requester();
958 unregister_cluster_logger();
959 cancel_probe_timeout();
960
961 // note my rank
962 int newrank = monmap->get_rank(messenger->get_myaddr());
963 if (newrank < 0 && rank >= 0) {
964 // was i ever part of the quorum?
965 if (has_ever_joined) {
966 dout(0) << " removed from monmap, suicide." << dendl;
967 exit(0);
968 }
969 }
970 if (newrank != rank) {
971 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
972 messenger->set_myname(entity_name_t::MON(newrank));
973 rank = newrank;
974
975 // reset all connections, or else our peers will think we are someone else.
976 messenger->mark_down_all();
977 }
978
979 // reset
980 state = STATE_PROBING;
981
982 _reset();
983
984 // sync store
985 if (g_conf->mon_compact_on_bootstrap) {
986 dout(10) << "bootstrap -- triggering compaction" << dendl;
987 store->compact();
988 dout(10) << "bootstrap -- finished compaction" << dendl;
989 }
990
991 // singleton monitor?
992 if (monmap->size() == 1 && rank == 0) {
993 win_standalone_election();
994 return;
995 }
996
997 reset_probe_timeout();
998
999 // i'm outside the quorum
1000 if (monmap->contains(name))
1001 outside_quorum.insert(name);
1002
1003 // probe monitors
1004 dout(10) << "probing other monitors" << dendl;
1005 for (unsigned i = 0; i < monmap->size(); i++) {
1006 if ((int)i != rank)
1007 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1008 monmap->get_inst(i));
1009 }
1010 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1011 p != extra_probe_peers.end();
1012 ++p) {
1013 if (*p != messenger->get_myaddr()) {
1014 entity_inst_t i;
1015 i.name = entity_name_t::MON(-1);
1016 i.addr = *p;
1017 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1018 }
1019 }
1020}
1021
1022bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1023{
1024 string addrstr;
1025 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1026 ss << "unable to parse address string value '"
1027 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1028 return false;
1029 }
1030 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1031 << addrstr << "'" << dendl;
1032
1033 entity_addr_t addr;
1034 const char *end = 0;
1035 if (!addr.parse(addrstr.c_str(), &end)) {
1036 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1037 return false;
1038 }
1039
1040 if (is_leader() || is_peon()) {
1041 ss << "mon already active; ignoring bootstrap hint";
1042 return true;
1043 }
1044
1045 if (addr.get_port() == 0)
1046 addr.set_port(CEPH_MON_PORT);
1047
1048 extra_probe_peers.insert(addr);
1049 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1050 return true;
1051}
1052
1053// called by bootstrap(), or on leader|peon -> electing
1054void Monitor::_reset()
1055{
1056 dout(10) << __func__ << dendl;
1057
1058 cancel_probe_timeout();
1059 timecheck_finish();
1060 health_events_cleanup();
1061 scrub_event_cancel();
1062
1063 leader_since = utime_t();
1064 if (!quorum.empty()) {
1065 exited_quorum = ceph_clock_now();
1066 }
1067 quorum.clear();
1068 outside_quorum.clear();
31f18b77 1069 quorum_feature_map.clear();
7c673cae
FG
1070
1071 scrub_reset();
1072
1073 paxos->restart();
1074
1075 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1076 (*p)->restart();
1077 health_monitor->finish();
1078}
1079
1080
1081// -----------------------------------------------------------
1082// sync
1083
1084set<string> Monitor::get_sync_targets_names()
1085{
1086 set<string> targets;
1087 targets.insert(paxos->get_name());
1088 for (int i = 0; i < PAXOS_NUM; ++i)
1089 paxos_service[i]->get_store_prefixes(targets);
1090 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1091 assert(config_key_service_ptr);
1092 config_key_service_ptr->get_store_prefixes(targets);
1093 return targets;
1094}
1095
1096
1097void Monitor::sync_timeout()
1098{
1099 dout(10) << __func__ << dendl;
1100 assert(state == STATE_SYNCHRONIZING);
1101 bootstrap();
1102}
1103
1104void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1105{
1106 dout(1) << __func__ << dendl;
1107
1108 MonMap latest_monmap;
1109
1110 // Grab latest monmap from MonmapMonitor
1111 bufferlist monmon_bl;
1112 int err = monmon()->get_monmap(monmon_bl);
1113 if (err < 0) {
1114 if (err != -ENOENT) {
1115 derr << __func__
1116 << " something wrong happened while reading the store: "
1117 << cpp_strerror(err) << dendl;
1118 assert(0 == "error reading the store");
1119 }
1120 } else {
1121 latest_monmap.decode(monmon_bl);
1122 }
1123
1124 // Grab last backed up monmap (if any) and compare epochs
1125 if (store->exists("mon_sync", "latest_monmap")) {
1126 bufferlist backup_bl;
1127 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1128 if (err < 0) {
1129 derr << __func__
1130 << " something wrong happened while reading the store: "
1131 << cpp_strerror(err) << dendl;
1132 assert(0 == "error reading the store");
1133 }
1134 assert(backup_bl.length() > 0);
1135
1136 MonMap backup_monmap;
1137 backup_monmap.decode(backup_bl);
1138
1139 if (backup_monmap.epoch > latest_monmap.epoch)
1140 latest_monmap = backup_monmap;
1141 }
1142
1143 // Check if our current monmap's epoch is greater than the one we've
1144 // got so far.
1145 if (monmap->epoch > latest_monmap.epoch)
1146 latest_monmap = *monmap;
1147
1148 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1149
1150 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1151}
1152
1153void Monitor::sync_reset_requester()
1154{
1155 dout(10) << __func__ << dendl;
1156
1157 if (sync_timeout_event) {
1158 timer.cancel_event(sync_timeout_event);
1159 sync_timeout_event = NULL;
1160 }
1161
1162 sync_provider = entity_inst_t();
1163 sync_cookie = 0;
1164 sync_full = false;
1165 sync_start_version = 0;
1166}
1167
1168void Monitor::sync_reset_provider()
1169{
1170 dout(10) << __func__ << dendl;
1171 sync_providers.clear();
1172}
1173
1174void Monitor::sync_start(entity_inst_t &other, bool full)
1175{
1176 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1177
1178 assert(state == STATE_PROBING ||
1179 state == STATE_SYNCHRONIZING);
1180 state = STATE_SYNCHRONIZING;
1181
1182 // make sure are not a provider for anyone!
1183 sync_reset_provider();
1184
1185 sync_full = full;
1186
1187 if (sync_full) {
1188 // stash key state, and mark that we are syncing
1189 auto t(std::make_shared<MonitorDBStore::Transaction>());
1190 sync_stash_critical_state(t);
1191 t->put("mon_sync", "in_sync", 1);
1192
1193 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1194 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1195 << sync_last_committed_floor << dendl;
1196 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1197
1198 store->apply_transaction(t);
1199
1200 assert(g_conf->mon_sync_requester_kill_at != 1);
1201
1202 // clear the underlying store
1203 set<string> targets = get_sync_targets_names();
1204 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1205 store->clear(targets);
1206
1207 // make sure paxos knows it has been reset. this prevents a
1208 // bootstrap and then different probe reply order from possibly
1209 // deciding a partial or no sync is needed.
1210 paxos->init();
1211
1212 assert(g_conf->mon_sync_requester_kill_at != 2);
1213 }
1214
1215 // assume 'other' as the leader. We will update the leader once we receive
1216 // a reply to the sync start.
1217 sync_provider = other;
1218
1219 sync_reset_timeout();
1220
1221 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1222 if (!sync_full)
1223 m->last_committed = paxos->get_version();
1224 messenger->send_message(m, sync_provider);
1225}
1226
1227void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1228{
1229 dout(10) << __func__ << dendl;
1230 bufferlist backup_monmap;
1231 sync_obtain_latest_monmap(backup_monmap);
1232 assert(backup_monmap.length() > 0);
1233 t->put("mon_sync", "latest_monmap", backup_monmap);
1234}
1235
1236void Monitor::sync_reset_timeout()
1237{
1238 dout(10) << __func__ << dendl;
1239 if (sync_timeout_event)
1240 timer.cancel_event(sync_timeout_event);
1241 sync_timeout_event = new C_MonContext(this, [this](int) {
1242 sync_timeout();
1243 });
1244 timer.add_event_after(g_conf->mon_sync_timeout, sync_timeout_event);
1245}
1246
1247void Monitor::sync_finish(version_t last_committed)
1248{
1249 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1250
1251 assert(g_conf->mon_sync_requester_kill_at != 7);
1252
1253 if (sync_full) {
1254 // finalize the paxos commits
1255 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1256 paxos->read_and_prepare_transactions(tx, sync_start_version,
1257 last_committed);
1258 tx->put(paxos->get_name(), "last_committed", last_committed);
1259
1260 dout(30) << __func__ << " final tx dump:\n";
1261 JSONFormatter f(true);
1262 tx->dump(&f);
1263 f.flush(*_dout);
1264 *_dout << dendl;
1265
1266 store->apply_transaction(tx);
1267 }
1268
1269 assert(g_conf->mon_sync_requester_kill_at != 8);
1270
1271 auto t(std::make_shared<MonitorDBStore::Transaction>());
1272 t->erase("mon_sync", "in_sync");
1273 t->erase("mon_sync", "force_sync");
1274 t->erase("mon_sync", "last_committed_floor");
1275 store->apply_transaction(t);
1276
1277 assert(g_conf->mon_sync_requester_kill_at != 9);
1278
1279 init_paxos();
1280
1281 assert(g_conf->mon_sync_requester_kill_at != 10);
1282
1283 bootstrap();
1284}
1285
1286void Monitor::handle_sync(MonOpRequestRef op)
1287{
1288 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1289 dout(10) << __func__ << " " << *m << dendl;
1290 switch (m->op) {
1291
1292 // provider ---------
1293
1294 case MMonSync::OP_GET_COOKIE_FULL:
1295 case MMonSync::OP_GET_COOKIE_RECENT:
1296 handle_sync_get_cookie(op);
1297 break;
1298 case MMonSync::OP_GET_CHUNK:
1299 handle_sync_get_chunk(op);
1300 break;
1301
1302 // client -----------
1303
1304 case MMonSync::OP_COOKIE:
1305 handle_sync_cookie(op);
1306 break;
1307
1308 case MMonSync::OP_CHUNK:
1309 case MMonSync::OP_LAST_CHUNK:
1310 handle_sync_chunk(op);
1311 break;
1312 case MMonSync::OP_NO_COOKIE:
1313 handle_sync_no_cookie(op);
1314 break;
1315
1316 default:
1317 dout(0) << __func__ << " unknown op " << m->op << dendl;
1318 assert(0 == "unknown op");
1319 }
1320}
1321
1322// leader
1323
1324void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1325{
1326 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1327 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1328 m->get_connection()->send_message(reply);
1329}
1330
1331void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1332{
1333 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1334 if (is_synchronizing()) {
1335 _sync_reply_no_cookie(op);
1336 return;
1337 }
1338
1339 assert(g_conf->mon_sync_provider_kill_at != 1);
1340
1341 // make sure they can understand us.
1342 if ((required_features ^ m->get_connection()->get_features()) &
1343 required_features) {
1344 dout(5) << " ignoring peer mon." << m->get_source().num()
1345 << " has features " << std::hex
1346 << m->get_connection()->get_features()
1347 << " but we require " << required_features << std::dec << dendl;
1348 return;
1349 }
1350
1351 // make up a unique cookie. include election epoch (which persists
1352 // across restarts for the whole cluster) and a counter for this
1353 // process instance. there is no need to be unique *across*
1354 // monitors, though.
1355 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1356 assert(sync_providers.count(cookie) == 0);
1357
1358 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1359
1360 SyncProvider& sp = sync_providers[cookie];
1361 sp.cookie = cookie;
1362 sp.entity = m->get_source_inst();
1363 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1364
1365 set<string> sync_targets;
1366 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1367 // full scan
1368 sync_targets = get_sync_targets_names();
1369 sp.last_committed = paxos->get_version();
1370 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1371 sp.full = true;
1372 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1373 } else {
1374 // just catch up paxos
1375 sp.last_committed = m->last_committed;
1376 }
1377 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1378
1379 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1380 reply->last_committed = sp.last_committed;
1381 m->get_connection()->send_message(reply);
1382}
1383
1384void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1385{
1386 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1387 dout(10) << __func__ << " " << *m << dendl;
1388
1389 if (sync_providers.count(m->cookie) == 0) {
1390 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1391 _sync_reply_no_cookie(op);
1392 return;
1393 }
1394
1395 assert(g_conf->mon_sync_provider_kill_at != 2);
1396
1397 SyncProvider& sp = sync_providers[m->cookie];
1398 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1399
1400 if (sp.last_committed < paxos->get_first_committed() &&
1401 paxos->get_first_committed() > 1) {
1402 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1403 << " < our fc " << paxos->get_first_committed() << dendl;
1404 sync_providers.erase(m->cookie);
1405 _sync_reply_no_cookie(op);
1406 return;
1407 }
1408
1409 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1410 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1411
1412 int left = g_conf->mon_sync_max_payload_size;
1413 while (sp.last_committed < paxos->get_version() && left > 0) {
1414 bufferlist bl;
1415 sp.last_committed++;
224ce89b
WB
1416
1417 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1418 assert(err == 0);
1419
7c673cae
FG
1420 tx->put(paxos->get_name(), sp.last_committed, bl);
1421 left -= bl.length();
1422 dout(20) << __func__ << " including paxos state " << sp.last_committed
1423 << dendl;
1424 }
1425 reply->last_committed = sp.last_committed;
1426
1427 if (sp.full && left > 0) {
1428 sp.synchronizer->get_chunk_tx(tx, left);
1429 sp.last_key = sp.synchronizer->get_last_key();
1430 reply->last_key = sp.last_key;
1431 }
1432
1433 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1434 sp.last_committed < paxos->get_version()) {
1435 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1436 << " key " << sp.last_key << dendl;
1437 } else {
1438 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1439 << " key " << sp.last_key << dendl;
1440 reply->op = MMonSync::OP_LAST_CHUNK;
1441
1442 assert(g_conf->mon_sync_provider_kill_at != 3);
1443
1444 // clean up our local state
1445 sync_providers.erase(sp.cookie);
1446 }
1447
1448 ::encode(*tx, reply->chunk_bl);
1449
1450 m->get_connection()->send_message(reply);
1451}
1452
1453// requester
1454
1455void Monitor::handle_sync_cookie(MonOpRequestRef op)
1456{
1457 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1458 dout(10) << __func__ << " " << *m << dendl;
1459 if (sync_cookie) {
1460 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1461 return;
1462 }
1463 if (m->get_source_inst() != sync_provider) {
1464 dout(10) << __func__ << " source does not match, discarding" << dendl;
1465 return;
1466 }
1467 sync_cookie = m->cookie;
1468 sync_start_version = m->last_committed;
1469
1470 sync_reset_timeout();
1471 sync_get_next_chunk();
1472
1473 assert(g_conf->mon_sync_requester_kill_at != 3);
1474}
1475
1476void Monitor::sync_get_next_chunk()
1477{
1478 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1479 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1480 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1481 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1482 }
1483 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1484 messenger->send_message(r, sync_provider);
1485
1486 assert(g_conf->mon_sync_requester_kill_at != 4);
1487}
1488
1489void Monitor::handle_sync_chunk(MonOpRequestRef op)
1490{
1491 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1492 dout(10) << __func__ << " " << *m << dendl;
1493
1494 if (m->cookie != sync_cookie) {
1495 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1496 return;
1497 }
1498 if (m->get_source_inst() != sync_provider) {
1499 dout(10) << __func__ << " source does not match, discarding" << dendl;
1500 return;
1501 }
1502
1503 assert(state == STATE_SYNCHRONIZING);
1504 assert(g_conf->mon_sync_requester_kill_at != 5);
1505
1506 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1507 tx->append_from_encoded(m->chunk_bl);
1508
1509 dout(30) << __func__ << " tx dump:\n";
1510 JSONFormatter f(true);
1511 tx->dump(&f);
1512 f.flush(*_dout);
1513 *_dout << dendl;
1514
1515 store->apply_transaction(tx);
1516
1517 assert(g_conf->mon_sync_requester_kill_at != 6);
1518
1519 if (!sync_full) {
1520 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1521 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1522 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1523 m->last_committed);
1524 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1525
1526 dout(30) << __func__ << " tx dump:\n";
1527 JSONFormatter f(true);
1528 tx->dump(&f);
1529 f.flush(*_dout);
1530 *_dout << dendl;
1531
1532 store->apply_transaction(tx);
1533 paxos->init(); // to refresh what we just wrote
1534 }
1535
1536 if (m->op == MMonSync::OP_CHUNK) {
1537 sync_reset_timeout();
1538 sync_get_next_chunk();
1539 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1540 sync_finish(m->last_committed);
1541 }
1542}
1543
1544void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1545{
1546 dout(10) << __func__ << dendl;
1547 bootstrap();
1548}
1549
1550void Monitor::sync_trim_providers()
1551{
1552 dout(20) << __func__ << dendl;
1553
1554 utime_t now = ceph_clock_now();
1555 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1556 while (p != sync_providers.end()) {
1557 if (now > p->second.timeout) {
1558 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1559 sync_providers.erase(p++);
1560 } else {
1561 ++p;
1562 }
1563 }
1564}
1565
1566// ---------------------------------------------------
1567// probe
1568
1569void Monitor::cancel_probe_timeout()
1570{
1571 if (probe_timeout_event) {
1572 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1573 timer.cancel_event(probe_timeout_event);
1574 probe_timeout_event = NULL;
1575 } else {
1576 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1577 }
1578}
1579
1580void Monitor::reset_probe_timeout()
1581{
1582 cancel_probe_timeout();
1583 probe_timeout_event = new C_MonContext(this, [this](int r) {
1584 probe_timeout(r);
1585 });
1586 double t = g_conf->mon_probe_timeout;
1587 timer.add_event_after(t, probe_timeout_event);
1588 dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
1589}
1590
1591void Monitor::probe_timeout(int r)
1592{
1593 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1594 assert(is_probing() || is_synchronizing());
1595 assert(probe_timeout_event);
1596 probe_timeout_event = NULL;
1597 bootstrap();
1598}
1599
1600void Monitor::handle_probe(MonOpRequestRef op)
1601{
1602 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1603 dout(10) << "handle_probe " << *m << dendl;
1604
1605 if (m->fsid != monmap->fsid) {
1606 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1607 return;
1608 }
1609
1610 switch (m->op) {
1611 case MMonProbe::OP_PROBE:
1612 handle_probe_probe(op);
1613 break;
1614
1615 case MMonProbe::OP_REPLY:
1616 handle_probe_reply(op);
1617 break;
1618
1619 case MMonProbe::OP_MISSING_FEATURES:
1620 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1621 << ", required " << m->required_features
1622 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1623 << dendl;
1624 break;
1625 }
1626}
1627
1628void Monitor::handle_probe_probe(MonOpRequestRef op)
1629{
1630 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1631
1632 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1633 << " features " << m->get_connection()->get_features() << dendl;
1634 uint64_t missing = required_features & ~m->get_connection()->get_features();
1635 if (missing) {
1636 dout(1) << " peer " << m->get_source_addr() << " missing features "
1637 << missing << dendl;
1638 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1639 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1640 name, has_ever_joined);
1641 m->required_features = required_features;
1642 m->get_connection()->send_message(r);
1643 }
1644 goto out;
1645 }
1646
1647 if (!is_probing() && !is_synchronizing()) {
1648 // If the probing mon is way ahead of us, we need to re-bootstrap.
1649 // Normally we capture this case when we initially bootstrap, but
1650 // it is possible we pass those checks (we overlap with
1651 // quorum-to-be) but fail to join a quorum before it moves past
1652 // us. We need to be kicked back to bootstrap so we can
1653 // synchonize, not keep calling elections.
1654 if (paxos->get_version() + 1 < m->paxos_first_version) {
1655 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1656 << "ahead of us, re-bootstrapping" << dendl;
1657 bootstrap();
1658 goto out;
1659
1660 }
1661 }
1662
1663 MMonProbe *r;
1664 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1665 r->name = name;
1666 r->quorum = quorum;
1667 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1668 r->paxos_first_version = paxos->get_first_committed();
1669 r->paxos_last_version = paxos->get_version();
1670 m->get_connection()->send_message(r);
1671
1672 // did we discover a peer here?
1673 if (!monmap->contains(m->get_source_addr())) {
1674 dout(1) << " adding peer " << m->get_source_addr()
1675 << " to list of hints" << dendl;
1676 extra_probe_peers.insert(m->get_source_addr());
1677 }
1678
1679 out:
1680 return;
1681}
1682
1683void Monitor::handle_probe_reply(MonOpRequestRef op)
1684{
1685 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1686 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1687 dout(10) << " monmap is " << *monmap << dendl;
1688
1689 // discover name and addrs during probing or electing states.
1690 if (!is_probing() && !is_electing()) {
1691 return;
1692 }
1693
1694 // newer map, or they've joined a quorum and we haven't?
1695 bufferlist mybl;
1696 monmap->encode(mybl, m->get_connection()->get_features());
1697 // make sure it's actually different; the checks below err toward
1698 // taking the other guy's map, which could cause us to loop.
1699 if (!mybl.contents_equal(m->monmap_bl)) {
1700 MonMap *newmap = new MonMap;
1701 newmap->decode(m->monmap_bl);
1702 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1703 !has_ever_joined)) {
1704 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1705 << ", mine was " << monmap->get_epoch() << dendl;
1706 delete newmap;
1707 monmap->decode(m->monmap_bl);
1708
1709 bootstrap();
1710 return;
1711 }
1712 delete newmap;
1713 }
1714
1715 // rename peer?
1716 string peer_name = monmap->get_name(m->get_source_addr());
1717 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1718 dout(10) << " renaming peer " << m->get_source_addr() << " "
1719 << peer_name << " -> " << m->name << " in my monmap"
1720 << dendl;
1721 monmap->rename(peer_name, m->name);
1722
1723 if (is_electing()) {
1724 bootstrap();
1725 return;
1726 }
1727 } else {
1728 dout(10) << " peer name is " << peer_name << dendl;
1729 }
1730
1731 // new initial peer?
1732 if (monmap->get_epoch() == 0 &&
1733 monmap->contains(m->name) &&
1734 monmap->get_addr(m->name).is_blank_ip()) {
1735 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1736 monmap->set_addr(m->name, m->get_source_addr());
1737
1738 bootstrap();
1739 return;
1740 }
1741
1742 // end discover phase
1743 if (!is_probing()) {
1744 return;
1745 }
1746
1747 assert(paxos != NULL);
1748
1749 if (is_synchronizing()) {
1750 dout(10) << " currently syncing" << dendl;
1751 return;
1752 }
1753
1754 entity_inst_t other = m->get_source_inst();
1755
1756 if (m->paxos_last_version < sync_last_committed_floor) {
1757 dout(10) << " peer paxos versions [" << m->paxos_first_version
1758 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1759 << sync_last_committed_floor << ", ignoring"
1760 << dendl;
1761 } else {
1762 if (paxos->get_version() < m->paxos_first_version &&
1763 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1764 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1765 << "," << m->paxos_last_version << "]"
1766 << " vs my version " << paxos->get_version()
1767 << " (too far ahead)"
1768 << dendl;
1769 cancel_probe_timeout();
1770 sync_start(other, true);
1771 return;
1772 }
1773 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1774 dout(10) << " peer paxos last version " << m->paxos_last_version
1775 << " vs my version " << paxos->get_version()
1776 << " (too far ahead)"
1777 << dendl;
1778 cancel_probe_timeout();
1779 sync_start(other, false);
1780 return;
1781 }
1782 }
1783
1784 // is there an existing quorum?
1785 if (m->quorum.size()) {
1786 dout(10) << " existing quorum " << m->quorum << dendl;
1787
1788 dout(10) << " peer paxos version " << m->paxos_last_version
1789 << " vs my version " << paxos->get_version()
1790 << " (ok)"
1791 << dendl;
1792
1793 if (monmap->contains(name) &&
1794 !monmap->get_addr(name).is_blank_ip()) {
1795 // i'm part of the cluster; just initiate a new election
1796 start_election();
1797 } else {
1798 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1799 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1800 monmap->get_inst(*m->quorum.begin()));
1801 }
1802 } else {
1803 if (monmap->contains(m->name)) {
1804 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1805 outside_quorum.insert(m->name);
1806 } else {
1807 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1808 return;
1809 }
1810
1811 unsigned need = monmap->size() / 2 + 1;
1812 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1813 if (outside_quorum.size() >= need) {
1814 if (outside_quorum.count(name)) {
1815 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1816 start_election();
1817 } else {
1818 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1819 }
1820 } else {
1821 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1822 }
1823 }
1824}
1825
1826void Monitor::join_election()
1827{
1828 dout(10) << __func__ << dendl;
1829 wait_for_paxos_write();
1830 _reset();
1831 state = STATE_ELECTING;
1832
1833 logger->inc(l_mon_num_elections);
1834}
1835
1836void Monitor::start_election()
1837{
1838 dout(10) << "start_election" << dendl;
1839 wait_for_paxos_write();
1840 _reset();
1841 state = STATE_ELECTING;
1842
1843 logger->inc(l_mon_num_elections);
1844 logger->inc(l_mon_election_call);
1845
1846 clog->info() << "mon." << name << " calling new monitor election";
1847 elector.call_election();
1848}
1849
1850void Monitor::win_standalone_election()
1851{
1852 dout(1) << "win_standalone_election" << dendl;
1853
1854 // bump election epoch, in case the previous epoch included other
1855 // monitors; we need to be able to make the distinction.
1856 elector.init();
1857 elector.advance_epoch();
1858
1859 rank = monmap->get_rank(name);
1860 assert(rank == 0);
1861 set<int> q;
1862 q.insert(rank);
1863
224ce89b
WB
1864 map<int,Metadata> metadata;
1865 collect_metadata(&metadata[0]);
1866
7c673cae
FG
1867 win_election(elector.get_epoch(), q,
1868 CEPH_FEATURES_ALL,
1869 ceph::features::mon::get_supported(),
d2e6a577 1870 metadata);
7c673cae
FG
1871}
1872
1873const utime_t& Monitor::get_leader_since() const
1874{
1875 assert(state == STATE_LEADER);
1876 return leader_since;
1877}
1878
1879epoch_t Monitor::get_epoch()
1880{
1881 return elector.get_epoch();
1882}
1883
1884void Monitor::_finish_svc_election()
1885{
1886 assert(state == STATE_LEADER || state == STATE_PEON);
1887
1888 for (auto p : paxos_service) {
1889 // we already called election_finished() on monmon(); avoid callig twice
1890 if (state == STATE_LEADER && p == monmon())
1891 continue;
1892 p->election_finished();
1893 }
1894}
1895
1896void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1897 const mon_feature_t& mon_features,
d2e6a577 1898 const map<int,Metadata>& metadata)
7c673cae
FG
1899{
1900 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1901 << " features " << features
1902 << " mon_features " << mon_features
1903 << dendl;
1904 assert(is_electing());
1905 state = STATE_LEADER;
1906 leader_since = ceph_clock_now();
1907 leader = rank;
1908 quorum = active;
1909 quorum_con_features = features;
1910 quorum_mon_features = mon_features;
224ce89b 1911 pending_metadata = metadata;
7c673cae
FG
1912 outside_quorum.clear();
1913
1914 clog->info() << "mon." << name << "@" << rank
1915 << " won leader election with quorum " << quorum;
1916
d2e6a577 1917 set_leader_commands(get_local_commands(mon_features));
7c673cae
FG
1918
1919 paxos->leader_init();
1920 // NOTE: tell monmap monitor first. This is important for the
1921 // bootstrap case to ensure that the very first paxos proposal
1922 // codifies the monmap. Otherwise any manner of chaos can ensue
1923 // when monitors are call elections or participating in a paxos
1924 // round without agreeing on who the participants are.
1925 monmon()->election_finished();
1926 _finish_svc_election();
1927 health_monitor->start(epoch);
1928
1929 logger->inc(l_mon_election_win);
1930
224ce89b
WB
1931 // inject new metadata in first transaction.
1932 {
1933 // include previous metadata for missing mons (that aren't part of
1934 // the current quorum).
1935 map<int,Metadata> m = metadata;
1936 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1937 if (m.count(rank) == 0 &&
1938 mon_metadata.count(rank)) {
1939 m[rank] = mon_metadata[rank];
1940 }
1941 }
1942
1943 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1944 // a new transaction immediately after the election finishes. We should
1945 // do that anyway for other reasons, though.
1946 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1947 bufferlist bl;
1948 ::encode(m, bl);
1949 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1950 }
1951
7c673cae
FG
1952 finish_election();
1953 if (monmap->size() > 1 &&
1954 monmap->get_epoch() > 0) {
1955 timecheck_start();
1956 health_tick_start();
1957 do_health_to_clog_interval();
1958 scrub_event_start();
1959 }
7c673cae
FG
1960}
1961
1962void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1963 uint64_t features,
1964 const mon_feature_t& mon_features)
1965{
1966 state = STATE_PEON;
1967 leader_since = utime_t();
1968 leader = l;
1969 quorum = q;
1970 outside_quorum.clear();
1971 quorum_con_features = features;
1972 quorum_mon_features = mon_features;
1973 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
1974 << " quorum is " << quorum << " features are " << quorum_con_features
1975 << " mon_features are " << quorum_mon_features
1976 << dendl;
1977
1978 paxos->peon_init();
1979 _finish_svc_election();
1980 health_monitor->start(epoch);
1981
1982 logger->inc(l_mon_election_lose);
1983
1984 finish_election();
1985
224ce89b
WB
1986 if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
1987 !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
1988 // for pre-luminous mons only
7c673cae 1989 Metadata sys_info;
224ce89b 1990 collect_metadata(&sys_info);
7c673cae
FG
1991 messenger->send_message(new MMonMetadata(sys_info),
1992 monmap->get_inst(get_leader()));
1993 }
1994}
1995
224ce89b
WB
1996void Monitor::collect_metadata(Metadata *m)
1997{
1998 collect_sys_info(m, g_ceph_context);
1999 (*m)["addr"] = stringify(messenger->get_myaddr());
2000}
2001
7c673cae
FG
2002void Monitor::finish_election()
2003{
2004 apply_quorum_to_compatset_features();
2005 apply_monmap_to_compatset_features();
2006 timecheck_finish();
2007 exited_quorum = utime_t();
2008 finish_contexts(g_ceph_context, waitfor_quorum);
2009 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2010 resend_routed_requests();
2011 update_logger();
2012 register_cluster_logger();
2013
2014 // am i named properly?
2015 string cur_name = monmap->get_name(messenger->get_myaddr());
2016 if (cur_name != name) {
2017 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2018 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2019 monmap->get_inst(*quorum.begin()));
2020 }
2021}
2022
2023void Monitor::_apply_compatset_features(CompatSet &new_features)
2024{
2025 if (new_features.compare(features) != 0) {
2026 CompatSet diff = features.unsupported(new_features);
2027 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2028 features = new_features;
2029
2030 auto t = std::make_shared<MonitorDBStore::Transaction>();
2031 write_features(t);
2032 store->apply_transaction(t);
2033
2034 calc_quorum_requirements();
2035 }
2036}
2037
2038void Monitor::apply_quorum_to_compatset_features()
2039{
2040 CompatSet new_features(features);
2041 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2042 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2043 }
2044 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2045 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2046 }
2047 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2048 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2049 }
2050 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2051 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2052 }
2053 dout(5) << __func__ << dendl;
2054 _apply_compatset_features(new_features);
2055}
2056
2057void Monitor::apply_monmap_to_compatset_features()
2058{
2059 CompatSet new_features(features);
2060 mon_feature_t monmap_features = monmap->get_required_features();
2061
2062 /* persistent monmap features may go into the compatset.
2063 * optional monmap features may not - why?
2064 * because optional monmap features may be set/unset by the admin,
2065 * and possibly by other means that haven't yet been thought out,
2066 * so we can't make the monitor enforce them on start - because they
2067 * may go away.
2068 * this, of course, does not invalidate setting a compatset feature
2069 * for an optional feature - as long as you make sure to clean it up
2070 * once you unset it.
2071 */
2072 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2073 assert(ceph::features::mon::get_persistent().contains_all(
2074 ceph::features::mon::FEATURE_KRAKEN));
2075 // this feature should only ever be set if the quorum supports it.
2076 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2077 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2078 }
2079
2080 dout(5) << __func__ << dendl;
2081 _apply_compatset_features(new_features);
2082}
2083
2084void Monitor::calc_quorum_requirements()
2085{
2086 required_features = 0;
2087
2088 // compatset
2089 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2090 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2091 }
2092 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2093 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2094 }
2095 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2096 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2097 }
2098 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2099 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2100 }
2101 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2102 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2103 }
2104
2105 // monmap
2106 if (monmap->get_required_features().contains_all(
2107 ceph::features::mon::FEATURE_KRAKEN)) {
2108 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2109 }
2110 if (monmap->get_required_features().contains_all(
2111 ceph::features::mon::FEATURE_LUMINOUS)) {
2112 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2113 }
2114 dout(10) << __func__ << " required_features " << required_features << dendl;
2115}
2116
31f18b77
FG
2117void Monitor::get_combined_feature_map(FeatureMap *fm)
2118{
2119 *fm += session_map.feature_map;
2120 for (auto id : quorum) {
2121 if (id != rank) {
2122 *fm += quorum_feature_map[id];
2123 }
2124 }
2125}
2126
7c673cae
FG
2127void Monitor::sync_force(Formatter *f, ostream& ss)
2128{
2129 bool free_formatter = false;
2130
2131 if (!f) {
2132 // louzy/lazy hack: default to json if no formatter has been defined
2133 f = new JSONFormatter();
2134 free_formatter = true;
2135 }
2136
2137 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2138 sync_stash_critical_state(tx);
2139 tx->put("mon_sync", "force_sync", 1);
2140 store->apply_transaction(tx);
2141
2142 f->open_object_section("sync_force");
2143 f->dump_int("ret", 0);
2144 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2145 f->close_section(); // sync_force
2146 f->flush(ss);
2147 if (free_formatter)
2148 delete f;
2149}
2150
2151void Monitor::_quorum_status(Formatter *f, ostream& ss)
2152{
2153 bool free_formatter = false;
2154
2155 if (!f) {
2156 // louzy/lazy hack: default to json if no formatter has been defined
2157 f = new JSONFormatter();
2158 free_formatter = true;
2159 }
2160 f->open_object_section("quorum_status");
2161 f->dump_int("election_epoch", get_epoch());
2162
2163 f->open_array_section("quorum");
2164 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2165 f->dump_int("mon", *p);
2166 f->close_section(); // quorum
2167
2168 list<string> quorum_names = get_quorum_names();
2169 f->open_array_section("quorum_names");
2170 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2171 f->dump_string("mon", *p);
2172 f->close_section(); // quorum_names
2173
2174 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2175
2176 f->open_object_section("monmap");
2177 monmap->dump(f);
2178 f->close_section(); // monmap
2179
2180 f->close_section(); // quorum_status
2181 f->flush(ss);
2182 if (free_formatter)
2183 delete f;
2184}
2185
2186void Monitor::get_mon_status(Formatter *f, ostream& ss)
2187{
2188 bool free_formatter = false;
2189
2190 if (!f) {
2191 // louzy/lazy hack: default to json if no formatter has been defined
2192 f = new JSONFormatter();
2193 free_formatter = true;
2194 }
2195
2196 f->open_object_section("mon_status");
2197 f->dump_string("name", name);
2198 f->dump_int("rank", rank);
2199 f->dump_string("state", get_state_name());
2200 f->dump_int("election_epoch", get_epoch());
2201
2202 f->open_array_section("quorum");
2203 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2204 f->dump_int("mon", *p);
2205 }
2206
2207 f->close_section(); // quorum
2208
2209 f->open_object_section("features");
2210 f->dump_stream("required_con") << required_features;
2211 mon_feature_t req_mon_features = get_required_mon_features();
2212 req_mon_features.dump(f, "required_mon");
2213 f->dump_stream("quorum_con") << quorum_con_features;
2214 quorum_mon_features.dump(f, "quorum_mon");
2215 f->close_section(); // features
2216
2217 f->open_array_section("outside_quorum");
2218 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2219 f->dump_string("mon", *p);
2220 f->close_section(); // outside_quorum
2221
2222 f->open_array_section("extra_probe_peers");
2223 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2224 p != extra_probe_peers.end();
2225 ++p)
2226 f->dump_stream("peer") << *p;
2227 f->close_section(); // extra_probe_peers
2228
2229 f->open_array_section("sync_provider");
2230 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2231 p != sync_providers.end();
2232 ++p) {
2233 f->dump_unsigned("cookie", p->second.cookie);
2234 f->dump_stream("entity") << p->second.entity;
2235 f->dump_stream("timeout") << p->second.timeout;
2236 f->dump_unsigned("last_committed", p->second.last_committed);
2237 f->dump_stream("last_key") << p->second.last_key;
2238 }
2239 f->close_section();
2240
2241 if (is_synchronizing()) {
2242 f->open_object_section("sync");
2243 f->dump_stream("sync_provider") << sync_provider;
2244 f->dump_unsigned("sync_cookie", sync_cookie);
2245 f->dump_unsigned("sync_start_version", sync_start_version);
2246 f->close_section();
2247 }
2248
2249 if (g_conf->mon_sync_provider_kill_at > 0)
2250 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2251 if (g_conf->mon_sync_requester_kill_at > 0)
2252 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2253
2254 f->open_object_section("monmap");
2255 monmap->dump(f);
2256 f->close_section();
2257
31f18b77 2258 f->dump_object("feature_map", session_map.feature_map);
7c673cae
FG
2259 f->close_section(); // mon_status
2260
2261 if (free_formatter) {
2262 // flush formatter to ss and delete it iff we created the formatter
2263 f->flush(ss);
2264 delete f;
2265 }
2266}
2267
2268
2269// health status to clog
2270
2271void Monitor::health_tick_start()
2272{
2273 if (!cct->_conf->mon_health_to_clog ||
2274 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2275 return;
2276
2277 dout(15) << __func__ << dendl;
2278
2279 health_tick_stop();
2280 health_tick_event = new C_MonContext(this, [this](int r) {
2281 if (r < 0)
2282 return;
2283 do_health_to_clog();
2284 health_tick_start();
2285 });
2286 timer.add_event_after(cct->_conf->mon_health_to_clog_tick_interval,
2287 health_tick_event);
2288}
2289
2290void Monitor::health_tick_stop()
2291{
2292 dout(15) << __func__ << dendl;
2293
2294 if (health_tick_event) {
2295 timer.cancel_event(health_tick_event);
2296 health_tick_event = NULL;
2297 }
2298}
2299
2300utime_t Monitor::health_interval_calc_next_update()
2301{
2302 utime_t now = ceph_clock_now();
2303
2304 time_t secs = now.sec();
2305 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2306 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2307 utime_t next = utime_t(secs + adjustment, 0);
2308
2309 dout(20) << __func__
2310 << " now: " << now << ","
2311 << " next: " << next << ","
2312 << " interval: " << cct->_conf->mon_health_to_clog_interval
2313 << dendl;
2314
2315 return next;
2316}
2317
2318void Monitor::health_interval_start()
2319{
2320 dout(15) << __func__ << dendl;
2321
2322 if (!cct->_conf->mon_health_to_clog ||
2323 cct->_conf->mon_health_to_clog_interval <= 0) {
2324 return;
2325 }
2326
2327 health_interval_stop();
2328 utime_t next = health_interval_calc_next_update();
2329 health_interval_event = new C_MonContext(this, [this](int r) {
2330 if (r < 0)
2331 return;
2332 do_health_to_clog_interval();
2333 });
2334 timer.add_event_at(next, health_interval_event);
2335}
2336
2337void Monitor::health_interval_stop()
2338{
2339 dout(15) << __func__ << dendl;
2340 if (health_interval_event) {
2341 timer.cancel_event(health_interval_event);
2342 }
2343 health_interval_event = NULL;
2344}
2345
2346void Monitor::health_events_cleanup()
2347{
2348 health_tick_stop();
2349 health_interval_stop();
2350 health_status_cache.reset();
2351}
2352
2353void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2354{
2355 dout(20) << __func__ << dendl;
2356
2357 if (changed.count("mon_health_to_clog")) {
2358 if (!cct->_conf->mon_health_to_clog) {
2359 health_events_cleanup();
2360 } else {
2361 if (!health_tick_event) {
2362 health_tick_start();
2363 }
2364 if (!health_interval_event) {
2365 health_interval_start();
2366 }
2367 }
2368 }
2369
2370 if (changed.count("mon_health_to_clog_interval")) {
2371 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2372 health_interval_stop();
2373 } else {
2374 health_interval_start();
2375 }
2376 }
2377
2378 if (changed.count("mon_health_to_clog_tick_interval")) {
2379 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2380 health_tick_stop();
2381 } else {
2382 health_tick_start();
2383 }
2384 }
2385}
2386
2387void Monitor::do_health_to_clog_interval()
2388{
2389 // outputting to clog may have been disabled in the conf
2390 // since we were scheduled.
2391 if (!cct->_conf->mon_health_to_clog ||
2392 cct->_conf->mon_health_to_clog_interval <= 0)
2393 return;
2394
2395 dout(10) << __func__ << dendl;
2396
2397 // do we have a cached value for next_clog_update? if not,
2398 // do we know when the last update was?
2399
2400 do_health_to_clog(true);
2401 health_interval_start();
2402}
2403
2404void Monitor::do_health_to_clog(bool force)
2405{
2406 // outputting to clog may have been disabled in the conf
2407 // since we were scheduled.
2408 if (!cct->_conf->mon_health_to_clog ||
2409 cct->_conf->mon_health_to_clog_interval <= 0)
2410 return;
2411
2412 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2413
224ce89b
WB
2414 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2415 string summary;
2416 health_status_t level = get_health_status(false, nullptr, &summary);
2417 if (!force &&
2418 summary == health_status_cache.summary &&
2419 level == health_status_cache.overall)
2420 return;
2421 if (level == HEALTH_OK)
2422 clog->info() << "overall " << summary;
2423 else if (level == HEALTH_WARN)
2424 clog->warn() << "overall " << summary;
2425 else if (level == HEALTH_ERR)
2426 clog->error() << "overall " << summary;
2427 else
2428 ceph_abort();
2429 health_status_cache.summary = summary;
2430 health_status_cache.overall = level;
2431 } else {
2432 // for jewel only
2433 list<string> status;
2434 health_status_t overall = get_health(status, NULL, NULL);
2435 dout(25) << __func__
2436 << (force ? " (force)" : "")
2437 << dendl;
2438
2439 string summary = joinify(status.begin(), status.end(), string("; "));
7c673cae 2440
224ce89b
WB
2441 if (!force &&
2442 overall == health_status_cache.overall &&
2443 !health_status_cache.summary.empty() &&
2444 health_status_cache.summary == summary) {
2445 // we got a dup!
2446 return;
2447 }
2448
2449 clog->info() << summary;
2450
2451 health_status_cache.overall = overall;
2452 health_status_cache.summary = summary;
2453 }
2454}
2455
2456health_status_t Monitor::get_health_status(
2457 bool want_detail,
2458 Formatter *f,
2459 std::string *plain,
2460 const char *sep1,
2461 const char *sep2)
2462{
2463 health_status_t r = HEALTH_OK;
2464 bool compat = g_conf->mon_health_preluminous_compat;
d2e6a577 2465 bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
224ce89b
WB
2466 if (f) {
2467 f->open_object_section("health");
2468 f->open_object_section("checks");
2469 }
7c673cae 2470
224ce89b
WB
2471 string summary;
2472 string *psummary = f ? nullptr : &summary;
2473 for (auto& svc : paxos_service) {
2474 r = std::min(r, svc->get_health_checks().dump_summary(
2475 f, psummary, sep2, want_detail));
2476 }
7c673cae 2477
224ce89b
WB
2478 if (f) {
2479 f->close_section();
2480 f->dump_stream("status") << r;
2481 } else {
2482 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2483 *plain = stringify(r);
2484 if (summary.size()) {
2485 *plain += sep1;
2486 *plain += summary;
2487 }
2488 *plain += "\n";
2489 }
2490
d2e6a577
FG
2491 if (f && (compat || compat_warn)) {
2492 health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
2493 if (compat) {
2494 f->open_array_section("summary");
2495 if (compat_warn) {
2496 f->open_object_section("item");
2497 f->dump_stream("severity") << HEALTH_WARN;
2498 f->dump_string("summary", "'ceph health' JSON format has changed in luminous; update your health monitoring scripts");
2499 f->close_section();
2500 }
2501 for (auto& svc : paxos_service) {
2502 svc->get_health_checks().dump_summary_compat(f);
2503 }
2504 f->close_section();
224ce89b 2505 }
d2e6a577 2506 f->dump_stream("overall_status") << cr;
224ce89b
WB
2507 }
2508
2509 if (want_detail) {
d2e6a577 2510 if (f && (compat || compat_warn)) {
224ce89b 2511 f->open_array_section("detail");
d2e6a577
FG
2512 if (compat_warn) {
2513 f->dump_string("item", "'ceph health' JSON format has changed in luminous. If you see this your monitoring system is scraping the wrong fields. Disable this with 'mon health preluminous compat warning = false'");
2514 }
224ce89b
WB
2515 }
2516
2517 for (auto& svc : paxos_service) {
2518 svc->get_health_checks().dump_detail(f, plain, compat);
2519 }
2520
d2e6a577 2521 if (f && (compat || compat_warn)) {
224ce89b
WB
2522 f->close_section();
2523 }
2524 }
2525 if (f) {
2526 f->close_section();
2527 }
2528 return r;
2529}
2530
2531void Monitor::log_health(
2532 const health_check_map_t& updated,
2533 const health_check_map_t& previous,
2534 MonitorDBStore::TransactionRef t)
2535{
2536 if (!g_conf->mon_health_to_clog) {
7c673cae
FG
2537 return;
2538 }
224ce89b
WB
2539 // FIXME: log atomically as part of @t instead of using clog.
2540 dout(10) << __func__ << " updated " << updated.checks.size()
2541 << " previous " << previous.checks.size()
2542 << dendl;
2543 for (auto& p : updated.checks) {
2544 auto q = previous.checks.find(p.first);
2545 if (q == previous.checks.end()) {
2546 // new
2547 ostringstream ss;
2548 ss << "Health check failed: " << p.second.summary << " ("
2549 << p.first << ")";
2550 if (p.second.severity == HEALTH_WARN)
2551 clog->warn() << ss.str();
2552 else
2553 clog->error() << ss.str();
2554 } else {
2555 if (p.second.summary != q->second.summary ||
2556 p.second.severity != q->second.severity) {
2557 // summary or severity changed (ignore detail changes at this level)
2558 ostringstream ss;
2559 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2560 if (p.second.severity == HEALTH_WARN)
2561 clog->warn() << ss.str();
2562 else
2563 clog->error() << ss.str();
2564 }
2565 }
2566 }
2567 for (auto& p : previous.checks) {
2568 if (!updated.checks.count(p.first)) {
2569 // cleared
2570 ostringstream ss;
2571 if (p.first == "DEGRADED_OBJECTS") {
2572 clog->info() << "All degraded objects recovered";
2573 } else if (p.first == "OSD_FLAGS") {
2574 clog->info() << "OSD flags cleared";
2575 } else {
2576 clog->info() << "Health check cleared: " << p.first << " (was: "
2577 << p.second.summary << ")";
2578 }
2579 }
2580 }
7c673cae 2581
224ce89b
WB
2582 if (previous.checks.size() && updated.checks.size() == 0) {
2583 // We might be going into a fully healthy state, check
2584 // other subsystems
2585 bool any_checks = false;
2586 for (auto& svc : paxos_service) {
2587 if (&(svc->get_health_checks()) == &(previous)) {
2588 // Ignore the ones we're clearing right now
2589 continue;
2590 }
7c673cae 2591
224ce89b
WB
2592 if (svc->get_health_checks().checks.size() > 0) {
2593 any_checks = true;
2594 break;
2595 }
2596 }
2597 if (!any_checks) {
2598 clog->info() << "Cluster is now healthy";
2599 }
2600 }
7c673cae
FG
2601}
2602
2603health_status_t Monitor::get_health(list<string>& status,
2604 bufferlist *detailbl,
2605 Formatter *f)
2606{
2607 list<pair<health_status_t,string> > summary;
2608 list<pair<health_status_t,string> > detail;
2609
2610 if (f)
2611 f->open_object_section("health");
2612
2613 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2614 p != paxos_service.end();
2615 ++p) {
2616 PaxosService *s = *p;
2617 s->get_health(summary, detailbl ? &detail : NULL, cct);
2618 }
2619
224ce89b 2620 health_monitor->get_health(summary, (detailbl ? &detail : NULL));
7c673cae
FG
2621
2622 health_status_t overall = HEALTH_OK;
2623 if (!timecheck_skews.empty()) {
2624 list<string> warns;
7c673cae
FG
2625 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2626 i != timecheck_skews.end(); ++i) {
2627 entity_inst_t inst = i->first;
2628 double skew = i->second;
2629 double latency = timecheck_latencies[inst];
2630 string name = monmap->get_name(inst.addr);
7c673cae
FG
2631 ostringstream tcss;
2632 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2633 if (tcstatus != HEALTH_OK) {
2634 if (overall > tcstatus)
2635 overall = tcstatus;
2636 warns.push_back(name);
7c673cae
FG
2637 ostringstream tmp_ss;
2638 tmp_ss << "mon." << name
2639 << " addr " << inst.addr << " " << tcss.str()
2640 << " (latency " << latency << "s)";
2641 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2642 }
7c673cae
FG
2643 }
2644 if (!warns.empty()) {
2645 ostringstream ss;
2646 ss << "clock skew detected on";
2647 while (!warns.empty()) {
2648 ss << " mon." << warns.front();
2649 warns.pop_front();
2650 if (!warns.empty())
2651 ss << ",";
2652 }
2653 status.push_back(ss.str());
2654 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2655 }
7c673cae 2656 }
7c673cae
FG
2657
2658 if (f)
2659 f->open_array_section("summary");
2660 if (!summary.empty()) {
2661 while (!summary.empty()) {
2662 if (overall > summary.front().first)
2663 overall = summary.front().first;
2664 status.push_back(summary.front().second);
2665 if (f) {
2666 f->open_object_section("item");
2667 f->dump_stream("severity") << summary.front().first;
2668 f->dump_string("summary", summary.front().second);
2669 f->close_section();
2670 }
2671 summary.pop_front();
2672 }
2673 }
2674 if (f)
2675 f->close_section();
2676
2677 stringstream fss;
2678 fss << overall;
2679 status.push_front(fss.str());
2680 if (f)
2681 f->dump_stream("overall_status") << overall;
2682
2683 if (f)
2684 f->open_array_section("detail");
2685 while (!detail.empty()) {
2686 if (f)
2687 f->dump_string("item", detail.front().second);
2688 else if (detailbl != NULL) {
2689 detailbl->append(detail.front().second);
2690 detailbl->append('\n');
2691 }
2692 detail.pop_front();
2693 }
2694 if (f)
2695 f->close_section();
2696
2697 if (f)
2698 f->close_section();
2699
2700 return overall;
2701}
2702
2703void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2704{
2705 if (f)
2706 f->open_object_section("status");
2707
7c673cae
FG
2708 if (f) {
2709 f->dump_stream("fsid") << monmap->get_fsid();
224ce89b 2710 get_health_status(false, f, nullptr);
7c673cae
FG
2711 f->dump_unsigned("election_epoch", get_epoch());
2712 {
2713 f->open_array_section("quorum");
2714 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2715 f->dump_int("rank", *p);
2716 f->close_section();
2717 f->open_array_section("quorum_names");
2718 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2719 f->dump_string("id", monmap->get_name(*p));
2720 f->close_section();
2721 }
2722 f->open_object_section("monmap");
2723 monmap->dump(f);
2724 f->close_section();
2725 f->open_object_section("osdmap");
224ce89b 2726 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
7c673cae
FG
2727 f->close_section();
2728 f->open_object_section("pgmap");
31f18b77 2729 pgservice->print_summary(f, NULL);
7c673cae
FG
2730 f->close_section();
2731 f->open_object_section("fsmap");
2732 mdsmon()->get_fsmap().print_summary(f, NULL);
2733 f->close_section();
7c673cae
FG
2734 f->open_object_section("mgrmap");
2735 mgrmon()->get_map().print_summary(f, nullptr);
2736 f->close_section();
224ce89b
WB
2737
2738 f->dump_object("servicemap", mgrstatmon()->get_service_map());
7c673cae
FG
2739 f->close_section();
2740 } else {
31f18b77
FG
2741 ss << " cluster:\n";
2742 ss << " id: " << monmap->get_fsid() << "\n";
224ce89b
WB
2743
2744 string health;
2745 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2746 get_health_status(false, nullptr, &health,
2747 "\n ", "\n ");
2748 } else {
2749 list<string> ls;
2750 get_health(ls, NULL, f);
2751 health = joinify(ls.begin(), ls.end(),
2752 string("\n "));
2753 }
2754 ss << " health: " << health << "\n";
2755
31f18b77 2756 ss << "\n \n services:\n";
224ce89b
WB
2757 {
2758 size_t maxlen = 3;
2759 auto& service_map = mgrstatmon()->get_service_map();
2760 for (auto& p : service_map.services) {
2761 maxlen = std::max(maxlen, p.first.size());
2762 }
2763 string spacing(maxlen - 3, ' ');
2764 const auto quorum_names = get_quorum_names();
2765 const auto mon_count = monmap->mon_info.size();
2766 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2767 << quorum_names;
2768 if (quorum_names.size() != mon_count) {
2769 std::list<std::string> out_of_q;
2770 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2771 if (quorum.count(i) == 0) {
2772 out_of_q.push_back(monmap->ranks[i]);
2773 }
2774 }
2775 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2776 out_of_q.end(), std::string(", "));
31f18b77 2777 }
7c673cae 2778 ss << "\n";
224ce89b
WB
2779 if (mgrmon()->in_use()) {
2780 ss << " mgr: " << spacing;
2781 mgrmon()->get_map().print_summary(nullptr, &ss);
2782 ss << "\n";
2783 }
2784 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2785 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2786 }
2787 ss << " osd: " << spacing;
2788 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2789 ss << "\n";
2790 for (auto& p : service_map.services) {
2791 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2792 << p.second.get_summary() << "\n";
2793 }
7c673cae 2794 }
31f18b77
FG
2795
2796 ss << "\n \n data:\n";
2797 pgservice->print_summary(NULL, &ss);
2798 ss << "\n ";
7c673cae
FG
2799 }
2800}
2801
2802void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2803 map<string,string> &param_str_map)
2804{
2805 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2806 p != cmdmap.end(); ++p) {
2807 if (p->first == "prefix")
2808 continue;
2809 if (p->first == "caps") {
2810 vector<string> cv;
2811 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2812 cv.size() % 2 == 0) {
2813 for (unsigned i = 0; i < cv.size(); i += 2) {
2814 string k = string("caps_") + cv[i];
2815 param_str_map[k] = cv[i + 1];
2816 }
2817 continue;
2818 }
2819 }
2820 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2821 }
2822}
2823
c07f9fc5
FG
2824const MonCommand *Monitor::_get_moncommand(
2825 const string &cmd_prefix,
d2e6a577
FG
2826 const vector<MonCommand>& cmds)
2827{
2828 for (auto& c : cmds) {
2829 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2830 return &c;
7c673cae
FG
2831 }
2832 }
d2e6a577 2833 return nullptr;
7c673cae
FG
2834}
2835
2836bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2837 const map<string,cmd_vartype>& cmdmap,
2838 const map<string,string>& param_str_map,
2839 const MonCommand *this_cmd) {
2840
2841 bool cmd_r = this_cmd->requires_perm('r');
2842 bool cmd_w = this_cmd->requires_perm('w');
2843 bool cmd_x = this_cmd->requires_perm('x');
2844
2845 bool capable = s->caps.is_capable(
2846 g_ceph_context,
2847 CEPH_ENTITY_TYPE_MON,
2848 s->entity_name,
2849 module, prefix, param_str_map,
2850 cmd_r, cmd_w, cmd_x);
2851
2852 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2853 return capable;
2854}
2855
c07f9fc5 2856void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
7c673cae
FG
2857 Formatter *f,
2858 bufferlist *rdata,
2859 bool hide_mgr_flag)
2860{
2861 int cmdnum = 0;
2862 f->open_object_section("command_descriptions");
c07f9fc5
FG
2863 for (const auto &cmd : commands) {
2864 unsigned flags = cmd.flags;
7c673cae
FG
2865 if (hide_mgr_flag) {
2866 flags &= ~MonCommand::FLAG_MGR;
2867 }
2868 ostringstream secname;
2869 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2870 dump_cmddesc_to_json(f, secname.str(),
c07f9fc5
FG
2871 cmd.cmdstring, cmd.helpstring, cmd.module,
2872 cmd.req_perms, cmd.availability, flags);
7c673cae
FG
2873 cmdnum++;
2874 }
2875 f->close_section(); // command_descriptions
2876
2877 f->flush(*rdata);
2878}
2879
7c673cae
FG
2880bool Monitor::is_keyring_required()
2881{
2882 string auth_cluster_required = g_conf->auth_supported.empty() ?
2883 g_conf->auth_cluster_required : g_conf->auth_supported;
2884 string auth_service_required = g_conf->auth_supported.empty() ?
2885 g_conf->auth_service_required : g_conf->auth_supported;
2886
2887 return auth_service_required == "cephx" ||
2888 auth_cluster_required == "cephx";
2889}
2890
2891struct C_MgrProxyCommand : public Context {
2892 Monitor *mon;
2893 MonOpRequestRef op;
2894 uint64_t size;
2895 bufferlist outbl;
2896 string outs;
2897 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2898 : mon(mon), op(op), size(s) { }
2899 void finish(int r) {
2900 Mutex::Locker l(mon->lock);
2901 mon->mgr_proxy_bytes -= size;
2902 mon->reply_command(op, r, outs, outbl, 0);
2903 }
2904};
2905
2906void Monitor::handle_command(MonOpRequestRef op)
2907{
2908 assert(op->is_type_command());
2909 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2910 if (m->fsid != monmap->fsid) {
2911 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2912 reply_command(op, -EPERM, "wrong fsid", 0);
2913 return;
2914 }
2915
2916 MonSession *session = static_cast<MonSession *>(
2917 m->get_connection()->get_priv());
2918 if (!session) {
2919 dout(5) << __func__ << " dropping stray message " << *m << dendl;
2920 return;
2921 }
2922 BOOST_SCOPE_EXIT_ALL(=) {
2923 session->put();
2924 };
2925
2926 if (m->cmd.empty()) {
2927 string rs = "No command supplied";
2928 reply_command(op, -EINVAL, rs, 0);
2929 return;
2930 }
2931
2932 string prefix;
2933 vector<string> fullcmd;
2934 map<string, cmd_vartype> cmdmap;
2935 stringstream ss, ds;
2936 bufferlist rdata;
2937 string rs;
2938 int r = -EINVAL;
2939 rs = "unrecognized command";
2940
2941 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
2942 // ss has reason for failure
2943 r = -EINVAL;
2944 rs = ss.str();
2945 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
2946 reply_command(op, r, rs, 0);
2947 return;
2948 }
2949
2950 // check return value. If no prefix parameter provided,
2951 // return value will be false, then return error info.
2952 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
2953 reply_command(op, -EINVAL, "command prefix not found", 0);
2954 return;
2955 }
2956
2957 // check prefix is empty
2958 if (prefix.empty()) {
2959 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
2960 return;
2961 }
2962
2963 if (prefix == "get_command_descriptions") {
2964 bufferlist rdata;
2965 Formatter *f = Formatter::create("json");
2966 // hide mgr commands until luminous upgrade is complete
2967 bool hide_mgr_flag =
31f18b77 2968 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
c07f9fc5
FG
2969
2970 std::vector<MonCommand> commands;
d2e6a577
FG
2971
2972 // only include mgr commands once all mons are upgrade (and we've dropped
2973 // the hard-coded PGMonitor commands)
2974 if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2975 commands = static_cast<MgrMonitor*>(
c07f9fc5 2976 paxos_service[PAXOS_MGR])->get_command_descs();
d2e6a577 2977 }
c07f9fc5 2978
d2e6a577
FG
2979 for (auto& c : leader_mon_commands) {
2980 commands.push_back(c);
c07f9fc5
FG
2981 }
2982
2983 format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
7c673cae
FG
2984 delete f;
2985 reply_command(op, 0, "", rdata, 0);
2986 return;
2987 }
2988
2989 string module;
2990 string err;
2991
2992 dout(0) << "handle_command " << *m << dendl;
2993
2994 string format;
2995 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
2996 boost::scoped_ptr<Formatter> f(Formatter::create(format));
2997
2998 get_str_vec(prefix, fullcmd);
2999
3000 // make sure fullcmd is not empty.
3001 // invalid prefix will cause empty vector fullcmd.
3002 // such as, prefix=";,,;"
3003 if (fullcmd.empty()) {
3004 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3005 return;
3006 }
3007
3008 module = fullcmd[0];
3009
3010 // validate command is in leader map
3011
3012 const MonCommand *leader_cmd;
c07f9fc5
FG
3013 const auto& mgr_cmds = mgrmon()->get_command_descs();
3014 const MonCommand *mgr_cmd = nullptr;
3015 if (!mgr_cmds.empty()) {
d2e6a577 3016 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
c07f9fc5 3017 }
d2e6a577 3018 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
7c673cae 3019 if (!leader_cmd) {
c07f9fc5
FG
3020 leader_cmd = mgr_cmd;
3021 if (!leader_cmd) {
3022 reply_command(op, -EINVAL, "command not known", 0);
3023 return;
3024 }
7c673cae
FG
3025 }
3026 // validate command is in our map & matches, or forward if it is allowed
d2e6a577
FG
3027 const MonCommand *mon_cmd = _get_moncommand(
3028 prefix,
3029 get_local_commands(quorum_mon_features));
c07f9fc5
FG
3030 if (!mon_cmd) {
3031 mon_cmd = mgr_cmd;
3032 }
7c673cae
FG
3033 if (!is_leader()) {
3034 if (!mon_cmd) {
3035 if (leader_cmd->is_noforward()) {
3036 reply_command(op, -EINVAL,
3037 "command not locally supported and not allowed to forward",
3038 0);
3039 return;
3040 }
3041 dout(10) << "Command not locally supported, forwarding request "
3042 << m << dendl;
3043 forward_request_leader(op);
3044 return;
3045 } else if (!mon_cmd->is_compat(leader_cmd)) {
3046 if (mon_cmd->is_noforward()) {
3047 reply_command(op, -EINVAL,
3048 "command not compatible with leader and not allowed to forward",
3049 0);
3050 return;
3051 }
3052 dout(10) << "Command not compatible with leader, forwarding request "
3053 << m << dendl;
3054 forward_request_leader(op);
3055 return;
3056 }
3057 }
3058
3059 if (mon_cmd->is_obsolete() ||
3060 (cct->_conf->mon_debug_deprecated_as_obsolete
3061 && mon_cmd->is_deprecated())) {
3062 reply_command(op, -ENOTSUP,
3063 "command is obsolete; please check usage and/or man page",
3064 0);
3065 return;
3066 }
3067
3068 if (session->proxy_con && mon_cmd->is_noforward()) {
3069 dout(10) << "Got forward for noforward command " << m << dendl;
3070 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3071 return;
3072 }
3073
3074 /* what we perceive as being the service the command falls under */
3075 string service(mon_cmd->module);
3076
3077 dout(25) << __func__ << " prefix='" << prefix
3078 << "' module='" << module
3079 << "' service='" << service << "'" << dendl;
3080
3081 bool cmd_is_rw =
3082 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3083
3084 // validate user's permissions for requested command
3085 map<string,string> param_str_map;
3086 _generate_command_map(cmdmap, param_str_map);
3087 if (!_allowed_command(session, service, prefix, cmdmap,
3088 param_str_map, mon_cmd)) {
3089 dout(1) << __func__ << " access denied" << dendl;
3090 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3091 << "from='" << session->inst << "' "
3092 << "entity='" << session->entity_name << "' "
3093 << "cmd=" << m->cmd << ": access denied";
3094 reply_command(op, -EACCES, "access denied", 0);
3095 return;
3096 }
3097
3098 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3099 << "from='" << session->inst << "' "
3100 << "entity='" << session->entity_name << "' "
3101 << "cmd=" << m->cmd << ": dispatch";
3102
3103 if (mon_cmd->is_mgr() &&
31f18b77 3104 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
7c673cae
FG
3105 const auto& hdr = m->get_header();
3106 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3107 uint64_t max =
3108 g_conf->mon_client_bytes * g_conf->mon_mgr_proxy_client_bytes_ratio;
3109 if (mgr_proxy_bytes + size > max) {
3110 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3111 << " + " << size << " > max " << max << dendl;
3112 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3113 return;
3114 }
3115 mgr_proxy_bytes += size;
3116 dout(10) << __func__ << " proxying mgr command (+" << size
3117 << " -> " << mgr_proxy_bytes << ")" << dendl;
3118 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3119 mgr_client.start_command(m->cmd,
3120 m->get_data(),
3121 &fin->outbl,
3122 &fin->outs,
3123 new C_OnFinisher(fin, &finisher));
3124 return;
3125 }
3126
d2e6a577
FG
3127 if ((module == "mds" || module == "fs") &&
3128 prefix != "fs authorize") {
7c673cae
FG
3129 mdsmon()->dispatch(op);
3130 return;
3131 }
31f18b77
FG
3132 if ((module == "osd" || prefix == "pg map") &&
3133 prefix != "osd last-stat-seq") {
7c673cae
FG
3134 osdmon()->dispatch(op);
3135 return;
3136 }
3137
3138 if (module == "pg") {
3139 pgmon()->dispatch(op);
3140 return;
3141 }
3142 if (module == "mon" &&
3143 /* Let the Monitor class handle the following commands:
3144 * 'mon compact'
3145 * 'mon scrub'
3146 * 'mon sync force'
3147 */
3148 prefix != "mon compact" &&
3149 prefix != "mon scrub" &&
3150 prefix != "mon sync force" &&
31f18b77
FG
3151 prefix != "mon metadata" &&
3152 prefix != "mon versions" &&
3153 prefix != "mon count-metadata") {
7c673cae
FG
3154 monmon()->dispatch(op);
3155 return;
3156 }
d2e6a577 3157 if (module == "auth" || prefix == "fs authorize") {
7c673cae
FG
3158 authmon()->dispatch(op);
3159 return;
3160 }
3161 if (module == "log") {
3162 logmon()->dispatch(op);
3163 return;
3164 }
3165
3166 if (module == "config-key") {
3167 config_key_service->dispatch(op);
3168 return;
3169 }
3170
3171 if (module == "mgr") {
3172 mgrmon()->dispatch(op);
3173 return;
3174 }
3175
3176 if (prefix == "fsid") {
3177 if (f) {
3178 f->open_object_section("fsid");
3179 f->dump_stream("fsid") << monmap->fsid;
3180 f->close_section();
3181 f->flush(rdata);
3182 } else {
3183 ds << monmap->fsid;
3184 rdata.append(ds);
3185 }
3186 reply_command(op, 0, "", rdata, 0);
3187 return;
3188 }
3189
3190 if (prefix == "scrub" || prefix == "mon scrub") {
3191 wait_for_paxos_write();
3192 if (is_leader()) {
3193 int r = scrub_start();
3194 reply_command(op, r, "", rdata, 0);
3195 } else if (is_peon()) {
3196 forward_request_leader(op);
3197 } else {
3198 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3199 }
3200 return;
3201 }
3202
3203 if (prefix == "compact" || prefix == "mon compact") {
3204 dout(1) << "triggering manual compaction" << dendl;
3205 utime_t start = ceph_clock_now();
3206 store->compact();
3207 utime_t end = ceph_clock_now();
3208 end -= start;
3209 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3210 ostringstream oss;
224ce89b 3211 oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
7c673cae
FG
3212 rs = oss.str();
3213 r = 0;
3214 }
3215 else if (prefix == "injectargs") {
3216 vector<string> injected_args;
3217 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3218 if (!injected_args.empty()) {
3219 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3220 ostringstream oss;
3221 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3222 ss << "injectargs:" << oss.str();
3223 rs = ss.str();
3224 goto out;
3225 } else {
3226 rs = "must supply options to be parsed in a single string";
3227 r = -EINVAL;
3228 }
224ce89b
WB
3229 } else if (prefix == "time-sync-status") {
3230 if (!f)
3231 f.reset(Formatter::create("json-pretty"));
3232 f->open_object_section("time_sync");
3233 if (!timecheck_skews.empty()) {
3234 f->open_object_section("time_skew_status");
3235 for (auto& i : timecheck_skews) {
3236 entity_inst_t inst = i.first;
3237 double skew = i.second;
3238 double latency = timecheck_latencies[inst];
3239 string name = monmap->get_name(inst.addr);
3240 ostringstream tcss;
3241 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3242 f->open_object_section(name.c_str());
3243 f->dump_float("skew", skew);
3244 f->dump_float("latency", latency);
3245 f->dump_stream("health") << tcstatus;
3246 if (tcstatus != HEALTH_OK) {
3247 f->dump_stream("details") << tcss.str();
3248 }
3249 f->close_section();
3250 }
3251 f->close_section();
3252 }
3253 f->open_object_section("timechecks");
3254 f->dump_unsigned("epoch", get_epoch());
3255 f->dump_int("round", timecheck_round);
3256 f->dump_stream("round_status") << ((timecheck_round%2) ?
3257 "on-going" : "finished");
3258 f->close_section();
3259 f->close_section();
3260 f->flush(rdata);
3261 r = 0;
3262 rs = "";
c07f9fc5
FG
3263 } else if (prefix == "config set") {
3264 std::string key;
3265 cmd_getval(cct, cmdmap, "key", key);
3266 std::string val;
3267 cmd_getval(cct, cmdmap, "value", val);
3268 r = g_conf->set_val(key, val, true, &ss);
d2e6a577
FG
3269 if (r == 0) {
3270 g_conf->apply_changes(nullptr);
3271 }
c07f9fc5
FG
3272 rs = ss.str();
3273 goto out;
7c673cae
FG
3274 } else if (prefix == "status" ||
3275 prefix == "health" ||
3276 prefix == "df") {
3277 string detail;
3278 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3279
3280 if (prefix == "status") {
3281 // get_cluster_status handles f == NULL
3282 get_cluster_status(ds, f.get());
3283
3284 if (f) {
3285 f->flush(ds);
3286 ds << '\n';
3287 }
3288 rdata.append(ds);
3289 } else if (prefix == "health") {
224ce89b
WB
3290 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3291 string plain;
3292 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3293 if (f) {
3294 f->flush(rdata);
3295 } else {
3296 rdata.append(plain);
3297 }
7c673cae 3298 } else {
224ce89b
WB
3299 list<string> health_str;
3300 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3301 if (f) {
3302 f->flush(ds);
3303 ds << '\n';
3304 } else {
3305 assert(!health_str.empty());
3306 ds << health_str.front();
3307 health_str.pop_front();
3308 if (!health_str.empty()) {
3309 ds << ' ';
3310 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3311 }
7c673cae 3312 }
224ce89b
WB
3313 bufferlist comb;
3314 comb.append(ds);
3315 if (detail == "detail")
3316 comb.append(rdata);
3317 rdata = comb;
7c673cae 3318 }
7c673cae
FG
3319 } else if (prefix == "df") {
3320 bool verbose = (detail == "detail");
3321 if (f)
3322 f->open_object_section("stats");
3323
31f18b77 3324 pgservice->dump_fs_stats(&ds, f.get(), verbose);
7c673cae
FG
3325 if (!f)
3326 ds << '\n';
31f18b77 3327 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
7c673cae
FG
3328
3329 if (f) {
3330 f->close_section();
3331 f->flush(ds);
3332 ds << '\n';
3333 }
3334 } else {
3335 assert(0 == "We should never get here!");
3336 return;
3337 }
3338 rdata.append(ds);
3339 rs = "";
3340 r = 0;
3341 } else if (prefix == "report") {
3342
3343 // this must be formatted, in its current form
3344 if (!f)
3345 f.reset(Formatter::create("json-pretty"));
3346 f->open_object_section("report");
3347 f->dump_stream("cluster_fingerprint") << fingerprint;
3348 f->dump_string("version", ceph_version_to_str());
3349 f->dump_string("commit", git_version_to_str());
3350 f->dump_stream("timestamp") << ceph_clock_now();
3351
3352 vector<string> tagsvec;
3353 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3354 string tagstr = str_join(tagsvec, " ");
3355 if (!tagstr.empty())
3356 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3357 f->dump_string("tag", tagstr);
3358
3359 list<string> hs;
3360 get_health(hs, NULL, f.get());
3361
3362 monmon()->dump_info(f.get());
3363 osdmon()->dump_info(f.get());
3364 mdsmon()->dump_info(f.get());
7c673cae 3365 authmon()->dump_info(f.get());
31f18b77 3366 pgservice->dump_info(f.get());
7c673cae
FG
3367
3368 paxos->dump_info(f.get());
3369
3370 f->close_section();
3371 f->flush(rdata);
3372
3373 ostringstream ss2;
3374 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3375 rs = ss2.str();
3376 r = 0;
31f18b77
FG
3377 } else if (prefix == "osd last-stat-seq") {
3378 int64_t osd;
3379 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3380 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3381 if (f) {
3382 f->dump_unsigned("seq", seq);
3383 f->flush(ds);
3384 } else {
3385 ds << seq;
3386 rdata.append(ds);
3387 }
3388 rs = "";
3389 r = 0;
7c673cae
FG
3390 } else if (prefix == "node ls") {
3391 string node_type("all");
3392 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3393 if (!f)
3394 f.reset(Formatter::create("json-pretty"));
3395 if (node_type == "all") {
3396 f->open_object_section("nodes");
3397 print_nodes(f.get(), ds);
3398 osdmon()->print_nodes(f.get());
3399 mdsmon()->print_nodes(f.get());
3400 f->close_section();
3401 } else if (node_type == "mon") {
3402 print_nodes(f.get(), ds);
3403 } else if (node_type == "osd") {
3404 osdmon()->print_nodes(f.get());
3405 } else if (node_type == "mds") {
3406 mdsmon()->print_nodes(f.get());
3407 }
3408 f->flush(ds);
3409 rdata.append(ds);
3410 rs = "";
3411 r = 0;
31f18b77
FG
3412 } else if (prefix == "features") {
3413 if (!is_leader() && !is_peon()) {
3414 dout(10) << " waiting for quorum" << dendl;
3415 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3416 return;
3417 }
3418 if (!is_leader()) {
3419 forward_request_leader(op);
3420 return;
3421 }
3422 if (!f)
3423 f.reset(Formatter::create("json-pretty"));
3424 FeatureMap fm;
3425 get_combined_feature_map(&fm);
3426 f->dump_object("features", fm);
3427 f->flush(rdata);
3428 rs = "";
3429 r = 0;
7c673cae
FG
3430 } else if (prefix == "mon metadata") {
3431 if (!f)
3432 f.reset(Formatter::create("json-pretty"));
3433
3434 string name;
3435 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3436 if (!all) {
3437 // Dump a single mon's metadata
3438 int mon = monmap->get_rank(name);
3439 if (mon < 0) {
3440 rs = "requested mon not found";
3441 r = -ENOENT;
3442 goto out;
3443 }
3444 f->open_object_section("mon_metadata");
3445 r = get_mon_metadata(mon, f.get(), ds);
3446 f->close_section();
3447 } else {
3448 // Dump all mons' metadata
3449 r = 0;
3450 f->open_array_section("mon_metadata");
3451 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3452 std::ostringstream get_err;
3453 f->open_object_section("mon");
3454 f->dump_string("name", monmap->get_name(rank));
3455 r = get_mon_metadata(rank, f.get(), get_err);
3456 f->close_section();
3457 if (r == -ENOENT || r == -EINVAL) {
3458 dout(1) << get_err.str() << dendl;
3459 // Drop error, list what metadata we do have
3460 r = 0;
3461 } else if (r != 0) {
3462 derr << "Unexpected error from get_mon_metadata: "
3463 << cpp_strerror(r) << dendl;
3464 ds << get_err.str();
3465 break;
3466 }
3467 }
3468 f->close_section();
3469 }
3470
3471 f->flush(ds);
3472 rdata.append(ds);
3473 rs = "";
31f18b77
FG
3474 } else if (prefix == "mon versions") {
3475 if (!f)
3476 f.reset(Formatter::create("json-pretty"));
3477 count_metadata("ceph_version", f.get());
3478 f->flush(ds);
3479 rdata.append(ds);
3480 rs = "";
3481 r = 0;
3482 } else if (prefix == "mon count-metadata") {
3483 if (!f)
3484 f.reset(Formatter::create("json-pretty"));
3485 string field;
3486 cmd_getval(g_ceph_context, cmdmap, "property", field);
3487 count_metadata(field, f.get());
3488 f->flush(ds);
3489 rdata.append(ds);
3490 rs = "";
3491 r = 0;
7c673cae
FG
3492 } else if (prefix == "quorum_status") {
3493 // make sure our map is readable and up to date
3494 if (!is_leader() && !is_peon()) {
3495 dout(10) << " waiting for quorum" << dendl;
3496 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3497 return;
3498 }
3499 _quorum_status(f.get(), ds);
3500 rdata.append(ds);
3501 rs = "";
3502 r = 0;
3503 } else if (prefix == "mon_status") {
3504 get_mon_status(f.get(), ds);
3505 if (f)
3506 f->flush(ds);
3507 rdata.append(ds);
3508 rs = "";
3509 r = 0;
3510 } else if (prefix == "sync force" ||
3511 prefix == "mon sync force") {
3512 string validate1, validate2;
3513 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3514 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3515 if (validate1 != "--yes-i-really-mean-it" ||
3516 validate2 != "--i-know-what-i-am-doing") {
3517 r = -EINVAL;
3518 rs = "are you SURE? this will mean the monitor store will be "
3519 "erased. pass '--yes-i-really-mean-it "
3520 "--i-know-what-i-am-doing' if you really do.";
3521 goto out;
3522 }
3523 sync_force(f.get(), ds);
3524 rs = ds.str();
3525 r = 0;
3526 } else if (prefix == "heap") {
3527 if (!ceph_using_tcmalloc())
3528 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3529 else {
3530 string heapcmd;
3531 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3532 // XXX 1-element vector, change at callee or make vector here?
3533 vector<string> heapcmd_vec;
3534 get_str_vec(heapcmd, heapcmd_vec);
3535 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3536 rdata.append(ds);
3537 rs = "";
3538 r = 0;
3539 }
3540 } else if (prefix == "quorum") {
3541 string quorumcmd;
3542 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3543 if (quorumcmd == "exit") {
3544 start_election();
3545 elector.stop_participating();
3546 rs = "stopped responding to quorum, initiated new election";
3547 r = 0;
3548 } else if (quorumcmd == "enter") {
3549 elector.start_participating();
3550 start_election();
3551 rs = "started responding to quorum, initiated new election";
3552 r = 0;
3553 } else {
3554 rs = "needs a valid 'quorum' command";
3555 r = -EINVAL;
3556 }
3557 } else if (prefix == "version") {
3558 if (f) {
3559 f->open_object_section("version");
3560 f->dump_string("version", pretty_version_to_str());
3561 f->close_section();
3562 f->flush(ds);
3563 } else {
3564 ds << pretty_version_to_str();
3565 }
3566 rdata.append(ds);
3567 rs = "";
3568 r = 0;
c07f9fc5
FG
3569 } else if (prefix == "versions") {
3570 if (!f)
3571 f.reset(Formatter::create("json-pretty"));
3572 map<string,int> overall;
3573 f->open_object_section("version");
3574 map<string,int> mon, mgr, osd, mds;
3575
3576 count_metadata("ceph_version", &mon);
3577 f->open_object_section("mon");
3578 for (auto& p : mon) {
3579 f->dump_int(p.first.c_str(), p.second);
3580 overall[p.first] += p.second;
3581 }
3582 f->close_section();
3583
3584 mgrmon()->count_metadata("ceph_version", &mgr);
3585 f->open_object_section("mgr");
3586 for (auto& p : mgr) {
3587 f->dump_int(p.first.c_str(), p.second);
3588 overall[p.first] += p.second;
3589 }
3590 f->close_section();
3591
3592 osdmon()->count_metadata("ceph_version", &osd);
3593 f->open_object_section("osd");
3594 for (auto& p : osd) {
3595 f->dump_int(p.first.c_str(), p.second);
3596 overall[p.first] += p.second;
3597 }
3598 f->close_section();
3599
3600 mdsmon()->count_metadata("ceph_version", &mds);
3601 f->open_object_section("mds");
3602 for (auto& p : mon) {
3603 f->dump_int(p.first.c_str(), p.second);
3604 overall[p.first] += p.second;
3605 }
3606 f->close_section();
3607
3608 for (auto& p : mgrstatmon()->get_service_map().services) {
3609 f->open_object_section(p.first.c_str());
3610 map<string,int> m;
3611 p.second.count_metadata("ceph_version", &m);
3612 for (auto& q : m) {
3613 f->dump_int(q.first.c_str(), q.second);
3614 overall[q.first] += q.second;
3615 }
3616 f->close_section();
3617 }
3618
3619 f->open_object_section("overall");
3620 for (auto& p : overall) {
3621 f->dump_int(p.first.c_str(), p.second);
3622 }
3623 f->close_section();
3624 f->close_section();
3625 f->flush(rdata);
3626 rs = "";
3627 r = 0;
7c673cae
FG
3628 }
3629
3630 out:
3631 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3632 reply_command(op, r, rs, rdata, 0);
3633}
3634
3635void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3636{
3637 bufferlist rdata;
3638 reply_command(op, rc, rs, rdata, version);
3639}
3640
3641void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3642 bufferlist& rdata, version_t version)
3643{
3644 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3645 assert(m->get_type() == MSG_MON_COMMAND);
3646 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3647 reply->set_tid(m->get_tid());
3648 reply->set_data(rdata);
3649 send_reply(op, reply);
3650}
3651
3652
3653// ------------------------
3654// request/reply routing
3655//
3656// a client/mds/osd will connect to a random monitor. we need to forward any
3657// messages requiring state updates to the leader, and then route any replies
3658// back via the correct monitor and back to them. (the monitor will not
3659// initiate any connections.)
3660
3661void Monitor::forward_request_leader(MonOpRequestRef op)
3662{
3663 op->mark_event(__func__);
3664
3665 int mon = get_leader();
3666 MonSession *session = op->get_session();
3667 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3668
3669 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3670 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3671 } else if (session->proxy_con) {
3672 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3673 } else if (!session->closed) {
3674 RoutedRequest *rr = new RoutedRequest;
3675 rr->tid = ++routed_request_tid;
3676 rr->client_inst = req->get_source_inst();
3677 rr->con = req->get_connection();
3678 rr->con_features = rr->con->get_features();
3679 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3680 rr->session = static_cast<MonSession *>(session->get());
3681 rr->op = op;
3682 routed_requests[rr->tid] = rr;
3683 session->routed_request_tids.insert(rr->tid);
3684
3685 dout(10) << "forward_request " << rr->tid << " request " << *req
3686 << " features " << rr->con_features << dendl;
3687
3688 MForward *forward = new MForward(rr->tid,
3689 req,
3690 rr->con_features,
3691 rr->session->caps);
3692 forward->set_priority(req->get_priority());
3693 if (session->auth_handler) {
3694 forward->entity_name = session->entity_name;
3695 } else if (req->get_source().is_mon()) {
3696 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3697 }
3698 messenger->send_message(forward, monmap->get_inst(mon));
3699 op->mark_forwarded();
3700 assert(op->get_req()->get_type() != 0);
3701 } else {
3702 dout(10) << "forward_request no session for request " << *req << dendl;
3703 }
3704}
3705
3706// fake connection attached to forwarded messages
3707struct AnonConnection : public Connection {
3708 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3709
3710 int send_message(Message *m) override {
3711 assert(!"send_message on anonymous connection");
3712 }
3713 void send_keepalive() override {
3714 assert(!"send_keepalive on anonymous connection");
3715 }
3716 void mark_down() override {
3717 // silently ignore
3718 }
3719 void mark_disposable() override {
3720 // silengtly ignore
3721 }
3722 bool is_connected() override { return false; }
3723};
3724
3725//extract the original message and put it into the regular dispatch function
3726void Monitor::handle_forward(MonOpRequestRef op)
3727{
3728 MForward *m = static_cast<MForward*>(op->get_req());
3729 dout(10) << "received forwarded message from " << m->client
3730 << " via " << m->get_source_inst() << dendl;
3731 MonSession *session = op->get_session();
3732 assert(session);
3733
3734 if (!session->is_capable("mon", MON_CAP_X)) {
3735 dout(0) << "forward from entity with insufficient caps! "
3736 << session->caps << dendl;
3737 } else {
3738 // see PaxosService::dispatch(); we rely on this being anon
3739 // (c->msgr == NULL)
3740 PaxosServiceMessage *req = m->claim_message();
3741 assert(req != NULL);
3742
3743 ConnectionRef c(new AnonConnection(cct));
3744 MonSession *s = new MonSession(req->get_source_inst(),
3745 static_cast<Connection*>(c.get()));
3746 c->set_priv(s->get());
3747 c->set_peer_addr(m->client.addr);
3748 c->set_peer_type(m->client.name.type());
3749 c->set_features(m->con_features);
3750
3751 s->caps = m->client_caps;
3752 dout(10) << " caps are " << s->caps << dendl;
3753 s->entity_name = m->entity_name;
3754 dout(10) << " entity name '" << s->entity_name << "' type "
3755 << s->entity_name.get_type() << dendl;
3756 s->proxy_con = m->get_connection();
3757 s->proxy_tid = m->tid;
3758
3759 req->set_connection(c);
3760
3761 // not super accurate, but better than nothing.
3762 req->set_recv_stamp(m->get_recv_stamp());
3763
3764 /*
3765 * note which election epoch this is; we will drop the message if
3766 * there is a future election since our peers will resend routed
3767 * requests in that case.
3768 */
3769 req->rx_election_epoch = get_epoch();
3770
3771 /* Because this is a special fake connection, we need to break
3772 the ref loop between Connection and MonSession differently
3773 than we normally do. Here, the Message refers to the Connection
3774 which refers to the Session, and nobody else refers to the Connection
3775 or the Session. And due to the special nature of this message,
3776 nobody refers to the Connection via the Session. So, clear out that
3777 half of the ref loop.*/
3778 s->con.reset(NULL);
3779
3780 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3781
3782 _ms_dispatch(req);
3783 s->put();
3784 }
3785}
3786
3787void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3788{
3789 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3790
3791 bufferlist bl;
3792 encode_message(m, quorum_con_features, bl);
3793
3794 messenger->send_message(m, to);
3795
3796 for (int i=0; i<(int)monmap->size(); i++) {
3797 if (i != rank)
3798 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3799 }
3800}
3801
3802void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3803{
3804 op->mark_event(__func__);
3805
3806 MonSession *session = op->get_session();
3807 assert(session);
3808 Message *req = op->get_req();
3809 ConnectionRef con = op->get_connection();
3810
3811 reply->set_cct(g_ceph_context);
3812 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3813
3814 if (!con) {
3815 dout(2) << "send_reply no connection, dropping reply " << *reply
3816 << " to " << req << " " << *req << dendl;
3817 reply->put();
3818 op->mark_event("reply: no connection");
3819 return;
3820 }
3821
3822 if (!session->con && !session->proxy_con) {
3823 dout(2) << "send_reply no connection, dropping reply " << *reply
3824 << " to " << req << " " << *req << dendl;
3825 reply->put();
3826 op->mark_event("reply: no connection");
3827 return;
3828 }
3829
3830 if (session->proxy_con) {
3831 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3832 << " via " << session->proxy_con->get_peer_addr()
3833 << " for request " << *req << dendl;
3834 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3835 op->mark_event("reply: send routed request");
3836 } else {
3837 session->con->send_message(reply);
3838 op->mark_event("reply: send");
3839 }
3840}
3841
3842void Monitor::no_reply(MonOpRequestRef op)
3843{
3844 MonSession *session = op->get_session();
3845 Message *req = op->get_req();
3846
3847 if (session->proxy_con) {
3848 dout(10) << "no_reply to " << req->get_source_inst()
3849 << " via " << session->proxy_con->get_peer_addr()
3850 << " for request " << *req << dendl;
3851 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3852 op->mark_event("no_reply: send routed request");
3853 } else {
3854 dout(10) << "no_reply to " << req->get_source_inst()
3855 << " " << *req << dendl;
3856 op->mark_event("no_reply");
3857 }
3858}
3859
3860void Monitor::handle_route(MonOpRequestRef op)
3861{
3862 MRoute *m = static_cast<MRoute*>(op->get_req());
3863 MonSession *session = op->get_session();
3864 //check privileges
3865 if (!session->is_capable("mon", MON_CAP_X)) {
3866 dout(0) << "MRoute received from entity without appropriate perms! "
3867 << dendl;
3868 return;
3869 }
3870 if (m->msg)
3871 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3872 else
3873 dout(10) << "handle_route null to " << m->dest << dendl;
3874
3875 // look it up
3876 if (m->session_mon_tid) {
3877 if (routed_requests.count(m->session_mon_tid)) {
3878 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3879
3880 // reset payload, in case encoding is dependent on target features
3881 if (m->msg) {
3882 m->msg->clear_payload();
3883 rr->con->send_message(m->msg);
3884 m->msg = NULL;
3885 }
3886 if (m->send_osdmap_first) {
3887 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3888 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3889 true, MonOpRequestRef());
3890 }
3891 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3892 routed_requests.erase(m->session_mon_tid);
3893 rr->session->routed_request_tids.erase(m->session_mon_tid);
3894 delete rr;
3895 } else {
3896 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3897 }
3898 } else {
3899 dout(10) << " not a routed request, trying to send anyway" << dendl;
3900 if (m->msg) {
3901 messenger->send_message(m->msg, m->dest);
3902 m->msg = NULL;
3903 }
3904 }
3905}
3906
3907void Monitor::resend_routed_requests()
3908{
3909 dout(10) << "resend_routed_requests" << dendl;
3910 int mon = get_leader();
3911 list<Context*> retry;
3912 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
3913 p != routed_requests.end();
3914 ++p) {
3915 RoutedRequest *rr = p->second;
3916
3917 if (mon == rank) {
3918 dout(10) << " requeue for self tid " << rr->tid << dendl;
3919 rr->op->mark_event("retry routed request");
3920 retry.push_back(new C_RetryMessage(this, rr->op));
3921 if (rr->session) {
3922 assert(rr->session->routed_request_tids.count(p->first));
3923 rr->session->routed_request_tids.erase(p->first);
3924 }
3925 delete rr;
3926 } else {
3927 bufferlist::iterator q = rr->request_bl.begin();
3928 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
3929 rr->op->mark_event("resend forwarded message to leader");
3930 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
3931 MForward *forward = new MForward(rr->tid, req, rr->con_features,
3932 rr->session->caps);
3933 req->put(); // forward takes its own ref; drop ours.
3934 forward->client = rr->client_inst;
3935 forward->set_priority(req->get_priority());
3936 messenger->send_message(forward, monmap->get_inst(mon));
3937 }
3938 }
3939 if (mon == rank) {
3940 routed_requests.clear();
3941 finish_contexts(g_ceph_context, retry);
3942 }
3943}
3944
3945void Monitor::remove_session(MonSession *s)
3946{
224ce89b
WB
3947 dout(10) << "remove_session " << s << " " << s->inst
3948 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
7c673cae
FG
3949 assert(s->con);
3950 assert(!s->closed);
3951 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
3952 p != s->routed_request_tids.end();
3953 ++p) {
3954 assert(routed_requests.count(*p));
3955 RoutedRequest *rr = routed_requests[*p];
3956 dout(10) << " dropping routed request " << rr->tid << dendl;
3957 delete rr;
3958 routed_requests.erase(*p);
3959 }
3960 s->routed_request_tids.clear();
3961 s->con->set_priv(NULL);
3962 session_map.remove_session(s);
3963 logger->set(l_mon_num_sessions, session_map.get_size());
3964 logger->inc(l_mon_session_rm);
3965}
3966
3967void Monitor::remove_all_sessions()
3968{
3969 Mutex::Locker l(session_map_lock);
3970 while (!session_map.sessions.empty()) {
3971 MonSession *s = session_map.sessions.front();
3972 remove_session(s);
3973 if (logger)
3974 logger->inc(l_mon_session_rm);
3975 }
3976 if (logger)
3977 logger->set(l_mon_num_sessions, session_map.get_size());
3978}
3979
3980void Monitor::send_command(const entity_inst_t& inst,
3981 const vector<string>& com)
3982{
3983 dout(10) << "send_command " << inst << "" << com << dendl;
3984 MMonCommand *c = new MMonCommand(monmap->fsid);
3985 c->cmd = com;
3986 try_send_message(c, inst);
3987}
3988
3989void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
3990{
3991 /**
3992 * Wait list the new session until we're in the quorum, assuming it's
3993 * sufficiently new.
3994 * tick() will periodically send them back through so we can send
3995 * the client elsewhere if we don't think we're getting back in.
3996 *
3997 * But we whitelist a few sorts of messages:
3998 * 1) Monitors can talk to us at any time, of course.
3999 * 2) auth messages. It's unlikely to go through much faster, but
4000 * it's possible we've just lost our quorum status and we want to take...
4001 * 3) command messages. We want to accept these under all possible
4002 * circumstances.
4003 */
4004 Message *m = op->get_req();
4005 MonSession *s = op->get_session();
4006 ConnectionRef con = op->get_connection();
4007 utime_t too_old = ceph_clock_now();
4008 too_old -= g_ceph_context->_conf->mon_lease;
4009 if (m->get_recv_stamp() > too_old &&
4010 con->is_connected()) {
4011 dout(5) << "waitlisting message " << *m << dendl;
4012 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4013 op->mark_wait_for_quorum();
4014 } else {
4015 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4016 con->mark_down();
4017 // proxied sessions aren't registered and don't have a con; don't remove
4018 // those.
4019 if (!s->proxy_con) {
4020 Mutex::Locker l(session_map_lock);
4021 remove_session(s);
4022 }
4023 op->mark_zap();
4024 }
4025}
4026
4027void Monitor::_ms_dispatch(Message *m)
4028{
4029 if (is_shutdown()) {
4030 m->put();
4031 return;
4032 }
4033
4034 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4035 bool src_is_mon = op->is_src_mon();
4036 op->mark_event("mon:_ms_dispatch");
4037 MonSession *s = op->get_session();
4038 if (s && s->closed) {
4039 return;
4040 }
224ce89b
WB
4041
4042 if (src_is_mon && s) {
4043 ConnectionRef con = m->get_connection();
4044 if (con->get_messenger() && con->get_features() != s->con_features) {
4045 // only update features if this is a non-anonymous connection
4046 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4047 << " (was " << s->con_features
4048 << ", now " << con->get_features() << ")" << dendl;
4049 // connection features changed - recreate session.
4050 if (s->con && s->con != con) {
4051 dout(10) << __func__ << " connection for " << m->get_source_inst()
4052 << " changed from session; mark down and replace" << dendl;
4053 s->con->mark_down();
4054 }
4055 if (s->item.is_on_list()) {
4056 // forwarded messages' sessions are not in the sessions map and
4057 // exist only while the op is being handled.
4058 remove_session(s);
4059 }
4060 s->put();
4061 s = nullptr;
4062 }
4063 }
4064
7c673cae
FG
4065 if (!s) {
4066 // if the sender is not a monitor, make sure their first message for a
4067 // session is an MAuth. If it is not, assume it's a stray message,
4068 // and considering that we are creating a new session it is safe to
4069 // assume that the sender hasn't authenticated yet, so we have no way
4070 // of assessing whether we should handle it or not.
4071 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4072 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4073 m->get_type() != CEPH_MSG_PING)) {
4074 dout(1) << __func__ << " dropping stray message " << *m
4075 << " from " << m->get_source_inst() << dendl;
4076 return;
4077 }
4078
4079 ConnectionRef con = m->get_connection();
4080 {
4081 Mutex::Locker l(session_map_lock);
4082 s = session_map.new_session(m->get_source_inst(), con.get());
4083 }
4084 assert(s);
4085 con->set_priv(s->get());
224ce89b
WB
4086 dout(10) << __func__ << " new session " << s << " " << *s
4087 << " features 0x" << std::hex
4088 << s->con_features << std::dec << dendl;
7c673cae
FG
4089 op->set_session(s);
4090
4091 logger->set(l_mon_num_sessions, session_map.get_size());
4092 logger->inc(l_mon_session_add);
4093
4094 if (src_is_mon) {
4095 // give it monitor caps; the peer type has been authenticated
4096 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4097 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4098 s->caps = *mon_caps;
4099 }
4100 s->put();
4101 } else {
4102 dout(20) << __func__ << " existing session " << s << " for " << s->inst
4103 << dendl;
4104 }
4105
4106 assert(s);
4107
4108 s->session_timeout = ceph_clock_now();
4109 s->session_timeout += g_conf->mon_session_timeout;
4110
4111 if (s->auth_handler) {
4112 s->entity_name = s->auth_handler->get_entity_name();
4113 }
4114 dout(20) << " caps " << s->caps.get_str() << dendl;
4115
4116 if ((is_synchronizing() ||
4117 (s->global_id == 0 && !exited_quorum.is_zero())) &&
4118 !src_is_mon &&
4119 m->get_type() != CEPH_MSG_PING) {
4120 waitlist_or_zap_client(op);
4121 } else {
4122 dispatch_op(op);
4123 }
4124 return;
4125}
4126
4127void Monitor::dispatch_op(MonOpRequestRef op)
4128{
4129 op->mark_event("mon:dispatch_op");
4130 MonSession *s = op->get_session();
4131 assert(s);
4132 if (s->closed) {
4133 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4134 return;
4135 }
4136
4137 /* we will consider the default type as being 'monitor' until proven wrong */
4138 op->set_type_monitor();
4139 /* deal with all messages that do not necessarily need caps */
4140 bool dealt_with = true;
4141 switch (op->get_req()->get_type()) {
4142 // auth
4143 case MSG_MON_GLOBAL_ID:
4144 case CEPH_MSG_AUTH:
4145 op->set_type_service();
4146 /* no need to check caps here */
4147 paxos_service[PAXOS_AUTH]->dispatch(op);
4148 break;
4149
4150 case CEPH_MSG_PING:
4151 handle_ping(op);
4152 break;
4153
4154 /* MMonGetMap may be used by clients to obtain a monmap *before*
4155 * authenticating with the monitor. We need to handle these without
4156 * checking caps because, even on a cluster without cephx, we only set
4157 * session caps *after* the auth handshake. A good example of this
4158 * is when a client calls MonClient::get_monmap_privately(), which does
4159 * not authenticate when obtaining a monmap.
4160 */
4161 case CEPH_MSG_MON_GET_MAP:
4162 handle_mon_get_map(op);
4163 break;
4164
4165 case CEPH_MSG_MON_METADATA:
4166 return handle_mon_metadata(op);
4167
4168 default:
4169 dealt_with = false;
4170 break;
4171 }
4172 if (dealt_with)
4173 return;
4174
4175 /* well, maybe the op belongs to a service... */
4176 op->set_type_service();
4177 /* deal with all messages which caps should be checked somewhere else */
4178 dealt_with = true;
4179 switch (op->get_req()->get_type()) {
4180
4181 // OSDs
4182 case CEPH_MSG_MON_GET_OSDMAP:
4183 case CEPH_MSG_POOLOP:
4184 case MSG_OSD_BEACON:
4185 case MSG_OSD_MARK_ME_DOWN:
4186 case MSG_OSD_FULL:
4187 case MSG_OSD_FAILURE:
4188 case MSG_OSD_BOOT:
4189 case MSG_OSD_ALIVE:
4190 case MSG_OSD_PGTEMP:
4191 case MSG_OSD_PG_CREATED:
4192 case MSG_REMOVE_SNAPS:
4193 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4194 break;
4195
4196 // MDSs
4197 case MSG_MDS_BEACON:
4198 case MSG_MDS_OFFLOAD_TARGETS:
4199 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4200 break;
4201
4202 // Mgrs
4203 case MSG_MGR_BEACON:
4204 paxos_service[PAXOS_MGR]->dispatch(op);
4205 break;
4206
31f18b77
FG
4207 // MgrStat
4208 case MSG_MON_MGR_REPORT:
7c673cae 4209 case CEPH_MSG_STATFS:
7c673cae 4210 case MSG_GETPOOLSTATS:
31f18b77
FG
4211 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4212 break;
4213
4214 // pg
4215 case MSG_PGSTATS:
7c673cae
FG
4216 paxos_service[PAXOS_PGMAP]->dispatch(op);
4217 break;
4218
4219 // log
4220 case MSG_LOG:
4221 paxos_service[PAXOS_LOG]->dispatch(op);
4222 break;
4223
4224 // handle_command() does its own caps checking
4225 case MSG_MON_COMMAND:
4226 op->set_type_command();
4227 handle_command(op);
4228 break;
4229
4230 default:
4231 dealt_with = false;
4232 break;
4233 }
4234 if (dealt_with)
4235 return;
4236
4237 /* nop, looks like it's not a service message; revert back to monitor */
4238 op->set_type_monitor();
4239
4240 /* messages we, the Monitor class, need to deal with
4241 * but may be sent by clients. */
4242
4243 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4244 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4245 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4246 << dendl;
4247 goto drop;
4248 }
4249
4250 dealt_with = true;
4251 switch (op->get_req()->get_type()) {
4252
4253 // misc
4254 case CEPH_MSG_MON_GET_VERSION:
4255 handle_get_version(op);
4256 break;
4257
4258 case CEPH_MSG_MON_SUBSCRIBE:
4259 /* FIXME: check what's being subscribed, filter accordingly */
4260 handle_subscribe(op);
4261 break;
4262
4263 default:
4264 dealt_with = false;
4265 break;
4266 }
4267 if (dealt_with)
4268 return;
4269
4270 if (!op->is_src_mon()) {
4271 dout(1) << __func__ << " unexpected monitor message from"
4272 << " non-monitor entity " << op->get_req()->get_source_inst()
4273 << " " << *(op->get_req()) << " -- dropping" << dendl;
4274 goto drop;
4275 }
4276
4277 /* messages that should only be sent by another monitor */
4278 dealt_with = true;
4279 switch (op->get_req()->get_type()) {
4280
4281 case MSG_ROUTE:
4282 handle_route(op);
4283 break;
4284
4285 case MSG_MON_PROBE:
4286 handle_probe(op);
4287 break;
4288
4289 // Sync (i.e., the new slurp, but on steroids)
4290 case MSG_MON_SYNC:
4291 handle_sync(op);
4292 break;
4293 case MSG_MON_SCRUB:
4294 handle_scrub(op);
4295 break;
4296
4297 /* log acks are sent from a monitor we sent the MLog to, and are
4298 never sent by clients to us. */
4299 case MSG_LOGACK:
4300 log_client.handle_log_ack((MLogAck*)op->get_req());
4301 break;
4302
4303 // monmap
4304 case MSG_MON_JOIN:
4305 op->set_type_service();
4306 paxos_service[PAXOS_MONMAP]->dispatch(op);
4307 break;
4308
4309 // paxos
4310 case MSG_MON_PAXOS:
4311 {
4312 op->set_type_paxos();
4313 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4314 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4315 //can't send these!
4316 break;
4317 }
4318
4319 if (state == STATE_SYNCHRONIZING) {
4320 // we are synchronizing. These messages would do us no
4321 // good, thus just drop them and ignore them.
4322 dout(10) << __func__ << " ignore paxos msg from "
4323 << pm->get_source_inst() << dendl;
4324 break;
4325 }
4326
4327 // sanitize
4328 if (pm->epoch > get_epoch()) {
4329 bootstrap();
4330 break;
4331 }
4332 if (pm->epoch != get_epoch()) {
4333 break;
4334 }
4335
4336 paxos->dispatch(op);
4337 }
4338 break;
4339
4340 // elector messages
4341 case MSG_MON_ELECTION:
4342 op->set_type_election();
4343 //check privileges here for simplicity
4344 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4345 dout(0) << "MMonElection received from entity without enough caps!"
4346 << op->get_session()->caps << dendl;
4347 break;
4348 }
4349 if (!is_probing() && !is_synchronizing()) {
4350 elector.dispatch(op);
4351 }
4352 break;
4353
4354 case MSG_FORWARD:
4355 handle_forward(op);
4356 break;
4357
4358 case MSG_TIMECHECK:
4359 handle_timecheck(op);
4360 break;
4361
4362 case MSG_MON_HEALTH:
4363 health_monitor->dispatch(op);
4364 break;
4365
224ce89b
WB
4366 case MSG_MON_HEALTH_CHECKS:
4367 op->set_type_service();
4368 paxos_service[PAXOS_HEALTH]->dispatch(op);
4369 break;
4370
7c673cae
FG
4371 default:
4372 dealt_with = false;
4373 break;
4374 }
4375 if (!dealt_with) {
4376 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4377 goto drop;
4378 }
4379 return;
4380
4381drop:
4382 return;
4383}
4384
4385void Monitor::handle_ping(MonOpRequestRef op)
4386{
4387 MPing *m = static_cast<MPing*>(op->get_req());
4388 dout(10) << __func__ << " " << *m << dendl;
4389 MPing *reply = new MPing;
4390 entity_inst_t inst = m->get_source_inst();
4391 bufferlist payload;
4392 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4393 f->open_object_section("pong");
4394
4395 list<string> health_str;
4396 get_health(health_str, NULL, f.get());
4397 {
4398 stringstream ss;
4399 get_mon_status(f.get(), ss);
4400 }
4401
4402 f->close_section();
4403 stringstream ss;
4404 f->flush(ss);
4405 ::encode(ss.str(), payload);
4406 reply->set_payload(payload);
4407 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4408 messenger->send_message(reply, inst);
4409}
4410
4411void Monitor::timecheck_start()
4412{
4413 dout(10) << __func__ << dendl;
4414 timecheck_cleanup();
4415 timecheck_start_round();
4416}
4417
4418void Monitor::timecheck_finish()
4419{
4420 dout(10) << __func__ << dendl;
4421 timecheck_cleanup();
4422}
4423
4424void Monitor::timecheck_start_round()
4425{
4426 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4427 assert(is_leader());
4428
4429 if (monmap->size() == 1) {
4430 assert(0 == "We are alone; this shouldn't have been scheduled!");
4431 return;
4432 }
4433
4434 if (timecheck_round % 2) {
4435 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4436 utime_t curr_time = ceph_clock_now();
4437 double max = g_conf->mon_timecheck_interval*3;
4438 if (curr_time - timecheck_round_start < max) {
4439 dout(10) << __func__ << " keep current round going" << dendl;
4440 goto out;
4441 } else {
4442 dout(10) << __func__
4443 << " finish current timecheck and start new" << dendl;
4444 timecheck_cancel_round();
4445 }
4446 }
4447
4448 assert(timecheck_round % 2 == 0);
4449 timecheck_acks = 0;
4450 timecheck_round ++;
4451 timecheck_round_start = ceph_clock_now();
4452 dout(10) << __func__ << " new " << timecheck_round << dendl;
4453
4454 timecheck();
4455out:
4456 dout(10) << __func__ << " setting up next event" << dendl;
4457 timecheck_reset_event();
4458}
4459
4460void Monitor::timecheck_finish_round(bool success)
4461{
4462 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4463 assert(timecheck_round % 2);
4464 timecheck_round ++;
4465 timecheck_round_start = utime_t();
4466
4467 if (success) {
4468 assert(timecheck_waiting.empty());
4469 assert(timecheck_acks == quorum.size());
4470 timecheck_report();
4471 timecheck_check_skews();
4472 return;
4473 }
4474
4475 dout(10) << __func__ << " " << timecheck_waiting.size()
4476 << " peers still waiting:";
4477 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4478 p != timecheck_waiting.end(); ++p) {
4479 *_dout << " " << p->first.name;
4480 }
4481 *_dout << dendl;
4482 timecheck_waiting.clear();
4483
4484 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4485}
4486
4487void Monitor::timecheck_cancel_round()
4488{
4489 timecheck_finish_round(false);
4490}
4491
4492void Monitor::timecheck_cleanup()
4493{
4494 timecheck_round = 0;
4495 timecheck_acks = 0;
4496 timecheck_round_start = utime_t();
4497
4498 if (timecheck_event) {
4499 timer.cancel_event(timecheck_event);
4500 timecheck_event = NULL;
4501 }
4502 timecheck_waiting.clear();
4503 timecheck_skews.clear();
4504 timecheck_latencies.clear();
4505
4506 timecheck_rounds_since_clean = 0;
4507}
4508
4509void Monitor::timecheck_reset_event()
4510{
4511 if (timecheck_event) {
4512 timer.cancel_event(timecheck_event);
4513 timecheck_event = NULL;
4514 }
4515
4516 double delay =
4517 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4518
4519 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4520 delay = cct->_conf->mon_timecheck_interval;
4521 }
4522
4523 dout(10) << __func__ << " delay " << delay
4524 << " rounds_since_clean " << timecheck_rounds_since_clean
4525 << dendl;
4526
4527 timecheck_event = new C_MonContext(this, [this](int) {
4528 timecheck_start_round();
4529 });
4530 timer.add_event_after(delay, timecheck_event);
4531}
4532
4533void Monitor::timecheck_check_skews()
4534{
4535 dout(10) << __func__ << dendl;
4536 assert(is_leader());
4537 assert((timecheck_round % 2) == 0);
4538 if (monmap->size() == 1) {
4539 assert(0 == "We are alone; we shouldn't have gotten here!");
4540 return;
4541 }
4542 assert(timecheck_latencies.size() == timecheck_skews.size());
4543
4544 bool found_skew = false;
4545 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4546 p != timecheck_skews.end(); ++p) {
4547
4548 double abs_skew;
4549 if (timecheck_has_skew(p->second, &abs_skew)) {
4550 dout(10) << __func__
4551 << " " << p->first << " skew " << abs_skew << dendl;
4552 found_skew = true;
4553 }
4554 }
4555
4556 if (found_skew) {
4557 ++timecheck_rounds_since_clean;
4558 timecheck_reset_event();
4559 } else if (timecheck_rounds_since_clean > 0) {
4560 dout(1) << __func__
4561 << " no clock skews found after " << timecheck_rounds_since_clean
4562 << " rounds" << dendl;
4563 // make sure the skews are really gone and not just a transient success
4564 // this will run just once if not in the presence of skews again.
4565 timecheck_rounds_since_clean = 1;
4566 timecheck_reset_event();
4567 timecheck_rounds_since_clean = 0;
4568 }
4569
4570}
4571
4572void Monitor::timecheck_report()
4573{
4574 dout(10) << __func__ << dendl;
4575 assert(is_leader());
4576 assert((timecheck_round % 2) == 0);
4577 if (monmap->size() == 1) {
4578 assert(0 == "We are alone; we shouldn't have gotten here!");
4579 return;
4580 }
4581
4582 assert(timecheck_latencies.size() == timecheck_skews.size());
4583 bool do_output = true; // only output report once
4584 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4585 if (monmap->get_name(*q) == name)
4586 continue;
4587
4588 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4589 m->epoch = get_epoch();
4590 m->round = timecheck_round;
4591
4592 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4593 it != timecheck_skews.end(); ++it) {
4594 double skew = it->second;
4595 double latency = timecheck_latencies[it->first];
4596
4597 m->skews[it->first] = skew;
4598 m->latencies[it->first] = latency;
4599
4600 if (do_output) {
4601 dout(25) << __func__ << " " << it->first
4602 << " latency " << latency
4603 << " skew " << skew << dendl;
4604 }
4605 }
4606 do_output = false;
4607 entity_inst_t inst = monmap->get_inst(*q);
4608 dout(10) << __func__ << " send report to " << inst << dendl;
4609 messenger->send_message(m, inst);
4610 }
4611}
4612
4613void Monitor::timecheck()
4614{
4615 dout(10) << __func__ << dendl;
4616 assert(is_leader());
4617 if (monmap->size() == 1) {
4618 assert(0 == "We are alone; we shouldn't have gotten here!");
4619 return;
4620 }
4621 assert(timecheck_round % 2 != 0);
4622
4623 timecheck_acks = 1; // we ack ourselves
4624
4625 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4626 << " round " << timecheck_round << dendl;
4627
4628 // we are at the eye of the storm; the point of reference
4629 timecheck_skews[messenger->get_myinst()] = 0.0;
4630 timecheck_latencies[messenger->get_myinst()] = 0.0;
4631
4632 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4633 if (monmap->get_name(*it) == name)
4634 continue;
4635
4636 entity_inst_t inst = monmap->get_inst(*it);
4637 utime_t curr_time = ceph_clock_now();
4638 timecheck_waiting[inst] = curr_time;
4639 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4640 m->epoch = get_epoch();
4641 m->round = timecheck_round;
4642 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4643 messenger->send_message(m, inst);
4644 }
4645}
4646
4647health_status_t Monitor::timecheck_status(ostringstream &ss,
4648 const double skew_bound,
4649 const double latency)
4650{
4651 health_status_t status = HEALTH_OK;
4652 assert(latency >= 0);
4653
4654 double abs_skew;
4655 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4656 status = HEALTH_WARN;
4657 ss << "clock skew " << abs_skew << "s"
4658 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4659 }
4660
4661 return status;
4662}
4663
4664void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4665{
4666 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4667 dout(10) << __func__ << " " << *m << dendl;
4668 /* handles PONG's */
4669 assert(m->op == MTimeCheck::OP_PONG);
4670
4671 entity_inst_t other = m->get_source_inst();
4672 if (m->epoch < get_epoch()) {
4673 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4674 << " from " << other
4675 << " curr " << get_epoch()
4676 << " -- severely lagged? discard" << dendl;
4677 return;
4678 }
4679 assert(m->epoch == get_epoch());
4680
4681 if (m->round < timecheck_round) {
4682 dout(1) << __func__ << " got old round " << m->round
4683 << " from " << other
4684 << " curr " << timecheck_round << " -- discard" << dendl;
4685 return;
4686 }
4687
4688 utime_t curr_time = ceph_clock_now();
4689
4690 assert(timecheck_waiting.count(other) > 0);
4691 utime_t timecheck_sent = timecheck_waiting[other];
4692 timecheck_waiting.erase(other);
4693 if (curr_time < timecheck_sent) {
4694 // our clock was readjusted -- drop everything until it all makes sense.
4695 dout(1) << __func__ << " our clock was readjusted --"
4696 << " bump round and drop current check"
4697 << dendl;
4698 timecheck_cancel_round();
4699 return;
4700 }
4701
4702 /* update peer latencies */
4703 double latency = (double)(curr_time - timecheck_sent);
4704
4705 if (timecheck_latencies.count(other) == 0)
4706 timecheck_latencies[other] = latency;
4707 else {
4708 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4709 timecheck_latencies[other] = avg_latency;
4710 }
4711
4712 /*
4713 * update skews
4714 *
4715 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4716 * and 'a' happens to be lower than 'b'; so we use double instead.
4717 *
4718 * latency is always expected to be >= 0.
4719 *
4720 * delta, the difference between theirs timestamp and ours, may either be
4721 * lower or higher than 0; will hardly ever be 0.
4722 *
4723 * The absolute skew is the absolute delta minus the latency, which is
4724 * taken as a whole instead of an rtt given that there is some queueing
4725 * and dispatch times involved and it's hard to assess how long exactly
4726 * it took for the message to travel to the other side and be handled. So
4727 * we call it a bounded skew, the worst case scenario.
4728 *
4729 * Now, to math!
4730 *
4731 * Given that the latency is always positive, we can establish that the
4732 * bounded skew will be:
4733 *
4734 * 1. positive if the absolute delta is higher than the latency and
4735 * delta is positive
4736 * 2. negative if the absolute delta is higher than the latency and
4737 * delta is negative.
4738 * 3. zero if the absolute delta is lower than the latency.
4739 *
4740 * On 3. we make a judgement call and treat the skew as non-existent.
4741 * This is because that, if the absolute delta is lower than the
4742 * latency, then the apparently existing skew is nothing more than a
4743 * side-effect of the high latency at work.
4744 *
4745 * This may not be entirely true though, as a severely skewed clock
4746 * may be masked by an even higher latency, but with high latencies
4747 * we probably have worse issues to deal with than just skewed clocks.
4748 */
4749 assert(latency >= 0);
4750
4751 double delta = ((double) m->timestamp) - ((double) curr_time);
4752 double abs_delta = (delta > 0 ? delta : -delta);
4753 double skew_bound = abs_delta - latency;
4754 if (skew_bound < 0)
4755 skew_bound = 0;
4756 else if (delta < 0)
4757 skew_bound = -skew_bound;
4758
4759 ostringstream ss;
4760 health_status_t status = timecheck_status(ss, skew_bound, latency);
4761 if (status == HEALTH_ERR)
4762 clog->error() << other << " " << ss.str();
4763 else if (status == HEALTH_WARN)
4764 clog->warn() << other << " " << ss.str();
4765
4766 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4767 << " delta " << delta << " skew_bound " << skew_bound
4768 << " latency " << latency << dendl;
4769
4770 timecheck_skews[other] = skew_bound;
4771
4772 timecheck_acks++;
4773 if (timecheck_acks == quorum.size()) {
4774 dout(10) << __func__ << " got pongs from everybody ("
4775 << timecheck_acks << " total)" << dendl;
4776 assert(timecheck_skews.size() == timecheck_acks);
4777 assert(timecheck_waiting.empty());
4778 // everyone has acked, so bump the round to finish it.
4779 timecheck_finish_round();
4780 }
4781}
4782
4783void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4784{
4785 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4786 dout(10) << __func__ << " " << *m << dendl;
4787
4788 assert(is_peon());
4789 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4790
4791 if (m->epoch != get_epoch()) {
4792 dout(1) << __func__ << " got wrong epoch "
4793 << "(ours " << get_epoch()
4794 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4795 return;
4796 }
4797
4798 if (m->round < timecheck_round) {
4799 dout(1) << __func__ << " got old round " << m->round
4800 << " current " << timecheck_round
4801 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4802 return;
4803 }
4804
4805 timecheck_round = m->round;
4806
4807 if (m->op == MTimeCheck::OP_REPORT) {
4808 assert((timecheck_round % 2) == 0);
4809 timecheck_latencies.swap(m->latencies);
4810 timecheck_skews.swap(m->skews);
4811 return;
4812 }
4813
4814 assert((timecheck_round % 2) != 0);
4815 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4816 utime_t curr_time = ceph_clock_now();
4817 reply->timestamp = curr_time;
4818 reply->epoch = m->epoch;
4819 reply->round = m->round;
4820 dout(10) << __func__ << " send " << *m
4821 << " to " << m->get_source_inst() << dendl;
4822 m->get_connection()->send_message(reply);
4823}
4824
4825void Monitor::handle_timecheck(MonOpRequestRef op)
4826{
4827 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4828 dout(10) << __func__ << " " << *m << dendl;
4829
4830 if (is_leader()) {
4831 if (m->op != MTimeCheck::OP_PONG) {
4832 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4833 } else {
4834 handle_timecheck_leader(op);
4835 }
4836 } else if (is_peon()) {
4837 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4838 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4839 } else {
4840 handle_timecheck_peon(op);
4841 }
4842 } else {
4843 dout(1) << __func__ << " drop unexpected msg" << dendl;
4844 }
4845}
4846
4847void Monitor::handle_subscribe(MonOpRequestRef op)
4848{
4849 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4850 dout(10) << "handle_subscribe " << *m << dendl;
4851
4852 bool reply = false;
4853
4854 MonSession *s = op->get_session();
4855 assert(s);
4856
4857 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4858 p != m->what.end();
4859 ++p) {
4860 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4861 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4862 reply = true;
4863
4864 // remove conflicting subscribes
4865 if (logmon()->sub_name_to_id(p->first) >= 0) {
4866 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4867 it != s->sub_map.end(); ) {
4868 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4869 Mutex::Locker l(session_map_lock);
4870 session_map.remove_sub((it++)->second);
4871 } else {
4872 ++it;
4873 }
4874 }
4875 }
4876
4877 {
4878 Mutex::Locker l(session_map_lock);
4879 session_map.add_update_sub(s, p->first, p->second.start,
4880 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4881 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4882 }
4883
4884 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4885 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4886 if ((int)s->is_capable("mds", MON_CAP_R)) {
4887 Subscription *sub = s->sub_map[p->first];
4888 assert(sub != nullptr);
4889 mdsmon()->check_sub(sub);
4890 }
4891 } else if (p->first == "osdmap") {
4892 if ((int)s->is_capable("osd", MON_CAP_R)) {
4893 if (s->osd_epoch > p->second.start) {
4894 // client needs earlier osdmaps on purpose, so reset the sent epoch
4895 s->osd_epoch = 0;
4896 }
4897 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4898 }
4899 } else if (p->first == "osd_pg_creates") {
4900 if ((int)s->is_capable("osd", MON_CAP_W)) {
4901 if (monmap->get_required_features().contains_all(
4902 ceph::features::mon::FEATURE_LUMINOUS)) {
4903 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
4904 } else {
4905 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
4906 }
4907 }
4908 } else if (p->first == "monmap") {
4909 monmon()->check_sub(s->sub_map[p->first]);
4910 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
4911 logmon()->check_sub(s->sub_map[p->first]);
4912 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
4913 mgrmon()->check_sub(s->sub_map[p->first]);
224ce89b
WB
4914 } else if (p->first == "servicemap") {
4915 mgrstatmon()->check_sub(s->sub_map[p->first]);
7c673cae
FG
4916 }
4917 }
4918
4919 if (reply) {
4920 // we only need to reply if the client is old enough to think it
4921 // has to send renewals.
4922 ConnectionRef con = m->get_connection();
4923 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
4924 m->get_connection()->send_message(new MMonSubscribeAck(
4925 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
4926 }
4927
4928}
4929
4930void Monitor::handle_get_version(MonOpRequestRef op)
4931{
4932 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
4933 dout(10) << "handle_get_version " << *m << dendl;
4934 PaxosService *svc = NULL;
4935
4936 MonSession *s = op->get_session();
4937 assert(s);
4938
4939 if (!is_leader() && !is_peon()) {
4940 dout(10) << " waiting for quorum" << dendl;
4941 waitfor_quorum.push_back(new C_RetryMessage(this, op));
4942 goto out;
4943 }
4944
4945 if (m->what == "mdsmap") {
4946 svc = mdsmon();
4947 } else if (m->what == "fsmap") {
4948 svc = mdsmon();
4949 } else if (m->what == "osdmap") {
4950 svc = osdmon();
4951 } else if (m->what == "monmap") {
4952 svc = monmon();
4953 } else {
4954 derr << "invalid map type " << m->what << dendl;
4955 }
4956
4957 if (svc) {
4958 if (!svc->is_readable()) {
4959 svc->wait_for_readable(op, new C_RetryMessage(this, op));
4960 goto out;
4961 }
4962
4963 MMonGetVersionReply *reply = new MMonGetVersionReply();
4964 reply->handle = m->handle;
4965 reply->version = svc->get_last_committed();
4966 reply->oldest_version = svc->get_first_committed();
4967 reply->set_tid(m->get_tid());
4968
4969 m->get_connection()->send_message(reply);
4970 }
4971 out:
4972 return;
4973}
4974
4975bool Monitor::ms_handle_reset(Connection *con)
4976{
4977 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
4978
4979 // ignore lossless monitor sessions
4980 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4981 return false;
4982
4983 MonSession *s = static_cast<MonSession *>(con->get_priv());
4984 if (!s)
4985 return false;
4986
4987 // break any con <-> session ref cycle
4988 s->con->set_priv(NULL);
4989
4990 if (is_shutdown())
4991 return false;
4992
4993 Mutex::Locker l(lock);
4994
4995 dout(10) << "reset/close on session " << s->inst << dendl;
4996 if (!s->closed) {
4997 Mutex::Locker l(session_map_lock);
4998 remove_session(s);
4999 }
5000 s->put();
5001 return true;
5002}
5003
5004bool Monitor::ms_handle_refused(Connection *con)
5005{
5006 // just log for now...
5007 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5008 return false;
5009}
5010
5011// -----
5012
5013void Monitor::send_latest_monmap(Connection *con)
5014{
5015 bufferlist bl;
5016 monmap->encode(bl, con->get_features());
5017 con->send_message(new MMonMap(bl));
5018}
5019
5020void Monitor::handle_mon_get_map(MonOpRequestRef op)
5021{
5022 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5023 dout(10) << "handle_mon_get_map" << dendl;
5024 send_latest_monmap(m->get_connection().get());
5025}
5026
5027void Monitor::handle_mon_metadata(MonOpRequestRef op)
5028{
5029 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5030 if (is_leader()) {
5031 dout(10) << __func__ << dendl;
5032 update_mon_metadata(m->get_source().num(), std::move(m->data));
5033 }
5034}
5035
5036void Monitor::update_mon_metadata(int from, Metadata&& m)
5037{
224ce89b
WB
5038 // NOTE: this is now for legacy (kraken or jewel) mons only.
5039 pending_metadata[from] = std::move(m);
7c673cae
FG
5040
5041 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
224ce89b 5042 bufferlist bl;
7c673cae
FG
5043 ::encode(pending_metadata, bl);
5044 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5045 paxos->trigger_propose();
5046}
5047
224ce89b 5048int Monitor::load_metadata()
7c673cae
FG
5049{
5050 bufferlist bl;
5051 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5052 if (r)
5053 return r;
5054 bufferlist::iterator it = bl.begin();
224ce89b
WB
5055 ::decode(mon_metadata, it);
5056
5057 pending_metadata = mon_metadata;
7c673cae
FG
5058 return 0;
5059}
5060
5061int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5062{
5063 assert(f);
224ce89b 5064 if (!mon_metadata.count(mon)) {
7c673cae
FG
5065 err << "mon." << mon << " not found";
5066 return -EINVAL;
5067 }
224ce89b 5068 const Metadata& m = mon_metadata[mon];
7c673cae
FG
5069 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5070 f->dump_string(p->first.c_str(), p->second);
5071 }
5072 return 0;
5073}
5074
c07f9fc5 5075void Monitor::count_metadata(const string& field, map<string,int> *out)
31f18b77 5076{
224ce89b 5077 for (auto& p : mon_metadata) {
31f18b77
FG
5078 auto q = p.second.find(field);
5079 if (q == p.second.end()) {
c07f9fc5 5080 (*out)["unknown"]++;
31f18b77 5081 } else {
c07f9fc5 5082 (*out)[q->second]++;
31f18b77
FG
5083 }
5084 }
c07f9fc5
FG
5085}
5086
5087void Monitor::count_metadata(const string& field, Formatter *f)
5088{
5089 map<string,int> by_val;
5090 count_metadata(field, &by_val);
31f18b77
FG
5091 f->open_object_section(field.c_str());
5092 for (auto& p : by_val) {
5093 f->dump_int(p.first.c_str(), p.second);
5094 }
5095 f->close_section();
5096}
5097
7c673cae
FG
5098int Monitor::print_nodes(Formatter *f, ostream& err)
5099{
7c673cae 5100 map<string, list<int> > mons; // hostname => mon
224ce89b
WB
5101 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5102 it != mon_metadata.end(); ++it) {
7c673cae
FG
5103 const Metadata& m = it->second;
5104 Metadata::const_iterator hostname = m.find("hostname");
5105 if (hostname == m.end()) {
5106 // not likely though
5107 continue;
5108 }
5109 mons[hostname->second].push_back(it->first);
5110 }
5111
5112 dump_services(f, mons, "mon");
5113 return 0;
5114}
5115
5116// ----------------------------------------------
5117// scrub
5118
5119int Monitor::scrub_start()
5120{
5121 dout(10) << __func__ << dendl;
5122 assert(is_leader());
5123
5124 if (!scrub_result.empty()) {
5125 clog->info() << "scrub already in progress";
5126 return -EBUSY;
5127 }
5128
5129 scrub_event_cancel();
5130 scrub_result.clear();
5131 scrub_state.reset(new ScrubState);
5132
5133 scrub();
5134 return 0;
5135}
5136
5137int Monitor::scrub()
5138{
5139 assert(is_leader());
5140 assert(scrub_state);
5141
5142 scrub_cancel_timeout();
5143 wait_for_paxos_write();
5144 scrub_version = paxos->get_version();
5145
5146
5147 // scrub all keys if we're the only monitor in the quorum
5148 int32_t num_keys =
5149 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5150
5151 for (set<int>::iterator p = quorum.begin();
5152 p != quorum.end();
5153 ++p) {
5154 if (*p == rank)
5155 continue;
5156 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5157 num_keys);
5158 r->key = scrub_state->last_key;
5159 messenger->send_message(r, monmap->get_inst(*p));
5160 }
5161
5162 // scrub my keys
5163 bool r = _scrub(&scrub_result[rank],
5164 &scrub_state->last_key,
5165 &num_keys);
5166
5167 scrub_state->finished = !r;
5168
5169 // only after we got our scrub results do we really care whether the
5170 // other monitors are late on their results. Also, this way we avoid
5171 // triggering the timeout if we end up getting stuck in _scrub() for
5172 // longer than the duration of the timeout.
5173 scrub_reset_timeout();
5174
5175 if (quorum.size() == 1) {
5176 assert(scrub_state->finished == true);
5177 scrub_finish();
5178 }
5179 return 0;
5180}
5181
5182void Monitor::handle_scrub(MonOpRequestRef op)
5183{
5184 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5185 dout(10) << __func__ << " " << *m << dendl;
5186 switch (m->op) {
5187 case MMonScrub::OP_SCRUB:
5188 {
5189 if (!is_peon())
5190 break;
5191
5192 wait_for_paxos_write();
5193
5194 if (m->version != paxos->get_version())
5195 break;
5196
5197 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5198 m->version,
5199 m->num_keys);
5200
5201 reply->key = m->key;
5202 _scrub(&reply->result, &reply->key, &reply->num_keys);
5203 m->get_connection()->send_message(reply);
5204 }
5205 break;
5206
5207 case MMonScrub::OP_RESULT:
5208 {
5209 if (!is_leader())
5210 break;
5211 if (m->version != scrub_version)
5212 break;
5213 // reset the timeout each time we get a result
5214 scrub_reset_timeout();
5215
5216 int from = m->get_source().num();
5217 assert(scrub_result.count(from) == 0);
5218 scrub_result[from] = m->result;
5219
5220 if (scrub_result.size() == quorum.size()) {
5221 scrub_check_results();
5222 scrub_result.clear();
5223 if (scrub_state->finished)
5224 scrub_finish();
5225 else
5226 scrub();
5227 }
5228 }
5229 break;
5230 }
5231}
5232
5233bool Monitor::_scrub(ScrubResult *r,
5234 pair<string,string> *start,
5235 int *num_keys)
5236{
5237 assert(r != NULL);
5238 assert(start != NULL);
5239 assert(num_keys != NULL);
5240
5241 set<string> prefixes = get_sync_targets_names();
5242 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5243
5244 dout(10) << __func__ << " start (" << *start << ")"
5245 << " num_keys " << *num_keys << dendl;
5246
5247 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5248
5249 int scrubbed_keys = 0;
5250 pair<string,string> last_key;
5251
5252 while (it->has_next_chunk()) {
5253
5254 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5255 break;
5256
5257 pair<string,string> k = it->get_next_key();
5258 if (prefixes.count(k.first) == 0)
5259 continue;
5260
5261 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5262 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5263 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5264 << dendl;
5265 continue;
5266 }
5267
5268 bufferlist bl;
224ce89b
WB
5269 int err = store->get(k.first, k.second, bl);
5270 assert(err == 0);
5271
7c673cae
FG
5272 uint32_t key_crc = bl.crc32c(0);
5273 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5274 << " crc " << key_crc << dendl;
5275 r->prefix_keys[k.first]++;
224ce89b 5276 if (r->prefix_crc.count(k.first) == 0) {
7c673cae 5277 r->prefix_crc[k.first] = 0;
224ce89b 5278 }
7c673cae
FG
5279 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5280
5281 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5282 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5283 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5284 r->prefix_crc[k.first] += 1;
5285 }
5286
5287 ++scrubbed_keys;
5288 last_key = k;
5289 }
5290
5291 dout(20) << __func__ << " last_key (" << last_key << ")"
5292 << " scrubbed_keys " << scrubbed_keys
5293 << " has_next " << it->has_next_chunk() << dendl;
5294
5295 *start = last_key;
5296 *num_keys = scrubbed_keys;
5297
5298 return it->has_next_chunk();
5299}
5300
5301void Monitor::scrub_check_results()
5302{
5303 dout(10) << __func__ << dendl;
5304
5305 // compare
5306 int errors = 0;
5307 ScrubResult& mine = scrub_result[rank];
5308 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5309 p != scrub_result.end();
5310 ++p) {
5311 if (p->first == rank)
5312 continue;
5313 if (p->second != mine) {
5314 ++errors;
5315 clog->error() << "scrub mismatch";
5316 clog->error() << " mon." << rank << " " << mine;
5317 clog->error() << " mon." << p->first << " " << p->second;
5318 }
5319 }
5320 if (!errors)
d2e6a577 5321 clog->debug() << "scrub ok on " << quorum << ": " << mine;
7c673cae
FG
5322}
5323
5324inline void Monitor::scrub_timeout()
5325{
5326 dout(1) << __func__ << " restarting scrub" << dendl;
5327 scrub_reset();
5328 scrub_start();
5329}
5330
5331void Monitor::scrub_finish()
5332{
5333 dout(10) << __func__ << dendl;
5334 scrub_reset();
5335 scrub_event_start();
5336}
5337
5338void Monitor::scrub_reset()
5339{
5340 dout(10) << __func__ << dendl;
5341 scrub_cancel_timeout();
5342 scrub_version = 0;
5343 scrub_result.clear();
5344 scrub_state.reset();
5345}
5346
5347inline void Monitor::scrub_update_interval(int secs)
5348{
5349 // we don't care about changes if we are not the leader.
5350 // changes will be visible if we become the leader.
5351 if (!is_leader())
5352 return;
5353
5354 dout(1) << __func__ << " new interval = " << secs << dendl;
5355
5356 // if scrub already in progress, all changes will already be visible during
5357 // the next round. Nothing to do.
5358 if (scrub_state != NULL)
5359 return;
5360
5361 scrub_event_cancel();
5362 scrub_event_start();
5363}
5364
5365void Monitor::scrub_event_start()
5366{
5367 dout(10) << __func__ << dendl;
5368
5369 if (scrub_event)
5370 scrub_event_cancel();
5371
5372 if (cct->_conf->mon_scrub_interval <= 0) {
5373 dout(1) << __func__ << " scrub event is disabled"
5374 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5375 << ")" << dendl;
5376 return;
5377 }
5378
5379 scrub_event = new C_MonContext(this, [this](int) {
5380 scrub_start();
5381 });
5382 timer.add_event_after(cct->_conf->mon_scrub_interval, scrub_event);
5383}
5384
5385void Monitor::scrub_event_cancel()
5386{
5387 dout(10) << __func__ << dendl;
5388 if (scrub_event) {
5389 timer.cancel_event(scrub_event);
5390 scrub_event = NULL;
5391 }
5392}
5393
5394inline void Monitor::scrub_cancel_timeout()
5395{
5396 if (scrub_timeout_event) {
5397 timer.cancel_event(scrub_timeout_event);
5398 scrub_timeout_event = NULL;
5399 }
5400}
5401
5402void Monitor::scrub_reset_timeout()
5403{
5404 dout(15) << __func__ << " reset timeout event" << dendl;
5405 scrub_cancel_timeout();
5406
5407 scrub_timeout_event = new C_MonContext(this, [this](int) {
5408 scrub_timeout();
5409 });
5410 timer.add_event_after(g_conf->mon_scrub_timeout, scrub_timeout_event);
5411}
5412
5413/************ TICK ***************/
5414void Monitor::new_tick()
5415{
5416 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5417 tick();
5418 }));
5419}
5420
5421void Monitor::tick()
5422{
5423 // ok go.
5424 dout(11) << "tick" << dendl;
5425
5426 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5427 (*p)->tick();
5428 (*p)->maybe_trim();
5429 }
5430
5431 // trim sessions
5432 utime_t now = ceph_clock_now();
5433 {
5434 Mutex::Locker l(session_map_lock);
5435 auto p = session_map.sessions.begin();
5436
5437 bool out_for_too_long = (!exited_quorum.is_zero() &&
5438 now > (exited_quorum + 2*g_conf->mon_lease));
5439
5440 while (!p.end()) {
5441 MonSession *s = *p;
5442 ++p;
5443
5444 // don't trim monitors
5445 if (s->inst.name.is_mon())
5446 continue;
5447
5448 if (s->session_timeout < now && s->con) {
5449 // check keepalive, too
5450 s->session_timeout = s->con->get_last_keepalive();
5451 s->session_timeout += g_conf->mon_session_timeout;
5452 }
5453 if (s->session_timeout < now) {
5454 dout(10) << " trimming session " << s->con << " " << s->inst
5455 << " (timeout " << s->session_timeout
5456 << " < now " << now << ")" << dendl;
5457 } else if (out_for_too_long) {
5458 // boot the client Session because we've taken too long getting back in
5459 dout(10) << " trimming session " << s->con << " " << s->inst
5460 << " because we've been out of quorum too long" << dendl;
5461 } else {
5462 continue;
5463 }
5464
5465 s->con->mark_down();
5466 remove_session(s);
5467 logger->inc(l_mon_session_trim);
5468 }
5469 }
5470 sync_trim_providers();
5471
5472 if (!maybe_wait_for_quorum.empty()) {
5473 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5474 }
5475
5476 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5477 // this is only necessary on upgraded clusters.
5478 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5479 prepare_new_fingerprint(t);
5480 paxos->trigger_propose();
5481 }
5482
5483 new_tick();
5484}
5485
5486void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5487{
5488 uuid_d nf;
5489 nf.generate_random();
5490 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5491
5492 bufferlist bl;
5493 ::encode(nf, bl);
5494 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5495}
5496
5497int Monitor::check_fsid()
5498{
5499 bufferlist ebl;
5500 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5501 if (r == -ENOENT)
5502 return r;
5503 assert(r == 0);
5504
5505 string es(ebl.c_str(), ebl.length());
5506
5507 // only keep the first line
5508 size_t pos = es.find_first_of('\n');
5509 if (pos != string::npos)
5510 es.resize(pos);
5511
5512 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5513 uuid_d ondisk;
5514 if (!ondisk.parse(es.c_str())) {
5515 derr << "error: unable to parse uuid" << dendl;
5516 return -EINVAL;
5517 }
5518
5519 if (monmap->get_fsid() != ondisk) {
5520 derr << "error: cluster_uuid file exists with value " << ondisk
5521 << ", != our uuid " << monmap->get_fsid() << dendl;
5522 return -EEXIST;
5523 }
5524
5525 return 0;
5526}
5527
5528int Monitor::write_fsid()
5529{
5530 auto t(std::make_shared<MonitorDBStore::Transaction>());
5531 write_fsid(t);
5532 int r = store->apply_transaction(t);
5533 return r;
5534}
5535
5536int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5537{
5538 ostringstream ss;
5539 ss << monmap->get_fsid() << "\n";
5540 string us = ss.str();
5541
5542 bufferlist b;
5543 b.append(us);
5544
5545 t->put(MONITOR_NAME, "cluster_uuid", b);
5546 return 0;
5547}
5548
5549/*
5550 * this is the closest thing to a traditional 'mkfs' for ceph.
5551 * initialize the monitor state machines to their initial values.
5552 */
5553int Monitor::mkfs(bufferlist& osdmapbl)
5554{
5555 auto t(std::make_shared<MonitorDBStore::Transaction>());
5556
5557 // verify cluster fsid
5558 int r = check_fsid();
5559 if (r < 0 && r != -ENOENT)
5560 return r;
5561
5562 bufferlist magicbl;
5563 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5564 magicbl.append("\n");
5565 t->put(MONITOR_NAME, "magic", magicbl);
5566
5567
5568 features = get_initial_supported_features();
5569 write_features(t);
5570
5571 // save monmap, osdmap, keyring.
5572 bufferlist monmapbl;
5573 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5574 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5575 t->put("mkfs", "monmap", monmapbl);
5576
5577 if (osdmapbl.length()) {
5578 // make sure it's a valid osdmap
5579 try {
5580 OSDMap om;
5581 om.decode(osdmapbl);
5582 }
5583 catch (buffer::error& e) {
5584 derr << "error decoding provided osdmap: " << e.what() << dendl;
5585 return -EINVAL;
5586 }
5587 t->put("mkfs", "osdmap", osdmapbl);
5588 }
5589
5590 if (is_keyring_required()) {
5591 KeyRing keyring;
5592 string keyring_filename;
5593
5594 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5595 if (r) {
5596 derr << "unable to find a keyring file on " << g_conf->keyring
5597 << ": " << cpp_strerror(r) << dendl;
5598 if (g_conf->key != "") {
5599 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5600 "\n\tcaps mon = \"allow *\"\n";
5601 bufferlist bl;
5602 bl.append(keyring_plaintext);
5603 try {
5604 bufferlist::iterator i = bl.begin();
5605 keyring.decode_plaintext(i);
5606 }
5607 catch (const buffer::error& e) {
5608 derr << "error decoding keyring " << keyring_plaintext
5609 << ": " << e.what() << dendl;
5610 return -EINVAL;
5611 }
5612 } else {
5613 return -ENOENT;
5614 }
5615 } else {
5616 r = keyring.load(g_ceph_context, keyring_filename);
5617 if (r < 0) {
5618 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5619 return r;
5620 }
5621 }
5622
5623 // put mon. key in external keyring; seed with everything else.
5624 extract_save_mon_key(keyring);
5625
5626 bufferlist keyringbl;
5627 keyring.encode_plaintext(keyringbl);
5628 t->put("mkfs", "keyring", keyringbl);
5629 }
5630 write_fsid(t);
5631 store->apply_transaction(t);
5632
5633 return 0;
5634}
5635
5636int Monitor::write_default_keyring(bufferlist& bl)
5637{
5638 ostringstream os;
5639 os << g_conf->mon_data << "/keyring";
5640
5641 int err = 0;
5642 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5643 if (fd < 0) {
5644 err = -errno;
5645 dout(0) << __func__ << " failed to open " << os.str()
5646 << ": " << cpp_strerror(err) << dendl;
5647 return err;
5648 }
5649
5650 err = bl.write_fd(fd);
5651 if (!err)
5652 ::fsync(fd);
5653 VOID_TEMP_FAILURE_RETRY(::close(fd));
5654
5655 return err;
5656}
5657
5658void Monitor::extract_save_mon_key(KeyRing& keyring)
5659{
5660 EntityName mon_name;
5661 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5662 EntityAuth mon_key;
5663 if (keyring.get_auth(mon_name, mon_key)) {
5664 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5665 KeyRing pkey;
5666 pkey.add(mon_name, mon_key);
5667 bufferlist bl;
5668 pkey.encode_plaintext(bl);
5669 write_default_keyring(bl);
5670 keyring.remove(mon_name);
5671 }
5672}
5673
5674bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5675 bool force_new)
5676{
5677 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5678 << dendl;
5679
5680 if (is_shutdown())
5681 return false;
5682
5683 // we only connect to other monitors and mgr; every else connects to us.
5684 if (service_id != CEPH_ENTITY_TYPE_MON &&
5685 service_id != CEPH_ENTITY_TYPE_MGR)
5686 return false;
5687
5688 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5689 // auth_none
5690 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5691 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5692 handler.set_global_id(0);
5693 *authorizer = handler.build_authorizer(service_id);
5694 return true;
5695 }
5696
5697 CephXServiceTicketInfo auth_ticket_info;
5698 CephXSessionAuthInfo info;
5699 int ret;
5700
5701 EntityName name;
5702 name.set_type(CEPH_ENTITY_TYPE_MON);
5703 auth_ticket_info.ticket.name = name;
5704 auth_ticket_info.ticket.global_id = 0;
5705
5706 if (service_id == CEPH_ENTITY_TYPE_MON) {
5707 // mon to mon authentication uses the private monitor shared key and not the
5708 // rotating key
5709 CryptoKey secret;
5710 if (!keyring.get_secret(name, secret) &&
5711 !key_server.get_secret(name, secret)) {
5712 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5713 << dendl;
5714 stringstream ss, ds;
5715 int err = key_server.list_secrets(ds);
5716 if (err < 0)
5717 ss << "no installed auth entries!";
5718 else
5719 ss << "installed auth entries:";
5720 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5721 return false;
5722 }
5723
5724 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5725 secret, (uint64_t)-1);
5726 if (ret < 0) {
5727 dout(0) << __func__ << " failed to build mon session_auth_info "
5728 << cpp_strerror(ret) << dendl;
5729 return false;
5730 }
5731 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5732 // mgr
5733 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5734 if (ret < 0) {
5735 derr << __func__ << " failed to build mgr service session_auth_info "
5736 << cpp_strerror(ret) << dendl;
5737 return false;
5738 }
5739 } else {
5740 ceph_abort(); // see check at top of fn
5741 }
5742
5743 CephXTicketBlob blob;
5744 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5745 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5746 return false;
5747 }
5748 bufferlist ticket_data;
5749 ::encode(blob, ticket_data);
5750
5751 bufferlist::iterator iter = ticket_data.begin();
5752 CephXTicketHandler handler(g_ceph_context, service_id);
5753 ::decode(handler.ticket, iter);
5754
5755 handler.session_key = info.session_key;
5756
5757 *authorizer = handler.build_authorizer(0);
5758
5759 return true;
5760}
5761
5762bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5763 int protocol, bufferlist& authorizer_data,
5764 bufferlist& authorizer_reply,
5765 bool& isvalid, CryptoKey& session_key)
5766{
5767 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5768 << " " << ceph_entity_type_name(peer_type)
5769 << " protocol " << protocol << dendl;
5770
5771 if (is_shutdown())
5772 return false;
5773
5774 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5775 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5776 // monitor, and cephx is enabled
5777 isvalid = false;
5778 if (protocol == CEPH_AUTH_CEPHX) {
5779 bufferlist::iterator iter = authorizer_data.begin();
5780 CephXServiceTicketInfo auth_ticket_info;
5781
5782 if (authorizer_data.length()) {
5783 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5784 auth_ticket_info, authorizer_reply);
5785 if (ret) {
5786 session_key = auth_ticket_info.session_key;
5787 isvalid = true;
5788 } else {
5789 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5790 }
5791 }
5792 } else {
5793 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5794 }
5795 } else {
5796 // who cares.
5797 isvalid = true;
5798 }
5799 return true;
5800}