1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
17 #include "include/stringify.h"
18 #include "include/str_list.h"
19 #include "auth/RotatingKeyRing.h"
20 #include "json_spirit/json_spirit_writer.h"
22 #include "mgr/mgr_commands.h"
23 #include "mgr/OSDHealthMetricCollector.h"
24 #include "mon/MonCommand.h"
26 #include "messages/MMgrOpen.h"
27 #include "messages/MMgrConfigure.h"
28 #include "messages/MMonMgrReport.h"
29 #include "messages/MCommand.h"
30 #include "messages/MCommandReply.h"
31 #include "messages/MPGStats.h"
32 #include "messages/MOSDScrub.h"
33 #include "messages/MOSDForceRecovery.h"
34 #include "common/errno.h"
36 #define dout_context g_ceph_context
37 #define dout_subsys ceph_subsys_mgr
39 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
43 DaemonServer::DaemonServer(MonClient
*monc_
,
45 DaemonStateIndex
&daemon_state_
,
46 ClusterState
&cluster_state_
,
47 PyModuleRegistry
&py_modules_
,
49 LogChannelRef audit_clog_
)
50 : Dispatcher(g_ceph_context
),
51 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
52 g_conf
->get_val
<uint64_t>("mgr_client_bytes"))),
53 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
54 g_conf
->get_val
<uint64_t>("mgr_client_messages"))),
55 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
56 g_conf
->get_val
<uint64_t>("mgr_osd_bytes"))),
57 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
58 g_conf
->get_val
<uint64_t>("mgr_osd_messages"))),
59 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
60 g_conf
->get_val
<uint64_t>("mgr_mds_bytes"))),
61 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
62 g_conf
->get_val
<uint64_t>("mgr_mds_messages"))),
63 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
64 g_conf
->get_val
<uint64_t>("mgr_mon_bytes"))),
65 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
66 g_conf
->get_val
<uint64_t>("mgr_mon_messages"))),
70 daemon_state(daemon_state_
),
71 cluster_state(cluster_state_
),
72 py_modules(py_modules_
),
74 audit_clog(audit_clog_
),
75 auth_cluster_registry(g_ceph_context
,
76 g_conf
->auth_supported
.empty() ?
77 g_conf
->auth_cluster_required
:
78 g_conf
->auth_supported
),
79 auth_service_registry(g_ceph_context
,
80 g_conf
->auth_supported
.empty() ?
81 g_conf
->auth_service_required
:
82 g_conf
->auth_supported
),
86 g_conf
->add_observer(this);
89 DaemonServer::~DaemonServer() {
91 g_conf
->remove_observer(this);
94 int DaemonServer::init(uint64_t gid
, entity_addr_t client_addr
)
96 // Initialize Messenger
97 std::string public_msgr_type
= g_conf
->ms_public_type
.empty() ?
98 g_conf
->get_val
<std::string
>("ms_type") : g_conf
->ms_public_type
;
99 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
100 entity_name_t::MGR(gid
),
103 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
106 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
107 client_byte_throttler
.get(),
108 client_msg_throttler
.get());
111 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
112 osd_byte_throttler
.get(),
113 osd_msg_throttler
.get());
114 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
115 mds_byte_throttler
.get(),
116 mds_msg_throttler
.get());
117 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
118 mon_byte_throttler
.get(),
119 mon_msg_throttler
.get());
121 int r
= msgr
->bind(g_conf
->public_addr
);
123 derr
<< "unable to bind mgr to " << g_conf
->public_addr
<< dendl
;
127 msgr
->set_myname(entity_name_t::MGR(gid
));
128 msgr
->set_addr_unknowns(client_addr
);
131 msgr
->add_dispatcher_tail(this);
133 started_at
= ceph_clock_now();
138 entity_addr_t
DaemonServer::get_myaddr() const
140 return msgr
->get_myaddr();
144 bool DaemonServer::ms_verify_authorizer(
148 ceph::bufferlist
& authorizer_data
,
149 ceph::bufferlist
& authorizer_reply
,
151 CryptoKey
& session_key
,
152 std::unique_ptr
<AuthAuthorizerChallenge
> *challenge
)
154 AuthAuthorizeHandler
*handler
= nullptr;
155 if (peer_type
== CEPH_ENTITY_TYPE_OSD
||
156 peer_type
== CEPH_ENTITY_TYPE_MON
||
157 peer_type
== CEPH_ENTITY_TYPE_MDS
||
158 peer_type
== CEPH_ENTITY_TYPE_MGR
) {
159 handler
= auth_cluster_registry
.get_handler(protocol
);
161 handler
= auth_service_registry
.get_handler(protocol
);
164 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol
<< dendl
;
169 MgrSessionRef
s(new MgrSession(cct
));
170 s
->inst
.addr
= con
->get_peer_addr();
171 AuthCapsInfo caps_info
;
173 RotatingKeyRing
*keys
= monc
->rotating_secrets
.get();
175 is_valid
= handler
->verify_authorizer(
178 authorizer_reply
, s
->entity_name
,
179 s
->global_id
, caps_info
,
184 dout(10) << __func__
<< " no rotating_keys (yet), denied" << dendl
;
189 if (caps_info
.allow_all
) {
190 dout(10) << " session " << s
<< " " << s
->entity_name
191 << " allow_all" << dendl
;
192 s
->caps
.set_allow_all();
194 if (caps_info
.caps
.length() > 0) {
195 bufferlist::iterator p
= caps_info
.caps
.begin();
200 catch (buffer::error
& e
) {
202 bool success
= s
->caps
.parse(str
);
204 dout(10) << " session " << s
<< " " << s
->entity_name
205 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
207 dout(10) << " session " << s
<< " " << s
->entity_name
208 << " failed to parse caps '" << str
<< "'" << dendl
;
212 con
->set_priv(s
->get());
214 if (peer_type
== CEPH_ENTITY_TYPE_OSD
) {
215 Mutex::Locker
l(lock
);
216 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
217 dout(10) << "registering osd." << s
->osd_id
<< " session "
218 << s
<< " con " << con
<< dendl
;
219 osd_cons
[s
->osd_id
].insert(con
);
227 bool DaemonServer::ms_get_authorizer(int dest_type
,
228 AuthAuthorizer
**authorizer
, bool force_new
)
230 dout(10) << "type=" << ceph_entity_type_name(dest_type
) << dendl
;
232 if (dest_type
== CEPH_ENTITY_TYPE_MON
) {
237 if (monc
->wait_auth_rotating(10) < 0)
241 *authorizer
= monc
->build_authorizer(dest_type
);
242 dout(20) << "got authorizer " << *authorizer
<< dendl
;
243 return *authorizer
!= NULL
;
246 bool DaemonServer::ms_handle_reset(Connection
*con
)
248 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
249 MgrSessionRef
session(static_cast<MgrSession
*>(con
->get_priv()));
253 session
->put(); // SessionRef takes a ref
254 Mutex::Locker
l(lock
);
255 dout(10) << "unregistering osd." << session
->osd_id
256 << " session " << session
<< " con " << con
<< dendl
;
257 osd_cons
[session
->osd_id
].erase(con
);
259 auto iter
= daemon_connections
.find(con
);
260 if (iter
!= daemon_connections
.end()) {
261 daemon_connections
.erase(iter
);
267 bool DaemonServer::ms_handle_refused(Connection
*con
)
269 // do nothing for now
273 bool DaemonServer::ms_dispatch(Message
*m
)
275 // Note that we do *not* take ::lock here, in order to avoid
276 // serializing all message handling. It's up to each handler
277 // to take whatever locks it needs.
278 switch (m
->get_type()) {
280 cluster_state
.ingest_pgstats(static_cast<MPGStats
*>(m
));
281 maybe_ready(m
->get_source().num());
285 return handle_report(static_cast<MMgrReport
*>(m
));
287 return handle_open(static_cast<MMgrOpen
*>(m
));
289 return handle_command(static_cast<MCommand
*>(m
));
291 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
296 void DaemonServer::maybe_ready(int32_t osd_id
)
298 if (pgmap_ready
.load()) {
299 // Fast path: we don't need to take lock because pgmap_ready
302 Mutex::Locker
l(lock
);
304 if (reported_osds
.find(osd_id
) == reported_osds
.end()) {
305 dout(4) << "initial report from osd " << osd_id
<< dendl
;
306 reported_osds
.insert(osd_id
);
307 std::set
<int32_t> up_osds
;
309 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
310 osdmap
.get_up_osds(up_osds
);
313 std::set
<int32_t> unreported_osds
;
314 std::set_difference(up_osds
.begin(), up_osds
.end(),
315 reported_osds
.begin(), reported_osds
.end(),
316 std::inserter(unreported_osds
, unreported_osds
.begin()));
318 if (unreported_osds
.size() == 0) {
319 dout(4) << "all osds have reported, sending PG state to mon" << dendl
;
321 reported_osds
.clear();
322 // Avoid waiting for next tick
325 dout(4) << "still waiting for " << unreported_osds
.size() << " osds"
326 " to report in before PGMap is ready" << dendl
;
332 void DaemonServer::shutdown()
334 dout(10) << "begin" << dendl
;
337 dout(10) << "done" << dendl
;
342 bool DaemonServer::handle_open(MMgrOpen
*m
)
344 Mutex::Locker
l(lock
);
347 if (!m
->service_name
.empty()) {
348 key
.first
= m
->service_name
;
350 key
.first
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
352 key
.second
= m
->daemon_name
;
354 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
356 _send_configure(m
->get_connection());
358 DaemonStatePtr daemon
;
359 if (daemon_state
.exists(key
)) {
360 daemon
= daemon_state
.get(key
);
363 dout(20) << "updating existing DaemonState for " << m
->daemon_name
<< dendl
;
364 Mutex::Locker
l(daemon
->lock
);
365 daemon
->perf_counters
.clear();
368 if (m
->service_daemon
) {
370 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
371 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
373 if (m
->daemon_metadata
.count("hostname")) {
374 daemon
->hostname
= m
->daemon_metadata
["hostname"];
376 daemon_state
.insert(daemon
);
378 Mutex::Locker
l(daemon
->lock
);
379 daemon
->service_daemon
= true;
380 daemon
->metadata
= m
->daemon_metadata
;
381 daemon
->service_status
= m
->daemon_status
;
383 utime_t now
= ceph_clock_now();
384 auto d
= pending_service_map
.get_daemon(m
->service_name
,
386 if (d
->gid
!= (uint64_t)m
->get_source().num()) {
387 dout(10) << "registering " << key
<< " in pending_service_map" << dendl
;
388 d
->gid
= m
->get_source().num();
389 d
->addr
= m
->get_source_addr();
390 d
->start_epoch
= pending_service_map
.epoch
;
391 d
->start_stamp
= now
;
392 d
->metadata
= m
->daemon_metadata
;
393 pending_service_map_dirty
= pending_service_map
.epoch
;
397 if (m
->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT
&&
398 m
->service_name
.empty())
400 // Store in set of the daemon/service connections, i.e. those
401 // connections that require an update in the event of stats
402 // configuration changes.
403 daemon_connections
.insert(m
->get_connection());
410 bool DaemonServer::handle_report(MMgrReport
*m
)
413 if (!m
->service_name
.empty()) {
414 key
.first
= m
->service_name
;
416 key
.first
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
418 key
.second
= m
->daemon_name
;
420 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
422 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
423 m
->service_name
.empty()) {
424 // Clients should not be sending us stats unless they are declaring
425 // themselves to be a daemon for some service.
426 dout(4) << "rejecting report from non-daemon client " << m
->daemon_name
428 m
->get_connection()->mark_down();
433 // Look up the DaemonState
434 DaemonStatePtr daemon
;
435 if (daemon_state
.exists(key
)) {
436 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
437 daemon
= daemon_state
.get(key
);
439 // we don't know the hostname at this stage, reject MMgrReport here.
440 dout(5) << "rejecting report from " << key
<< ", since we do not have its metadata now."
443 // issue metadata request in background
444 if (!daemon_state
.is_updating(key
) &&
445 (key
.first
== "osd" || key
.first
== "mds")) {
447 std::ostringstream oss
;
448 auto c
= new MetadataUpdate(daemon_state
, key
);
449 if (key
.first
== "osd") {
450 oss
<< "{\"prefix\": \"osd metadata\", \"id\": "
453 } else if (key
.first
== "mds") {
454 c
->set_default("addr", stringify(m
->get_source_addr()));
455 oss
<< "{\"prefix\": \"mds metadata\", \"who\": \""
456 << key
.second
<< "\"}";
462 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
466 Mutex::Locker
l(lock
);
468 MgrSessionRef
session(static_cast<MgrSession
*>(m
->get_connection()->get_priv()));
472 m
->get_connection()->mark_down();
475 dout(10) << "unregistering osd." << session
->osd_id
476 << " session " << session
<< " con " << m
->get_connection() << dendl
;
478 if (osd_cons
.find(session
->osd_id
) != osd_cons
.end()) {
479 osd_cons
[session
->osd_id
].erase(m
->get_connection());
482 auto iter
= daemon_connections
.find(m
->get_connection());
483 if (iter
!= daemon_connections
.end()) {
484 daemon_connections
.erase(iter
);
491 // Update the DaemonState
492 assert(daemon
!= nullptr);
494 Mutex::Locker
l(daemon
->lock
);
495 auto &daemon_counters
= daemon
->perf_counters
;
496 daemon_counters
.update(m
);
498 if (daemon
->service_daemon
) {
499 utime_t now
= ceph_clock_now();
500 if (m
->daemon_status
) {
501 daemon
->service_status
= *m
->daemon_status
;
502 daemon
->service_status_stamp
= now
;
504 daemon
->last_service_beacon
= now
;
505 } else if (m
->daemon_status
) {
506 derr
<< "got status from non-daemon " << key
<< dendl
;
508 if (m
->get_connection()->peer_is_osd()) {
509 // only OSD sends health_checks to me now
510 daemon
->osd_health_metrics
= std::move(m
->osd_health_metrics
);
514 // if there are any schema updates, notify the python modules
515 if (!m
->declare_types
.empty() || !m
->undeclare_types
.empty()) {
517 oss
<< key
.first
<< '.' << key
.second
;
518 py_modules
.notify_all("perf_schema_update", oss
.str());
526 void DaemonServer::_generate_command_map(
527 map
<string
,cmd_vartype
>& cmdmap
,
528 map
<string
,string
> ¶m_str_map
)
530 for (map
<string
,cmd_vartype
>::const_iterator p
= cmdmap
.begin();
531 p
!= cmdmap
.end(); ++p
) {
532 if (p
->first
== "prefix")
534 if (p
->first
== "caps") {
536 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
537 cv
.size() % 2 == 0) {
538 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
539 string k
= string("caps_") + cv
[i
];
540 param_str_map
[k
] = cv
[i
+ 1];
545 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
549 const MonCommand
*DaemonServer::_get_mgrcommand(
550 const string
&cmd_prefix
,
551 const std::vector
<MonCommand
> &cmds
)
553 const MonCommand
*this_cmd
= nullptr;
554 for (const auto &cmd
: cmds
) {
555 if (cmd
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
563 bool DaemonServer::_allowed_command(
565 const string
&module
,
566 const string
&prefix
,
567 const map
<string
,cmd_vartype
>& cmdmap
,
568 const map
<string
,string
>& param_str_map
,
569 const MonCommand
*this_cmd
) {
571 if (s
->entity_name
.is_mon()) {
572 // mon is all-powerful. even when it is forwarding commands on behalf of
573 // old clients; we expect the mon is validating commands before proxying!
577 bool cmd_r
= this_cmd
->requires_perm('r');
578 bool cmd_w
= this_cmd
->requires_perm('w');
579 bool cmd_x
= this_cmd
->requires_perm('x');
581 bool capable
= s
->caps
.is_capable(
583 CEPH_ENTITY_TYPE_MGR
,
585 module
, prefix
, param_str_map
,
586 cmd_r
, cmd_w
, cmd_x
);
588 dout(10) << " " << s
->entity_name
<< " "
589 << (capable
? "" : "not ") << "capable" << dendl
;
593 bool DaemonServer::handle_command(MCommand
*m
)
595 Mutex::Locker
l(lock
);
597 std::stringstream ss
;
600 assert(lock
.is_locked_by_me());
603 * The working data for processing an MCommand. This lives in
604 * a class to enable passing it into other threads for processing
605 * outside of the thread/locks that called handle_command.
614 CommandContext(MCommand
*m_
)
624 void reply(int r
, const std::stringstream
&ss
)
629 void reply(int r
, const std::string
&rs
)
631 // Let the connection drop as soon as we've sent our response
632 ConnectionRef con
= m
->get_connection();
634 con
->mark_disposable();
637 dout(1) << "handle_command " << cpp_strerror(r
) << " " << rs
<< dendl
;
639 MCommandReply
*reply
= new MCommandReply(r
, rs
);
640 reply
->set_tid(m
->get_tid());
641 reply
->set_data(odata
);
642 con
->send_message(reply
);
648 * A context for receiving a bufferlist/error string from a background
649 * function and then calling back to a CommandContext when it's done
651 class ReplyOnFinish
: public Context
{
652 std::shared_ptr
<CommandContext
> cmdctx
;
658 ReplyOnFinish(std::shared_ptr
<CommandContext
> cmdctx_
)
661 void finish(int r
) override
{
662 cmdctx
->odata
.claim_append(from_mon
);
663 cmdctx
->reply(r
, outs
);
667 std::shared_ptr
<CommandContext
> cmdctx
= std::make_shared
<CommandContext
>(m
);
669 MgrSessionRef
session(static_cast<MgrSession
*>(m
->get_connection()->get_priv()));
673 session
->put(); // SessionRef takes a ref
674 if (session
->inst
.name
== entity_name_t())
675 session
->inst
.name
= m
->get_source();
678 boost::scoped_ptr
<Formatter
> f
;
679 map
<string
,string
> param_str_map
;
681 if (!cmdmap_from_json(m
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
682 cmdctx
->reply(-EINVAL
, ss
);
687 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "format", format
, string("plain"));
688 f
.reset(Formatter::create(format
));
691 cmd_getval(cct
, cmdctx
->cmdmap
, "prefix", prefix
);
693 dout(4) << "decoded " << cmdctx
->cmdmap
.size() << dendl
;
694 dout(4) << "prefix=" << prefix
<< dendl
;
696 if (prefix
== "get_command_descriptions") {
697 dout(10) << "reading commands from python modules" << dendl
;
698 const auto py_commands
= py_modules
.get_commands();
702 f
.open_object_section("command_descriptions");
704 auto dump_cmd
= [&cmdnum
, &f
](const MonCommand
&mc
){
705 ostringstream secname
;
706 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
707 dump_cmddesc_to_json(&f
, secname
.str(), mc
.cmdstring
, mc
.helpstring
,
708 mc
.module
, mc
.req_perms
, mc
.availability
, 0);
712 for (const auto &pyc
: py_commands
) {
716 for (const auto &mgr_cmd
: mgr_commands
) {
720 f
.close_section(); // command_descriptions
721 f
.flush(cmdctx
->odata
);
722 cmdctx
->reply(0, ss
);
727 const MonCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
);
728 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
730 MonCommand py_command
= {"", "", "py", "rw", "cli"};
731 if (!_allowed_command(session
.get(), py_command
.module
, prefix
, cmdctx
->cmdmap
,
732 param_str_map
, &py_command
)) {
733 dout(1) << " access denied" << dendl
;
734 ss
<< "access denied; does your client key have mgr caps?"
735 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
736 cmdctx
->reply(-EACCES
, ss
);
740 // validate user's permissions for requested command
741 if (!_allowed_command(session
.get(), mgr_cmd
->module
, prefix
, cmdctx
->cmdmap
,
742 param_str_map
, mgr_cmd
)) {
743 dout(1) << " access denied" << dendl
;
744 audit_clog
->info() << "from='" << session
->inst
<< "' "
745 << "entity='" << session
->entity_name
<< "' "
746 << "cmd=" << m
->cmd
<< ": access denied";
747 ss
<< "access denied' does your client key have mgr caps?"
748 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
749 cmdctx
->reply(-EACCES
, ss
);
755 << "from='" << session
->inst
<< "' "
756 << "entity='" << session
->entity_name
<< "' "
757 << "cmd=" << m
->cmd
<< ": dispatch";
760 // service map commands
761 if (prefix
== "service dump") {
763 f
.reset(Formatter::create("json-pretty"));
764 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
765 f
->dump_object("service_map", service_map
);
767 f
->flush(cmdctx
->odata
);
768 cmdctx
->reply(0, ss
);
771 if (prefix
== "service status") {
773 f
.reset(Formatter::create("json-pretty"));
774 // only include state from services that are in the persisted service map
775 f
->open_object_section("service_status");
777 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
780 for (auto& p
: s
.services
) {
781 f
->open_object_section(p
.first
.c_str());
782 for (auto& q
: p
.second
.daemons
) {
783 f
->open_object_section(q
.first
.c_str());
784 DaemonKey
key(p
.first
, q
.first
);
785 assert(daemon_state
.exists(key
));
786 auto daemon
= daemon_state
.get(key
);
787 Mutex::Locker
l(daemon
->lock
);
788 f
->dump_stream("status_stamp") << daemon
->service_status_stamp
;
789 f
->dump_stream("last_beacon") << daemon
->last_service_beacon
;
790 f
->open_object_section("status");
791 for (auto& r
: daemon
->service_status
) {
792 f
->dump_string(r
.first
.c_str(), r
.second
);
800 f
->flush(cmdctx
->odata
);
801 cmdctx
->reply(0, ss
);
805 if (prefix
== "config set") {
808 cmd_getval(cct
, cmdctx
->cmdmap
, "key", key
);
809 cmd_getval(cct
, cmdctx
->cmdmap
, "value", val
);
810 r
= cct
->_conf
->set_val(key
, val
, true, &ss
);
812 cct
->_conf
->apply_changes(nullptr);
814 cmdctx
->reply(0, ss
);
821 if (prefix
== "pg scrub" ||
822 prefix
== "pg repair" ||
823 prefix
== "pg deep-scrub") {
824 string scrubop
= prefix
.substr(3, string::npos
);
827 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgidstr
);
828 if (!pgid
.parse(pgidstr
.c_str())) {
829 ss
<< "invalid pgid '" << pgidstr
<< "'";
830 cmdctx
->reply(-EINVAL
, ss
);
833 bool pg_exists
= false;
834 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
835 pg_exists
= osdmap
.pg_exists(pgid
);
838 ss
<< "pg " << pgid
<< " dne";
839 cmdctx
->reply(-ENOENT
, ss
);
842 int acting_primary
= -1;
843 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
844 acting_primary
= osdmap
.get_pg_acting_primary(pgid
);
846 if (acting_primary
== -1) {
847 ss
<< "pg " << pgid
<< " has no primary osd";
848 cmdctx
->reply(-EAGAIN
, ss
);
851 auto p
= osd_cons
.find(acting_primary
);
852 if (p
== osd_cons
.end()) {
853 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
854 << " is not currently connected";
855 cmdctx
->reply(-EAGAIN
, ss
);
857 vector
<pg_t
> pgs
= { pgid
};
858 for (auto& con
: p
->second
) {
859 con
->send_message(new MOSDScrub(monc
->get_fsid(),
862 scrubop
== "deep-scrub"));
864 ss
<< "instructing pg " << pgid
<< " on osd." << acting_primary
865 << " to " << scrubop
;
866 cmdctx
->reply(0, ss
);
868 } else if (prefix
== "osd scrub" ||
869 prefix
== "osd deep-scrub" ||
870 prefix
== "osd repair") {
872 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", whostr
);
874 get_str_vec(prefix
, pvec
);
877 if (whostr
== "*" || whostr
== "all" || whostr
== "any") {
878 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
879 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
880 if (osdmap
.is_up(i
)) {
885 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
887 ss
<< "invalid osd '" << whostr
<< "'";
888 cmdctx
->reply(-EINVAL
, ss
);
891 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
892 if (osdmap
.is_up(osd
)) {
897 ss
<< "osd." << osd
<< " is not up";
898 cmdctx
->reply(-EAGAIN
, ss
);
902 set
<int> sent_osds
, failed_osds
;
903 for (auto osd
: osds
) {
904 auto p
= osd_cons
.find(osd
);
905 if (p
== osd_cons
.end()) {
906 failed_osds
.insert(osd
);
908 sent_osds
.insert(osd
);
909 for (auto& con
: p
->second
) {
910 con
->send_message(new MOSDScrub(monc
->get_fsid(),
911 pvec
.back() == "repair",
912 pvec
.back() == "deep-scrub"));
916 if (failed_osds
.size() == osds
.size()) {
917 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
918 << " (not connected)";
921 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
922 if (!failed_osds
.empty()) {
923 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
927 cmdctx
->reply(0, ss
);
929 } else if (prefix
== "osd reweight-by-pg" ||
930 prefix
== "osd reweight-by-utilization" ||
931 prefix
== "osd test-reweight-by-pg" ||
932 prefix
== "osd test-reweight-by-utilization") {
934 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
936 prefix
== "osd test-reweight-by-pg" ||
937 prefix
== "osd test-reweight-by-utilization";
939 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "oload", oload
, int64_t(120));
941 vector
<string
> poolnames
;
942 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pools", poolnames
);
943 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
944 for (const auto& poolname
: poolnames
) {
945 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
947 ss
<< "pool '" << poolname
<< "' does not exist";
954 cmdctx
->reply(r
, ss
);
957 double max_change
= g_conf
->mon_reweight_max_change
;
958 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_change", max_change
);
959 if (max_change
<= 0.0) {
960 ss
<< "max_change " << max_change
<< " must be positive";
961 cmdctx
->reply(-EINVAL
, ss
);
964 int64_t max_osds
= g_conf
->mon_reweight_max_osds
;
965 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_osds", max_osds
);
967 ss
<< "max_osds " << max_osds
<< " must be positive";
968 cmdctx
->reply(-EINVAL
, ss
);
971 string no_increasing
;
972 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "no_increasing", no_increasing
);
974 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
975 r
= cluster_state
.with_pgmap([&](const PGMap
& pgmap
) {
976 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
977 return reweight::by_utilization(osdmap
, pgmap
,
982 pools
.empty() ? NULL
: &pools
,
983 no_increasing
== "--no-increasing",
985 &ss
, &out_str
, f
.get());
989 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
992 f
->flush(cmdctx
->odata
);
994 cmdctx
->odata
.append(out_str
);
997 ss
<< "FAILED reweight-by-pg";
998 cmdctx
->reply(r
, ss
);
1000 } else if (r
== 0 || dry_run
) {
1002 cmdctx
->reply(r
, ss
);
1005 json_spirit::Object json_object
;
1006 for (const auto& osd_weight
: new_weights
) {
1007 json_spirit::Config::add(json_object
,
1008 std::to_string(osd_weight
.first
),
1009 std::to_string(osd_weight
.second
));
1011 string s
= json_spirit::write(json_object
);
1012 std::replace(begin(s
), end(s
), '\"', '\'');
1015 "\"prefix\": \"osd reweightn\", "
1016 "\"weights\": \"" + s
+ "\""
1018 auto on_finish
= new ReplyOnFinish(cmdctx
);
1019 monc
->start_mon_command({cmd
}, {},
1020 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
1023 } else if (prefix
== "osd df") {
1025 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "output_method", method
);
1026 r
= cluster_state
.with_pgservice([&](const PGMapStatService
& pgservice
) {
1027 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1028 print_osd_utilization(osdmap
, &pgservice
, ss
,
1029 f
.get(), method
== "tree");
1031 cmdctx
->odata
.append(ss
);
1035 cmdctx
->reply(r
, "");
1037 } else if (prefix
== "osd safe-to-destroy") {
1039 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "ids", ids
);
1042 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1043 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1045 if (!r
&& osds
.empty()) {
1046 ss
<< "must specify one or more OSDs";
1050 cmdctx
->reply(r
, ss
);
1053 set
<int> active_osds
, missing_stats
, stored_pgs
;
1054 int affected_pgs
= 0;
1055 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1056 if (pg_map
.num_pg_unknown
> 0) {
1057 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; cannot draw"
1058 << " any conclusions";
1062 int num_active_clean
= 0;
1063 for (auto& p
: pg_map
.num_pg_by_state
) {
1064 unsigned want
= PG_STATE_ACTIVE
|PG_STATE_CLEAN
;
1065 if ((p
.first
& want
) == want
) {
1066 num_active_clean
+= p
.second
;
1069 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1070 for (auto osd
: osds
) {
1071 if (!osdmap
.exists(osd
)) {
1072 continue; // clearly safe to destroy
1074 auto q
= pg_map
.num_pg_by_osd
.find(osd
);
1075 if (q
!= pg_map
.num_pg_by_osd
.end()) {
1076 if (q
->second
.acting
> 0 || q
->second
.up
> 0) {
1077 active_osds
.insert(osd
);
1078 affected_pgs
+= q
->second
.acting
+ q
->second
.up
;
1082 if (num_active_clean
< pg_map
.num_pg
) {
1083 // all pgs aren't active+clean; we need to be careful.
1084 auto p
= pg_map
.osd_stat
.find(osd
);
1085 if (p
== pg_map
.osd_stat
.end()) {
1086 missing_stats
.insert(osd
);
1088 if (p
->second
.num_pgs
> 0) {
1089 stored_pgs
.insert(osd
);
1095 if (!r
&& !active_osds
.empty()) {
1096 ss
<< "OSD(s) " << active_osds
<< " have " << affected_pgs
1097 << " pgs currently mapped to them";
1099 } else if (!missing_stats
.empty()) {
1100 ss
<< "OSD(s) " << missing_stats
<< " have no reported stats, and not all"
1101 << " PGs are active+clean; we cannot draw any conclusions";
1103 } else if (!stored_pgs
.empty()) {
1104 ss
<< "OSD(s) " << stored_pgs
<< " last reported they still store some PG"
1105 << " data, and not all PGs are active+clean; we cannot be sure they"
1106 << " aren't still needed.";
1110 cmdctx
->reply(r
, ss
);
1113 ss
<< "OSD(s) " << osds
<< " are safe to destroy without reducing data"
1115 cmdctx
->reply(0, ss
);
1117 } else if (prefix
== "osd ok-to-stop") {
1119 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "ids", ids
);
1122 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1123 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1125 if (!r
&& osds
.empty()) {
1126 ss
<< "must specify one or more OSDs";
1130 cmdctx
->reply(r
, ss
);
1133 map
<pg_t
,int> pg_delta
; // pgid -> net acting set size change
1134 int dangerous_pgs
= 0;
1135 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1136 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1137 if (pg_map
.num_pg_unknown
> 0) {
1138 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; "
1139 << "cannot draw any conclusions";
1143 for (auto osd
: osds
) {
1144 auto p
= pg_map
.pg_by_osd
.find(osd
);
1145 if (p
!= pg_map
.pg_by_osd
.end()) {
1146 for (auto& pgid
: p
->second
) {
1151 for (auto& p
: pg_delta
) {
1152 auto q
= pg_map
.pg_stat
.find(p
.first
);
1153 if (q
== pg_map
.pg_stat
.end()) {
1154 ss
<< "missing information about " << p
.first
<< "; cannot draw"
1155 << " any conclusions";
1159 if (!(q
->second
.state
& PG_STATE_ACTIVE
) ||
1160 (q
->second
.state
& PG_STATE_DEGRADED
)) {
1161 // we don't currently have a good way to tell *how* degraded
1162 // a degraded PG is, so we have to assume we cannot remove
1163 // any more replicas/shards.
1167 const pg_pool_t
*pi
= osdmap
.get_pg_pool(p
.first
.pool());
1169 ++dangerous_pgs
; // pool is creating or deleting
1171 if (q
->second
.acting
.size() + p
.second
< pi
->min_size
) {
1179 cmdctx
->reply(r
, ss
);
1182 if (dangerous_pgs
) {
1183 ss
<< dangerous_pgs
<< " PGs are already degraded or might become "
1185 cmdctx
->reply(-EBUSY
, ss
);
1188 ss
<< "OSD(s) " << osds
<< " are ok to stop without reducing"
1189 << " availability, provided there are no other concurrent failures"
1190 << " or interventions. " << pg_delta
.size() << " PGs are likely to be"
1191 << " degraded (but remain available) as a result.";
1192 cmdctx
->reply(0, ss
);
1194 } else if (prefix
== "pg force-recovery" ||
1195 prefix
== "pg force-backfill" ||
1196 prefix
== "pg cancel-force-recovery" ||
1197 prefix
== "pg cancel-force-backfill") {
1198 string forceop
= prefix
.substr(3, string::npos
);
1199 list
<pg_t
> parsed_pgs
;
1200 map
<int, list
<pg_t
> > osdpgs
;
1202 // figure out actual op just once
1204 if (forceop
== "force-recovery") {
1205 actual_op
= OFR_RECOVERY
;
1206 } else if (forceop
== "force-backfill") {
1207 actual_op
= OFR_BACKFILL
;
1208 } else if (forceop
== "cancel-force-backfill") {
1209 actual_op
= OFR_BACKFILL
| OFR_CANCEL
;
1210 } else if (forceop
== "cancel-force-recovery") {
1211 actual_op
= OFR_RECOVERY
| OFR_CANCEL
;
1214 // covnert pg names to pgs, discard any invalid ones while at it
1216 // we don't want to keep pgidstr and pgidstr_nodup forever
1217 vector
<string
> pgidstr
;
1218 // get pgids to process and prune duplicates
1219 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgidstr
);
1220 set
<string
> pgidstr_nodup(pgidstr
.begin(), pgidstr
.end());
1221 if (pgidstr
.size() != pgidstr_nodup
.size()) {
1222 // move elements only when there were duplicates, as this
1224 pgidstr
.resize(pgidstr_nodup
.size());
1225 auto it
= pgidstr_nodup
.begin();
1226 for (size_t i
= 0 ; i
< pgidstr_nodup
.size(); i
++) {
1227 pgidstr
[i
] = std::move(*it
++);
1231 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1232 for (auto& pstr
: pgidstr
) {
1234 if (!parsed_pg
.parse(pstr
.c_str())) {
1235 ss
<< "invalid pgid '" << pstr
<< "'; ";
1238 auto workit
= pg_map
.pg_stat
.find(parsed_pg
);
1239 if (workit
== pg_map
.pg_stat
.end()) {
1240 ss
<< "pg " << pstr
<< " does not exist; ";
1243 pg_stat_t workpg
= workit
->second
;
1245 // discard pgs for which user requests are pointless
1249 if ((workpg
.state
& (PG_STATE_DEGRADED
| PG_STATE_RECOVERY_WAIT
| PG_STATE_RECOVERING
)) == 0) {
1250 // don't return error, user script may be racing with cluster. not fatal.
1251 ss
<< "pg " << pstr
<< " doesn't require recovery; ";
1253 } else if (workpg
.state
& PG_STATE_FORCED_RECOVERY
) {
1254 ss
<< "pg " << pstr
<< " recovery already forced; ";
1255 // return error, as it may be a bug in user script
1261 if ((workpg
.state
& (PG_STATE_DEGRADED
| PG_STATE_BACKFILL_WAIT
| PG_STATE_BACKFILLING
)) == 0) {
1262 ss
<< "pg " << pstr
<< " doesn't require backfilling; ";
1264 } else if (workpg
.state
& PG_STATE_FORCED_BACKFILL
) {
1265 ss
<< "pg " << pstr
<< " backfill already forced; ";
1270 case OFR_BACKFILL
| OFR_CANCEL
:
1271 if ((workpg
.state
& PG_STATE_FORCED_BACKFILL
) == 0) {
1272 ss
<< "pg " << pstr
<< " backfill not forced; ";
1276 case OFR_RECOVERY
| OFR_CANCEL
:
1277 if ((workpg
.state
& PG_STATE_FORCED_RECOVERY
) == 0) {
1278 ss
<< "pg " << pstr
<< " recovery not forced; ";
1283 assert(0 == "actual_op value is not supported");
1286 parsed_pgs
.push_back(std::move(parsed_pg
));
1291 // group pgs to process by osd
1292 for (auto& pgid
: parsed_pgs
) {
1293 auto workit
= pg_map
.pg_stat
.find(pgid
);
1294 if (workit
!= pg_map
.pg_stat
.end()) {
1295 pg_stat_t workpg
= workit
->second
;
1296 set
<int32_t> osds(workpg
.up
.begin(), workpg
.up
.end());
1297 osds
.insert(workpg
.acting
.begin(), workpg
.acting
.end());
1298 for (auto i
: osds
) {
1299 osdpgs
[i
].push_back(pgid
);
1307 // respond with error only when no pgs are correct
1308 // yes, in case of mixed errors, only the last one will be emitted,
1309 // but the message presented will be fine
1310 if (parsed_pgs
.size() != 0) {
1311 // clear error to not confuse users/scripts
1315 // optimize the command -> messages conversion, use only one message per distinct OSD
1316 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1317 for (auto& i
: osdpgs
) {
1318 if (osdmap
.is_up(i
.first
)) {
1319 vector
<pg_t
> pgvec(make_move_iterator(i
.second
.begin()), make_move_iterator(i
.second
.end()));
1320 auto p
= osd_cons
.find(i
.first
);
1321 if (p
== osd_cons
.end()) {
1322 ss
<< "osd." << i
.first
<< " is not currently connected";
1326 for (auto& con
: p
->second
) {
1327 con
->send_message(new MOSDForceRecovery(monc
->get_fsid(), pgvec
, actual_op
));
1329 ss
<< "instructing pg(s) " << i
.second
<< " on osd." << i
.first
<< " to " << forceop
<< "; ";
1334 cmdctx
->reply(r
, ss
);
1337 r
= cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1338 return cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1339 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
1340 f
.get(), &ss
, &cmdctx
->odata
);
1344 if (r
!= -EOPNOTSUPP
) {
1345 cmdctx
->reply(r
, ss
);
1350 // None of the special native commands,
1351 ActivePyModule
*handler
= nullptr;
1352 auto py_commands
= py_modules
.get_py_commands();
1353 for (const auto &pyc
: py_commands
) {
1354 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
1355 dout(1) << "pyc_prefix: '" << pyc_prefix
<< "'" << dendl
;
1356 if (pyc_prefix
== prefix
) {
1357 handler
= pyc
.handler
;
1362 if (handler
== nullptr) {
1363 ss
<< "No handler found for '" << prefix
<< "'";
1364 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
1365 cmdctx
->reply(-EINVAL
, ss
);
1368 // Okay, now we have a handler to call, but we must not call it
1369 // in this thread, because the python handlers can do anything,
1370 // including blocking, and including calling back into mgr.
1371 dout(4) << "passing through " << cmdctx
->cmdmap
.size() << dendl
;
1372 finisher
.queue(new FunctionContext([cmdctx
, handler
](int r_
) {
1373 std::stringstream ds
;
1374 std::stringstream ss
;
1375 int r
= handler
->handle_command(cmdctx
->cmdmap
, &ds
, &ss
);
1376 cmdctx
->odata
.append(ds
);
1377 cmdctx
->reply(r
, ss
);
1383 void DaemonServer::_prune_pending_service_map()
1385 utime_t cutoff
= ceph_clock_now();
1386 cutoff
-= g_conf
->get_val
<double>("mgr_service_beacon_grace");
1387 auto p
= pending_service_map
.services
.begin();
1388 while (p
!= pending_service_map
.services
.end()) {
1389 auto q
= p
->second
.daemons
.begin();
1390 while (q
!= p
->second
.daemons
.end()) {
1391 DaemonKey
key(p
->first
, q
->first
);
1392 if (!daemon_state
.exists(key
)) {
1393 derr
<< "missing key " << key
<< dendl
;
1397 auto daemon
= daemon_state
.get(key
);
1398 Mutex::Locker
l(daemon
->lock
);
1399 if (daemon
->last_service_beacon
== utime_t()) {
1400 // we must have just restarted; assume they are alive now.
1401 daemon
->last_service_beacon
= ceph_clock_now();
1405 if (daemon
->last_service_beacon
< cutoff
) {
1406 dout(10) << "pruning stale " << p
->first
<< "." << q
->first
1407 << " last_beacon " << daemon
->last_service_beacon
<< dendl
;
1408 q
= p
->second
.daemons
.erase(q
);
1409 pending_service_map_dirty
= pending_service_map
.epoch
;
1414 if (p
->second
.daemons
.empty()) {
1415 p
= pending_service_map
.services
.erase(p
);
1416 pending_service_map_dirty
= pending_service_map
.epoch
;
1423 void DaemonServer::send_report()
1426 if (ceph_clock_now() - started_at
> g_conf
->get_val
<int64_t>("mgr_stats_period") * 4.0) {
1428 reported_osds
.clear();
1429 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1430 << "potentially incomplete PG state to mon" << dendl
;
1432 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1438 auto m
= new MMonMgrReport();
1439 py_modules
.get_health_checks(&m
->health_checks
);
1441 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1442 cluster_state
.update_delta_stats();
1444 if (pending_service_map
.epoch
) {
1445 _prune_pending_service_map();
1446 if (pending_service_map_dirty
>= pending_service_map
.epoch
) {
1447 pending_service_map
.modified
= ceph_clock_now();
1448 ::encode(pending_service_map
, m
->service_map_bl
, CEPH_FEATURES_ALL
);
1449 dout(10) << "sending service_map e" << pending_service_map
.epoch
1451 pending_service_map
.epoch
++;
1455 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1456 // FIXME: no easy way to get mon features here. this will do for
1457 // now, though, as long as we don't make a backward-incompat change.
1458 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
1459 dout(10) << pg_map
<< dendl
;
1461 pg_map
.get_health_checks(g_ceph_context
, osdmap
,
1464 dout(10) << m
->health_checks
.checks
.size() << " health checks"
1466 dout(20) << "health checks:\n";
1467 JSONFormatter
jf(true);
1468 jf
.dump_object("health_checks", m
->health_checks
);
1474 auto osds
= daemon_state
.get_by_service("osd");
1475 map
<osd_metric
, unique_ptr
<OSDHealthMetricCollector
>> accumulated
;
1476 for (const auto& osd
: osds
) {
1477 Mutex::Locker
l(osd
.second
->lock
);
1478 for (const auto& metric
: osd
.second
->osd_health_metrics
) {
1479 auto acc
= accumulated
.find(metric
.get_type());
1480 if (acc
== accumulated
.end()) {
1481 auto collector
= OSDHealthMetricCollector::create(metric
.get_type());
1483 derr
<< __func__
<< " " << osd
.first
<< "." << osd
.second
1484 << " sent me an unknown health metric: "
1485 << static_cast<uint8_t>(metric
.get_type()) << dendl
;
1488 tie(acc
, std::ignore
) = accumulated
.emplace(metric
.get_type(),
1489 std::move(collector
));
1491 acc
->second
->update(osd
.first
, metric
);
1494 for (const auto& acc
: accumulated
) {
1495 acc
.second
->summarize(m
->health_checks
);
1497 // TODO? We currently do not notify the PyModules
1498 // TODO: respect needs_send, so we send the report only if we are asked to do
1499 // so, or the state is updated.
1500 monc
->send_mon_message(m
);
1503 void DaemonServer::got_service_map()
1505 Mutex::Locker
l(lock
);
1507 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
1508 if (pending_service_map
.epoch
== 0) {
1509 // we just started up
1510 dout(10) << "got initial map e" << service_map
.epoch
<< dendl
;
1511 pending_service_map
= service_map
;
1513 // we we already active and therefore must have persisted it,
1514 // which means ours is the same or newer.
1515 dout(10) << "got updated map e" << service_map
.epoch
<< dendl
;
1517 pending_service_map
.epoch
= service_map
.epoch
+ 1;
1520 // cull missing daemons, populate new ones
1521 for (auto& p
: pending_service_map
.services
) {
1522 std::set
<std::string
> names
;
1523 for (auto& q
: p
.second
.daemons
) {
1524 names
.insert(q
.first
);
1525 DaemonKey
key(p
.first
, q
.first
);
1526 if (!daemon_state
.exists(key
)) {
1527 auto daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
1529 daemon
->metadata
= q
.second
.metadata
;
1530 if (q
.second
.metadata
.count("hostname")) {
1531 daemon
->hostname
= q
.second
.metadata
["hostname"];
1533 daemon
->service_daemon
= true;
1534 daemon_state
.insert(daemon
);
1535 dout(10) << "added missing " << key
<< dendl
;
1538 daemon_state
.cull(p
.first
, names
);
1543 const char** DaemonServer::get_tracked_conf_keys() const
1545 static const char *KEYS
[] = {
1546 "mgr_stats_threshold",
1554 void DaemonServer::handle_conf_change(const struct md_config_t
*conf
,
1555 const std::set
<std::string
> &changed
)
1557 dout(4) << "ohai" << dendl
;
1558 // We may be called within lock (via MCommand `config set`) or outwith the
1559 // lock (via admin socket `config set`), so handle either case.
1560 const bool initially_locked
= lock
.is_locked_by_me();
1561 if (!initially_locked
) {
1565 if (changed
.count("mgr_stats_threshold") || changed
.count("mgr_stats_period")) {
1566 dout(4) << "Updating stats threshold/period on "
1567 << daemon_connections
.size() << " clients" << dendl
;
1568 // Send a fresh MMgrConfigure to all clients, so that they can follow
1569 // the new policy for transmitting stats
1570 for (auto &c
: daemon_connections
) {
1576 void DaemonServer::_send_configure(ConnectionRef c
)
1578 assert(lock
.is_locked_by_me());
1580 auto configure
= new MMgrConfigure();
1581 configure
->stats_period
= g_conf
->get_val
<int64_t>("mgr_stats_period");
1582 configure
->stats_threshold
= g_conf
->get_val
<int64_t>("mgr_stats_threshold");
1583 c
->send_message(configure
);