1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
16 #include "include/str_list.h"
17 #include "auth/RotatingKeyRing.h"
18 #include "json_spirit/json_spirit_writer.h"
20 #include "messages/MMgrOpen.h"
21 #include "messages/MMgrConfigure.h"
22 #include "messages/MMonMgrReport.h"
23 #include "messages/MCommand.h"
24 #include "messages/MCommandReply.h"
25 #include "messages/MPGStats.h"
26 #include "messages/MOSDScrub.h"
27 #include "common/errno.h"
29 #define dout_context g_ceph_context
30 #define dout_subsys ceph_subsys_mgr
32 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
34 DaemonServer::DaemonServer(MonClient
*monc_
,
36 DaemonStateIndex
&daemon_state_
,
37 ClusterState
&cluster_state_
,
38 PyModules
&py_modules_
,
40 LogChannelRef audit_clog_
)
41 : Dispatcher(g_ceph_context
),
42 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
43 g_conf
->mgr_client_bytes
)),
44 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
45 g_conf
->mgr_client_messages
)),
46 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
47 g_conf
->mgr_osd_bytes
)),
48 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
49 g_conf
->mgr_osd_messages
)),
50 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
51 g_conf
->mgr_mds_bytes
)),
52 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
53 g_conf
->mgr_mds_messages
)),
54 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
55 g_conf
->mgr_mon_bytes
)),
56 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
57 g_conf
->mgr_mon_messages
)),
61 daemon_state(daemon_state_
),
62 cluster_state(cluster_state_
),
63 py_modules(py_modules_
),
65 audit_clog(audit_clog_
),
66 auth_registry(g_ceph_context
,
67 g_conf
->auth_supported
.empty() ?
68 g_conf
->auth_cluster_required
:
69 g_conf
->auth_supported
),
73 DaemonServer::~DaemonServer() {
77 int DaemonServer::init(uint64_t gid
, entity_addr_t client_addr
)
79 // Initialize Messenger
80 std::string public_msgr_type
= g_conf
->ms_public_type
.empty() ?
81 g_conf
->get_val
<std::string
>("ms_type") : g_conf
->ms_public_type
;
82 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
83 entity_name_t::MGR(gid
),
86 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
89 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
90 client_byte_throttler
.get(),
91 client_msg_throttler
.get());
94 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
95 osd_byte_throttler
.get(),
96 osd_msg_throttler
.get());
97 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
98 mds_byte_throttler
.get(),
99 mds_msg_throttler
.get());
100 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
101 mon_byte_throttler
.get(),
102 mon_msg_throttler
.get());
104 int r
= msgr
->bind(g_conf
->public_addr
);
106 derr
<< "unable to bind mgr to " << g_conf
->public_addr
<< dendl
;
110 msgr
->set_myname(entity_name_t::MGR(gid
));
111 msgr
->set_addr_unknowns(client_addr
);
114 msgr
->add_dispatcher_tail(this);
116 started_at
= ceph_clock_now();
121 entity_addr_t
DaemonServer::get_myaddr() const
123 return msgr
->get_myaddr();
127 bool DaemonServer::ms_verify_authorizer(Connection
*con
,
130 ceph::bufferlist
& authorizer_data
,
131 ceph::bufferlist
& authorizer_reply
,
133 CryptoKey
& session_key
)
135 auto handler
= auth_registry
.get_handler(protocol
);
137 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol
<< dendl
;
142 MgrSessionRef
s(new MgrSession(cct
));
143 s
->inst
.addr
= con
->get_peer_addr();
144 AuthCapsInfo caps_info
;
146 is_valid
= handler
->verify_authorizer(
147 cct
, monc
->rotating_secrets
.get(),
149 authorizer_reply
, s
->entity_name
,
150 s
->global_id
, caps_info
,
154 if (caps_info
.allow_all
) {
155 dout(10) << " session " << s
<< " " << s
->entity_name
156 << " allow_all" << dendl
;
157 s
->caps
.set_allow_all();
159 if (caps_info
.caps
.length() > 0) {
160 bufferlist::iterator p
= caps_info
.caps
.begin();
165 catch (buffer::error
& e
) {
167 bool success
= s
->caps
.parse(str
);
169 dout(10) << " session " << s
<< " " << s
->entity_name
170 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
172 dout(10) << " session " << s
<< " " << s
->entity_name
173 << " failed to parse caps '" << str
<< "'" << dendl
;
177 con
->set_priv(s
->get());
179 if (peer_type
== CEPH_ENTITY_TYPE_OSD
) {
180 Mutex::Locker
l(lock
);
181 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
182 dout(10) << "registering osd." << s
->osd_id
<< " session "
183 << s
<< " con " << con
<< dendl
;
184 osd_cons
[s
->osd_id
].insert(con
);
192 bool DaemonServer::ms_get_authorizer(int dest_type
,
193 AuthAuthorizer
**authorizer
, bool force_new
)
195 dout(10) << "type=" << ceph_entity_type_name(dest_type
) << dendl
;
197 if (dest_type
== CEPH_ENTITY_TYPE_MON
) {
202 if (monc
->wait_auth_rotating(10) < 0)
206 *authorizer
= monc
->build_authorizer(dest_type
);
207 dout(20) << "got authorizer " << *authorizer
<< dendl
;
208 return *authorizer
!= NULL
;
211 bool DaemonServer::ms_handle_reset(Connection
*con
)
213 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
214 MgrSessionRef
session(static_cast<MgrSession
*>(con
->get_priv()));
218 session
->put(); // SessionRef takes a ref
219 Mutex::Locker
l(lock
);
220 dout(10) << "unregistering osd." << session
->osd_id
221 << " session " << session
<< " con " << con
<< dendl
;
222 osd_cons
[session
->osd_id
].erase(con
);
227 bool DaemonServer::ms_handle_refused(Connection
*con
)
229 // do nothing for now
233 bool DaemonServer::ms_dispatch(Message
*m
)
235 Mutex::Locker
l(lock
);
237 switch (m
->get_type()) {
239 cluster_state
.ingest_pgstats(static_cast<MPGStats
*>(m
));
240 maybe_ready(m
->get_source().num());
244 return handle_report(static_cast<MMgrReport
*>(m
));
246 return handle_open(static_cast<MMgrOpen
*>(m
));
248 return handle_command(static_cast<MCommand
*>(m
));
250 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
255 void DaemonServer::maybe_ready(int32_t osd_id
)
257 if (!pgmap_ready
&& reported_osds
.find(osd_id
) == reported_osds
.end()) {
258 dout(4) << "initial report from osd " << osd_id
<< dendl
;
259 reported_osds
.insert(osd_id
);
260 std::set
<int32_t> up_osds
;
262 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
263 osdmap
.get_up_osds(up_osds
);
266 std::set
<int32_t> unreported_osds
;
267 std::set_difference(up_osds
.begin(), up_osds
.end(),
268 reported_osds
.begin(), reported_osds
.end(),
269 std::inserter(unreported_osds
, unreported_osds
.begin()));
271 if (unreported_osds
.size() == 0) {
272 dout(4) << "all osds have reported, sending PG state to mon" << dendl
;
274 reported_osds
.clear();
275 // Avoid waiting for next tick
278 dout(4) << "still waiting for " << unreported_osds
.size() << " osds"
279 " to report in before PGMap is ready" << dendl
;
284 void DaemonServer::shutdown()
286 dout(10) << "begin" << dendl
;
289 dout(10) << "done" << dendl
;
294 bool DaemonServer::handle_open(MMgrOpen
*m
)
297 if (!m
->service_name
.empty()) {
298 key
.first
= m
->service_name
;
300 key
.first
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
302 key
.second
= m
->daemon_name
;
304 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
306 auto configure
= new MMgrConfigure();
307 configure
->stats_period
= g_conf
->mgr_stats_period
;
308 m
->get_connection()->send_message(configure
);
310 if (daemon_state
.exists(key
)) {
311 dout(20) << "updating existing DaemonState for " << m
->daemon_name
<< dendl
;
312 daemon_state
.get(key
)->perf_counters
.clear();
315 if (m
->service_daemon
) {
316 DaemonStatePtr daemon
;
317 if (daemon_state
.exists(key
)) {
318 daemon
= daemon_state
.get(key
);
320 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
321 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
323 if (m
->daemon_metadata
.count("hostname")) {
324 daemon
->hostname
= m
->daemon_metadata
["hostname"];
326 daemon_state
.insert(daemon
);
328 daemon
->service_daemon
= true;
329 daemon
->metadata
= m
->daemon_metadata
;
330 daemon
->service_status
= m
->daemon_status
;
332 utime_t now
= ceph_clock_now();
333 auto d
= pending_service_map
.get_daemon(m
->service_name
,
335 if (d
->gid
!= (uint64_t)m
->get_source().num()) {
336 dout(10) << "registering " << key
<< " in pending_service_map" << dendl
;
337 d
->gid
= m
->get_source().num();
338 d
->addr
= m
->get_source_addr();
339 d
->start_epoch
= pending_service_map
.epoch
;
340 d
->start_stamp
= now
;
341 d
->metadata
= m
->daemon_metadata
;
342 pending_service_map_dirty
= pending_service_map
.epoch
;
350 bool DaemonServer::handle_report(MMgrReport
*m
)
353 if (!m
->service_name
.empty()) {
354 key
.first
= m
->service_name
;
356 key
.first
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
358 key
.second
= m
->daemon_name
;
360 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
362 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
363 m
->service_name
.empty()) {
364 // Clients should not be sending us stats unless they are declaring
365 // themselves to be a daemon for some service.
366 dout(4) << "rejecting report from non-daemon client " << m
->daemon_name
372 DaemonStatePtr daemon
;
373 if (daemon_state
.exists(key
)) {
374 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
375 daemon
= daemon_state
.get(key
);
377 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
378 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
379 // FIXME: crap, we don't know the hostname at this stage.
381 daemon_state
.insert(daemon
);
382 // FIXME: we should avoid this case by rejecting MMgrReport from
383 // daemons without sessions, and ensuring that session open
384 // always contains metadata.
386 assert(daemon
!= nullptr);
387 auto &daemon_counters
= daemon
->perf_counters
;
388 daemon_counters
.update(m
);
390 if (daemon
->service_daemon
) {
391 utime_t now
= ceph_clock_now();
392 if (m
->daemon_status
) {
393 daemon
->service_status
= *m
->daemon_status
;
394 daemon
->service_status_stamp
= now
;
396 daemon
->last_service_beacon
= now
;
397 } else if (m
->daemon_status
) {
398 derr
<< "got status from non-daemon " << key
<< dendl
;
412 bool requires_perm(char p
) const {
413 return (perm
.find(p
) != string::npos
);
418 #define COMMAND(parsesig, helptext, module, perm, availability) \
419 {parsesig, helptext, module, perm, availability},
420 #include "MgrCommands.h"
424 void DaemonServer::_generate_command_map(
425 map
<string
,cmd_vartype
>& cmdmap
,
426 map
<string
,string
> ¶m_str_map
)
428 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
429 p
!= cmdmap
.end(); ++p
) {
430 if (p
->first
== "prefix")
432 if (p
->first
== "caps") {
434 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
435 cv
.size() % 2 == 0) {
436 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
437 string k
= string("caps_") + cv
[i
];
438 param_str_map
[k
] = cv
[i
+ 1];
443 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
447 const MgrCommand
*DaemonServer::_get_mgrcommand(
448 const string
&cmd_prefix
,
452 MgrCommand
*this_cmd
= NULL
;
453 for (MgrCommand
*cp
= cmds
;
454 cp
< &cmds
[cmds_size
]; cp
++) {
455 if (cp
->cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
463 bool DaemonServer::_allowed_command(
465 const string
&module
,
466 const string
&prefix
,
467 const map
<string
,cmd_vartype
>& cmdmap
,
468 const map
<string
,string
>& param_str_map
,
469 const MgrCommand
*this_cmd
) {
471 if (s
->entity_name
.is_mon()) {
472 // mon is all-powerful. even when it is forwarding commands on behalf of
473 // old clients; we expect the mon is validating commands before proxying!
477 bool cmd_r
= this_cmd
->requires_perm('r');
478 bool cmd_w
= this_cmd
->requires_perm('w');
479 bool cmd_x
= this_cmd
->requires_perm('x');
481 bool capable
= s
->caps
.is_capable(
483 CEPH_ENTITY_TYPE_MGR
,
485 module
, prefix
, param_str_map
,
486 cmd_r
, cmd_w
, cmd_x
);
488 dout(10) << " " << s
->entity_name
<< " "
489 << (capable
? "" : "not ") << "capable" << dendl
;
493 bool DaemonServer::handle_command(MCommand
*m
)
496 std::stringstream ss
;
499 assert(lock
.is_locked_by_me());
502 * The working data for processing an MCommand. This lives in
503 * a class to enable passing it into other threads for processing
504 * outside of the thread/locks that called handle_command.
513 CommandContext(MCommand
*m_
)
523 void reply(int r
, const std::stringstream
&ss
)
528 void reply(int r
, const std::string
&rs
)
530 // Let the connection drop as soon as we've sent our response
531 ConnectionRef con
= m
->get_connection();
533 con
->mark_disposable();
536 dout(1) << "handle_command " << cpp_strerror(r
) << " " << rs
<< dendl
;
538 MCommandReply
*reply
= new MCommandReply(r
, rs
);
539 reply
->set_tid(m
->get_tid());
540 reply
->set_data(odata
);
541 con
->send_message(reply
);
547 * A context for receiving a bufferlist/error string from a background
548 * function and then calling back to a CommandContext when it's done
550 class ReplyOnFinish
: public Context
{
551 std::shared_ptr
<CommandContext
> cmdctx
;
557 ReplyOnFinish(std::shared_ptr
<CommandContext
> cmdctx_
)
560 void finish(int r
) override
{
561 cmdctx
->odata
.claim_append(from_mon
);
562 cmdctx
->reply(r
, outs
);
566 std::shared_ptr
<CommandContext
> cmdctx
= std::make_shared
<CommandContext
>(m
);
568 MgrSessionRef
session(static_cast<MgrSession
*>(m
->get_connection()->get_priv()));
572 session
->put(); // SessionRef takes a ref
573 if (session
->inst
.name
== entity_name_t())
574 session
->inst
.name
= m
->get_source();
577 boost::scoped_ptr
<Formatter
> f
;
578 map
<string
,string
> param_str_map
;
580 if (!cmdmap_from_json(m
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
581 cmdctx
->reply(-EINVAL
, ss
);
586 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "format", format
, string("plain"));
587 f
.reset(Formatter::create(format
));
590 cmd_getval(cct
, cmdctx
->cmdmap
, "prefix", prefix
);
592 dout(4) << "decoded " << cmdctx
->cmdmap
.size() << dendl
;
593 dout(4) << "prefix=" << prefix
<< dendl
;
595 if (prefix
== "get_command_descriptions") {
598 dout(10) << "reading commands from python modules" << dendl
;
599 auto py_commands
= py_modules
.get_commands();
602 f
.open_object_section("command_descriptions");
603 for (const auto &pyc
: py_commands
) {
604 ostringstream secname
;
605 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
606 dout(20) << "Dumping " << pyc
.cmdstring
<< " (" << pyc
.helpstring
608 dump_cmddesc_to_json(&f
, secname
.str(), pyc
.cmdstring
, pyc
.helpstring
,
609 "mgr", pyc
.perm
, "cli", 0);
613 for (const auto &cp
: mgr_commands
) {
614 ostringstream secname
;
615 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
616 dump_cmddesc_to_json(&f
, secname
.str(), cp
.cmdstring
, cp
.helpstring
,
617 cp
.module
, cp
.perm
, cp
.availability
, 0);
620 f
.close_section(); // command_descriptions
621 f
.flush(cmdctx
->odata
);
622 cmdctx
->reply(0, ss
);
627 const MgrCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
,
628 ARRAY_SIZE(mgr_commands
));
629 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
631 MgrCommand py_command
= {"", "", "py", "rw", "cli"};
632 if (!_allowed_command(session
.get(), py_command
.module
, prefix
, cmdctx
->cmdmap
,
633 param_str_map
, &py_command
)) {
634 dout(1) << " access denied" << dendl
;
635 ss
<< "access denied; does your client key have mgr caps?"
636 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
637 cmdctx
->reply(-EACCES
, ss
);
641 // validate user's permissions for requested command
642 if (!_allowed_command(session
.get(), mgr_cmd
->module
, prefix
, cmdctx
->cmdmap
,
643 param_str_map
, mgr_cmd
)) {
644 dout(1) << " access denied" << dendl
;
645 audit_clog
->info() << "from='" << session
->inst
<< "' "
646 << "entity='" << session
->entity_name
<< "' "
647 << "cmd=" << m
->cmd
<< ": access denied";
648 ss
<< "access denied' does your client key have mgr caps?"
649 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
650 cmdctx
->reply(-EACCES
, ss
);
656 << "from='" << session
->inst
<< "' "
657 << "entity='" << session
->entity_name
<< "' "
658 << "cmd=" << m
->cmd
<< ": dispatch";
661 // service map commands
662 if (prefix
== "service dump") {
664 f
.reset(Formatter::create("json-pretty"));
665 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
666 f
->dump_object("service_map", service_map
);
668 f
->flush(cmdctx
->odata
);
669 cmdctx
->reply(0, ss
);
672 if (prefix
== "service status") {
674 f
.reset(Formatter::create("json-pretty"));
675 // only include state from services that are in the persisted service map
676 f
->open_object_section("service_status");
678 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
681 for (auto& p
: s
.services
) {
682 f
->open_object_section(p
.first
.c_str());
683 for (auto& q
: p
.second
.daemons
) {
684 f
->open_object_section(q
.first
.c_str());
685 DaemonKey
key(p
.first
, q
.first
);
686 assert(daemon_state
.exists(key
));
687 auto daemon
= daemon_state
.get(key
);
688 f
->dump_stream("status_stamp") << daemon
->service_status_stamp
;
689 f
->dump_stream("last_beacon") << daemon
->last_service_beacon
;
690 f
->open_object_section("status");
691 for (auto& r
: daemon
->service_status
) {
692 f
->dump_string(r
.first
.c_str(), r
.second
);
700 f
->flush(cmdctx
->odata
);
701 cmdctx
->reply(0, ss
);
708 if (prefix
== "pg scrub" ||
709 prefix
== "pg repair" ||
710 prefix
== "pg deep-scrub") {
711 string scrubop
= prefix
.substr(3, string::npos
);
714 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgidstr
);
715 if (!pgid
.parse(pgidstr
.c_str())) {
716 ss
<< "invalid pgid '" << pgidstr
<< "'";
717 cmdctx
->reply(-EINVAL
, ss
);
720 bool pg_exists
= false;
721 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
722 pg_exists
= osdmap
.pg_exists(pgid
);
725 ss
<< "pg " << pgid
<< " dne";
726 cmdctx
->reply(-ENOENT
, ss
);
729 int acting_primary
= -1;
730 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
731 acting_primary
= osdmap
.get_pg_acting_primary(pgid
);
733 if (acting_primary
== -1) {
734 ss
<< "pg " << pgid
<< " has no primary osd";
735 cmdctx
->reply(-EAGAIN
, ss
);
738 auto p
= osd_cons
.find(acting_primary
);
739 if (p
== osd_cons
.end()) {
740 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
741 << " is not currently connected";
742 cmdctx
->reply(-EAGAIN
, ss
);
744 vector
<pg_t
> pgs
= { pgid
};
745 for (auto& con
: p
->second
) {
746 con
->send_message(new MOSDScrub(monc
->get_fsid(),
749 scrubop
== "deep-scrub"));
751 ss
<< "instructing pg " << pgid
<< " on osd." << acting_primary
752 << " to " << scrubop
;
753 cmdctx
->reply(0, ss
);
755 } else if (prefix
== "osd scrub" ||
756 prefix
== "osd deep-scrub" ||
757 prefix
== "osd repair") {
759 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", whostr
);
761 get_str_vec(prefix
, pvec
);
764 if (whostr
== "*" || whostr
== "all" || whostr
== "any") {
765 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
766 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
767 if (osdmap
.is_up(i
)) {
772 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
774 ss
<< "invalid osd '" << whostr
<< "'";
775 cmdctx
->reply(-EINVAL
, ss
);
778 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
779 if (osdmap
.is_up(osd
)) {
784 ss
<< "osd." << osd
<< " is not up";
785 cmdctx
->reply(-EAGAIN
, ss
);
789 set
<int> sent_osds
, failed_osds
;
790 for (auto osd
: osds
) {
791 auto p
= osd_cons
.find(osd
);
792 if (p
== osd_cons
.end()) {
793 failed_osds
.insert(osd
);
795 sent_osds
.insert(osd
);
796 for (auto& con
: p
->second
) {
797 con
->send_message(new MOSDScrub(monc
->get_fsid(),
798 pvec
.back() == "repair",
799 pvec
.back() == "deep-scrub"));
803 if (failed_osds
.size() == osds
.size()) {
804 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
805 << " (not connected)";
808 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
809 if (!failed_osds
.empty()) {
810 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
814 cmdctx
->reply(0, ss
);
816 } else if (prefix
== "osd reweight-by-pg" ||
817 prefix
== "osd reweight-by-utilization" ||
818 prefix
== "osd test-reweight-by-pg" ||
819 prefix
== "osd test-reweight-by-utilization") {
821 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
823 prefix
== "osd test-reweight-by-pg" ||
824 prefix
== "osd test-reweight-by-utilization";
826 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "oload", oload
, int64_t(120));
828 vector
<string
> poolnames
;
829 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pools", poolnames
);
830 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
831 for (const auto& poolname
: poolnames
) {
832 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
834 ss
<< "pool '" << poolname
<< "' does not exist";
841 cmdctx
->reply(r
, ss
);
844 double max_change
= g_conf
->mon_reweight_max_change
;
845 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_change", max_change
);
846 if (max_change
<= 0.0) {
847 ss
<< "max_change " << max_change
<< " must be positive";
848 cmdctx
->reply(-EINVAL
, ss
);
851 int64_t max_osds
= g_conf
->mon_reweight_max_osds
;
852 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_osds", max_osds
);
854 ss
<< "max_osds " << max_osds
<< " must be positive";
855 cmdctx
->reply(-EINVAL
, ss
);
858 string no_increasing
;
859 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "no_increasing", no_increasing
);
861 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
862 r
= cluster_state
.with_pgmap([&](const PGMap
& pgmap
) {
863 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
864 return reweight::by_utilization(osdmap
, pgmap
,
869 pools
.empty() ? NULL
: &pools
,
870 no_increasing
== "--no-increasing",
872 &ss
, &out_str
, f
.get());
876 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
879 f
->flush(cmdctx
->odata
);
881 cmdctx
->odata
.append(out_str
);
884 ss
<< "FAILED reweight-by-pg";
885 cmdctx
->reply(r
, ss
);
887 } else if (r
== 0 || dry_run
) {
889 cmdctx
->reply(r
, ss
);
892 json_spirit::Object json_object
;
893 for (const auto& osd_weight
: new_weights
) {
894 json_spirit::Config::add(json_object
,
895 std::to_string(osd_weight
.first
),
896 std::to_string(osd_weight
.second
));
898 string s
= json_spirit::write(json_object
);
899 std::replace(begin(s
), end(s
), '\"', '\'');
902 "\"prefix\": \"osd reweightn\", "
903 "\"weights\": \"" + s
+ "\""
905 auto on_finish
= new ReplyOnFinish(cmdctx
);
906 monc
->start_mon_command({cmd
}, {},
907 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
910 } else if (prefix
== "osd df") {
912 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "output_method", method
);
913 r
= cluster_state
.with_pgservice([&](const PGMapStatService
& pgservice
) {
914 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
915 print_osd_utilization(osdmap
, &pgservice
, ss
,
916 f
.get(), method
== "tree");
918 cmdctx
->odata
.append(ss
);
922 cmdctx
->reply(r
, "");
925 r
= cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
926 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
927 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
928 f
.get(), &ss
, &cmdctx
->odata
);
932 if (r
!= -EOPNOTSUPP
) {
933 cmdctx
->reply(r
, ss
);
938 // None of the special native commands,
939 MgrPyModule
*handler
= nullptr;
940 auto py_commands
= py_modules
.get_commands();
941 for (const auto &pyc
: py_commands
) {
942 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
943 dout(1) << "pyc_prefix: '" << pyc_prefix
<< "'" << dendl
;
944 if (pyc_prefix
== prefix
) {
945 handler
= pyc
.handler
;
950 if (handler
== nullptr) {
951 ss
<< "No handler found for '" << prefix
<< "'";
952 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
953 cmdctx
->reply(-EINVAL
, ss
);
956 // Okay, now we have a handler to call, but we must not call it
957 // in this thread, because the python handlers can do anything,
958 // including blocking, and including calling back into mgr.
959 dout(4) << "passing through " << cmdctx
->cmdmap
.size() << dendl
;
960 finisher
.queue(new FunctionContext([cmdctx
, handler
](int r_
) {
961 std::stringstream ds
;
962 std::stringstream ss
;
963 int r
= handler
->handle_command(cmdctx
->cmdmap
, &ds
, &ss
);
964 cmdctx
->odata
.append(ds
);
965 cmdctx
->reply(r
, ss
);
971 void DaemonServer::_prune_pending_service_map()
973 utime_t cutoff
= ceph_clock_now();
974 cutoff
-= g_conf
->mgr_service_beacon_grace
;
975 auto p
= pending_service_map
.services
.begin();
976 while (p
!= pending_service_map
.services
.end()) {
977 auto q
= p
->second
.daemons
.begin();
978 while (q
!= p
->second
.daemons
.end()) {
979 DaemonKey
key(p
->first
, q
->first
);
980 if (!daemon_state
.exists(key
)) {
981 derr
<< "missing key " << key
<< dendl
;
985 auto daemon
= daemon_state
.get(key
);
986 if (daemon
->last_service_beacon
== utime_t()) {
987 // we must have just restarted; assume they are alive now.
988 daemon
->last_service_beacon
= ceph_clock_now();
992 if (daemon
->last_service_beacon
< cutoff
) {
993 dout(10) << "pruning stale " << p
->first
<< "." << q
->first
994 << " last_beacon " << daemon
->last_service_beacon
<< dendl
;
995 q
= p
->second
.daemons
.erase(q
);
996 pending_service_map_dirty
= pending_service_map
.epoch
;
1001 if (p
->second
.daemons
.empty()) {
1002 p
= pending_service_map
.services
.erase(p
);
1003 pending_service_map_dirty
= pending_service_map
.epoch
;
1010 void DaemonServer::send_report()
1013 if (ceph_clock_now() - started_at
> g_conf
->mgr_stats_period
* 4.0) {
1015 reported_osds
.clear();
1016 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1017 << "potentially incomplete PG state to mon" << dendl
;
1019 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1025 auto m
= new MMonMgrReport();
1026 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1027 cluster_state
.update_delta_stats();
1029 if (pending_service_map
.epoch
) {
1030 _prune_pending_service_map();
1031 if (pending_service_map_dirty
>= pending_service_map
.epoch
) {
1032 pending_service_map
.modified
= ceph_clock_now();
1033 ::encode(pending_service_map
, m
->service_map_bl
, CEPH_FEATURES_ALL
);
1034 dout(10) << "sending service_map e" << pending_service_map
.epoch
1036 pending_service_map
.epoch
++;
1040 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1041 // FIXME: no easy way to get mon features here. this will do for
1042 // now, though, as long as we don't make a backward-incompat change.
1043 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
1044 dout(10) << pg_map
<< dendl
;
1046 pg_map
.get_health_checks(g_ceph_context
, osdmap
,
1048 dout(10) << m
->health_checks
.checks
.size() << " health checks"
1050 dout(20) << "health checks:\n";
1051 JSONFormatter
jf(true);
1052 jf
.dump_object("health_checks", m
->health_checks
);
1057 // TODO? We currently do not notify the PyModules
1058 // TODO: respect needs_send, so we send the report only if we are asked to do
1059 // so, or the state is updated.
1060 monc
->send_mon_message(m
);
1063 void DaemonServer::got_service_map()
1065 Mutex::Locker
l(lock
);
1067 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
1068 if (pending_service_map
.epoch
== 0) {
1069 // we just started up
1070 dout(10) << "got initial map e" << service_map
.epoch
<< dendl
;
1071 pending_service_map
= service_map
;
1073 // we we already active and therefore must have persisted it,
1074 // which means ours is the same or newer.
1075 dout(10) << "got updated map e" << service_map
.epoch
<< dendl
;
1077 pending_service_map
.epoch
= service_map
.epoch
+ 1;
1080 // cull missing daemons, populate new ones
1081 for (auto& p
: pending_service_map
.services
) {
1082 std::set
<std::string
> names
;
1083 for (auto& q
: p
.second
.daemons
) {
1084 names
.insert(q
.first
);
1085 DaemonKey
key(p
.first
, q
.first
);
1086 if (!daemon_state
.exists(key
)) {
1087 auto daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
1089 daemon
->metadata
= q
.second
.metadata
;
1090 if (q
.second
.metadata
.count("hostname")) {
1091 daemon
->hostname
= q
.second
.metadata
["hostname"];
1093 daemon
->service_daemon
= true;
1094 daemon_state
.insert(daemon
);
1095 dout(10) << "added missing " << key
<< dendl
;
1098 daemon_state
.cull(p
.first
, names
);