1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
16 #include "include/str_list.h"
17 #include "auth/RotatingKeyRing.h"
18 #include "json_spirit/json_spirit_writer.h"
20 #include "mgr/mgr_commands.h"
21 #include "mon/MonCommand.h"
23 #include "messages/MMgrOpen.h"
24 #include "messages/MMgrConfigure.h"
25 #include "messages/MMonMgrReport.h"
26 #include "messages/MCommand.h"
27 #include "messages/MCommandReply.h"
28 #include "messages/MPGStats.h"
29 #include "messages/MOSDScrub.h"
30 #include "messages/MOSDForceRecovery.h"
31 #include "common/errno.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mgr
36 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
40 DaemonServer::DaemonServer(MonClient
*monc_
,
42 DaemonStateIndex
&daemon_state_
,
43 ClusterState
&cluster_state_
,
44 PyModules
&py_modules_
,
46 LogChannelRef audit_clog_
)
47 : Dispatcher(g_ceph_context
),
48 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
49 g_conf
->mgr_client_bytes
)),
50 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
51 g_conf
->mgr_client_messages
)),
52 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
53 g_conf
->mgr_osd_bytes
)),
54 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
55 g_conf
->mgr_osd_messages
)),
56 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
57 g_conf
->mgr_mds_bytes
)),
58 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
59 g_conf
->mgr_mds_messages
)),
60 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
61 g_conf
->mgr_mon_bytes
)),
62 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
63 g_conf
->mgr_mon_messages
)),
67 daemon_state(daemon_state_
),
68 cluster_state(cluster_state_
),
69 py_modules(py_modules_
),
71 audit_clog(audit_clog_
),
72 auth_registry(g_ceph_context
,
73 g_conf
->auth_supported
.empty() ?
74 g_conf
->auth_cluster_required
:
75 g_conf
->auth_supported
),
79 DaemonServer::~DaemonServer() {
83 int DaemonServer::init(uint64_t gid
, entity_addr_t client_addr
)
85 // Initialize Messenger
86 std::string public_msgr_type
= g_conf
->ms_public_type
.empty() ?
87 g_conf
->get_val
<std::string
>("ms_type") : g_conf
->ms_public_type
;
88 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
89 entity_name_t::MGR(gid
),
92 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
95 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
96 client_byte_throttler
.get(),
97 client_msg_throttler
.get());
100 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
101 osd_byte_throttler
.get(),
102 osd_msg_throttler
.get());
103 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
104 mds_byte_throttler
.get(),
105 mds_msg_throttler
.get());
106 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
107 mon_byte_throttler
.get(),
108 mon_msg_throttler
.get());
110 int r
= msgr
->bind(g_conf
->public_addr
);
112 derr
<< "unable to bind mgr to " << g_conf
->public_addr
<< dendl
;
116 msgr
->set_myname(entity_name_t::MGR(gid
));
117 msgr
->set_addr_unknowns(client_addr
);
120 msgr
->add_dispatcher_tail(this);
122 started_at
= ceph_clock_now();
127 entity_addr_t
DaemonServer::get_myaddr() const
129 return msgr
->get_myaddr();
133 bool DaemonServer::ms_verify_authorizer(Connection
*con
,
136 ceph::bufferlist
& authorizer_data
,
137 ceph::bufferlist
& authorizer_reply
,
139 CryptoKey
& session_key
)
141 auto handler
= auth_registry
.get_handler(protocol
);
143 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol
<< dendl
;
148 MgrSessionRef
s(new MgrSession(cct
));
149 s
->inst
.addr
= con
->get_peer_addr();
150 AuthCapsInfo caps_info
;
152 RotatingKeyRing
*keys
= monc
->rotating_secrets
.get();
154 is_valid
= handler
->verify_authorizer(
157 authorizer_reply
, s
->entity_name
,
158 s
->global_id
, caps_info
,
161 dout(10) << __func__
<< " no rotating_keys (yet), denied" << dendl
;
166 if (caps_info
.allow_all
) {
167 dout(10) << " session " << s
<< " " << s
->entity_name
168 << " allow_all" << dendl
;
169 s
->caps
.set_allow_all();
171 if (caps_info
.caps
.length() > 0) {
172 bufferlist::iterator p
= caps_info
.caps
.begin();
177 catch (buffer::error
& e
) {
179 bool success
= s
->caps
.parse(str
);
181 dout(10) << " session " << s
<< " " << s
->entity_name
182 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
184 dout(10) << " session " << s
<< " " << s
->entity_name
185 << " failed to parse caps '" << str
<< "'" << dendl
;
189 con
->set_priv(s
->get());
191 if (peer_type
== CEPH_ENTITY_TYPE_OSD
) {
192 Mutex::Locker
l(lock
);
193 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
194 dout(10) << "registering osd." << s
->osd_id
<< " session "
195 << s
<< " con " << con
<< dendl
;
196 osd_cons
[s
->osd_id
].insert(con
);
204 bool DaemonServer::ms_get_authorizer(int dest_type
,
205 AuthAuthorizer
**authorizer
, bool force_new
)
207 dout(10) << "type=" << ceph_entity_type_name(dest_type
) << dendl
;
209 if (dest_type
== CEPH_ENTITY_TYPE_MON
) {
214 if (monc
->wait_auth_rotating(10) < 0)
218 *authorizer
= monc
->build_authorizer(dest_type
);
219 dout(20) << "got authorizer " << *authorizer
<< dendl
;
220 return *authorizer
!= NULL
;
223 bool DaemonServer::ms_handle_reset(Connection
*con
)
225 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
226 MgrSessionRef
session(static_cast<MgrSession
*>(con
->get_priv()));
230 session
->put(); // SessionRef takes a ref
231 Mutex::Locker
l(lock
);
232 dout(10) << "unregistering osd." << session
->osd_id
233 << " session " << session
<< " con " << con
<< dendl
;
234 osd_cons
[session
->osd_id
].erase(con
);
239 bool DaemonServer::ms_handle_refused(Connection
*con
)
241 // do nothing for now
245 bool DaemonServer::ms_dispatch(Message
*m
)
247 Mutex::Locker
l(lock
);
249 switch (m
->get_type()) {
251 cluster_state
.ingest_pgstats(static_cast<MPGStats
*>(m
));
252 maybe_ready(m
->get_source().num());
256 return handle_report(static_cast<MMgrReport
*>(m
));
258 return handle_open(static_cast<MMgrOpen
*>(m
));
260 return handle_command(static_cast<MCommand
*>(m
));
262 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
267 void DaemonServer::maybe_ready(int32_t osd_id
)
269 if (!pgmap_ready
&& reported_osds
.find(osd_id
) == reported_osds
.end()) {
270 dout(4) << "initial report from osd " << osd_id
<< dendl
;
271 reported_osds
.insert(osd_id
);
272 std::set
<int32_t> up_osds
;
274 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
275 osdmap
.get_up_osds(up_osds
);
278 std::set
<int32_t> unreported_osds
;
279 std::set_difference(up_osds
.begin(), up_osds
.end(),
280 reported_osds
.begin(), reported_osds
.end(),
281 std::inserter(unreported_osds
, unreported_osds
.begin()));
283 if (unreported_osds
.size() == 0) {
284 dout(4) << "all osds have reported, sending PG state to mon" << dendl
;
286 reported_osds
.clear();
287 // Avoid waiting for next tick
290 dout(4) << "still waiting for " << unreported_osds
.size() << " osds"
291 " to report in before PGMap is ready" << dendl
;
296 void DaemonServer::shutdown()
298 dout(10) << "begin" << dendl
;
301 dout(10) << "done" << dendl
;
306 bool DaemonServer::handle_open(MMgrOpen
*m
)
309 if (!m
->service_name
.empty()) {
310 key
.first
= m
->service_name
;
312 key
.first
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
314 key
.second
= m
->daemon_name
;
316 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
318 auto configure
= new MMgrConfigure();
319 configure
->stats_period
= g_conf
->mgr_stats_period
;
320 m
->get_connection()->send_message(configure
);
322 DaemonStatePtr daemon
;
323 if (daemon_state
.exists(key
)) {
324 daemon
= daemon_state
.get(key
);
327 dout(20) << "updating existing DaemonState for " << m
->daemon_name
<< dendl
;
328 Mutex::Locker
l(daemon
->lock
);
329 daemon_state
.get(key
)->perf_counters
.clear();
332 if (m
->service_daemon
) {
334 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
335 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
337 if (m
->daemon_metadata
.count("hostname")) {
338 daemon
->hostname
= m
->daemon_metadata
["hostname"];
340 daemon_state
.insert(daemon
);
342 Mutex::Locker
l(daemon
->lock
);
343 daemon
->service_daemon
= true;
344 daemon
->metadata
= m
->daemon_metadata
;
345 daemon
->service_status
= m
->daemon_status
;
347 utime_t now
= ceph_clock_now();
348 auto d
= pending_service_map
.get_daemon(m
->service_name
,
350 if (d
->gid
!= (uint64_t)m
->get_source().num()) {
351 dout(10) << "registering " << key
<< " in pending_service_map" << dendl
;
352 d
->gid
= m
->get_source().num();
353 d
->addr
= m
->get_source_addr();
354 d
->start_epoch
= pending_service_map
.epoch
;
355 d
->start_stamp
= now
;
356 d
->metadata
= m
->daemon_metadata
;
357 pending_service_map_dirty
= pending_service_map
.epoch
;
365 bool DaemonServer::handle_report(MMgrReport
*m
)
368 if (!m
->service_name
.empty()) {
369 key
.first
= m
->service_name
;
371 key
.first
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
373 key
.second
= m
->daemon_name
;
375 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
377 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
378 m
->service_name
.empty()) {
379 // Clients should not be sending us stats unless they are declaring
380 // themselves to be a daemon for some service.
381 dout(4) << "rejecting report from non-daemon client " << m
->daemon_name
387 DaemonStatePtr daemon
;
388 if (daemon_state
.exists(key
)) {
389 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
390 daemon
= daemon_state
.get(key
);
392 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
393 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
394 // FIXME: crap, we don't know the hostname at this stage.
396 daemon_state
.insert(daemon
);
397 // FIXME: we should avoid this case by rejecting MMgrReport from
398 // daemons without sessions, and ensuring that session open
399 // always contains metadata.
401 assert(daemon
!= nullptr);
402 auto &daemon_counters
= daemon
->perf_counters
;
404 Mutex::Locker
l(daemon
->lock
);
405 daemon_counters
.update(m
);
407 // if there are any schema updates, notify the python modules
408 if (!m
->declare_types
.empty() || !m
->undeclare_types
.empty()) {
410 oss
<< key
.first
<< '.' << key
.second
;
411 py_modules
.notify_all("perf_schema_update", oss
.str());
414 if (daemon
->service_daemon
) {
415 utime_t now
= ceph_clock_now();
416 if (m
->daemon_status
) {
417 daemon
->service_status
= *m
->daemon_status
;
418 daemon
->service_status_stamp
= now
;
420 daemon
->last_service_beacon
= now
;
421 } else if (m
->daemon_status
) {
422 derr
<< "got status from non-daemon " << key
<< dendl
;
430 void DaemonServer::_generate_command_map(
431 map
<string
,cmd_vartype
>& cmdmap
,
432 map
<string
,string
> ¶m_str_map
)
434 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
435 p
!= cmdmap
.end(); ++p
) {
436 if (p
->first
== "prefix")
438 if (p
->first
== "caps") {
440 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
441 cv
.size() % 2 == 0) {
442 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
443 string k
= string("caps_") + cv
[i
];
444 param_str_map
[k
] = cv
[i
+ 1];
449 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
453 const MonCommand
*DaemonServer::_get_mgrcommand(
454 const string
&cmd_prefix
,
455 const std::vector
<MonCommand
> &cmds
)
457 const MonCommand
*this_cmd
= nullptr;
458 for (const auto &cmd
: cmds
) {
459 if (cmd
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
467 bool DaemonServer::_allowed_command(
469 const string
&module
,
470 const string
&prefix
,
471 const map
<string
,cmd_vartype
>& cmdmap
,
472 const map
<string
,string
>& param_str_map
,
473 const MonCommand
*this_cmd
) {
475 if (s
->entity_name
.is_mon()) {
476 // mon is all-powerful. even when it is forwarding commands on behalf of
477 // old clients; we expect the mon is validating commands before proxying!
481 bool cmd_r
= this_cmd
->requires_perm('r');
482 bool cmd_w
= this_cmd
->requires_perm('w');
483 bool cmd_x
= this_cmd
->requires_perm('x');
485 bool capable
= s
->caps
.is_capable(
487 CEPH_ENTITY_TYPE_MGR
,
489 module
, prefix
, param_str_map
,
490 cmd_r
, cmd_w
, cmd_x
);
492 dout(10) << " " << s
->entity_name
<< " "
493 << (capable
? "" : "not ") << "capable" << dendl
;
497 bool DaemonServer::handle_command(MCommand
*m
)
500 std::stringstream ss
;
503 assert(lock
.is_locked_by_me());
506 * The working data for processing an MCommand. This lives in
507 * a class to enable passing it into other threads for processing
508 * outside of the thread/locks that called handle_command.
517 CommandContext(MCommand
*m_
)
527 void reply(int r
, const std::stringstream
&ss
)
532 void reply(int r
, const std::string
&rs
)
534 // Let the connection drop as soon as we've sent our response
535 ConnectionRef con
= m
->get_connection();
537 con
->mark_disposable();
540 dout(1) << "handle_command " << cpp_strerror(r
) << " " << rs
<< dendl
;
542 MCommandReply
*reply
= new MCommandReply(r
, rs
);
543 reply
->set_tid(m
->get_tid());
544 reply
->set_data(odata
);
545 con
->send_message(reply
);
551 * A context for receiving a bufferlist/error string from a background
552 * function and then calling back to a CommandContext when it's done
554 class ReplyOnFinish
: public Context
{
555 std::shared_ptr
<CommandContext
> cmdctx
;
561 ReplyOnFinish(std::shared_ptr
<CommandContext
> cmdctx_
)
564 void finish(int r
) override
{
565 cmdctx
->odata
.claim_append(from_mon
);
566 cmdctx
->reply(r
, outs
);
570 std::shared_ptr
<CommandContext
> cmdctx
= std::make_shared
<CommandContext
>(m
);
572 MgrSessionRef
session(static_cast<MgrSession
*>(m
->get_connection()->get_priv()));
576 session
->put(); // SessionRef takes a ref
577 if (session
->inst
.name
== entity_name_t())
578 session
->inst
.name
= m
->get_source();
581 boost::scoped_ptr
<Formatter
> f
;
582 map
<string
,string
> param_str_map
;
584 if (!cmdmap_from_json(m
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
585 cmdctx
->reply(-EINVAL
, ss
);
590 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "format", format
, string("plain"));
591 f
.reset(Formatter::create(format
));
594 cmd_getval(cct
, cmdctx
->cmdmap
, "prefix", prefix
);
596 dout(4) << "decoded " << cmdctx
->cmdmap
.size() << dendl
;
597 dout(4) << "prefix=" << prefix
<< dendl
;
599 if (prefix
== "get_command_descriptions") {
600 dout(10) << "reading commands from python modules" << dendl
;
601 const auto py_commands
= py_modules
.get_commands();
605 f
.open_object_section("command_descriptions");
607 auto dump_cmd
= [&cmdnum
, &f
](const MonCommand
&mc
){
608 ostringstream secname
;
609 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
610 dump_cmddesc_to_json(&f
, secname
.str(), mc
.cmdstring
, mc
.helpstring
,
611 mc
.module
, mc
.req_perms
, mc
.availability
, 0);
615 for (const auto &pyc
: py_commands
) {
619 for (const auto &mgr_cmd
: mgr_commands
) {
623 f
.close_section(); // command_descriptions
624 f
.flush(cmdctx
->odata
);
625 cmdctx
->reply(0, ss
);
630 const MonCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
);
631 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
633 MonCommand py_command
= {"", "", "py", "rw", "cli"};
634 if (!_allowed_command(session
.get(), py_command
.module
, prefix
, cmdctx
->cmdmap
,
635 param_str_map
, &py_command
)) {
636 dout(1) << " access denied" << dendl
;
637 ss
<< "access denied; does your client key have mgr caps?"
638 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
639 cmdctx
->reply(-EACCES
, ss
);
643 // validate user's permissions for requested command
644 if (!_allowed_command(session
.get(), mgr_cmd
->module
, prefix
, cmdctx
->cmdmap
,
645 param_str_map
, mgr_cmd
)) {
646 dout(1) << " access denied" << dendl
;
647 audit_clog
->info() << "from='" << session
->inst
<< "' "
648 << "entity='" << session
->entity_name
<< "' "
649 << "cmd=" << m
->cmd
<< ": access denied";
650 ss
<< "access denied' does your client key have mgr caps?"
651 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
652 cmdctx
->reply(-EACCES
, ss
);
658 << "from='" << session
->inst
<< "' "
659 << "entity='" << session
->entity_name
<< "' "
660 << "cmd=" << m
->cmd
<< ": dispatch";
663 // service map commands
664 if (prefix
== "service dump") {
666 f
.reset(Formatter::create("json-pretty"));
667 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
668 f
->dump_object("service_map", service_map
);
670 f
->flush(cmdctx
->odata
);
671 cmdctx
->reply(0, ss
);
674 if (prefix
== "service status") {
676 f
.reset(Formatter::create("json-pretty"));
677 // only include state from services that are in the persisted service map
678 f
->open_object_section("service_status");
680 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
683 for (auto& p
: s
.services
) {
684 f
->open_object_section(p
.first
.c_str());
685 for (auto& q
: p
.second
.daemons
) {
686 f
->open_object_section(q
.first
.c_str());
687 DaemonKey
key(p
.first
, q
.first
);
688 assert(daemon_state
.exists(key
));
689 auto daemon
= daemon_state
.get(key
);
690 Mutex::Locker
l(daemon
->lock
);
691 f
->dump_stream("status_stamp") << daemon
->service_status_stamp
;
692 f
->dump_stream("last_beacon") << daemon
->last_service_beacon
;
693 f
->open_object_section("status");
694 for (auto& r
: daemon
->service_status
) {
695 f
->dump_string(r
.first
.c_str(), r
.second
);
703 f
->flush(cmdctx
->odata
);
704 cmdctx
->reply(0, ss
);
711 if (prefix
== "pg scrub" ||
712 prefix
== "pg repair" ||
713 prefix
== "pg deep-scrub") {
714 string scrubop
= prefix
.substr(3, string::npos
);
717 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgidstr
);
718 if (!pgid
.parse(pgidstr
.c_str())) {
719 ss
<< "invalid pgid '" << pgidstr
<< "'";
720 cmdctx
->reply(-EINVAL
, ss
);
723 bool pg_exists
= false;
724 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
725 pg_exists
= osdmap
.pg_exists(pgid
);
728 ss
<< "pg " << pgid
<< " dne";
729 cmdctx
->reply(-ENOENT
, ss
);
732 int acting_primary
= -1;
733 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
734 acting_primary
= osdmap
.get_pg_acting_primary(pgid
);
736 if (acting_primary
== -1) {
737 ss
<< "pg " << pgid
<< " has no primary osd";
738 cmdctx
->reply(-EAGAIN
, ss
);
741 auto p
= osd_cons
.find(acting_primary
);
742 if (p
== osd_cons
.end()) {
743 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
744 << " is not currently connected";
745 cmdctx
->reply(-EAGAIN
, ss
);
747 vector
<pg_t
> pgs
= { pgid
};
748 for (auto& con
: p
->second
) {
749 con
->send_message(new MOSDScrub(monc
->get_fsid(),
752 scrubop
== "deep-scrub"));
754 ss
<< "instructing pg " << pgid
<< " on osd." << acting_primary
755 << " to " << scrubop
;
756 cmdctx
->reply(0, ss
);
758 } else if (prefix
== "osd scrub" ||
759 prefix
== "osd deep-scrub" ||
760 prefix
== "osd repair") {
762 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", whostr
);
764 get_str_vec(prefix
, pvec
);
767 if (whostr
== "*" || whostr
== "all" || whostr
== "any") {
768 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
769 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
770 if (osdmap
.is_up(i
)) {
775 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
777 ss
<< "invalid osd '" << whostr
<< "'";
778 cmdctx
->reply(-EINVAL
, ss
);
781 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
782 if (osdmap
.is_up(osd
)) {
787 ss
<< "osd." << osd
<< " is not up";
788 cmdctx
->reply(-EAGAIN
, ss
);
792 set
<int> sent_osds
, failed_osds
;
793 for (auto osd
: osds
) {
794 auto p
= osd_cons
.find(osd
);
795 if (p
== osd_cons
.end()) {
796 failed_osds
.insert(osd
);
798 sent_osds
.insert(osd
);
799 for (auto& con
: p
->second
) {
800 con
->send_message(new MOSDScrub(monc
->get_fsid(),
801 pvec
.back() == "repair",
802 pvec
.back() == "deep-scrub"));
806 if (failed_osds
.size() == osds
.size()) {
807 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
808 << " (not connected)";
811 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
812 if (!failed_osds
.empty()) {
813 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
817 cmdctx
->reply(0, ss
);
819 } else if (prefix
== "osd reweight-by-pg" ||
820 prefix
== "osd reweight-by-utilization" ||
821 prefix
== "osd test-reweight-by-pg" ||
822 prefix
== "osd test-reweight-by-utilization") {
824 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
826 prefix
== "osd test-reweight-by-pg" ||
827 prefix
== "osd test-reweight-by-utilization";
829 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "oload", oload
, int64_t(120));
831 vector
<string
> poolnames
;
832 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pools", poolnames
);
833 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
834 for (const auto& poolname
: poolnames
) {
835 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
837 ss
<< "pool '" << poolname
<< "' does not exist";
844 cmdctx
->reply(r
, ss
);
847 double max_change
= g_conf
->mon_reweight_max_change
;
848 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_change", max_change
);
849 if (max_change
<= 0.0) {
850 ss
<< "max_change " << max_change
<< " must be positive";
851 cmdctx
->reply(-EINVAL
, ss
);
854 int64_t max_osds
= g_conf
->mon_reweight_max_osds
;
855 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_osds", max_osds
);
857 ss
<< "max_osds " << max_osds
<< " must be positive";
858 cmdctx
->reply(-EINVAL
, ss
);
861 string no_increasing
;
862 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "no_increasing", no_increasing
);
864 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
865 r
= cluster_state
.with_pgmap([&](const PGMap
& pgmap
) {
866 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
867 return reweight::by_utilization(osdmap
, pgmap
,
872 pools
.empty() ? NULL
: &pools
,
873 no_increasing
== "--no-increasing",
875 &ss
, &out_str
, f
.get());
879 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
882 f
->flush(cmdctx
->odata
);
884 cmdctx
->odata
.append(out_str
);
887 ss
<< "FAILED reweight-by-pg";
888 cmdctx
->reply(r
, ss
);
890 } else if (r
== 0 || dry_run
) {
892 cmdctx
->reply(r
, ss
);
895 json_spirit::Object json_object
;
896 for (const auto& osd_weight
: new_weights
) {
897 json_spirit::Config::add(json_object
,
898 std::to_string(osd_weight
.first
),
899 std::to_string(osd_weight
.second
));
901 string s
= json_spirit::write(json_object
);
902 std::replace(begin(s
), end(s
), '\"', '\'');
905 "\"prefix\": \"osd reweightn\", "
906 "\"weights\": \"" + s
+ "\""
908 auto on_finish
= new ReplyOnFinish(cmdctx
);
909 monc
->start_mon_command({cmd
}, {},
910 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
913 } else if (prefix
== "osd df") {
915 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "output_method", method
);
916 r
= cluster_state
.with_pgservice([&](const PGMapStatService
& pgservice
) {
917 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
918 print_osd_utilization(osdmap
, &pgservice
, ss
,
919 f
.get(), method
== "tree");
921 cmdctx
->odata
.append(ss
);
925 cmdctx
->reply(r
, "");
927 } else if (prefix
== "osd safe-to-destroy") {
929 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "ids", ids
);
932 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
933 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
935 if (!r
&& osds
.empty()) {
936 ss
<< "must specify one or more OSDs";
940 cmdctx
->reply(r
, ss
);
943 set
<int> active_osds
, missing_stats
, stored_pgs
;
944 int affected_pgs
= 0;
945 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
946 if (pg_map
.num_pg_unknown
> 0) {
947 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; cannot draw"
948 << " any conclusions";
952 int num_active_clean
= 0;
953 for (auto& p
: pg_map
.num_pg_by_state
) {
954 unsigned want
= PG_STATE_ACTIVE
|PG_STATE_CLEAN
;
955 if ((p
.first
& want
) == want
) {
956 num_active_clean
+= p
.second
;
959 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
960 for (auto osd
: osds
) {
961 if (!osdmap
.exists(osd
)) {
962 continue; // clearly safe to destroy
964 auto q
= pg_map
.num_pg_by_osd
.find(osd
);
965 if (q
!= pg_map
.num_pg_by_osd
.end()) {
966 if (q
->second
.acting
> 0 || q
->second
.up
> 0) {
967 active_osds
.insert(osd
);
968 affected_pgs
+= q
->second
.acting
+ q
->second
.up
;
972 if (num_active_clean
< pg_map
.num_pg
) {
973 // all pgs aren't active+clean; we need to be careful.
974 auto p
= pg_map
.osd_stat
.find(osd
);
975 if (p
== pg_map
.osd_stat
.end()) {
976 missing_stats
.insert(osd
);
978 if (p
->second
.num_pgs
> 0) {
979 stored_pgs
.insert(osd
);
985 if (!r
&& !active_osds
.empty()) {
986 ss
<< "OSD(s) " << active_osds
<< " have " << affected_pgs
987 << " pgs currently mapped to them";
989 } else if (!missing_stats
.empty()) {
990 ss
<< "OSD(s) " << missing_stats
<< " have no reported stats, and not all"
991 << " PGs are active+clean; we cannot draw any conclusions";
993 } else if (!stored_pgs
.empty()) {
994 ss
<< "OSD(s) " << stored_pgs
<< " last reported they still store some PG"
995 << " data, and not all PGs are active+clean; we cannot be sure they"
996 << " aren't still needed.";
1000 cmdctx
->reply(r
, ss
);
1003 ss
<< "OSD(s) " << osds
<< " are safe to destroy without reducing data"
1005 cmdctx
->reply(0, ss
);
1007 } else if (prefix
== "osd ok-to-stop") {
1009 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "ids", ids
);
1012 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1013 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1015 if (!r
&& osds
.empty()) {
1016 ss
<< "must specify one or more OSDs";
1020 cmdctx
->reply(r
, ss
);
1023 map
<pg_t
,int> pg_delta
; // pgid -> net acting set size change
1024 int dangerous_pgs
= 0;
1025 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1026 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1027 if (pg_map
.num_pg_unknown
> 0) {
1028 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; "
1029 << "cannot draw any conclusions";
1033 for (auto osd
: osds
) {
1034 auto p
= pg_map
.pg_by_osd
.find(osd
);
1035 if (p
!= pg_map
.pg_by_osd
.end()) {
1036 for (auto& pgid
: p
->second
) {
1041 for (auto& p
: pg_delta
) {
1042 auto q
= pg_map
.pg_stat
.find(p
.first
);
1043 if (q
== pg_map
.pg_stat
.end()) {
1044 ss
<< "missing information about " << p
.first
<< "; cannot draw"
1045 << " any conclusions";
1049 if (!(q
->second
.state
& PG_STATE_ACTIVE
) ||
1050 (q
->second
.state
& PG_STATE_DEGRADED
)) {
1051 // we don't currently have a good way to tell *how* degraded
1052 // a degraded PG is, so we have to assume we cannot remove
1053 // any more replicas/shards.
1057 const pg_pool_t
*pi
= osdmap
.get_pg_pool(p
.first
.pool());
1059 ++dangerous_pgs
; // pool is creating or deleting
1061 if (q
->second
.acting
.size() + p
.second
< pi
->min_size
) {
1069 cmdctx
->reply(r
, ss
);
1072 if (dangerous_pgs
) {
1073 ss
<< dangerous_pgs
<< " PGs are already degraded or might become "
1075 cmdctx
->reply(-EBUSY
, ss
);
1078 ss
<< "OSD(s) " << osds
<< " are ok to stop without reducing"
1079 << " availability, provided there are no other concurrent failures"
1080 << " or interventions. " << pg_delta
.size() << " PGs are likely to be"
1081 << " degraded (but remain available) as a result.";
1082 cmdctx
->reply(0, ss
);
1084 } else if (prefix
== "pg force-recovery" ||
1085 prefix
== "pg force-backfill" ||
1086 prefix
== "pg cancel-force-recovery" ||
1087 prefix
== "pg cancel-force-backfill") {
1088 string forceop
= prefix
.substr(3, string::npos
);
1089 list
<pg_t
> parsed_pgs
;
1090 map
<int, list
<pg_t
> > osdpgs
;
1092 // figure out actual op just once
1094 if (forceop
== "force-recovery") {
1095 actual_op
= OFR_RECOVERY
;
1096 } else if (forceop
== "force-backfill") {
1097 actual_op
= OFR_BACKFILL
;
1098 } else if (forceop
== "cancel-force-backfill") {
1099 actual_op
= OFR_BACKFILL
| OFR_CANCEL
;
1100 } else if (forceop
== "cancel-force-recovery") {
1101 actual_op
= OFR_RECOVERY
| OFR_CANCEL
;
1104 // covnert pg names to pgs, discard any invalid ones while at it
1106 // we don't want to keep pgidstr and pgidstr_nodup forever
1107 vector
<string
> pgidstr
;
1108 // get pgids to process and prune duplicates
1109 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgidstr
);
1110 set
<string
> pgidstr_nodup(pgidstr
.begin(), pgidstr
.end());
1111 if (pgidstr
.size() != pgidstr_nodup
.size()) {
1112 // move elements only when there were duplicates, as this
1114 pgidstr
.resize(pgidstr_nodup
.size());
1115 auto it
= pgidstr_nodup
.begin();
1116 for (size_t i
= 0 ; i
< pgidstr_nodup
.size(); i
++) {
1117 pgidstr
[i
] = std::move(*it
++);
1121 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1122 for (auto& pstr
: pgidstr
) {
1124 if (!parsed_pg
.parse(pstr
.c_str())) {
1125 ss
<< "invalid pgid '" << pstr
<< "'; ";
1128 auto workit
= pg_map
.pg_stat
.find(parsed_pg
);
1129 if (workit
== pg_map
.pg_stat
.end()) {
1130 ss
<< "pg " << pstr
<< " not exists; ";
1133 pg_stat_t workpg
= workit
->second
;
1135 // discard pgs for which user requests are pointless
1139 if ((workpg
.state
& (PG_STATE_DEGRADED
| PG_STATE_RECOVERY_WAIT
| PG_STATE_RECOVERING
)) == 0) {
1140 // don't return error, user script may be racing with cluster. not fatal.
1141 ss
<< "pg " << pstr
<< " doesn't require recovery; ";
1143 } else if (workpg
.state
& PG_STATE_FORCED_RECOVERY
) {
1144 ss
<< "pg " << pstr
<< " recovery already forced; ";
1145 // return error, as it may be a bug in user script
1151 if ((workpg
.state
& (PG_STATE_DEGRADED
| PG_STATE_BACKFILL_WAIT
| PG_STATE_BACKFILL
)) == 0) {
1152 ss
<< "pg " << pstr
<< " doesn't require backfilling; ";
1154 } else if (workpg
.state
& PG_STATE_FORCED_BACKFILL
) {
1155 ss
<< "pg " << pstr
<< " backfill already forced; ";
1160 case OFR_BACKFILL
| OFR_CANCEL
:
1161 if ((workpg
.state
& PG_STATE_FORCED_BACKFILL
) == 0) {
1162 ss
<< "pg " << pstr
<< " backfill not forced; ";
1166 case OFR_RECOVERY
| OFR_CANCEL
:
1167 if ((workpg
.state
& PG_STATE_FORCED_RECOVERY
) == 0) {
1168 ss
<< "pg " << pstr
<< " recovery not forced; ";
1173 assert(0 == "actual_op value is not supported");
1176 parsed_pgs
.push_back(std::move(parsed_pg
));
1181 // group pgs to process by osd
1182 for (auto& pgid
: parsed_pgs
) {
1183 auto workit
= pg_map
.pg_stat
.find(pgid
);
1184 if (workit
!= pg_map
.pg_stat
.end()) {
1185 pg_stat_t workpg
= workit
->second
;
1186 set
<int32_t> osds(workpg
.up
.begin(), workpg
.up
.end());
1187 osds
.insert(workpg
.acting
.begin(), workpg
.acting
.end());
1188 for (auto i
: osds
) {
1189 osdpgs
[i
].push_back(pgid
);
1197 // respond with error only when no pgs are correct
1198 // yes, in case of mixed errors, only the last one will be emitted,
1199 // but the message presented will be fine
1200 if (parsed_pgs
.size() != 0) {
1201 // clear error to not confuse users/scripts
1205 // optimize the command -> messages conversion, use only one message per distinct OSD
1206 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1207 for (auto& i
: osdpgs
) {
1208 if (osdmap
.is_up(i
.first
)) {
1209 vector
<pg_t
> pgvec(make_move_iterator(i
.second
.begin()), make_move_iterator(i
.second
.end()));
1210 auto p
= osd_cons
.find(i
.first
);
1211 if (p
== osd_cons
.end()) {
1212 ss
<< "osd." << i
.first
<< " is not currently connected";
1216 for (auto& con
: p
->second
) {
1217 con
->send_message(new MOSDForceRecovery(monc
->get_fsid(), pgvec
, actual_op
));
1219 ss
<< "instructing pg(s) " << i
.second
<< " on osd." << i
.first
<< " to " << forceop
<< "; ";
1224 cmdctx
->reply(r
, ss
);
1227 r
= cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1228 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1229 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
1230 f
.get(), &ss
, &cmdctx
->odata
);
1234 if (r
!= -EOPNOTSUPP
) {
1235 cmdctx
->reply(r
, ss
);
1240 // None of the special native commands,
1241 MgrPyModule
*handler
= nullptr;
1242 auto py_commands
= py_modules
.get_py_commands();
1243 for (const auto &pyc
: py_commands
) {
1244 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
1245 dout(1) << "pyc_prefix: '" << pyc_prefix
<< "'" << dendl
;
1246 if (pyc_prefix
== prefix
) {
1247 handler
= pyc
.handler
;
1252 if (handler
== nullptr) {
1253 ss
<< "No handler found for '" << prefix
<< "'";
1254 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
1255 cmdctx
->reply(-EINVAL
, ss
);
1258 // Okay, now we have a handler to call, but we must not call it
1259 // in this thread, because the python handlers can do anything,
1260 // including blocking, and including calling back into mgr.
1261 dout(4) << "passing through " << cmdctx
->cmdmap
.size() << dendl
;
1262 finisher
.queue(new FunctionContext([cmdctx
, handler
](int r_
) {
1263 std::stringstream ds
;
1264 std::stringstream ss
;
1265 int r
= handler
->handle_command(cmdctx
->cmdmap
, &ds
, &ss
);
1266 cmdctx
->odata
.append(ds
);
1267 cmdctx
->reply(r
, ss
);
1273 void DaemonServer::_prune_pending_service_map()
1275 utime_t cutoff
= ceph_clock_now();
1276 cutoff
-= g_conf
->mgr_service_beacon_grace
;
1277 auto p
= pending_service_map
.services
.begin();
1278 while (p
!= pending_service_map
.services
.end()) {
1279 auto q
= p
->second
.daemons
.begin();
1280 while (q
!= p
->second
.daemons
.end()) {
1281 DaemonKey
key(p
->first
, q
->first
);
1282 if (!daemon_state
.exists(key
)) {
1283 derr
<< "missing key " << key
<< dendl
;
1287 auto daemon
= daemon_state
.get(key
);
1288 Mutex::Locker
l(daemon
->lock
);
1289 if (daemon
->last_service_beacon
== utime_t()) {
1290 // we must have just restarted; assume they are alive now.
1291 daemon
->last_service_beacon
= ceph_clock_now();
1295 if (daemon
->last_service_beacon
< cutoff
) {
1296 dout(10) << "pruning stale " << p
->first
<< "." << q
->first
1297 << " last_beacon " << daemon
->last_service_beacon
<< dendl
;
1298 q
= p
->second
.daemons
.erase(q
);
1299 pending_service_map_dirty
= pending_service_map
.epoch
;
1304 if (p
->second
.daemons
.empty()) {
1305 p
= pending_service_map
.services
.erase(p
);
1306 pending_service_map_dirty
= pending_service_map
.epoch
;
1313 void DaemonServer::send_report()
1316 if (ceph_clock_now() - started_at
> g_conf
->mgr_stats_period
* 4.0) {
1318 reported_osds
.clear();
1319 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1320 << "potentially incomplete PG state to mon" << dendl
;
1322 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1328 auto m
= new MMonMgrReport();
1329 py_modules
.get_health_checks(&m
->health_checks
);
1331 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1332 cluster_state
.update_delta_stats();
1334 if (pending_service_map
.epoch
) {
1335 _prune_pending_service_map();
1336 if (pending_service_map_dirty
>= pending_service_map
.epoch
) {
1337 pending_service_map
.modified
= ceph_clock_now();
1338 ::encode(pending_service_map
, m
->service_map_bl
, CEPH_FEATURES_ALL
);
1339 dout(10) << "sending service_map e" << pending_service_map
.epoch
1341 pending_service_map
.epoch
++;
1345 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1346 // FIXME: no easy way to get mon features here. this will do for
1347 // now, though, as long as we don't make a backward-incompat change.
1348 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
1349 dout(10) << pg_map
<< dendl
;
1351 pg_map
.get_health_checks(g_ceph_context
, osdmap
,
1354 dout(10) << m
->health_checks
.checks
.size() << " health checks"
1356 dout(20) << "health checks:\n";
1357 JSONFormatter
jf(true);
1358 jf
.dump_object("health_checks", m
->health_checks
);
1363 // TODO? We currently do not notify the PyModules
1364 // TODO: respect needs_send, so we send the report only if we are asked to do
1365 // so, or the state is updated.
1366 monc
->send_mon_message(m
);
1369 void DaemonServer::got_service_map()
1371 Mutex::Locker
l(lock
);
1373 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
1374 if (pending_service_map
.epoch
== 0) {
1375 // we just started up
1376 dout(10) << "got initial map e" << service_map
.epoch
<< dendl
;
1377 pending_service_map
= service_map
;
1379 // we we already active and therefore must have persisted it,
1380 // which means ours is the same or newer.
1381 dout(10) << "got updated map e" << service_map
.epoch
<< dendl
;
1383 pending_service_map
.epoch
= service_map
.epoch
+ 1;
1386 // cull missing daemons, populate new ones
1387 for (auto& p
: pending_service_map
.services
) {
1388 std::set
<std::string
> names
;
1389 for (auto& q
: p
.second
.daemons
) {
1390 names
.insert(q
.first
);
1391 DaemonKey
key(p
.first
, q
.first
);
1392 if (!daemon_state
.exists(key
)) {
1393 auto daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
1395 daemon
->metadata
= q
.second
.metadata
;
1396 if (q
.second
.metadata
.count("hostname")) {
1397 daemon
->hostname
= q
.second
.metadata
["hostname"];
1399 daemon
->service_daemon
= true;
1400 daemon_state
.insert(daemon
);
1401 dout(10) << "added missing " << key
<< dendl
;
1404 daemon_state
.cull(p
.first
, names
);