]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/DaemonServer.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "DaemonServer.h"
15 #include "mgr/Mgr.h"
16
17 #include "include/stringify.h"
18 #include "include/str_list.h"
19 #include "auth/RotatingKeyRing.h"
20 #include "json_spirit/json_spirit_writer.h"
21
22 #include "mgr/mgr_commands.h"
23 #include "mgr/OSDHealthMetricCollector.h"
24 #include "mon/MonCommand.h"
25
26 #include "messages/MMgrOpen.h"
27 #include "messages/MMgrConfigure.h"
28 #include "messages/MMonMgrReport.h"
29 #include "messages/MCommand.h"
30 #include "messages/MCommandReply.h"
31 #include "messages/MPGStats.h"
32 #include "messages/MOSDScrub.h"
33 #include "messages/MOSDForceRecovery.h"
34 #include "common/errno.h"
35
36 #define dout_context g_ceph_context
37 #define dout_subsys ceph_subsys_mgr
38 #undef dout_prefix
39 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
40
41
42
43 DaemonServer::DaemonServer(MonClient *monc_,
44 Finisher &finisher_,
45 DaemonStateIndex &daemon_state_,
46 ClusterState &cluster_state_,
47 PyModuleRegistry &py_modules_,
48 LogChannelRef clog_,
49 LogChannelRef audit_clog_)
50 : Dispatcher(g_ceph_context),
51 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
52 g_conf->get_val<uint64_t>("mgr_client_bytes"))),
53 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
54 g_conf->get_val<uint64_t>("mgr_client_messages"))),
55 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
56 g_conf->get_val<uint64_t>("mgr_osd_bytes"))),
57 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
58 g_conf->get_val<uint64_t>("mgr_osd_messages"))),
59 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
60 g_conf->get_val<uint64_t>("mgr_mds_bytes"))),
61 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
62 g_conf->get_val<uint64_t>("mgr_mds_messages"))),
63 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
64 g_conf->get_val<uint64_t>("mgr_mon_bytes"))),
65 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
66 g_conf->get_val<uint64_t>("mgr_mon_messages"))),
67 msgr(nullptr),
68 monc(monc_),
69 finisher(finisher_),
70 daemon_state(daemon_state_),
71 cluster_state(cluster_state_),
72 py_modules(py_modules_),
73 clog(clog_),
74 audit_clog(audit_clog_),
75 auth_cluster_registry(g_ceph_context,
76 g_conf->auth_supported.empty() ?
77 g_conf->auth_cluster_required :
78 g_conf->auth_supported),
79 auth_service_registry(g_ceph_context,
80 g_conf->auth_supported.empty() ?
81 g_conf->auth_service_required :
82 g_conf->auth_supported),
83 lock("DaemonServer"),
84 pgmap_ready(false)
85 {
86 g_conf->add_observer(this);
87 }
88
89 DaemonServer::~DaemonServer() {
90 delete msgr;
91 g_conf->remove_observer(this);
92 }
93
94 int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
95 {
96 // Initialize Messenger
97 std::string public_msgr_type = g_conf->ms_public_type.empty() ?
98 g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
99 msgr = Messenger::create(g_ceph_context, public_msgr_type,
100 entity_name_t::MGR(gid),
101 "mgr",
102 getpid(), 0);
103 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
104
105 // throttle clients
106 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
107 client_byte_throttler.get(),
108 client_msg_throttler.get());
109
110 // servers
111 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
112 osd_byte_throttler.get(),
113 osd_msg_throttler.get());
114 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
115 mds_byte_throttler.get(),
116 mds_msg_throttler.get());
117 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
118 mon_byte_throttler.get(),
119 mon_msg_throttler.get());
120
121 int r = msgr->bind(g_conf->public_addr);
122 if (r < 0) {
123 derr << "unable to bind mgr to " << g_conf->public_addr << dendl;
124 return r;
125 }
126
127 msgr->set_myname(entity_name_t::MGR(gid));
128 msgr->set_addr_unknowns(client_addr);
129
130 msgr->start();
131 msgr->add_dispatcher_tail(this);
132
133 started_at = ceph_clock_now();
134
135 return 0;
136 }
137
138 entity_addr_t DaemonServer::get_myaddr() const
139 {
140 return msgr->get_myaddr();
141 }
142
143
144 bool DaemonServer::ms_verify_authorizer(
145 Connection *con,
146 int peer_type,
147 int protocol,
148 ceph::bufferlist& authorizer_data,
149 ceph::bufferlist& authorizer_reply,
150 bool& is_valid,
151 CryptoKey& session_key,
152 std::unique_ptr<AuthAuthorizerChallenge> *challenge)
153 {
154 AuthAuthorizeHandler *handler = nullptr;
155 if (peer_type == CEPH_ENTITY_TYPE_OSD ||
156 peer_type == CEPH_ENTITY_TYPE_MON ||
157 peer_type == CEPH_ENTITY_TYPE_MDS ||
158 peer_type == CEPH_ENTITY_TYPE_MGR) {
159 handler = auth_cluster_registry.get_handler(protocol);
160 } else {
161 handler = auth_service_registry.get_handler(protocol);
162 }
163 if (!handler) {
164 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
165 is_valid = false;
166 return true;
167 }
168
169 MgrSessionRef s(new MgrSession(cct));
170 s->inst.addr = con->get_peer_addr();
171 AuthCapsInfo caps_info;
172
173 RotatingKeyRing *keys = monc->rotating_secrets.get();
174 if (keys) {
175 is_valid = handler->verify_authorizer(
176 cct, keys,
177 authorizer_data,
178 authorizer_reply, s->entity_name,
179 s->global_id, caps_info,
180 session_key,
181 nullptr,
182 challenge);
183 } else {
184 dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
185 is_valid = false;
186 }
187
188 if (is_valid) {
189 if (caps_info.allow_all) {
190 dout(10) << " session " << s << " " << s->entity_name
191 << " allow_all" << dendl;
192 s->caps.set_allow_all();
193 }
194 if (caps_info.caps.length() > 0) {
195 bufferlist::iterator p = caps_info.caps.begin();
196 string str;
197 try {
198 ::decode(str, p);
199 }
200 catch (buffer::error& e) {
201 }
202 bool success = s->caps.parse(str);
203 if (success) {
204 dout(10) << " session " << s << " " << s->entity_name
205 << " has caps " << s->caps << " '" << str << "'" << dendl;
206 } else {
207 dout(10) << " session " << s << " " << s->entity_name
208 << " failed to parse caps '" << str << "'" << dendl;
209 is_valid = false;
210 }
211 }
212 con->set_priv(s->get());
213
214 if (peer_type == CEPH_ENTITY_TYPE_OSD) {
215 Mutex::Locker l(lock);
216 s->osd_id = atoi(s->entity_name.get_id().c_str());
217 dout(10) << "registering osd." << s->osd_id << " session "
218 << s << " con " << con << dendl;
219 osd_cons[s->osd_id].insert(con);
220 }
221 }
222
223 return true;
224 }
225
226
227 bool DaemonServer::ms_get_authorizer(int dest_type,
228 AuthAuthorizer **authorizer, bool force_new)
229 {
230 dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
231
232 if (dest_type == CEPH_ENTITY_TYPE_MON) {
233 return true;
234 }
235
236 if (force_new) {
237 if (monc->wait_auth_rotating(10) < 0)
238 return false;
239 }
240
241 *authorizer = monc->build_authorizer(dest_type);
242 dout(20) << "got authorizer " << *authorizer << dendl;
243 return *authorizer != NULL;
244 }
245
246 bool DaemonServer::ms_handle_reset(Connection *con)
247 {
248 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
249 MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
250 if (!session) {
251 return false;
252 }
253 session->put(); // SessionRef takes a ref
254 Mutex::Locker l(lock);
255 dout(10) << "unregistering osd." << session->osd_id
256 << " session " << session << " con " << con << dendl;
257 osd_cons[session->osd_id].erase(con);
258
259 auto iter = daemon_connections.find(con);
260 if (iter != daemon_connections.end()) {
261 daemon_connections.erase(iter);
262 }
263 }
264 return false;
265 }
266
267 bool DaemonServer::ms_handle_refused(Connection *con)
268 {
269 // do nothing for now
270 return false;
271 }
272
273 bool DaemonServer::ms_dispatch(Message *m)
274 {
275 // Note that we do *not* take ::lock here, in order to avoid
276 // serializing all message handling. It's up to each handler
277 // to take whatever locks it needs.
278 switch (m->get_type()) {
279 case MSG_PGSTATS:
280 cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
281 maybe_ready(m->get_source().num());
282 m->put();
283 return true;
284 case MSG_MGR_REPORT:
285 return handle_report(static_cast<MMgrReport*>(m));
286 case MSG_MGR_OPEN:
287 return handle_open(static_cast<MMgrOpen*>(m));
288 case MSG_COMMAND:
289 return handle_command(static_cast<MCommand*>(m));
290 default:
291 dout(1) << "Unhandled message type " << m->get_type() << dendl;
292 return false;
293 };
294 }
295
296 void DaemonServer::maybe_ready(int32_t osd_id)
297 {
298 if (pgmap_ready.load()) {
299 // Fast path: we don't need to take lock because pgmap_ready
300 // is already set
301 } else {
302 Mutex::Locker l(lock);
303
304 if (reported_osds.find(osd_id) == reported_osds.end()) {
305 dout(4) << "initial report from osd " << osd_id << dendl;
306 reported_osds.insert(osd_id);
307 std::set<int32_t> up_osds;
308
309 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
310 osdmap.get_up_osds(up_osds);
311 });
312
313 std::set<int32_t> unreported_osds;
314 std::set_difference(up_osds.begin(), up_osds.end(),
315 reported_osds.begin(), reported_osds.end(),
316 std::inserter(unreported_osds, unreported_osds.begin()));
317
318 if (unreported_osds.size() == 0) {
319 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
320 pgmap_ready = true;
321 reported_osds.clear();
322 // Avoid waiting for next tick
323 send_report();
324 } else {
325 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
326 " to report in before PGMap is ready" << dendl;
327 }
328 }
329 }
330 }
331
332 void DaemonServer::shutdown()
333 {
334 dout(10) << "begin" << dendl;
335 msgr->shutdown();
336 msgr->wait();
337 dout(10) << "done" << dendl;
338 }
339
340
341
342 bool DaemonServer::handle_open(MMgrOpen *m)
343 {
344 Mutex::Locker l(lock);
345
346 DaemonKey key;
347 if (!m->service_name.empty()) {
348 key.first = m->service_name;
349 } else {
350 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
351 }
352 key.second = m->daemon_name;
353
354 dout(4) << "from " << m->get_connection() << " " << key << dendl;
355
356 _send_configure(m->get_connection());
357
358 DaemonStatePtr daemon;
359 if (daemon_state.exists(key)) {
360 daemon = daemon_state.get(key);
361 }
362 if (daemon) {
363 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
364 Mutex::Locker l(daemon->lock);
365 daemon->perf_counters.clear();
366 }
367
368 if (m->service_daemon) {
369 if (!daemon) {
370 dout(4) << "constructing new DaemonState for " << key << dendl;
371 daemon = std::make_shared<DaemonState>(daemon_state.types);
372 daemon->key = key;
373 if (m->daemon_metadata.count("hostname")) {
374 daemon->hostname = m->daemon_metadata["hostname"];
375 }
376 daemon_state.insert(daemon);
377 }
378 Mutex::Locker l(daemon->lock);
379 daemon->service_daemon = true;
380 daemon->metadata = m->daemon_metadata;
381 daemon->service_status = m->daemon_status;
382
383 utime_t now = ceph_clock_now();
384 auto d = pending_service_map.get_daemon(m->service_name,
385 m->daemon_name);
386 if (d->gid != (uint64_t)m->get_source().num()) {
387 dout(10) << "registering " << key << " in pending_service_map" << dendl;
388 d->gid = m->get_source().num();
389 d->addr = m->get_source_addr();
390 d->start_epoch = pending_service_map.epoch;
391 d->start_stamp = now;
392 d->metadata = m->daemon_metadata;
393 pending_service_map_dirty = pending_service_map.epoch;
394 }
395 }
396
397 if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT &&
398 m->service_name.empty())
399 {
400 // Store in set of the daemon/service connections, i.e. those
401 // connections that require an update in the event of stats
402 // configuration changes.
403 daemon_connections.insert(m->get_connection());
404 }
405
406 m->put();
407 return true;
408 }
409
410 bool DaemonServer::handle_report(MMgrReport *m)
411 {
412 DaemonKey key;
413 if (!m->service_name.empty()) {
414 key.first = m->service_name;
415 } else {
416 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
417 }
418 key.second = m->daemon_name;
419
420 dout(4) << "from " << m->get_connection() << " " << key << dendl;
421
422 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
423 m->service_name.empty()) {
424 // Clients should not be sending us stats unless they are declaring
425 // themselves to be a daemon for some service.
426 dout(4) << "rejecting report from non-daemon client " << m->daemon_name
427 << dendl;
428 m->get_connection()->mark_down();
429 m->put();
430 return true;
431 }
432
433 // Look up the DaemonState
434 DaemonStatePtr daemon;
435 if (daemon_state.exists(key)) {
436 dout(20) << "updating existing DaemonState for " << key << dendl;
437 daemon = daemon_state.get(key);
438 } else {
439 // we don't know the hostname at this stage, reject MMgrReport here.
440 dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
441 << dendl;
442
443 // issue metadata request in background
444 if (!daemon_state.is_updating(key) &&
445 (key.first == "osd" || key.first == "mds")) {
446
447 std::ostringstream oss;
448 auto c = new MetadataUpdate(daemon_state, key);
449 if (key.first == "osd") {
450 oss << "{\"prefix\": \"osd metadata\", \"id\": "
451 << key.second<< "}";
452
453 } else if (key.first == "mds") {
454 c->set_default("addr", stringify(m->get_source_addr()));
455 oss << "{\"prefix\": \"mds metadata\", \"who\": \""
456 << key.second << "\"}";
457
458 } else {
459 ceph_abort();
460 }
461
462 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
463 }
464
465 {
466 Mutex::Locker l(lock);
467 // kill session
468 MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
469 if (!session) {
470 return false;
471 }
472 m->get_connection()->mark_down();
473 session->put();
474
475 dout(10) << "unregistering osd." << session->osd_id
476 << " session " << session << " con " << m->get_connection() << dendl;
477
478 if (osd_cons.find(session->osd_id) != osd_cons.end()) {
479 osd_cons[session->osd_id].erase(m->get_connection());
480 }
481
482 auto iter = daemon_connections.find(m->get_connection());
483 if (iter != daemon_connections.end()) {
484 daemon_connections.erase(iter);
485 }
486 }
487
488 return false;
489 }
490
491 // Update the DaemonState
492 assert(daemon != nullptr);
493 {
494 Mutex::Locker l(daemon->lock);
495 auto &daemon_counters = daemon->perf_counters;
496 daemon_counters.update(m);
497
498 if (daemon->service_daemon) {
499 utime_t now = ceph_clock_now();
500 if (m->daemon_status) {
501 daemon->service_status = *m->daemon_status;
502 daemon->service_status_stamp = now;
503 }
504 daemon->last_service_beacon = now;
505 } else if (m->daemon_status) {
506 derr << "got status from non-daemon " << key << dendl;
507 }
508 if (m->get_connection()->peer_is_osd()) {
509 // only OSD sends health_checks to me now
510 daemon->osd_health_metrics = std::move(m->osd_health_metrics);
511 }
512 }
513
514 // if there are any schema updates, notify the python modules
515 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
516 ostringstream oss;
517 oss << key.first << '.' << key.second;
518 py_modules.notify_all("perf_schema_update", oss.str());
519 }
520
521 m->put();
522 return true;
523 }
524
525
526 void DaemonServer::_generate_command_map(
527 map<string,cmd_vartype>& cmdmap,
528 map<string,string> &param_str_map)
529 {
530 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
531 p != cmdmap.end(); ++p) {
532 if (p->first == "prefix")
533 continue;
534 if (p->first == "caps") {
535 vector<string> cv;
536 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
537 cv.size() % 2 == 0) {
538 for (unsigned i = 0; i < cv.size(); i += 2) {
539 string k = string("caps_") + cv[i];
540 param_str_map[k] = cv[i + 1];
541 }
542 continue;
543 }
544 }
545 param_str_map[p->first] = cmd_vartype_stringify(p->second);
546 }
547 }
548
549 const MonCommand *DaemonServer::_get_mgrcommand(
550 const string &cmd_prefix,
551 const std::vector<MonCommand> &cmds)
552 {
553 const MonCommand *this_cmd = nullptr;
554 for (const auto &cmd : cmds) {
555 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
556 this_cmd = &cmd;
557 break;
558 }
559 }
560 return this_cmd;
561 }
562
563 bool DaemonServer::_allowed_command(
564 MgrSession *s,
565 const string &module,
566 const string &prefix,
567 const map<string,cmd_vartype>& cmdmap,
568 const map<string,string>& param_str_map,
569 const MonCommand *this_cmd) {
570
571 if (s->entity_name.is_mon()) {
572 // mon is all-powerful. even when it is forwarding commands on behalf of
573 // old clients; we expect the mon is validating commands before proxying!
574 return true;
575 }
576
577 bool cmd_r = this_cmd->requires_perm('r');
578 bool cmd_w = this_cmd->requires_perm('w');
579 bool cmd_x = this_cmd->requires_perm('x');
580
581 bool capable = s->caps.is_capable(
582 g_ceph_context,
583 CEPH_ENTITY_TYPE_MGR,
584 s->entity_name,
585 module, prefix, param_str_map,
586 cmd_r, cmd_w, cmd_x);
587
588 dout(10) << " " << s->entity_name << " "
589 << (capable ? "" : "not ") << "capable" << dendl;
590 return capable;
591 }
592
593 bool DaemonServer::handle_command(MCommand *m)
594 {
595 Mutex::Locker l(lock);
596 int r = 0;
597 std::stringstream ss;
598 std::string prefix;
599
600 assert(lock.is_locked_by_me());
601
602 /**
603 * The working data for processing an MCommand. This lives in
604 * a class to enable passing it into other threads for processing
605 * outside of the thread/locks that called handle_command.
606 */
607 class CommandContext
608 {
609 public:
610 MCommand *m;
611 bufferlist odata;
612 cmdmap_t cmdmap;
613
614 CommandContext(MCommand *m_)
615 : m(m_)
616 {
617 }
618
619 ~CommandContext()
620 {
621 m->put();
622 }
623
624 void reply(int r, const std::stringstream &ss)
625 {
626 reply(r, ss.str());
627 }
628
629 void reply(int r, const std::string &rs)
630 {
631 // Let the connection drop as soon as we've sent our response
632 ConnectionRef con = m->get_connection();
633 if (con) {
634 con->mark_disposable();
635 }
636
637 dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl;
638 if (con) {
639 MCommandReply *reply = new MCommandReply(r, rs);
640 reply->set_tid(m->get_tid());
641 reply->set_data(odata);
642 con->send_message(reply);
643 }
644 }
645 };
646
647 /**
648 * A context for receiving a bufferlist/error string from a background
649 * function and then calling back to a CommandContext when it's done
650 */
651 class ReplyOnFinish : public Context {
652 std::shared_ptr<CommandContext> cmdctx;
653
654 public:
655 bufferlist from_mon;
656 string outs;
657
658 ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
659 : cmdctx(cmdctx_)
660 {}
661 void finish(int r) override {
662 cmdctx->odata.claim_append(from_mon);
663 cmdctx->reply(r, outs);
664 }
665 };
666
667 std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
668
669 MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
670 if (!session) {
671 return true;
672 }
673 session->put(); // SessionRef takes a ref
674 if (session->inst.name == entity_name_t())
675 session->inst.name = m->get_source();
676
677 std::string format;
678 boost::scoped_ptr<Formatter> f;
679 map<string,string> param_str_map;
680
681 if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
682 cmdctx->reply(-EINVAL, ss);
683 return true;
684 }
685
686 {
687 cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
688 f.reset(Formatter::create(format));
689 }
690
691 cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
692
693 dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
694 dout(4) << "prefix=" << prefix << dendl;
695
696 if (prefix == "get_command_descriptions") {
697 dout(10) << "reading commands from python modules" << dendl;
698 const auto py_commands = py_modules.get_commands();
699
700 int cmdnum = 0;
701 JSONFormatter f;
702 f.open_object_section("command_descriptions");
703
704 auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){
705 ostringstream secname;
706 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
707 dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring,
708 mc.module, mc.req_perms, mc.availability, 0);
709 cmdnum++;
710 };
711
712 for (const auto &pyc : py_commands) {
713 dump_cmd(pyc);
714 }
715
716 for (const auto &mgr_cmd : mgr_commands) {
717 dump_cmd(mgr_cmd);
718 }
719
720 f.close_section(); // command_descriptions
721 f.flush(cmdctx->odata);
722 cmdctx->reply(0, ss);
723 return true;
724 }
725
726 // lookup command
727 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
728 _generate_command_map(cmdctx->cmdmap, param_str_map);
729 if (!mgr_cmd) {
730 MonCommand py_command = {"", "", "py", "rw", "cli"};
731 if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
732 param_str_map, &py_command)) {
733 dout(1) << " access denied" << dendl;
734 ss << "access denied; does your client key have mgr caps?"
735 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
736 cmdctx->reply(-EACCES, ss);
737 return true;
738 }
739 } else {
740 // validate user's permissions for requested command
741 if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
742 param_str_map, mgr_cmd)) {
743 dout(1) << " access denied" << dendl;
744 audit_clog->info() << "from='" << session->inst << "' "
745 << "entity='" << session->entity_name << "' "
746 << "cmd=" << m->cmd << ": access denied";
747 ss << "access denied' does your client key have mgr caps?"
748 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
749 cmdctx->reply(-EACCES, ss);
750 return true;
751 }
752 }
753
754 audit_clog->debug()
755 << "from='" << session->inst << "' "
756 << "entity='" << session->entity_name << "' "
757 << "cmd=" << m->cmd << ": dispatch";
758
759 // ----------------
760 // service map commands
761 if (prefix == "service dump") {
762 if (!f)
763 f.reset(Formatter::create("json-pretty"));
764 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
765 f->dump_object("service_map", service_map);
766 });
767 f->flush(cmdctx->odata);
768 cmdctx->reply(0, ss);
769 return true;
770 }
771 if (prefix == "service status") {
772 if (!f)
773 f.reset(Formatter::create("json-pretty"));
774 // only include state from services that are in the persisted service map
775 f->open_object_section("service_status");
776 ServiceMap s;
777 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
778 s = service_map;
779 });
780 for (auto& p : s.services) {
781 f->open_object_section(p.first.c_str());
782 for (auto& q : p.second.daemons) {
783 f->open_object_section(q.first.c_str());
784 DaemonKey key(p.first, q.first);
785 assert(daemon_state.exists(key));
786 auto daemon = daemon_state.get(key);
787 Mutex::Locker l(daemon->lock);
788 f->dump_stream("status_stamp") << daemon->service_status_stamp;
789 f->dump_stream("last_beacon") << daemon->last_service_beacon;
790 f->open_object_section("status");
791 for (auto& r : daemon->service_status) {
792 f->dump_string(r.first.c_str(), r.second);
793 }
794 f->close_section();
795 f->close_section();
796 }
797 f->close_section();
798 }
799 f->close_section();
800 f->flush(cmdctx->odata);
801 cmdctx->reply(0, ss);
802 return true;
803 }
804
805 if (prefix == "config set") {
806 std::string key;
807 std::string val;
808 cmd_getval(cct, cmdctx->cmdmap, "key", key);
809 cmd_getval(cct, cmdctx->cmdmap, "value", val);
810 r = cct->_conf->set_val(key, val, true, &ss);
811 if (r == 0) {
812 cct->_conf->apply_changes(nullptr);
813 }
814 cmdctx->reply(0, ss);
815 return true;
816 }
817
818 // -----------
819 // PG commands
820
821 if (prefix == "pg scrub" ||
822 prefix == "pg repair" ||
823 prefix == "pg deep-scrub") {
824 string scrubop = prefix.substr(3, string::npos);
825 pg_t pgid;
826 string pgidstr;
827 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
828 if (!pgid.parse(pgidstr.c_str())) {
829 ss << "invalid pgid '" << pgidstr << "'";
830 cmdctx->reply(-EINVAL, ss);
831 return true;
832 }
833 bool pg_exists = false;
834 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
835 pg_exists = osdmap.pg_exists(pgid);
836 });
837 if (!pg_exists) {
838 ss << "pg " << pgid << " dne";
839 cmdctx->reply(-ENOENT, ss);
840 return true;
841 }
842 int acting_primary = -1;
843 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
844 acting_primary = osdmap.get_pg_acting_primary(pgid);
845 });
846 if (acting_primary == -1) {
847 ss << "pg " << pgid << " has no primary osd";
848 cmdctx->reply(-EAGAIN, ss);
849 return true;
850 }
851 auto p = osd_cons.find(acting_primary);
852 if (p == osd_cons.end()) {
853 ss << "pg " << pgid << " primary osd." << acting_primary
854 << " is not currently connected";
855 cmdctx->reply(-EAGAIN, ss);
856 }
857 vector<pg_t> pgs = { pgid };
858 for (auto& con : p->second) {
859 con->send_message(new MOSDScrub(monc->get_fsid(),
860 pgs,
861 scrubop == "repair",
862 scrubop == "deep-scrub"));
863 }
864 ss << "instructing pg " << pgid << " on osd." << acting_primary
865 << " to " << scrubop;
866 cmdctx->reply(0, ss);
867 return true;
868 } else if (prefix == "osd scrub" ||
869 prefix == "osd deep-scrub" ||
870 prefix == "osd repair") {
871 string whostr;
872 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
873 vector<string> pvec;
874 get_str_vec(prefix, pvec);
875
876 set<int> osds;
877 if (whostr == "*" || whostr == "all" || whostr == "any") {
878 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
879 for (int i = 0; i < osdmap.get_max_osd(); i++)
880 if (osdmap.is_up(i)) {
881 osds.insert(i);
882 }
883 });
884 } else {
885 long osd = parse_osd_id(whostr.c_str(), &ss);
886 if (osd < 0) {
887 ss << "invalid osd '" << whostr << "'";
888 cmdctx->reply(-EINVAL, ss);
889 return true;
890 }
891 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
892 if (osdmap.is_up(osd)) {
893 osds.insert(osd);
894 }
895 });
896 if (osds.empty()) {
897 ss << "osd." << osd << " is not up";
898 cmdctx->reply(-EAGAIN, ss);
899 return true;
900 }
901 }
902 set<int> sent_osds, failed_osds;
903 for (auto osd : osds) {
904 auto p = osd_cons.find(osd);
905 if (p == osd_cons.end()) {
906 failed_osds.insert(osd);
907 } else {
908 sent_osds.insert(osd);
909 for (auto& con : p->second) {
910 con->send_message(new MOSDScrub(monc->get_fsid(),
911 pvec.back() == "repair",
912 pvec.back() == "deep-scrub"));
913 }
914 }
915 }
916 if (failed_osds.size() == osds.size()) {
917 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
918 << " (not connected)";
919 r = -EAGAIN;
920 } else {
921 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
922 if (!failed_osds.empty()) {
923 ss << "; osd(s) " << failed_osds << " were not connected";
924 }
925 r = 0;
926 }
927 cmdctx->reply(0, ss);
928 return true;
929 } else if (prefix == "osd reweight-by-pg" ||
930 prefix == "osd reweight-by-utilization" ||
931 prefix == "osd test-reweight-by-pg" ||
932 prefix == "osd test-reweight-by-utilization") {
933 bool by_pg =
934 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
935 bool dry_run =
936 prefix == "osd test-reweight-by-pg" ||
937 prefix == "osd test-reweight-by-utilization";
938 int64_t oload;
939 cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
940 set<int64_t> pools;
941 vector<string> poolnames;
942 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
943 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
944 for (const auto& poolname : poolnames) {
945 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
946 if (pool < 0) {
947 ss << "pool '" << poolname << "' does not exist";
948 r = -ENOENT;
949 }
950 pools.insert(pool);
951 }
952 });
953 if (r) {
954 cmdctx->reply(r, ss);
955 return true;
956 }
957 double max_change = g_conf->mon_reweight_max_change;
958 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
959 if (max_change <= 0.0) {
960 ss << "max_change " << max_change << " must be positive";
961 cmdctx->reply(-EINVAL, ss);
962 return true;
963 }
964 int64_t max_osds = g_conf->mon_reweight_max_osds;
965 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
966 if (max_osds <= 0) {
967 ss << "max_osds " << max_osds << " must be positive";
968 cmdctx->reply(-EINVAL, ss);
969 return true;
970 }
971 string no_increasing;
972 cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
973 string out_str;
974 mempool::osdmap::map<int32_t, uint32_t> new_weights;
975 r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
976 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
977 return reweight::by_utilization(osdmap, pgmap,
978 oload,
979 max_change,
980 max_osds,
981 by_pg,
982 pools.empty() ? NULL : &pools,
983 no_increasing == "--no-increasing",
984 &new_weights,
985 &ss, &out_str, f.get());
986 });
987 });
988 if (r >= 0) {
989 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
990 }
991 if (f) {
992 f->flush(cmdctx->odata);
993 } else {
994 cmdctx->odata.append(out_str);
995 }
996 if (r < 0) {
997 ss << "FAILED reweight-by-pg";
998 cmdctx->reply(r, ss);
999 return true;
1000 } else if (r == 0 || dry_run) {
1001 ss << "no change";
1002 cmdctx->reply(r, ss);
1003 return true;
1004 } else {
1005 json_spirit::Object json_object;
1006 for (const auto& osd_weight : new_weights) {
1007 json_spirit::Config::add(json_object,
1008 std::to_string(osd_weight.first),
1009 std::to_string(osd_weight.second));
1010 }
1011 string s = json_spirit::write(json_object);
1012 std::replace(begin(s), end(s), '\"', '\'');
1013 const string cmd =
1014 "{"
1015 "\"prefix\": \"osd reweightn\", "
1016 "\"weights\": \"" + s + "\""
1017 "}";
1018 auto on_finish = new ReplyOnFinish(cmdctx);
1019 monc->start_mon_command({cmd}, {},
1020 &on_finish->from_mon, &on_finish->outs, on_finish);
1021 return true;
1022 }
1023 } else if (prefix == "osd df") {
1024 string method;
1025 cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
1026 r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) {
1027 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1028 print_osd_utilization(osdmap, &pgservice, ss,
1029 f.get(), method == "tree");
1030
1031 cmdctx->odata.append(ss);
1032 return 0;
1033 });
1034 });
1035 cmdctx->reply(r, "");
1036 return true;
1037 } else if (prefix == "osd safe-to-destroy") {
1038 vector<string> ids;
1039 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1040 set<int> osds;
1041 int r;
1042 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1043 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1044 });
1045 if (!r && osds.empty()) {
1046 ss << "must specify one or more OSDs";
1047 r = -EINVAL;
1048 }
1049 if (r < 0) {
1050 cmdctx->reply(r, ss);
1051 return true;
1052 }
1053 set<int> active_osds, missing_stats, stored_pgs;
1054 int affected_pgs = 0;
1055 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1056 if (pg_map.num_pg_unknown > 0) {
1057 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
1058 << " any conclusions";
1059 r = -EAGAIN;
1060 return;
1061 }
1062 int num_active_clean = 0;
1063 for (auto& p : pg_map.num_pg_by_state) {
1064 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
1065 if ((p.first & want) == want) {
1066 num_active_clean += p.second;
1067 }
1068 }
1069 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1070 for (auto osd : osds) {
1071 if (!osdmap.exists(osd)) {
1072 continue; // clearly safe to destroy
1073 }
1074 auto q = pg_map.num_pg_by_osd.find(osd);
1075 if (q != pg_map.num_pg_by_osd.end()) {
1076 if (q->second.acting > 0 || q->second.up > 0) {
1077 active_osds.insert(osd);
1078 affected_pgs += q->second.acting + q->second.up;
1079 continue;
1080 }
1081 }
1082 if (num_active_clean < pg_map.num_pg) {
1083 // all pgs aren't active+clean; we need to be careful.
1084 auto p = pg_map.osd_stat.find(osd);
1085 if (p == pg_map.osd_stat.end()) {
1086 missing_stats.insert(osd);
1087 }
1088 if (p->second.num_pgs > 0) {
1089 stored_pgs.insert(osd);
1090 }
1091 }
1092 }
1093 });
1094 });
1095 if (!r && !active_osds.empty()) {
1096 ss << "OSD(s) " << active_osds << " have " << affected_pgs
1097 << " pgs currently mapped to them";
1098 r = -EBUSY;
1099 } else if (!missing_stats.empty()) {
1100 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1101 << " PGs are active+clean; we cannot draw any conclusions";
1102 r = -EAGAIN;
1103 } else if (!stored_pgs.empty()) {
1104 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1105 << " data, and not all PGs are active+clean; we cannot be sure they"
1106 << " aren't still needed.";
1107 r = -EBUSY;
1108 }
1109 if (r) {
1110 cmdctx->reply(r, ss);
1111 return true;
1112 }
1113 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1114 << " durability.";
1115 cmdctx->reply(0, ss);
1116 return true;
1117 } else if (prefix == "osd ok-to-stop") {
1118 vector<string> ids;
1119 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1120 set<int> osds;
1121 int r;
1122 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1123 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1124 });
1125 if (!r && osds.empty()) {
1126 ss << "must specify one or more OSDs";
1127 r = -EINVAL;
1128 }
1129 if (r < 0) {
1130 cmdctx->reply(r, ss);
1131 return true;
1132 }
1133 map<pg_t,int> pg_delta; // pgid -> net acting set size change
1134 int dangerous_pgs = 0;
1135 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1136 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1137 if (pg_map.num_pg_unknown > 0) {
1138 ss << pg_map.num_pg_unknown << " pgs have unknown state; "
1139 << "cannot draw any conclusions";
1140 r = -EAGAIN;
1141 return;
1142 }
1143 for (auto osd : osds) {
1144 auto p = pg_map.pg_by_osd.find(osd);
1145 if (p != pg_map.pg_by_osd.end()) {
1146 for (auto& pgid : p->second) {
1147 --pg_delta[pgid];
1148 }
1149 }
1150 }
1151 for (auto& p : pg_delta) {
1152 auto q = pg_map.pg_stat.find(p.first);
1153 if (q == pg_map.pg_stat.end()) {
1154 ss << "missing information about " << p.first << "; cannot draw"
1155 << " any conclusions";
1156 r = -EAGAIN;
1157 return;
1158 }
1159 if (!(q->second.state & PG_STATE_ACTIVE) ||
1160 (q->second.state & PG_STATE_DEGRADED)) {
1161 // we don't currently have a good way to tell *how* degraded
1162 // a degraded PG is, so we have to assume we cannot remove
1163 // any more replicas/shards.
1164 ++dangerous_pgs;
1165 continue;
1166 }
1167 const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool());
1168 if (!pi) {
1169 ++dangerous_pgs; // pool is creating or deleting
1170 } else {
1171 if (q->second.acting.size() + p.second < pi->min_size) {
1172 ++dangerous_pgs;
1173 }
1174 }
1175 }
1176 });
1177 });
1178 if (r) {
1179 cmdctx->reply(r, ss);
1180 return true;
1181 }
1182 if (dangerous_pgs) {
1183 ss << dangerous_pgs << " PGs are already degraded or might become "
1184 << "unavailable";
1185 cmdctx->reply(-EBUSY, ss);
1186 return true;
1187 }
1188 ss << "OSD(s) " << osds << " are ok to stop without reducing"
1189 << " availability, provided there are no other concurrent failures"
1190 << " or interventions. " << pg_delta.size() << " PGs are likely to be"
1191 << " degraded (but remain available) as a result.";
1192 cmdctx->reply(0, ss);
1193 return true;
1194 } else if (prefix == "pg force-recovery" ||
1195 prefix == "pg force-backfill" ||
1196 prefix == "pg cancel-force-recovery" ||
1197 prefix == "pg cancel-force-backfill") {
1198 string forceop = prefix.substr(3, string::npos);
1199 list<pg_t> parsed_pgs;
1200 map<int, list<pg_t> > osdpgs;
1201
1202 // figure out actual op just once
1203 int actual_op = 0;
1204 if (forceop == "force-recovery") {
1205 actual_op = OFR_RECOVERY;
1206 } else if (forceop == "force-backfill") {
1207 actual_op = OFR_BACKFILL;
1208 } else if (forceop == "cancel-force-backfill") {
1209 actual_op = OFR_BACKFILL | OFR_CANCEL;
1210 } else if (forceop == "cancel-force-recovery") {
1211 actual_op = OFR_RECOVERY | OFR_CANCEL;
1212 }
1213
1214 // covnert pg names to pgs, discard any invalid ones while at it
1215 {
1216 // we don't want to keep pgidstr and pgidstr_nodup forever
1217 vector<string> pgidstr;
1218 // get pgids to process and prune duplicates
1219 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
1220 set<string> pgidstr_nodup(pgidstr.begin(), pgidstr.end());
1221 if (pgidstr.size() != pgidstr_nodup.size()) {
1222 // move elements only when there were duplicates, as this
1223 // reorders them
1224 pgidstr.resize(pgidstr_nodup.size());
1225 auto it = pgidstr_nodup.begin();
1226 for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) {
1227 pgidstr[i] = std::move(*it++);
1228 }
1229 }
1230
1231 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1232 for (auto& pstr : pgidstr) {
1233 pg_t parsed_pg;
1234 if (!parsed_pg.parse(pstr.c_str())) {
1235 ss << "invalid pgid '" << pstr << "'; ";
1236 r = -EINVAL;
1237 } else {
1238 auto workit = pg_map.pg_stat.find(parsed_pg);
1239 if (workit == pg_map.pg_stat.end()) {
1240 ss << "pg " << pstr << " does not exist; ";
1241 r = -ENOENT;
1242 } else {
1243 pg_stat_t workpg = workit->second;
1244
1245 // discard pgs for which user requests are pointless
1246 switch (actual_op)
1247 {
1248 case OFR_RECOVERY:
1249 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) {
1250 // don't return error, user script may be racing with cluster. not fatal.
1251 ss << "pg " << pstr << " doesn't require recovery; ";
1252 continue;
1253 } else if (workpg.state & PG_STATE_FORCED_RECOVERY) {
1254 ss << "pg " << pstr << " recovery already forced; ";
1255 // return error, as it may be a bug in user script
1256 r = -EINVAL;
1257 continue;
1258 }
1259 break;
1260 case OFR_BACKFILL:
1261 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING)) == 0) {
1262 ss << "pg " << pstr << " doesn't require backfilling; ";
1263 continue;
1264 } else if (workpg.state & PG_STATE_FORCED_BACKFILL) {
1265 ss << "pg " << pstr << " backfill already forced; ";
1266 r = -EINVAL;
1267 continue;
1268 }
1269 break;
1270 case OFR_BACKFILL | OFR_CANCEL:
1271 if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) {
1272 ss << "pg " << pstr << " backfill not forced; ";
1273 continue;
1274 }
1275 break;
1276 case OFR_RECOVERY | OFR_CANCEL:
1277 if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) {
1278 ss << "pg " << pstr << " recovery not forced; ";
1279 continue;
1280 }
1281 break;
1282 default:
1283 assert(0 == "actual_op value is not supported");
1284 }
1285
1286 parsed_pgs.push_back(std::move(parsed_pg));
1287 }
1288 }
1289 }
1290
1291 // group pgs to process by osd
1292 for (auto& pgid : parsed_pgs) {
1293 auto workit = pg_map.pg_stat.find(pgid);
1294 if (workit != pg_map.pg_stat.end()) {
1295 pg_stat_t workpg = workit->second;
1296 set<int32_t> osds(workpg.up.begin(), workpg.up.end());
1297 osds.insert(workpg.acting.begin(), workpg.acting.end());
1298 for (auto i : osds) {
1299 osdpgs[i].push_back(pgid);
1300 }
1301 }
1302 }
1303
1304 });
1305 }
1306
1307 // respond with error only when no pgs are correct
1308 // yes, in case of mixed errors, only the last one will be emitted,
1309 // but the message presented will be fine
1310 if (parsed_pgs.size() != 0) {
1311 // clear error to not confuse users/scripts
1312 r = 0;
1313 }
1314
1315 // optimize the command -> messages conversion, use only one message per distinct OSD
1316 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1317 for (auto& i : osdpgs) {
1318 if (osdmap.is_up(i.first)) {
1319 vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end()));
1320 auto p = osd_cons.find(i.first);
1321 if (p == osd_cons.end()) {
1322 ss << "osd." << i.first << " is not currently connected";
1323 r = -EAGAIN;
1324 continue;
1325 }
1326 for (auto& con : p->second) {
1327 con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op));
1328 }
1329 ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; ";
1330 }
1331 }
1332 });
1333 ss << std::endl;
1334 cmdctx->reply(r, ss);
1335 return true;
1336 } else {
1337 r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
1338 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1339 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
1340 f.get(), &ss, &cmdctx->odata);
1341 });
1342 });
1343
1344 if (r != -EOPNOTSUPP) {
1345 cmdctx->reply(r, ss);
1346 return true;
1347 }
1348 }
1349
1350 // None of the special native commands,
1351 ActivePyModule *handler = nullptr;
1352 auto py_commands = py_modules.get_py_commands();
1353 for (const auto &pyc : py_commands) {
1354 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1355 dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
1356 if (pyc_prefix == prefix) {
1357 handler = pyc.handler;
1358 break;
1359 }
1360 }
1361
1362 if (handler == nullptr) {
1363 ss << "No handler found for '" << prefix << "'";
1364 dout(4) << "No handler found for '" << prefix << "'" << dendl;
1365 cmdctx->reply(-EINVAL, ss);
1366 return true;
1367 } else {
1368 // Okay, now we have a handler to call, but we must not call it
1369 // in this thread, because the python handlers can do anything,
1370 // including blocking, and including calling back into mgr.
1371 dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
1372 finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
1373 std::stringstream ds;
1374 std::stringstream ss;
1375 int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
1376 cmdctx->odata.append(ds);
1377 cmdctx->reply(r, ss);
1378 }));
1379 return true;
1380 }
1381 }
1382
1383 void DaemonServer::_prune_pending_service_map()
1384 {
1385 utime_t cutoff = ceph_clock_now();
1386 cutoff -= g_conf->get_val<double>("mgr_service_beacon_grace");
1387 auto p = pending_service_map.services.begin();
1388 while (p != pending_service_map.services.end()) {
1389 auto q = p->second.daemons.begin();
1390 while (q != p->second.daemons.end()) {
1391 DaemonKey key(p->first, q->first);
1392 if (!daemon_state.exists(key)) {
1393 derr << "missing key " << key << dendl;
1394 ++q;
1395 continue;
1396 }
1397 auto daemon = daemon_state.get(key);
1398 Mutex::Locker l(daemon->lock);
1399 if (daemon->last_service_beacon == utime_t()) {
1400 // we must have just restarted; assume they are alive now.
1401 daemon->last_service_beacon = ceph_clock_now();
1402 ++q;
1403 continue;
1404 }
1405 if (daemon->last_service_beacon < cutoff) {
1406 dout(10) << "pruning stale " << p->first << "." << q->first
1407 << " last_beacon " << daemon->last_service_beacon << dendl;
1408 q = p->second.daemons.erase(q);
1409 pending_service_map_dirty = pending_service_map.epoch;
1410 } else {
1411 ++q;
1412 }
1413 }
1414 if (p->second.daemons.empty()) {
1415 p = pending_service_map.services.erase(p);
1416 pending_service_map_dirty = pending_service_map.epoch;
1417 } else {
1418 ++p;
1419 }
1420 }
1421 }
1422
1423 void DaemonServer::send_report()
1424 {
1425 if (!pgmap_ready) {
1426 if (ceph_clock_now() - started_at > g_conf->get_val<int64_t>("mgr_stats_period") * 4.0) {
1427 pgmap_ready = true;
1428 reported_osds.clear();
1429 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1430 << "potentially incomplete PG state to mon" << dendl;
1431 } else {
1432 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1433 << dendl;
1434 return;
1435 }
1436 }
1437
1438 auto m = new MMonMgrReport();
1439 py_modules.get_health_checks(&m->health_checks);
1440
1441 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1442 cluster_state.update_delta_stats();
1443
1444 if (pending_service_map.epoch) {
1445 _prune_pending_service_map();
1446 if (pending_service_map_dirty >= pending_service_map.epoch) {
1447 pending_service_map.modified = ceph_clock_now();
1448 ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
1449 dout(10) << "sending service_map e" << pending_service_map.epoch
1450 << dendl;
1451 pending_service_map.epoch++;
1452 }
1453 }
1454
1455 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1456 // FIXME: no easy way to get mon features here. this will do for
1457 // now, though, as long as we don't make a backward-incompat change.
1458 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
1459 dout(10) << pg_map << dendl;
1460
1461 pg_map.get_health_checks(g_ceph_context, osdmap,
1462 &m->health_checks);
1463
1464 dout(10) << m->health_checks.checks.size() << " health checks"
1465 << dendl;
1466 dout(20) << "health checks:\n";
1467 JSONFormatter jf(true);
1468 jf.dump_object("health_checks", m->health_checks);
1469 jf.flush(*_dout);
1470 *_dout << dendl;
1471 });
1472 });
1473
1474 auto osds = daemon_state.get_by_service("osd");
1475 map<osd_metric, unique_ptr<OSDHealthMetricCollector>> accumulated;
1476 for (const auto& osd : osds) {
1477 Mutex::Locker l(osd.second->lock);
1478 for (const auto& metric : osd.second->osd_health_metrics) {
1479 auto acc = accumulated.find(metric.get_type());
1480 if (acc == accumulated.end()) {
1481 auto collector = OSDHealthMetricCollector::create(metric.get_type());
1482 if (!collector) {
1483 derr << __func__ << " " << osd.first << "." << osd.second
1484 << " sent me an unknown health metric: "
1485 << static_cast<uint8_t>(metric.get_type()) << dendl;
1486 continue;
1487 }
1488 tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
1489 std::move(collector));
1490 }
1491 acc->second->update(osd.first, metric);
1492 }
1493 }
1494 for (const auto& acc : accumulated) {
1495 acc.second->summarize(m->health_checks);
1496 }
1497 // TODO? We currently do not notify the PyModules
1498 // TODO: respect needs_send, so we send the report only if we are asked to do
1499 // so, or the state is updated.
1500 monc->send_mon_message(m);
1501 }
1502
1503 void DaemonServer::got_service_map()
1504 {
1505 Mutex::Locker l(lock);
1506
1507 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
1508 if (pending_service_map.epoch == 0) {
1509 // we just started up
1510 dout(10) << "got initial map e" << service_map.epoch << dendl;
1511 pending_service_map = service_map;
1512 } else {
1513 // we we already active and therefore must have persisted it,
1514 // which means ours is the same or newer.
1515 dout(10) << "got updated map e" << service_map.epoch << dendl;
1516 }
1517 pending_service_map.epoch = service_map.epoch + 1;
1518 });
1519
1520 // cull missing daemons, populate new ones
1521 for (auto& p : pending_service_map.services) {
1522 std::set<std::string> names;
1523 for (auto& q : p.second.daemons) {
1524 names.insert(q.first);
1525 DaemonKey key(p.first, q.first);
1526 if (!daemon_state.exists(key)) {
1527 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
1528 daemon->key = key;
1529 daemon->metadata = q.second.metadata;
1530 if (q.second.metadata.count("hostname")) {
1531 daemon->hostname = q.second.metadata["hostname"];
1532 }
1533 daemon->service_daemon = true;
1534 daemon_state.insert(daemon);
1535 dout(10) << "added missing " << key << dendl;
1536 }
1537 }
1538 daemon_state.cull(p.first, names);
1539 }
1540 }
1541
1542
1543 const char** DaemonServer::get_tracked_conf_keys() const
1544 {
1545 static const char *KEYS[] = {
1546 "mgr_stats_threshold",
1547 "mgr_stats_period",
1548 nullptr
1549 };
1550
1551 return KEYS;
1552 }
1553
1554 void DaemonServer::handle_conf_change(const struct md_config_t *conf,
1555 const std::set <std::string> &changed)
1556 {
1557 dout(4) << "ohai" << dendl;
1558 // We may be called within lock (via MCommand `config set`) or outwith the
1559 // lock (via admin socket `config set`), so handle either case.
1560 const bool initially_locked = lock.is_locked_by_me();
1561 if (!initially_locked) {
1562 lock.Lock();
1563 }
1564
1565 if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
1566 dout(4) << "Updating stats threshold/period on "
1567 << daemon_connections.size() << " clients" << dendl;
1568 // Send a fresh MMgrConfigure to all clients, so that they can follow
1569 // the new policy for transmitting stats
1570 for (auto &c : daemon_connections) {
1571 _send_configure(c);
1572 }
1573 }
1574 }
1575
1576 void DaemonServer::_send_configure(ConnectionRef c)
1577 {
1578 assert(lock.is_locked_by_me());
1579
1580 auto configure = new MMgrConfigure();
1581 configure->stats_period = g_conf->get_val<int64_t>("mgr_stats_period");
1582 configure->stats_threshold = g_conf->get_val<int64_t>("mgr_stats_threshold");
1583 c->send_message(configure);
1584 }
1585