1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
16 #include "include/str_list.h"
17 #include "auth/RotatingKeyRing.h"
18 #include "json_spirit/json_spirit_writer.h"
20 #include "messages/MMgrOpen.h"
21 #include "messages/MMgrConfigure.h"
22 #include "messages/MMonMgrReport.h"
23 #include "messages/MCommand.h"
24 #include "messages/MCommandReply.h"
25 #include "messages/MPGStats.h"
26 #include "messages/MOSDScrub.h"
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_mgr
31 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
33 DaemonServer::DaemonServer(MonClient
*monc_
,
35 DaemonStateIndex
&daemon_state_
,
36 ClusterState
&cluster_state_
,
37 PyModules
&py_modules_
,
39 LogChannelRef audit_clog_
)
40 : Dispatcher(g_ceph_context
),
41 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
42 g_conf
->mgr_client_bytes
)),
43 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
44 g_conf
->mgr_client_messages
)),
45 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
46 g_conf
->mgr_osd_bytes
)),
47 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
48 g_conf
->mgr_osd_messages
)),
49 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
50 g_conf
->mgr_mds_bytes
)),
51 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
52 g_conf
->mgr_mds_messages
)),
53 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
54 g_conf
->mgr_mon_bytes
)),
55 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
56 g_conf
->mgr_mon_messages
)),
60 daemon_state(daemon_state_
),
61 cluster_state(cluster_state_
),
62 py_modules(py_modules_
),
64 audit_clog(audit_clog_
),
65 auth_registry(g_ceph_context
,
66 g_conf
->auth_supported
.empty() ?
67 g_conf
->auth_cluster_required
:
68 g_conf
->auth_supported
),
72 DaemonServer::~DaemonServer() {
76 int DaemonServer::init(uint64_t gid
, entity_addr_t client_addr
)
78 // Initialize Messenger
79 std::string public_msgr_type
= g_conf
->ms_public_type
.empty() ?
80 g_conf
->get_val
<std::string
>("ms_type") : g_conf
->ms_public_type
;
81 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
82 entity_name_t::MGR(gid
),
85 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
88 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
89 client_byte_throttler
.get(),
90 client_msg_throttler
.get());
93 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
94 osd_byte_throttler
.get(),
95 osd_msg_throttler
.get());
96 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
97 mds_byte_throttler
.get(),
98 mds_msg_throttler
.get());
99 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
100 mon_byte_throttler
.get(),
101 mon_msg_throttler
.get());
103 int r
= msgr
->bind(g_conf
->public_addr
);
105 derr
<< "unable to bind mgr to " << g_conf
->public_addr
<< dendl
;
109 msgr
->set_myname(entity_name_t::MGR(gid
));
110 msgr
->set_addr_unknowns(client_addr
);
113 msgr
->add_dispatcher_tail(this);
118 entity_addr_t
DaemonServer::get_myaddr() const
120 return msgr
->get_myaddr();
124 bool DaemonServer::ms_verify_authorizer(Connection
*con
,
127 ceph::bufferlist
& authorizer_data
,
128 ceph::bufferlist
& authorizer_reply
,
130 CryptoKey
& session_key
)
132 auto handler
= auth_registry
.get_handler(protocol
);
134 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol
<< dendl
;
139 MgrSessionRef
s(new MgrSession(cct
));
140 s
->inst
.addr
= con
->get_peer_addr();
141 AuthCapsInfo caps_info
;
143 is_valid
= handler
->verify_authorizer(
144 cct
, monc
->rotating_secrets
.get(),
146 authorizer_reply
, s
->entity_name
,
147 s
->global_id
, caps_info
,
151 if (caps_info
.allow_all
) {
152 dout(10) << " session " << s
<< " " << s
->entity_name
153 << " allow_all" << dendl
;
154 s
->caps
.set_allow_all();
156 if (caps_info
.caps
.length() > 0) {
157 bufferlist::iterator p
= caps_info
.caps
.begin();
162 catch (buffer::error
& e
) {
164 bool success
= s
->caps
.parse(str
);
166 dout(10) << " session " << s
<< " " << s
->entity_name
167 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
169 dout(10) << " session " << s
<< " " << s
->entity_name
170 << " failed to parse caps '" << str
<< "'" << dendl
;
174 con
->set_priv(s
->get());
176 if (peer_type
== CEPH_ENTITY_TYPE_OSD
) {
177 Mutex::Locker
l(lock
);
178 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
179 dout(10) << __func__
<< " registering osd." << s
->osd_id
<< " session "
180 << s
<< " con " << con
<< dendl
;
181 osd_cons
[s
->osd_id
].insert(con
);
189 bool DaemonServer::ms_get_authorizer(int dest_type
,
190 AuthAuthorizer
**authorizer
, bool force_new
)
192 dout(10) << "type=" << ceph_entity_type_name(dest_type
) << dendl
;
194 if (dest_type
== CEPH_ENTITY_TYPE_MON
) {
199 if (monc
->wait_auth_rotating(10) < 0)
203 *authorizer
= monc
->build_authorizer(dest_type
);
204 dout(20) << "got authorizer " << *authorizer
<< dendl
;
205 return *authorizer
!= NULL
;
208 bool DaemonServer::ms_handle_reset(Connection
*con
)
210 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
211 MgrSessionRef
session(static_cast<MgrSession
*>(con
->get_priv()));
215 session
->put(); // SessionRef takes a ref
216 Mutex::Locker
l(lock
);
217 dout(10) << __func__
<< " unregistering osd." << session
->osd_id
218 << " session " << session
<< " con " << con
<< dendl
;
219 osd_cons
[session
->osd_id
].erase(con
);
224 bool DaemonServer::ms_handle_refused(Connection
*con
)
226 // do nothing for now
230 bool DaemonServer::ms_dispatch(Message
*m
)
232 Mutex::Locker
l(lock
);
234 switch (m
->get_type()) {
236 cluster_state
.ingest_pgstats(static_cast<MPGStats
*>(m
));
240 return handle_report(static_cast<MMgrReport
*>(m
));
242 return handle_open(static_cast<MMgrOpen
*>(m
));
244 return handle_command(static_cast<MCommand
*>(m
));
246 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
251 void DaemonServer::shutdown()
253 dout(10) << __func__
<< dendl
;
256 dout(10) << __func__
<< " done" << dendl
;
261 bool DaemonServer::handle_open(MMgrOpen
*m
)
263 uint32_t type
= m
->get_connection()->get_peer_type();
264 DaemonKey
key(type
, m
->daemon_name
);
266 dout(4) << "from " << m
->get_connection() << " name "
267 << ceph_entity_type_name(type
) << "." << m
->daemon_name
<< dendl
;
269 auto configure
= new MMgrConfigure();
270 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
) {
271 // We don't want clients to send us stats
272 configure
->stats_period
= 0;
274 configure
->stats_period
= g_conf
->mgr_stats_period
;
276 m
->get_connection()->send_message(configure
);
278 if (daemon_state
.exists(key
)) {
279 dout(20) << "updating existing DaemonState for " << m
->daemon_name
<< dendl
;
280 daemon_state
.get(key
)->perf_counters
.clear();
287 bool DaemonServer::handle_report(MMgrReport
*m
)
289 uint32_t type
= m
->get_connection()->get_peer_type();
290 DaemonKey
key(type
, m
->daemon_name
);
292 dout(4) << "from " << m
->get_connection() << " name "
293 << ceph_entity_type_name(type
) << "." << m
->daemon_name
<< dendl
;
295 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
) {
296 // Clients should not be sending us stats
297 dout(4) << "rejecting report from client " << m
->daemon_name
<< dendl
;
302 DaemonStatePtr daemon
;
303 if (daemon_state
.exists(key
)) {
304 dout(20) << "updating existing DaemonState for " << m
->daemon_name
<< dendl
;
305 daemon
= daemon_state
.get(key
);
307 dout(4) << "constructing new DaemonState for " << m
->daemon_name
<< dendl
;
308 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
309 // FIXME: crap, we don't know the hostname at this stage.
311 daemon_state
.insert(daemon
);
312 // FIXME: we should request metadata at this stage
315 assert(daemon
!= nullptr);
316 auto &daemon_counters
= daemon
->perf_counters
;
317 daemon_counters
.update(m
);
330 bool requires_perm(char p
) const {
331 return (perm
.find(p
) != string::npos
);
336 #define COMMAND(parsesig, helptext, module, perm, availability) \
337 {parsesig, helptext, module, perm, availability},
338 #include "MgrCommands.h"
342 void DaemonServer::_generate_command_map(
343 map
<string
,cmd_vartype
>& cmdmap
,
344 map
<string
,string
> ¶m_str_map
)
346 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
347 p
!= cmdmap
.end(); ++p
) {
348 if (p
->first
== "prefix")
350 if (p
->first
== "caps") {
352 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
353 cv
.size() % 2 == 0) {
354 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
355 string k
= string("caps_") + cv
[i
];
356 param_str_map
[k
] = cv
[i
+ 1];
361 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
365 const MgrCommand
*DaemonServer::_get_mgrcommand(
366 const string
&cmd_prefix
,
370 MgrCommand
*this_cmd
= NULL
;
371 for (MgrCommand
*cp
= cmds
;
372 cp
< &cmds
[cmds_size
]; cp
++) {
373 if (cp
->cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
381 bool DaemonServer::_allowed_command(
383 const string
&module
,
384 const string
&prefix
,
385 const map
<string
,cmd_vartype
>& cmdmap
,
386 const map
<string
,string
>& param_str_map
,
387 const MgrCommand
*this_cmd
) {
389 if (s
->entity_name
.is_mon()) {
390 // mon is all-powerful. even when it is forwarding commands on behalf of
391 // old clients; we expect the mon is validating commands before proxying!
395 bool cmd_r
= this_cmd
->requires_perm('r');
396 bool cmd_w
= this_cmd
->requires_perm('w');
397 bool cmd_x
= this_cmd
->requires_perm('x');
399 bool capable
= s
->caps
.is_capable(
401 CEPH_ENTITY_TYPE_MGR
,
403 module
, prefix
, param_str_map
,
404 cmd_r
, cmd_w
, cmd_x
);
406 dout(10) << " " << s
->entity_name
<< " "
407 << (capable
? "" : "not ") << "capable" << dendl
;
411 bool DaemonServer::handle_command(MCommand
*m
)
414 std::stringstream ss
;
417 assert(lock
.is_locked_by_me());
420 * The working data for processing an MCommand. This lives in
421 * a class to enable passing it into other threads for processing
422 * outside of the thread/locks that called handle_command.
431 CommandContext(MCommand
*m_
)
441 void reply(int r
, const std::stringstream
&ss
)
446 void reply(int r
, const std::string
&rs
)
448 // Let the connection drop as soon as we've sent our response
449 ConnectionRef con
= m
->get_connection();
451 con
->mark_disposable();
454 dout(1) << "do_command r=" << r
<< " " << rs
<< dendl
;
456 MCommandReply
*reply
= new MCommandReply(r
, rs
);
457 reply
->set_tid(m
->get_tid());
458 reply
->set_data(odata
);
459 con
->send_message(reply
);
465 * A context for receiving a bufferlist/error string from a background
466 * function and then calling back to a CommandContext when it's done
468 class ReplyOnFinish
: public Context
{
469 std::shared_ptr
<CommandContext
> cmdctx
;
475 ReplyOnFinish(std::shared_ptr
<CommandContext
> cmdctx_
)
478 void finish(int r
) override
{
479 cmdctx
->odata
.claim_append(from_mon
);
480 cmdctx
->reply(r
, outs
);
484 std::shared_ptr
<CommandContext
> cmdctx
= std::make_shared
<CommandContext
>(m
);
486 MgrSessionRef
session(static_cast<MgrSession
*>(m
->get_connection()->get_priv()));
490 session
->put(); // SessionRef takes a ref
491 if (session
->inst
.name
== entity_name_t())
492 session
->inst
.name
= m
->get_source();
495 boost::scoped_ptr
<Formatter
> f
;
496 map
<string
,string
> param_str_map
;
498 if (!cmdmap_from_json(m
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
499 cmdctx
->reply(-EINVAL
, ss
);
504 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "format", format
, string("plain"));
505 f
.reset(Formatter::create(format
));
508 cmd_getval(cct
, cmdctx
->cmdmap
, "prefix", prefix
);
510 dout(4) << "decoded " << cmdctx
->cmdmap
.size() << dendl
;
511 dout(4) << "prefix=" << prefix
<< dendl
;
513 if (prefix
== "get_command_descriptions") {
516 dout(10) << "reading commands from python modules" << dendl
;
517 auto py_commands
= py_modules
.get_commands();
520 f
.open_object_section("command_descriptions");
521 for (const auto &pyc
: py_commands
) {
522 ostringstream secname
;
523 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
524 dout(20) << "Dumping " << pyc
.cmdstring
<< " (" << pyc
.helpstring
526 dump_cmddesc_to_json(&f
, secname
.str(), pyc
.cmdstring
, pyc
.helpstring
,
527 "mgr", pyc
.perm
, "cli", 0);
531 for (const auto &cp
: mgr_commands
) {
532 ostringstream secname
;
533 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
534 dump_cmddesc_to_json(&f
, secname
.str(), cp
.cmdstring
, cp
.helpstring
,
535 cp
.module
, cp
.perm
, cp
.availability
, 0);
538 f
.close_section(); // command_descriptions
539 f
.flush(cmdctx
->odata
);
540 cmdctx
->reply(0, ss
);
545 const MgrCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
,
546 ARRAY_SIZE(mgr_commands
));
547 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
549 MgrCommand py_command
= {"", "", "py", "rw", "cli"};
550 if (!_allowed_command(session
.get(), py_command
.module
, prefix
, cmdctx
->cmdmap
,
551 param_str_map
, &py_command
)) {
552 dout(1) << " access denied" << dendl
;
553 ss
<< "access denied; does your client key have mgr caps?"
554 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
555 cmdctx
->reply(-EACCES
, ss
);
559 // validate user's permissions for requested command
560 if (!_allowed_command(session
.get(), mgr_cmd
->module
, prefix
, cmdctx
->cmdmap
,
561 param_str_map
, mgr_cmd
)) {
562 dout(1) << " access denied" << dendl
;
563 audit_clog
->info() << "from='" << session
->inst
<< "' "
564 << "entity='" << session
->entity_name
<< "' "
565 << "cmd=" << m
->cmd
<< ": access denied";
566 ss
<< "access denied' does your client key have mgr caps?"
567 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
568 cmdctx
->reply(-EACCES
, ss
);
574 << "from='" << session
->inst
<< "' "
575 << "entity='" << session
->entity_name
<< "' "
576 << "cmd=" << m
->cmd
<< ": dispatch";
581 if (prefix
== "pg scrub" ||
582 prefix
== "pg repair" ||
583 prefix
== "pg deep-scrub") {
584 string scrubop
= prefix
.substr(3, string::npos
);
587 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgidstr
);
588 if (!pgid
.parse(pgidstr
.c_str())) {
589 ss
<< "invalid pgid '" << pgidstr
<< "'";
590 cmdctx
->reply(-EINVAL
, ss
);
593 bool pg_exists
= false;
594 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
595 pg_exists
= osdmap
.pg_exists(pgid
);
598 ss
<< "pg " << pgid
<< " dne";
599 cmdctx
->reply(-ENOENT
, ss
);
602 int acting_primary
= -1;
603 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
604 acting_primary
= osdmap
.get_pg_acting_primary(pgid
);
606 if (acting_primary
== -1) {
607 ss
<< "pg " << pgid
<< " has no primary osd";
608 cmdctx
->reply(-EAGAIN
, ss
);
611 auto p
= osd_cons
.find(acting_primary
);
612 if (p
== osd_cons
.end()) {
613 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
614 << " is not currently connected";
615 cmdctx
->reply(-EAGAIN
, ss
);
617 vector
<pg_t
> pgs
= { pgid
};
618 for (auto& con
: p
->second
) {
619 con
->send_message(new MOSDScrub(monc
->get_fsid(),
622 scrubop
== "deep-scrub"));
624 ss
<< "instructing pg " << pgid
<< " on osd." << acting_primary
625 << " to " << scrubop
;
626 cmdctx
->reply(0, ss
);
628 } else if (prefix
== "osd scrub" ||
629 prefix
== "osd deep-scrub" ||
630 prefix
== "osd repair") {
632 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", whostr
);
634 get_str_vec(prefix
, pvec
);
638 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
639 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
640 if (osdmap
.is_up(i
)) {
645 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
647 ss
<< "invalid osd '" << whostr
<< "'";
648 cmdctx
->reply(-EINVAL
, ss
);
651 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
652 if (osdmap
.is_up(osd
)) {
657 ss
<< "osd." << osd
<< " is not up";
658 cmdctx
->reply(-EAGAIN
, ss
);
662 set
<int> sent_osds
, failed_osds
;
663 for (auto osd
: osds
) {
664 auto p
= osd_cons
.find(osd
);
665 if (p
== osd_cons
.end()) {
666 failed_osds
.insert(osd
);
668 sent_osds
.insert(osd
);
669 for (auto& con
: p
->second
) {
670 con
->send_message(new MOSDScrub(monc
->get_fsid(),
671 pvec
.back() == "repair",
672 pvec
.back() == "deep-scrub"));
676 if (failed_osds
.size() == osds
.size()) {
677 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
678 << " (not connected)";
681 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
682 if (!failed_osds
.empty()) {
683 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
687 cmdctx
->reply(0, ss
);
689 } else if (prefix
== "osd reweight-by-pg" ||
690 prefix
== "osd reweight-by-utilization" ||
691 prefix
== "osd test-reweight-by-pg" ||
692 prefix
== "osd test-reweight-by-utilization") {
694 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
696 prefix
== "osd test-reweight-by-pg" ||
697 prefix
== "osd test-reweight-by-utilization";
699 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "oload", oload
, int64_t(120));
701 vector
<string
> poolnames
;
702 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pools", poolnames
);
703 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
704 for (const auto& poolname
: poolnames
) {
705 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
707 ss
<< "pool '" << poolname
<< "' does not exist";
714 cmdctx
->reply(r
, ss
);
717 double max_change
= g_conf
->mon_reweight_max_change
;
718 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_change", max_change
);
719 if (max_change
<= 0.0) {
720 ss
<< "max_change " << max_change
<< " must be positive";
721 cmdctx
->reply(-EINVAL
, ss
);
724 int64_t max_osds
= g_conf
->mon_reweight_max_osds
;
725 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_osds", max_osds
);
727 ss
<< "max_osds " << max_osds
<< " must be positive";
728 cmdctx
->reply(-EINVAL
, ss
);
731 string no_increasing
;
732 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "no_increasing", no_increasing
);
734 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
735 r
= cluster_state
.with_pgmap([&](const PGMap
& pgmap
) {
736 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
737 return reweight::by_utilization(osdmap
, pgmap
,
742 pools
.empty() ? NULL
: &pools
,
743 no_increasing
== "--no-increasing",
745 &ss
, &out_str
, f
.get());
749 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
752 f
->flush(cmdctx
->odata
);
754 cmdctx
->odata
.append(out_str
);
757 ss
<< "FAILED reweight-by-pg";
758 cmdctx
->reply(r
, ss
);
760 } else if (r
== 0 || dry_run
) {
762 cmdctx
->reply(r
, ss
);
765 json_spirit::Object json_object
;
766 for (const auto& osd_weight
: new_weights
) {
767 json_spirit::Config::add(json_object
,
768 std::to_string(osd_weight
.first
),
769 std::to_string(osd_weight
.second
));
771 string s
= json_spirit::write(json_object
);
772 std::replace(begin(s
), end(s
), '\"', '\'');
775 "\"prefix\": \"osd reweightn\", "
776 "\"weights\": \"" + s
+ "\""
778 auto on_finish
= new ReplyOnFinish(cmdctx
);
779 monc
->start_mon_command({cmd
}, {},
780 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
783 } else if (prefix
== "osd df") {
785 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "output_method", method
);
786 r
= cluster_state
.with_pgservice([&](const PGMapStatService
& pgservice
) {
787 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
788 print_osd_utilization(osdmap
, &pgservice
, ss
,
789 f
.get(), method
== "tree");
791 cmdctx
->odata
.append(ss
);
795 cmdctx
->reply(r
, "");
798 r
= cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
799 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
800 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
801 f
.get(), &ss
, &cmdctx
->odata
);
805 if (r
!= -EOPNOTSUPP
) {
806 cmdctx
->reply(r
, ss
);
811 // None of the special native commands,
812 MgrPyModule
*handler
= nullptr;
813 auto py_commands
= py_modules
.get_commands();
814 for (const auto &pyc
: py_commands
) {
815 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
816 dout(1) << "pyc_prefix: '" << pyc_prefix
<< "'" << dendl
;
817 if (pyc_prefix
== prefix
) {
818 handler
= pyc
.handler
;
823 if (handler
== nullptr) {
824 ss
<< "No handler found for '" << prefix
<< "'";
825 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
826 cmdctx
->reply(-EINVAL
, ss
);
829 // Okay, now we have a handler to call, but we must not call it
830 // in this thread, because the python handlers can do anything,
831 // including blocking, and including calling back into mgr.
832 dout(4) << "passing through " << cmdctx
->cmdmap
.size() << dendl
;
833 finisher
.queue(new FunctionContext([cmdctx
, handler
](int r_
) {
834 std::stringstream ds
;
835 std::stringstream ss
;
836 int r
= handler
->handle_command(cmdctx
->cmdmap
, &ds
, &ss
);
837 cmdctx
->odata
.append(ds
);
838 cmdctx
->reply(r
, ss
);
844 void DaemonServer::send_report()
846 auto m
= new MMonMgrReport();
847 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
848 cluster_state
.update_delta_stats();
850 // FIXME: reporting health detail here might be a bad idea?
851 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
852 // FIXME: no easy way to get mon features here. this will do for
853 // now, though, as long as we don't make a backward-incompat change.
854 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
855 dout(10) << pg_map
<< dendl
;
856 pg_map
.get_health(g_ceph_context
, osdmap
,
861 // TODO? We currently do not notify the PyModules
862 // TODO: respect needs_send, so we send the report only if we are asked to do
863 // so, or the state is updated.
864 monc
->send_mon_message(m
);