1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
15 #include <boost/algorithm/string.hpp>
18 #include "include/stringify.h"
19 #include "include/str_list.h"
20 #include "auth/RotatingKeyRing.h"
21 #include "json_spirit/json_spirit_writer.h"
23 #include "mgr/mgr_commands.h"
24 #include "mgr/DaemonHealthMetricCollector.h"
25 #include "mgr/OSDPerfMetricCollector.h"
26 #include "mgr/MDSPerfMetricCollector.h"
27 #include "mon/MonCommand.h"
29 #include "messages/MMgrOpen.h"
30 #include "messages/MMgrClose.h"
31 #include "messages/MMgrConfigure.h"
32 #include "messages/MMonMgrReport.h"
33 #include "messages/MCommand.h"
34 #include "messages/MCommandReply.h"
35 #include "messages/MMgrCommand.h"
36 #include "messages/MMgrCommandReply.h"
37 #include "messages/MPGStats.h"
38 #include "messages/MOSDScrub.h"
39 #include "messages/MOSDScrub2.h"
40 #include "messages/MOSDForceRecovery.h"
41 #include "common/errno.h"
42 #include "common/pick_address.h"
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_mgr
47 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
48 using namespace TOPNSPC::common
;
50 template <typename Map
>
51 bool map_compare(Map
const &lhs
, Map
const &rhs
) {
52 return lhs
.size() == rhs
.size()
53 && std::equal(lhs
.begin(), lhs
.end(), rhs
.begin(),
54 [] (auto a
, auto b
) { return a
.first
== b
.first
&& a
.second
== b
.second
; });
58 DaemonServer::DaemonServer(MonClient
*monc_
,
60 DaemonStateIndex
&daemon_state_
,
61 ClusterState
&cluster_state_
,
62 PyModuleRegistry
&py_modules_
,
64 LogChannelRef audit_clog_
)
65 : Dispatcher(g_ceph_context
),
66 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
67 g_conf().get_val
<Option::size_t>("mgr_client_bytes"))),
68 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
69 g_conf().get_val
<uint64_t>("mgr_client_messages"))),
70 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
71 g_conf().get_val
<Option::size_t>("mgr_osd_bytes"))),
72 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
73 g_conf().get_val
<uint64_t>("mgr_osd_messages"))),
74 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
75 g_conf().get_val
<Option::size_t>("mgr_mds_bytes"))),
76 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
77 g_conf().get_val
<uint64_t>("mgr_mds_messages"))),
78 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
79 g_conf().get_val
<Option::size_t>("mgr_mon_bytes"))),
80 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
81 g_conf().get_val
<uint64_t>("mgr_mon_messages"))),
85 daemon_state(daemon_state_
),
86 cluster_state(cluster_state_
),
87 py_modules(py_modules_
),
89 audit_clog(audit_clog_
),
91 timer(g_ceph_context
, lock
),
94 osd_perf_metric_collector_listener(this),
95 osd_perf_metric_collector(osd_perf_metric_collector_listener
),
96 mds_perf_metric_collector_listener(this),
97 mds_perf_metric_collector(mds_perf_metric_collector_listener
)
99 g_conf().add_observer(this);
102 DaemonServer::~DaemonServer() {
104 g_conf().remove_observer(this);
107 int DaemonServer::init(uint64_t gid
, entity_addrvec_t client_addrs
)
109 // Initialize Messenger
110 std::string public_msgr_type
= g_conf()->ms_public_type
.empty() ?
111 g_conf().get_val
<std::string
>("ms_type") : g_conf()->ms_public_type
;
112 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
113 entity_name_t::MGR(gid
),
115 Messenger::get_pid_nonce());
116 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
118 msgr
->set_auth_client(monc
);
121 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
122 client_byte_throttler
.get(),
123 client_msg_throttler
.get());
126 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
127 osd_byte_throttler
.get(),
128 osd_msg_throttler
.get());
129 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
130 mds_byte_throttler
.get(),
131 mds_msg_throttler
.get());
132 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
133 mon_byte_throttler
.get(),
134 mon_msg_throttler
.get());
136 entity_addrvec_t addrs
;
137 int r
= pick_addresses(cct
, CEPH_PICK_ADDRESS_PUBLIC
, &addrs
);
141 dout(20) << __func__
<< " will bind to " << addrs
<< dendl
;
142 r
= msgr
->bindv(addrs
);
144 derr
<< "unable to bind mgr to " << addrs
<< dendl
;
148 msgr
->set_myname(entity_name_t::MGR(gid
));
149 msgr
->set_addr_unknowns(client_addrs
);
152 msgr
->add_dispatcher_tail(this);
154 msgr
->set_auth_server(monc
);
155 monc
->set_handle_authentication_dispatcher(this);
157 started_at
= ceph_clock_now();
159 std::lock_guard
l(lock
);
162 schedule_tick_locked(
163 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
168 entity_addrvec_t
DaemonServer::get_myaddrs() const
170 return msgr
->get_myaddrs();
173 int DaemonServer::ms_handle_authentication(Connection
*con
)
175 auto s
= ceph::make_ref
<MgrSession
>(cct
);
177 s
->inst
.addr
= con
->get_peer_addr();
178 s
->entity_name
= con
->peer_name
;
179 dout(10) << __func__
<< " new session " << s
<< " con " << con
180 << " entity " << con
->peer_name
181 << " addr " << con
->get_peer_addrs()
184 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
185 if (caps_info
.allow_all
) {
186 dout(10) << " session " << s
<< " " << s
->entity_name
187 << " allow_all" << dendl
;
188 s
->caps
.set_allow_all();
189 } else if (caps_info
.caps
.length() > 0) {
190 auto p
= caps_info
.caps
.cbegin();
195 catch (buffer::error
& e
) {
196 dout(10) << " session " << s
<< " " << s
->entity_name
197 << " failed to decode caps" << dendl
;
200 if (!s
->caps
.parse(str
)) {
201 dout(10) << " session " << s
<< " " << s
->entity_name
202 << " failed to parse caps '" << str
<< "'" << dendl
;
205 dout(10) << " session " << s
<< " " << s
->entity_name
206 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
209 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
210 std::lock_guard
l(lock
);
211 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
212 dout(10) << "registering osd." << s
->osd_id
<< " session "
213 << s
<< " con " << con
<< dendl
;
214 osd_cons
[s
->osd_id
].insert(con
);
220 bool DaemonServer::ms_handle_reset(Connection
*con
)
222 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
223 auto priv
= con
->get_priv();
224 auto session
= static_cast<MgrSession
*>(priv
.get());
228 std::lock_guard
l(lock
);
229 dout(10) << "unregistering osd." << session
->osd_id
230 << " session " << session
<< " con " << con
<< dendl
;
231 osd_cons
[session
->osd_id
].erase(con
);
233 auto iter
= daemon_connections
.find(con
);
234 if (iter
!= daemon_connections
.end()) {
235 daemon_connections
.erase(iter
);
241 bool DaemonServer::ms_handle_refused(Connection
*con
)
243 // do nothing for now
247 bool DaemonServer::ms_dispatch2(const ref_t
<Message
>& m
)
249 // Note that we do *not* take ::lock here, in order to avoid
250 // serializing all message handling. It's up to each handler
251 // to take whatever locks it needs.
252 switch (m
->get_type()) {
254 cluster_state
.ingest_pgstats(ref_cast
<MPGStats
>(m
));
255 maybe_ready(m
->get_source().num());
258 return handle_report(ref_cast
<MMgrReport
>(m
));
260 return handle_open(ref_cast
<MMgrOpen
>(m
));
262 return handle_close(ref_cast
<MMgrClose
>(m
));
264 return handle_command(ref_cast
<MCommand
>(m
));
265 case MSG_MGR_COMMAND
:
266 return handle_command(ref_cast
<MMgrCommand
>(m
));
268 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
273 void DaemonServer::dump_pg_ready(ceph::Formatter
*f
)
275 f
->dump_bool("pg_ready", pgmap_ready
.load());
278 void DaemonServer::maybe_ready(int32_t osd_id
)
280 if (pgmap_ready
.load()) {
281 // Fast path: we don't need to take lock because pgmap_ready
284 std::lock_guard
l(lock
);
286 if (reported_osds
.find(osd_id
) == reported_osds
.end()) {
287 dout(4) << "initial report from osd " << osd_id
<< dendl
;
288 reported_osds
.insert(osd_id
);
289 std::set
<int32_t> up_osds
;
291 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
292 osdmap
.get_up_osds(up_osds
);
295 std::set
<int32_t> unreported_osds
;
296 std::set_difference(up_osds
.begin(), up_osds
.end(),
297 reported_osds
.begin(), reported_osds
.end(),
298 std::inserter(unreported_osds
, unreported_osds
.begin()));
300 if (unreported_osds
.size() == 0) {
301 dout(4) << "all osds have reported, sending PG state to mon" << dendl
;
303 reported_osds
.clear();
304 // Avoid waiting for next tick
307 dout(4) << "still waiting for " << unreported_osds
.size() << " osds"
308 " to report in before PGMap is ready" << dendl
;
314 void DaemonServer::tick()
320 schedule_tick_locked(
321 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
324 // Currently modules do not set health checks in response to events delivered to
325 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
326 // if this pattern emerges in the future, this scheduler could be modified to
327 // fire after all modules have had a chance to set their health checks.
328 void DaemonServer::schedule_tick_locked(double delay_sec
)
330 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
333 timer
.cancel_event(tick_event
);
334 tick_event
= nullptr;
337 // on shutdown start rejecting explicit requests to send reports that may
338 // originate from python land which may still be running.
342 tick_event
= timer
.add_event_after(delay_sec
,
343 new LambdaContext([this](int r
) {
348 void DaemonServer::schedule_tick(double delay_sec
)
350 std::lock_guard
l(lock
);
351 schedule_tick_locked(delay_sec
);
354 void DaemonServer::handle_osd_perf_metric_query_updated()
358 // Send a fresh MMgrConfigure to all clients, so that they can follow
359 // the new policy for transmitting stats
360 finisher
.queue(new LambdaContext([this](int r
) {
361 std::lock_guard
l(lock
);
362 for (auto &c
: daemon_connections
) {
363 if (c
->peer_is_osd()) {
370 void DaemonServer::handle_mds_perf_metric_query_updated()
374 // Send a fresh MMgrConfigure to all clients, so that they can follow
375 // the new policy for transmitting stats
376 finisher
.queue(new LambdaContext([this](int r
) {
377 std::lock_guard
l(lock
);
378 for (auto &c
: daemon_connections
) {
379 if (c
->peer_is_mds()) {
386 void DaemonServer::shutdown()
388 dout(10) << "begin" << dendl
;
391 cluster_state
.shutdown();
392 dout(10) << "done" << dendl
;
394 std::lock_guard
l(lock
);
395 shutting_down
= true;
399 static DaemonKey
key_from_service(
400 const std::string
& service_name
,
402 const std::string
& daemon_name
)
404 if (!service_name
.empty()) {
405 return DaemonKey
{service_name
, daemon_name
};
407 return DaemonKey
{ceph_entity_type_name(peer_type
), daemon_name
};
411 void DaemonServer::fetch_missing_metadata(const DaemonKey
& key
,
412 const entity_addr_t
& addr
)
414 if (!daemon_state
.is_updating(key
) &&
415 (key
.type
== "osd" || key
.type
== "mds" || key
.type
== "mon")) {
416 std::ostringstream oss
;
417 auto c
= new MetadataUpdate(daemon_state
, key
);
418 if (key
.type
== "osd") {
419 oss
<< "{\"prefix\": \"osd metadata\", \"id\": "
421 } else if (key
.type
== "mds") {
422 c
->set_default("addr", stringify(addr
));
423 oss
<< "{\"prefix\": \"mds metadata\", \"who\": \""
424 << key
.name
<< "\"}";
425 } else if (key
.type
== "mon") {
426 oss
<< "{\"prefix\": \"mon metadata\", \"id\": \""
427 << key
.name
<< "\"}";
431 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
435 bool DaemonServer::handle_open(const ref_t
<MMgrOpen
>& m
)
437 std::unique_lock
l(lock
);
439 DaemonKey key
= key_from_service(m
->service_name
,
440 m
->get_connection()->get_peer_type(),
443 auto con
= m
->get_connection();
444 dout(10) << "from " << key
<< " " << con
->get_peer_addr() << dendl
;
446 _send_configure(con
);
448 DaemonStatePtr daemon
;
449 if (daemon_state
.exists(key
)) {
450 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
451 daemon
= daemon_state
.get(key
);
454 if (m
->service_daemon
) {
455 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
456 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
458 daemon
->service_daemon
= true;
459 daemon_state
.insert(daemon
);
461 /* A normal Ceph daemon has connected but we are or should be waiting on
462 * metadata for it. Close the session so that it tries to reconnect.
464 dout(2) << "ignoring open from " << key
<< " " << con
->get_peer_addr()
465 << "; not ready for session (expect reconnect)" << dendl
;
468 fetch_missing_metadata(key
, m
->get_source_addr());
473 if (m
->service_daemon
) {
474 // update the metadata through the daemon state index to
475 // ensure it's kept up-to-date
476 daemon_state
.update_metadata(daemon
, m
->daemon_metadata
);
479 std::lock_guard
l(daemon
->lock
);
480 daemon
->perf_counters
.clear();
482 daemon
->service_daemon
= m
->service_daemon
;
483 if (m
->service_daemon
) {
484 daemon
->service_status
= m
->daemon_status
;
486 utime_t now
= ceph_clock_now();
487 auto [d
, added
] = pending_service_map
.get_daemon(m
->service_name
,
489 if (added
|| d
->gid
!= (uint64_t)m
->get_source().num()) {
490 dout(10) << "registering " << key
<< " in pending_service_map" << dendl
;
491 d
->gid
= m
->get_source().num();
492 d
->addr
= m
->get_source_addr();
493 d
->start_epoch
= pending_service_map
.epoch
;
494 d
->start_stamp
= now
;
495 d
->metadata
= m
->daemon_metadata
;
496 pending_service_map_dirty
= pending_service_map
.epoch
;
500 auto p
= m
->config_bl
.cbegin();
501 if (p
!= m
->config_bl
.end()) {
502 decode(daemon
->config
, p
);
503 decode(daemon
->ignored_mon_config
, p
);
504 dout(20) << " got config " << daemon
->config
505 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
507 daemon
->config_defaults_bl
= m
->config_defaults_bl
;
508 daemon
->config_defaults
.clear();
509 dout(20) << " got config_defaults_bl " << daemon
->config_defaults_bl
.length()
510 << " bytes" << dendl
;
513 if (con
->get_peer_type() != entity_name_t::TYPE_CLIENT
&&
514 m
->service_name
.empty())
516 // Store in set of the daemon/service connections, i.e. those
517 // connections that require an update in the event of stats
518 // configuration changes.
519 daemon_connections
.insert(con
);
525 bool DaemonServer::handle_close(const ref_t
<MMgrClose
>& m
)
527 std::lock_guard
l(lock
);
529 DaemonKey key
= key_from_service(m
->service_name
,
530 m
->get_connection()->get_peer_type(),
532 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
534 if (daemon_state
.exists(key
)) {
535 DaemonStatePtr daemon
= daemon_state
.get(key
);
536 daemon_state
.rm(key
);
538 std::lock_guard
l(daemon
->lock
);
539 if (daemon
->service_daemon
) {
540 pending_service_map
.rm_daemon(m
->service_name
, m
->daemon_name
);
541 pending_service_map_dirty
= pending_service_map
.epoch
;
546 // send same message back as a reply
547 m
->get_connection()->send_message2(m
);
551 void DaemonServer::update_task_status(
553 const std::map
<std::string
,std::string
>& task_status
)
555 dout(10) << "got task status from " << key
<< dendl
;
557 [[maybe_unused
]] auto [daemon
, added
] =
558 pending_service_map
.get_daemon(key
.type
, key
.name
);
559 if (daemon
->task_status
!= task_status
) {
560 daemon
->task_status
= task_status
;
561 pending_service_map_dirty
= pending_service_map
.epoch
;
565 bool DaemonServer::handle_report(const ref_t
<MMgrReport
>& m
)
568 if (!m
->service_name
.empty()) {
569 key
.type
= m
->service_name
;
571 key
.type
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
573 key
.name
= m
->daemon_name
;
575 dout(10) << "from " << m
->get_connection() << " " << key
<< dendl
;
577 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
578 m
->service_name
.empty()) {
579 // Clients should not be sending us stats unless they are declaring
580 // themselves to be a daemon for some service.
581 dout(10) << "rejecting report from non-daemon client " << m
->daemon_name
583 clog
->warn() << "rejecting report from non-daemon client " << m
->daemon_name
584 << " at " << m
->get_connection()->get_peer_addrs();
585 m
->get_connection()->mark_down();
591 std::unique_lock
locker(lock
);
593 DaemonStatePtr daemon
;
594 // Look up the DaemonState
595 if (daemon_state
.exists(key
)) {
596 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
597 daemon
= daemon_state
.get(key
);
601 // we don't know the hostname at this stage, reject MMgrReport here.
602 dout(5) << "rejecting report from " << key
<< ", since we do not have its metadata now."
604 // issue metadata request in background
605 fetch_missing_metadata(key
, m
->get_source_addr());
610 auto priv
= m
->get_connection()->get_priv();
611 auto session
= static_cast<MgrSession
*>(priv
.get());
615 m
->get_connection()->mark_down();
617 dout(10) << "unregistering osd." << session
->osd_id
618 << " session " << session
<< " con " << m
->get_connection() << dendl
;
620 if (osd_cons
.find(session
->osd_id
) != osd_cons
.end()) {
621 osd_cons
[session
->osd_id
].erase(m
->get_connection());
624 auto iter
= daemon_connections
.find(m
->get_connection());
625 if (iter
!= daemon_connections
.end()) {
626 daemon_connections
.erase(iter
);
632 // Update the DaemonState
633 ceph_assert(daemon
!= nullptr);
635 std::lock_guard
l(daemon
->lock
);
636 auto &daemon_counters
= daemon
->perf_counters
;
637 daemon_counters
.update(*m
.get());
639 auto p
= m
->config_bl
.cbegin();
640 if (p
!= m
->config_bl
.end()) {
641 decode(daemon
->config
, p
);
642 decode(daemon
->ignored_mon_config
, p
);
643 dout(20) << " got config " << daemon
->config
644 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
647 utime_t now
= ceph_clock_now();
648 if (daemon
->service_daemon
) {
649 if (m
->daemon_status
) {
650 daemon
->service_status_stamp
= now
;
651 daemon
->service_status
= *m
->daemon_status
;
653 daemon
->last_service_beacon
= now
;
654 } else if (m
->daemon_status
) {
655 derr
<< "got status from non-daemon " << key
<< dendl
;
657 // update task status
658 if (m
->task_status
) {
659 update_task_status(key
, *m
->task_status
);
660 daemon
->last_service_beacon
= now
;
662 if (m
->get_connection()->peer_is_osd() || m
->get_connection()->peer_is_mon()) {
663 // only OSD and MON send health_checks to me now
664 daemon
->daemon_health_metrics
= std::move(m
->daemon_health_metrics
);
665 dout(10) << "daemon_health_metrics " << daemon
->daemon_health_metrics
671 // if there are any schema updates, notify the python modules
672 if (!m
->declare_types
.empty() || !m
->undeclare_types
.empty()) {
673 py_modules
.notify_all("perf_schema_update", ceph::to_string(key
));
676 if (m
->get_connection()->peer_is_osd()) {
677 osd_perf_metric_collector
.process_reports(m
->osd_perf_metric_reports
);
680 if (m
->metric_report_message
) {
681 const MetricReportMessage
&message
= *m
->metric_report_message
;
682 boost::apply_visitor(HandlePayloadVisitor(this), message
.payload
);
689 void DaemonServer::_generate_command_map(
691 map
<string
,string
> ¶m_str_map
)
693 for (auto p
= cmdmap
.begin();
694 p
!= cmdmap
.end(); ++p
) {
695 if (p
->first
== "prefix")
697 if (p
->first
== "caps") {
699 if (cmd_getval(cmdmap
, "caps", cv
) &&
700 cv
.size() % 2 == 0) {
701 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
702 string k
= string("caps_") + cv
[i
];
703 param_str_map
[k
] = cv
[i
+ 1];
708 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
712 const MonCommand
*DaemonServer::_get_mgrcommand(
713 const string
&cmd_prefix
,
714 const std::vector
<MonCommand
> &cmds
)
716 const MonCommand
*this_cmd
= nullptr;
717 for (const auto &cmd
: cmds
) {
718 if (cmd
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
726 bool DaemonServer::_allowed_command(
728 const string
&service
,
729 const string
&module
,
730 const string
&prefix
,
731 const cmdmap_t
& cmdmap
,
732 const map
<string
,string
>& param_str_map
,
733 const MonCommand
*this_cmd
) {
735 if (s
->entity_name
.is_mon()) {
736 // mon is all-powerful. even when it is forwarding commands on behalf of
737 // old clients; we expect the mon is validating commands before proxying!
741 bool cmd_r
= this_cmd
->requires_perm('r');
742 bool cmd_w
= this_cmd
->requires_perm('w');
743 bool cmd_x
= this_cmd
->requires_perm('x');
745 bool capable
= s
->caps
.is_capable(
748 service
, module
, prefix
, param_str_map
,
752 dout(10) << " " << s
->entity_name
<< " "
753 << (capable
? "" : "not ") << "capable" << dendl
;
758 * The working data for processing an MCommand. This lives in
759 * a class to enable passing it into other threads for processing
760 * outside of the thread/locks that called handle_command.
762 class CommandContext
{
764 ceph::ref_t
<MCommand
> m_tell
;
765 ceph::ref_t
<MMgrCommand
> m_mgr
;
766 const std::vector
<std::string
>& cmd
; ///< ref into m_tell or m_mgr
767 const bufferlist
& data
; ///< ref into m_tell or m_mgr
771 explicit CommandContext(ceph::ref_t
<MCommand
> m
)
772 : m_tell
{std::move(m
)},
774 data(m_tell
->get_data()) {
776 explicit CommandContext(ceph::ref_t
<MMgrCommand
> m
)
777 : m_mgr
{std::move(m
)},
779 data(m_mgr
->get_data()) {
782 void reply(int r
, const std::stringstream
&ss
) {
786 void reply(int r
, const std::string
&rs
) {
787 // Let the connection drop as soon as we've sent our response
788 ConnectionRef con
= m_tell
? m_tell
->get_connection()
789 : m_mgr
->get_connection();
791 con
->mark_disposable();
795 dout(20) << "success" << dendl
;
797 derr
<< __func__
<< " " << cpp_strerror(r
) << " " << rs
<< dendl
;
801 MCommandReply
*reply
= new MCommandReply(r
, rs
);
802 reply
->set_tid(m_tell
->get_tid());
803 reply
->set_data(odata
);
804 con
->send_message(reply
);
806 MMgrCommandReply
*reply
= new MMgrCommandReply(r
, rs
);
807 reply
->set_tid(m_mgr
->get_tid());
808 reply
->set_data(odata
);
809 con
->send_message(reply
);
816 * A context for receiving a bufferlist/error string from a background
817 * function and then calling back to a CommandContext when it's done
819 class ReplyOnFinish
: public Context
{
820 std::shared_ptr
<CommandContext
> cmdctx
;
826 explicit ReplyOnFinish(const std::shared_ptr
<CommandContext
> &cmdctx_
)
829 void finish(int r
) override
{
830 cmdctx
->odata
.claim_append(from_mon
);
831 cmdctx
->reply(r
, outs
);
835 bool DaemonServer::handle_command(const ref_t
<MCommand
>& m
)
837 std::lock_guard
l(lock
);
838 auto cmdctx
= std::make_shared
<CommandContext
>(m
);
840 return _handle_command(cmdctx
);
841 } catch (const bad_cmd_get
& e
) {
842 cmdctx
->reply(-EINVAL
, e
.what());
847 bool DaemonServer::handle_command(const ref_t
<MMgrCommand
>& m
)
849 std::lock_guard
l(lock
);
850 auto cmdctx
= std::make_shared
<CommandContext
>(m
);
852 return _handle_command(cmdctx
);
853 } catch (const bad_cmd_get
& e
) {
854 cmdctx
->reply(-EINVAL
, e
.what());
859 void DaemonServer::log_access_denied(
860 std::shared_ptr
<CommandContext
>& cmdctx
,
861 MgrSession
* session
, std::stringstream
& ss
) {
862 dout(1) << " access denied" << dendl
;
863 audit_clog
->info() << "from='" << session
->inst
<< "' "
864 << "entity='" << session
->entity_name
<< "' "
865 << "cmd=" << cmdctx
->cmd
<< ": access denied";
866 ss
<< "access denied: does your client key have mgr caps? "
867 "See http://docs.ceph.com/en/latest/mgr/administrator/"
868 "#client-authentication";
871 void DaemonServer::_check_offlines_pgs(
872 const set
<int>& osds
,
873 const OSDMap
& osdmap
,
875 offline_pg_report
*report
)
878 *report
= offline_pg_report();
881 for (const auto& q
: pgmap
.pg_stat
) {
882 set
<int32_t> pg_acting
; // net acting sets (with no missing if degraded)
884 if (q
.second
.state
== 0) {
885 report
->unknown
.insert(q
.first
);
888 if (q
.second
.state
& PG_STATE_DEGRADED
) {
889 for (auto& anm
: q
.second
.avail_no_missing
) {
890 if (osds
.count(anm
.osd
)) {
894 if (anm
.osd
!= CRUSH_ITEM_NONE
) {
895 pg_acting
.insert(anm
.osd
);
899 for (auto& a
: q
.second
.acting
) {
904 if (a
!= CRUSH_ITEM_NONE
) {
912 const pg_pool_t
*pi
= osdmap
.get_pg_pool(q
.first
.pool());
913 bool dangerous
= false;
915 report
->bad_no_pool
.insert(q
.first
); // pool is creating or deleting
918 if (!(q
.second
.state
& PG_STATE_ACTIVE
)) {
919 report
->bad_already_inactive
.insert(q
.first
);
922 if (pg_acting
.size() < pi
->min_size
) {
923 report
->bad_become_inactive
.insert(q
.first
);
927 report
->not_ok
.insert(q
.first
);
929 report
->ok
.insert(q
.first
);
930 if (q
.second
.state
& PG_STATE_DEGRADED
) {
931 report
->ok_become_more_degraded
.insert(q
.first
);
933 report
->ok_become_degraded
.insert(q
.first
);
937 dout(20) << osds
<< " -> " << report
->ok
.size() << " ok, "
938 << report
->not_ok
.size() << " not ok, "
939 << report
->unknown
.size() << " unknown"
943 void DaemonServer::_maximize_ok_to_stop_set(
944 const set
<int>& orig_osds
,
946 const OSDMap
& osdmap
,
948 offline_pg_report
*out_report
)
950 dout(20) << "orig_osds " << orig_osds
<< " max " << max
<< dendl
;
951 _check_offlines_pgs(orig_osds
, osdmap
, pgmap
, out_report
);
952 if (!out_report
->ok_to_stop()) {
955 if (orig_osds
.size() >= max
) {
960 // semi-arbitrarily start with the first osd in the set
961 offline_pg_report report
;
962 set
<int> osds
= orig_osds
;
963 int parent
= *osds
.begin();
967 // identify the next parent
968 int r
= osdmap
.crush
->get_immediate_parent_id(parent
, &parent
);
970 return; // just go with what we have so far!
973 // get candidate additions that are beneath this point in the tree
975 r
= osdmap
.crush
->get_all_children(parent
, &children
);
977 return; // just go with what we have so far!
979 dout(20) << " parent " << parent
<< " children " << children
<< dendl
;
981 // try adding in more osds
982 int failed
= 0; // how many children we failed to add to our set
983 for (auto o
: children
) {
984 if (o
>= 0 && osdmap
.is_up(o
) && osds
.count(o
) == 0) {
986 _check_offlines_pgs(osds
, osdmap
, pgmap
, &report
);
987 if (!report
.ok_to_stop()) {
992 *out_report
= report
;
993 if (osds
.size() == max
) {
994 dout(20) << " hit max" << dendl
;
995 return; // yay, we hit the max
1001 // we hit some failures; go with what we have
1002 dout(20) << " hit some peer failures" << dendl
;
1008 bool DaemonServer::_handle_command(
1009 std::shared_ptr
<CommandContext
>& cmdctx
)
1012 bool admin_socket_cmd
= false;
1013 if (cmdctx
->m_tell
) {
1015 // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
1017 admin_socket_cmd
= (cmdctx
->m_tell
->fsid
!= uuid_d());
1021 auto priv
= m
->get_connection()->get_priv();
1022 auto session
= static_cast<MgrSession
*>(priv
.get());
1026 if (session
->inst
.name
== entity_name_t()) {
1027 session
->inst
.name
= m
->get_source();
1030 map
<string
,string
> param_str_map
;
1031 std::stringstream ss
;
1034 if (!cmdmap_from_json(cmdctx
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
1035 cmdctx
->reply(-EINVAL
, ss
);
1040 cmd_getval(cmdctx
->cmdmap
, "prefix", prefix
);
1041 dout(10) << "decoded-size=" << cmdctx
->cmdmap
.size() << " prefix=" << prefix
<< dendl
;
1043 boost::scoped_ptr
<Formatter
> f
;
1046 if (boost::algorithm::ends_with(prefix
, "_json")) {
1049 cmd_getval(cmdctx
->cmdmap
, "format", format
, string("plain"));
1051 f
.reset(Formatter::create(format
));
1054 // this is just for mgr commands - admin socket commands will fall
1055 // through and use the admin socket version of
1056 // get_command_descriptions
1057 if (prefix
== "get_command_descriptions" && !admin_socket_cmd
) {
1058 dout(10) << "reading commands from python modules" << dendl
;
1059 const auto py_commands
= py_modules
.get_commands();
1063 f
.open_object_section("command_descriptions");
1065 auto dump_cmd
= [&cmdnum
, &f
, m
](const MonCommand
&mc
){
1066 ostringstream secname
;
1067 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
1068 dump_cmddesc_to_json(&f
, m
->get_connection()->get_features(),
1069 secname
.str(), mc
.cmdstring
, mc
.helpstring
,
1070 mc
.module
, mc
.req_perms
, 0);
1074 for (const auto &pyc
: py_commands
) {
1078 for (const auto &mgr_cmd
: mgr_commands
) {
1082 f
.close_section(); // command_descriptions
1083 f
.flush(cmdctx
->odata
);
1084 cmdctx
->reply(0, ss
);
1089 const MonCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
);
1090 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
1092 bool is_allowed
= false;
1093 ModuleCommand py_command
;
1094 if (admin_socket_cmd
) {
1095 // admin socket commands require all capabilities
1096 is_allowed
= session
->caps
.is_allow_all();
1097 } else if (!mgr_cmd
) {
1098 // Resolve the command to the name of the module that will
1099 // handle it (if the command exists)
1100 auto py_commands
= py_modules
.get_py_commands();
1101 for (const auto &pyc
: py_commands
) {
1102 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
1103 if (pyc_prefix
== prefix
) {
1109 MonCommand pyc
= {"", "", "py", py_command
.perm
};
1110 is_allowed
= _allowed_command(session
, "py", py_command
.module_name
,
1111 prefix
, cmdctx
->cmdmap
, param_str_map
,
1114 // validate user's permissions for requested command
1115 is_allowed
= _allowed_command(session
, mgr_cmd
->module
, "",
1116 prefix
, cmdctx
->cmdmap
, param_str_map
, mgr_cmd
);
1120 log_access_denied(cmdctx
, session
, ss
);
1121 cmdctx
->reply(-EACCES
, ss
);
1126 << "from='" << session
->inst
<< "' "
1127 << "entity='" << session
->entity_name
<< "' "
1128 << "cmd=" << cmdctx
->cmd
<< ": dispatch";
1130 if (admin_socket_cmd
) {
1131 cct
->get_admin_socket()->queue_tell_command(cmdctx
->m_tell
);
1136 // service map commands
1137 if (prefix
== "service dump") {
1139 f
.reset(Formatter::create("json-pretty"));
1140 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
1141 f
->dump_object("service_map", service_map
);
1143 f
->flush(cmdctx
->odata
);
1144 cmdctx
->reply(0, ss
);
1147 if (prefix
== "service status") {
1149 f
.reset(Formatter::create("json-pretty"));
1150 // only include state from services that are in the persisted service map
1151 f
->open_object_section("service_status");
1152 for (auto& [type
, service
] : pending_service_map
.services
) {
1153 if (ServiceMap::is_normal_ceph_entity(type
)) {
1157 f
->open_object_section(type
.c_str());
1158 for (auto& q
: service
.daemons
) {
1159 f
->open_object_section(q
.first
.c_str());
1160 DaemonKey key
{type
, q
.first
};
1161 ceph_assert(daemon_state
.exists(key
));
1162 auto daemon
= daemon_state
.get(key
);
1163 std::lock_guard
l(daemon
->lock
);
1164 f
->dump_stream("status_stamp") << daemon
->service_status_stamp
;
1165 f
->dump_stream("last_beacon") << daemon
->last_service_beacon
;
1166 f
->open_object_section("status");
1167 for (auto& r
: daemon
->service_status
) {
1168 f
->dump_string(r
.first
.c_str(), r
.second
);
1176 f
->flush(cmdctx
->odata
);
1177 cmdctx
->reply(0, ss
);
1181 if (prefix
== "config set") {
1184 cmd_getval(cmdctx
->cmdmap
, "key", key
);
1185 cmd_getval(cmdctx
->cmdmap
, "value", val
);
1186 r
= cct
->_conf
.set_val(key
, val
, &ss
);
1188 cct
->_conf
.apply_changes(nullptr);
1190 cmdctx
->reply(0, ss
);
1197 if (prefix
== "pg scrub" ||
1198 prefix
== "pg repair" ||
1199 prefix
== "pg deep-scrub") {
1200 string scrubop
= prefix
.substr(3, string::npos
);
1204 cmd_getval(cmdctx
->cmdmap
, "pgid", pgidstr
);
1205 if (!pgid
.parse(pgidstr
.c_str())) {
1206 ss
<< "invalid pgid '" << pgidstr
<< "'";
1207 cmdctx
->reply(-EINVAL
, ss
);
1210 bool pg_exists
= false;
1211 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1212 pg_exists
= osdmap
.pg_exists(pgid
);
1215 ss
<< "pg " << pgid
<< " does not exist";
1216 cmdctx
->reply(-ENOENT
, ss
);
1219 int acting_primary
= -1;
1221 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1222 epoch
= osdmap
.get_epoch();
1223 osdmap
.get_primary_shard(pgid
, &acting_primary
, &spgid
);
1225 if (acting_primary
== -1) {
1226 ss
<< "pg " << pgid
<< " has no primary osd";
1227 cmdctx
->reply(-EAGAIN
, ss
);
1230 auto p
= osd_cons
.find(acting_primary
);
1231 if (p
== osd_cons
.end()) {
1232 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
1233 << " is not currently connected";
1234 cmdctx
->reply(-EAGAIN
, ss
);
1237 for (auto& con
: p
->second
) {
1238 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
1239 vector
<spg_t
> pgs
= { spgid
};
1240 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1243 scrubop
== "repair",
1244 scrubop
== "deep-scrub"));
1246 vector
<pg_t
> pgs
= { pgid
};
1247 con
->send_message(new MOSDScrub(monc
->get_fsid(),
1249 scrubop
== "repair",
1250 scrubop
== "deep-scrub"));
1253 ss
<< "instructing pg " << spgid
<< " on osd." << acting_primary
1254 << " to " << scrubop
;
1255 cmdctx
->reply(0, ss
);
1257 } else if (prefix
== "osd scrub" ||
1258 prefix
== "osd deep-scrub" ||
1259 prefix
== "osd repair") {
1261 cmd_getval(cmdctx
->cmdmap
, "who", whostr
);
1262 vector
<string
> pvec
;
1263 get_str_vec(prefix
, pvec
);
1266 if (whostr
== "*" || whostr
== "all" || whostr
== "any") {
1267 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1268 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
1269 if (osdmap
.is_up(i
)) {
1274 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
1276 ss
<< "invalid osd '" << whostr
<< "'";
1277 cmdctx
->reply(-EINVAL
, ss
);
1280 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1281 if (osdmap
.is_up(osd
)) {
1286 ss
<< "osd." << osd
<< " is not up";
1287 cmdctx
->reply(-EAGAIN
, ss
);
1291 set
<int> sent_osds
, failed_osds
;
1292 for (auto osd
: osds
) {
1295 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1296 epoch
= osdmap
.get_epoch();
1297 auto p
= pgmap
.pg_by_osd
.find(osd
);
1298 if (p
!= pgmap
.pg_by_osd
.end()) {
1299 for (auto pgid
: p
->second
) {
1302 osdmap
.get_primary_shard(pgid
, &primary
, &spg
);
1303 if (primary
== osd
) {
1304 spgs
.push_back(spg
);
1309 auto p
= osd_cons
.find(osd
);
1310 if (p
== osd_cons
.end()) {
1311 failed_osds
.insert(osd
);
1313 sent_osds
.insert(osd
);
1314 for (auto& con
: p
->second
) {
1315 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
1316 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1319 pvec
.back() == "repair",
1320 pvec
.back() == "deep-scrub"));
1322 con
->send_message(new MOSDScrub(monc
->get_fsid(),
1323 pvec
.back() == "repair",
1324 pvec
.back() == "deep-scrub"));
1329 if (failed_osds
.size() == osds
.size()) {
1330 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
1331 << " (not connected)";
1334 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
1335 if (!failed_osds
.empty()) {
1336 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
1340 cmdctx
->reply(0, ss
);
1342 } else if (prefix
== "osd pool scrub" ||
1343 prefix
== "osd pool deep-scrub" ||
1344 prefix
== "osd pool repair") {
1345 vector
<string
> pool_names
;
1346 cmd_getval(cmdctx
->cmdmap
, "who", pool_names
);
1347 if (pool_names
.empty()) {
1348 ss
<< "must specify one or more pool names";
1349 cmdctx
->reply(-EINVAL
, ss
);
1353 map
<int32_t, vector
<pg_t
>> pgs_by_primary
; // legacy
1354 map
<int32_t, vector
<spg_t
>> spgs_by_primary
;
1355 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1356 epoch
= osdmap
.get_epoch();
1357 for (auto& pool_name
: pool_names
) {
1358 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1360 ss
<< "unrecognized pool '" << pool_name
<< "'";
1364 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1365 for (int i
= 0; i
< pool_pg_num
; i
++) {
1366 pg_t
pg(i
, pool_id
);
1369 auto got
= osdmap
.get_primary_shard(pg
, &primary
, &spg
);
1372 pgs_by_primary
[primary
].push_back(pg
);
1373 spgs_by_primary
[primary
].push_back(spg
);
1378 cmdctx
->reply(r
, ss
);
1381 for (auto& it
: spgs_by_primary
) {
1382 auto primary
= it
.first
;
1383 auto p
= osd_cons
.find(primary
);
1384 if (p
== osd_cons
.end()) {
1385 ss
<< "osd." << primary
<< " is not currently connected";
1386 cmdctx
->reply(-EAGAIN
, ss
);
1389 for (auto& con
: p
->second
) {
1390 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
1391 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1394 prefix
== "osd pool repair",
1395 prefix
== "osd pool deep-scrub"));
1398 auto q
= pgs_by_primary
.find(primary
);
1399 ceph_assert(q
!= pgs_by_primary
.end());
1400 con
->send_message(new MOSDScrub(monc
->get_fsid(),
1402 prefix
== "osd pool repair",
1403 prefix
== "osd pool deep-scrub"));
1407 cmdctx
->reply(0, "");
1409 } else if (prefix
== "osd reweight-by-pg" ||
1410 prefix
== "osd reweight-by-utilization" ||
1411 prefix
== "osd test-reweight-by-pg" ||
1412 prefix
== "osd test-reweight-by-utilization") {
1414 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
1416 prefix
== "osd test-reweight-by-pg" ||
1417 prefix
== "osd test-reweight-by-utilization";
1419 cmd_getval(cmdctx
->cmdmap
, "oload", oload
, int64_t(120));
1421 vector
<string
> poolnames
;
1422 cmd_getval(cmdctx
->cmdmap
, "pools", poolnames
);
1423 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1424 for (const auto& poolname
: poolnames
) {
1425 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
1427 ss
<< "pool '" << poolname
<< "' does not exist";
1434 cmdctx
->reply(r
, ss
);
1438 double max_change
= g_conf().get_val
<double>("mon_reweight_max_change");
1439 cmd_getval(cmdctx
->cmdmap
, "max_change", max_change
);
1440 if (max_change
<= 0.0) {
1441 ss
<< "max_change " << max_change
<< " must be positive";
1442 cmdctx
->reply(-EINVAL
, ss
);
1445 int64_t max_osds
= g_conf().get_val
<int64_t>("mon_reweight_max_osds");
1446 cmd_getval(cmdctx
->cmdmap
, "max_osds", max_osds
);
1447 if (max_osds
<= 0) {
1448 ss
<< "max_osds " << max_osds
<< " must be positive";
1449 cmdctx
->reply(-EINVAL
, ss
);
1452 bool no_increasing
= false;
1453 cmd_getval(cmdctx
->cmdmap
, "no_increasing", no_increasing
);
1455 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
1456 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
&osdmap
, const PGMap
& pgmap
) {
1457 return reweight::by_utilization(osdmap
, pgmap
,
1462 pools
.empty() ? NULL
: &pools
,
1465 &ss
, &out_str
, f
.get());
1468 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
1471 f
->flush(cmdctx
->odata
);
1473 cmdctx
->odata
.append(out_str
);
1476 ss
<< "FAILED reweight-by-pg";
1477 cmdctx
->reply(r
, ss
);
1479 } else if (r
== 0 || dry_run
) {
1481 cmdctx
->reply(r
, ss
);
1484 json_spirit::Object json_object
;
1485 for (const auto& osd_weight
: new_weights
) {
1486 json_spirit::Config::add(json_object
,
1487 std::to_string(osd_weight
.first
),
1488 std::to_string(osd_weight
.second
));
1490 string s
= json_spirit::write(json_object
);
1491 std::replace(begin(s
), end(s
), '\"', '\'');
1494 "\"prefix\": \"osd reweightn\", "
1495 "\"weights\": \"" + s
+ "\""
1497 auto on_finish
= new ReplyOnFinish(cmdctx
);
1498 monc
->start_mon_command({cmd
}, {},
1499 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
1502 } else if (prefix
== "osd df") {
1503 string method
, filter
;
1504 cmd_getval(cmdctx
->cmdmap
, "output_method", method
);
1505 cmd_getval(cmdctx
->cmdmap
, "filter", filter
);
1507 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1508 // sanity check filter(s)
1509 if (!filter
.empty() &&
1510 osdmap
.lookup_pg_pool_name(filter
) < 0 &&
1511 !osdmap
.crush
->class_exists(filter
) &&
1512 !osdmap
.crush
->name_exists(filter
)) {
1513 rs
<< "'" << filter
<< "' not a pool, crush node or device class name";
1516 print_osd_utilization(osdmap
, pgmap
, ss
,
1517 f
.get(), method
== "tree", filter
);
1518 cmdctx
->odata
.append(ss
);
1521 cmdctx
->reply(r
, rs
);
1523 } else if (prefix
== "osd pool stats") {
1525 cmd_getval(cmdctx
->cmdmap
, "pool_name", pool_name
);
1526 int64_t poolid
= -ENOENT
;
1527 bool one_pool
= false;
1528 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1529 if (!pool_name
.empty()) {
1530 poolid
= osdmap
.lookup_pg_pool_name(pool_name
);
1532 ceph_assert(poolid
== -ENOENT
);
1533 ss
<< "unrecognized pool '" << pool_name
<< "'";
1540 f
->open_array_section("pool_stats");
1542 if (osdmap
.get_pools().empty()) {
1543 ss
<< "there are no pools!";
1547 for (auto &p
: osdmap
.get_pools()) {
1551 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, f
.get(), &rs
);
1559 f
->flush(cmdctx
->odata
);
1561 cmdctx
->odata
.append(rs
.str());
1565 if (r
!= -EOPNOTSUPP
) {
1566 cmdctx
->reply(r
, ss
);
1569 } else if (prefix
== "osd safe-to-destroy" ||
1570 prefix
== "osd destroy" ||
1571 prefix
== "osd purge") {
1574 if (prefix
== "osd safe-to-destroy") {
1576 cmd_getval(cmdctx
->cmdmap
, "ids", ids
);
1577 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1578 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1580 if (!r
&& osds
.empty()) {
1581 ss
<< "must specify one or more OSDs";
1586 if (!cmd_getval(cmdctx
->cmdmap
, "id", id
)) {
1588 ss
<< "must specify OSD id";
1594 cmdctx
->reply(r
, ss
);
1597 set
<int> active_osds
, missing_stats
, stored_pgs
, safe_to_destroy
;
1598 int affected_pgs
= 0;
1599 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1600 if (pg_map
.num_pg_unknown
> 0) {
1601 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; cannot draw"
1602 << " any conclusions";
1606 int num_active_clean
= 0;
1607 for (auto& p
: pg_map
.num_pg_by_state
) {
1608 unsigned want
= PG_STATE_ACTIVE
|PG_STATE_CLEAN
;
1609 if ((p
.first
& want
) == want
) {
1610 num_active_clean
+= p
.second
;
1613 for (auto osd
: osds
) {
1614 if (!osdmap
.exists(osd
)) {
1615 safe_to_destroy
.insert(osd
);
1616 continue; // clearly safe to destroy
1618 auto q
= pg_map
.num_pg_by_osd
.find(osd
);
1619 if (q
!= pg_map
.num_pg_by_osd
.end()) {
1620 if (q
->second
.acting
> 0 || q
->second
.up_not_acting
> 0) {
1621 active_osds
.insert(osd
);
1622 // XXX: For overlapping PGs, this counts them again
1623 affected_pgs
+= q
->second
.acting
+ q
->second
.up_not_acting
;
1627 if (num_active_clean
< pg_map
.num_pg
) {
1628 // all pgs aren't active+clean; we need to be careful.
1629 auto p
= pg_map
.osd_stat
.find(osd
);
1630 if (p
== pg_map
.osd_stat
.end() || !osdmap
.is_up(osd
)) {
1631 missing_stats
.insert(osd
);
1633 } else if (p
->second
.num_pgs
> 0) {
1634 stored_pgs
.insert(osd
);
1638 safe_to_destroy
.insert(osd
);
1641 if (r
&& prefix
== "osd safe-to-destroy") {
1642 cmdctx
->reply(r
, ss
); // regardless of formatter
1645 if (!r
&& (!active_osds
.empty() ||
1646 !missing_stats
.empty() || !stored_pgs
.empty())) {
1647 if (!safe_to_destroy
.empty()) {
1648 ss
<< "OSD(s) " << safe_to_destroy
1649 << " are safe to destroy without reducing data durability. ";
1651 if (!active_osds
.empty()) {
1652 ss
<< "OSD(s) " << active_osds
<< " have " << affected_pgs
1653 << " pgs currently mapped to them. ";
1655 if (!missing_stats
.empty()) {
1656 ss
<< "OSD(s) " << missing_stats
<< " have no reported stats, and not all"
1657 << " PGs are active+clean; we cannot draw any conclusions. ";
1659 if (!stored_pgs
.empty()) {
1660 ss
<< "OSD(s) " << stored_pgs
<< " last reported they still store some PG"
1661 << " data, and not all PGs are active+clean; we cannot be sure they"
1662 << " aren't still needed.";
1664 if (!active_osds
.empty() || !stored_pgs
.empty()) {
1671 if (prefix
== "osd safe-to-destroy") {
1673 ss
<< "OSD(s) " << osds
<< " are safe to destroy without reducing data"
1677 f
->open_object_section("osd_status");
1678 f
->open_array_section("safe_to_destroy");
1679 for (auto i
: safe_to_destroy
)
1680 f
->dump_int("osd", i
);
1682 f
->open_array_section("active");
1683 for (auto i
: active_osds
)
1684 f
->dump_int("osd", i
);
1686 f
->open_array_section("missing_stats");
1687 for (auto i
: missing_stats
)
1688 f
->dump_int("osd", i
);
1690 f
->open_array_section("stored_pgs");
1691 for (auto i
: stored_pgs
)
1692 f
->dump_int("osd", i
);
1694 f
->close_section(); // osd_status
1695 f
->flush(cmdctx
->odata
);
1697 std::stringstream().swap(ss
);
1699 cmdctx
->reply(r
, ss
);
1705 cmd_getval(cmdctx
->cmdmap
, "force", force
);
1708 cmd_getval(cmdctx
->cmdmap
, "yes_i_really_mean_it", force
);
1711 ss
<< "\nYou can proceed by passing --force, but be warned that"
1712 " this will likely mean real, permanent data loss.";
1718 cmdctx
->reply(r
, ss
);
1723 "\"prefix\": \"" + prefix
+ "-actual\", "
1724 "\"id\": " + stringify(osds
) + ", "
1725 "\"yes_i_really_mean_it\": true"
1727 auto on_finish
= new ReplyOnFinish(cmdctx
);
1728 monc
->start_mon_command({cmd
}, {}, nullptr, &on_finish
->outs
, on_finish
);
1730 } else if (prefix
== "osd ok-to-stop") {
1732 cmd_getval(cmdctx
->cmdmap
, "ids", ids
);
1735 cmd_getval(cmdctx
->cmdmap
, "max", max
);
1737 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1738 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1740 if (!r
&& osds
.empty()) {
1741 ss
<< "must specify one or more OSDs";
1744 if (max
< (int)osds
.size()) {
1748 cmdctx
->reply(r
, ss
);
1751 offline_pg_report out_report
;
1752 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1753 _maximize_ok_to_stop_set(
1754 osds
, max
, osdmap
, pg_map
,
1758 f
.reset(Formatter::create("json"));
1760 f
->dump_object("ok_to_stop", out_report
);
1761 f
->flush(cmdctx
->odata
);
1762 cmdctx
->odata
.append("\n");
1763 if (!out_report
.unknown
.empty()) {
1764 ss
<< out_report
.unknown
.size() << " pgs have unknown state; "
1765 << "cannot draw any conclusions";
1766 cmdctx
->reply(-EAGAIN
, ss
);
1768 if (!out_report
.ok_to_stop()) {
1769 ss
<< "unsafe to stop osd(s) at this time (" << out_report
.not_ok
.size() << " PGs are or would become offline)";
1770 cmdctx
->reply(-EBUSY
, ss
);
1772 cmdctx
->reply(0, ss
);
1775 } else if (prefix
== "pg force-recovery" ||
1776 prefix
== "pg force-backfill" ||
1777 prefix
== "pg cancel-force-recovery" ||
1778 prefix
== "pg cancel-force-backfill" ||
1779 prefix
== "osd pool force-recovery" ||
1780 prefix
== "osd pool force-backfill" ||
1781 prefix
== "osd pool cancel-force-recovery" ||
1782 prefix
== "osd pool cancel-force-backfill") {
1784 get_str_vec(prefix
, vs
);
1785 auto& granularity
= vs
.front();
1786 auto& forceop
= vs
.back();
1789 // figure out actual op just once
1791 if (forceop
== "force-recovery") {
1792 actual_op
= OFR_RECOVERY
;
1793 } else if (forceop
== "force-backfill") {
1794 actual_op
= OFR_BACKFILL
;
1795 } else if (forceop
== "cancel-force-backfill") {
1796 actual_op
= OFR_BACKFILL
| OFR_CANCEL
;
1797 } else if (forceop
== "cancel-force-recovery") {
1798 actual_op
= OFR_RECOVERY
| OFR_CANCEL
;
1801 set
<pg_t
> candidates
; // deduped
1802 if (granularity
== "pg") {
1803 // covnert pg names to pgs, discard any invalid ones while at it
1804 vector
<string
> pgids
;
1805 cmd_getval(cmdctx
->cmdmap
, "pgid", pgids
);
1806 for (auto& i
: pgids
) {
1808 if (!pgid
.parse(i
.c_str())) {
1809 ss
<< "invlaid pgid '" << i
<< "'; ";
1813 candidates
.insert(pgid
);
1817 vector
<string
> pool_names
;
1818 cmd_getval(cmdctx
->cmdmap
, "who", pool_names
);
1819 if (pool_names
.empty()) {
1820 ss
<< "must specify one or more pool names";
1821 cmdctx
->reply(-EINVAL
, ss
);
1824 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1825 for (auto& pool_name
: pool_names
) {
1826 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1828 ss
<< "unrecognized pool '" << pool_name
<< "'";
1832 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1833 for (int i
= 0; i
< pool_pg_num
; i
++)
1834 candidates
.insert({(unsigned int)i
, (uint64_t)pool_id
});
1838 cmdctx
->reply(r
, ss
);
1843 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1844 for (auto& i
: candidates
) {
1845 auto it
= pg_map
.pg_stat
.find(i
);
1846 if (it
== pg_map
.pg_stat
.end()) {
1847 ss
<< "pg " << i
<< " does not exist; ";
1851 auto state
= it
->second
.state
;
1852 // discard pgs for which user requests are pointless
1853 switch (actual_op
) {
1855 if ((state
& (PG_STATE_DEGRADED
|
1856 PG_STATE_RECOVERY_WAIT
|
1857 PG_STATE_RECOVERING
)) == 0) {
1858 // don't return error, user script may be racing with cluster.
1860 ss
<< "pg " << i
<< " doesn't require recovery; ";
1862 } else if (state
& PG_STATE_FORCED_RECOVERY
) {
1863 ss
<< "pg " << i
<< " recovery already forced; ";
1864 // return error, as it may be a bug in user script
1870 if ((state
& (PG_STATE_DEGRADED
|
1871 PG_STATE_BACKFILL_WAIT
|
1872 PG_STATE_BACKFILLING
)) == 0) {
1873 ss
<< "pg " << i
<< " doesn't require backfilling; ";
1875 } else if (state
& PG_STATE_FORCED_BACKFILL
) {
1876 ss
<< "pg " << i
<< " backfill already forced; ";
1881 case OFR_BACKFILL
| OFR_CANCEL
:
1882 if ((state
& PG_STATE_FORCED_BACKFILL
) == 0) {
1883 ss
<< "pg " << i
<< " backfill not forced; ";
1887 case OFR_RECOVERY
| OFR_CANCEL
:
1888 if ((state
& PG_STATE_FORCED_RECOVERY
) == 0) {
1889 ss
<< "pg " << i
<< " recovery not forced; ";
1894 ceph_abort_msg("actual_op value is not supported");
1900 // respond with error only when no pgs are correct
1901 // yes, in case of mixed errors, only the last one will be emitted,
1902 // but the message presented will be fine
1903 if (pgs
.size() != 0) {
1904 // clear error to not confuse users/scripts
1908 // optimize the command -> messages conversion, use only one
1909 // message per distinct OSD
1910 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1911 // group pgs to process by osd
1912 map
<int, vector
<spg_t
>> osdpgs
;
1913 for (auto& pgid
: pgs
) {
1916 if (osdmap
.get_primary_shard(pgid
, &primary
, &spg
)) {
1917 osdpgs
[primary
].push_back(spg
);
1920 for (auto& i
: osdpgs
) {
1921 if (osdmap
.is_up(i
.first
)) {
1922 auto p
= osd_cons
.find(i
.first
);
1923 if (p
== osd_cons
.end()) {
1924 ss
<< "osd." << i
.first
<< " is not currently connected";
1928 for (auto& con
: p
->second
) {
1930 new MOSDForceRecovery(monc
->get_fsid(), i
.second
, actual_op
));
1932 ss
<< "instructing pg(s) " << i
.second
<< " on osd." << i
.first
1933 << " to " << forceop
<< "; ";
1938 cmdctx
->reply(r
, ss
);
1940 } else if (prefix
== "config show" ||
1941 prefix
== "config show-with-defaults") {
1943 cmd_getval(cmdctx
->cmdmap
, "who", who
);
1944 auto [key
, valid
] = DaemonKey::parse(who
);
1946 ss
<< "invalid daemon name: use <type>.<id>";
1947 cmdctx
->reply(-EINVAL
, ss
);
1950 DaemonStatePtr daemon
= daemon_state
.get(key
);
1952 ss
<< "no config state for daemon " << who
;
1953 cmdctx
->reply(-ENOENT
, ss
);
1957 std::lock_guard
l(daemon
->lock
);
1961 if (cmd_getval(cmdctx
->cmdmap
, "key", name
)) {
1962 // handle special options
1963 if (name
== "fsid") {
1964 cmdctx
->odata
.append(stringify(monc
->get_fsid()) + "\n");
1965 cmdctx
->reply(r
, ss
);
1968 auto p
= daemon
->config
.find(name
);
1969 if (p
!= daemon
->config
.end() &&
1970 !p
->second
.empty()) {
1971 cmdctx
->odata
.append(p
->second
.rbegin()->second
+ "\n");
1973 auto& defaults
= daemon
->_get_config_defaults();
1974 auto q
= defaults
.find(name
);
1975 if (q
!= defaults
.end()) {
1976 cmdctx
->odata
.append(q
->second
+ "\n");
1981 } else if (daemon
->config_defaults_bl
.length() > 0) {
1984 f
->open_array_section("config");
1986 tbl
.define_column("NAME", TextTable::LEFT
, TextTable::LEFT
);
1987 tbl
.define_column("VALUE", TextTable::LEFT
, TextTable::LEFT
);
1988 tbl
.define_column("SOURCE", TextTable::LEFT
, TextTable::LEFT
);
1989 tbl
.define_column("OVERRIDES", TextTable::LEFT
, TextTable::LEFT
);
1990 tbl
.define_column("IGNORES", TextTable::LEFT
, TextTable::LEFT
);
1992 if (prefix
== "config show") {
1994 for (auto& i
: daemon
->config
) {
1995 dout(20) << " " << i
.first
<< " -> " << i
.second
<< dendl
;
1996 if (i
.second
.empty()) {
2000 f
->open_object_section("value");
2001 f
->dump_string("name", i
.first
);
2002 f
->dump_string("value", i
.second
.rbegin()->second
);
2003 f
->dump_string("source", ceph_conf_level_name(
2004 i
.second
.rbegin()->first
));
2005 if (i
.second
.size() > 1) {
2006 f
->open_array_section("overrides");
2007 auto j
= i
.second
.rend();
2008 for (--j
; j
!= i
.second
.rbegin(); --j
) {
2009 f
->open_object_section("value");
2010 f
->dump_string("source", ceph_conf_level_name(j
->first
));
2011 f
->dump_string("value", j
->second
);
2016 if (daemon
->ignored_mon_config
.count(i
.first
)) {
2017 f
->dump_string("ignores", "mon");
2022 tbl
<< i
.second
.rbegin()->second
;
2023 tbl
<< ceph_conf_level_name(i
.second
.rbegin()->first
);
2024 if (i
.second
.size() > 1) {
2026 auto j
= i
.second
.rend();
2027 for (--j
; j
!= i
.second
.rbegin(); --j
) {
2028 if (j
->second
== i
.second
.rbegin()->second
) {
2029 ov
.push_front(string("(") + ceph_conf_level_name(j
->first
) +
2030 string("[") + j
->second
+ string("]") +
2033 ov
.push_front(ceph_conf_level_name(j
->first
) +
2034 string("[") + j
->second
+ string("]"));
2042 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
2043 tbl
<< TextTable::endrow
;
2047 // show-with-defaults
2048 auto& defaults
= daemon
->_get_config_defaults();
2049 for (auto& i
: defaults
) {
2051 f
->open_object_section("value");
2052 f
->dump_string("name", i
.first
);
2056 auto j
= daemon
->config
.find(i
.first
);
2057 if (j
!= daemon
->config
.end() && !j
->second
.empty()) {
2060 f
->dump_string("value", j
->second
.rbegin()->second
);
2061 f
->dump_string("source", ceph_conf_level_name(
2062 j
->second
.rbegin()->first
));
2063 if (j
->second
.size() > 1) {
2064 f
->open_array_section("overrides");
2065 auto k
= j
->second
.rend();
2066 for (--k
; k
!= j
->second
.rbegin(); --k
) {
2067 f
->open_object_section("value");
2068 f
->dump_string("source", ceph_conf_level_name(k
->first
));
2069 f
->dump_string("value", k
->second
);
2074 if (daemon
->ignored_mon_config
.count(i
.first
)) {
2075 f
->dump_string("ignores", "mon");
2079 tbl
<< j
->second
.rbegin()->second
;
2080 tbl
<< ceph_conf_level_name(j
->second
.rbegin()->first
);
2081 if (j
->second
.size() > 1) {
2083 auto k
= j
->second
.rend();
2084 for (--k
; k
!= j
->second
.rbegin(); --k
) {
2085 if (k
->second
== j
->second
.rbegin()->second
) {
2086 ov
.push_front(string("(") + ceph_conf_level_name(k
->first
) +
2087 string("[") + k
->second
+ string("]") +
2090 ov
.push_front(ceph_conf_level_name(k
->first
) +
2091 string("[") + k
->second
+ string("]"));
2098 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
2099 tbl
<< TextTable::endrow
;
2102 // only have default
2104 f
->dump_string("value", i
.second
);
2105 f
->dump_string("source", ceph_conf_level_name(CONF_DEFAULT
));
2109 tbl
<< ceph_conf_level_name(CONF_DEFAULT
);
2112 tbl
<< TextTable::endrow
;
2119 f
->flush(cmdctx
->odata
);
2121 cmdctx
->odata
.append(stringify(tbl
));
2124 cmdctx
->reply(r
, ss
);
2126 } else if (prefix
== "device ls") {
2130 f
->open_array_section("devices");
2131 daemon_state
.with_devices([&f
](const DeviceState
& dev
) {
2132 f
->dump_object("device", dev
);
2135 f
->flush(cmdctx
->odata
);
2137 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2138 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
2139 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2140 tbl
.define_column("WEAR", TextTable::RIGHT
, TextTable::RIGHT
);
2141 tbl
.define_column("LIFE EXPECTANCY", TextTable::LEFT
, TextTable::LEFT
);
2142 auto now
= ceph_clock_now();
2143 daemon_state
.with_devices([&tbl
, now
](const DeviceState
& dev
) {
2145 for (auto& i
: dev
.attachments
) {
2149 h
+= std::get
<0>(i
) + ":" + std::get
<1>(i
);
2152 for (auto& i
: dev
.daemons
) {
2158 char wear_level_str
[16] = {0};
2159 if (dev
.wear_level
>= 0) {
2160 snprintf(wear_level_str
, sizeof(wear_level_str
)-1, "%d%%",
2161 (int)(100.1 * dev
.wear_level
));
2167 << dev
.get_life_expectancy_str(now
)
2168 << TextTable::endrow
;
2170 cmdctx
->odata
.append(stringify(tbl
));
2172 cmdctx
->reply(0, ss
);
2174 } else if (prefix
== "device ls-by-daemon") {
2176 cmd_getval(cmdctx
->cmdmap
, "who", who
);
2177 if (auto [k
, valid
] = DaemonKey::parse(who
); !valid
) {
2178 ss
<< who
<< " is not a valid daemon name";
2181 auto dm
= daemon_state
.get(k
);
2184 f
->open_array_section("devices");
2185 for (auto& i
: dm
->devices
) {
2186 daemon_state
.with_device(i
.first
, [&f
] (const DeviceState
& dev
) {
2187 f
->dump_object("device", dev
);
2191 f
->flush(cmdctx
->odata
);
2194 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2195 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
2196 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
,
2198 auto now
= ceph_clock_now();
2199 for (auto& i
: dm
->devices
) {
2200 daemon_state
.with_device(
2201 i
.first
, [&tbl
, now
] (const DeviceState
& dev
) {
2203 for (auto& i
: dev
.attachments
) {
2207 h
+= std::get
<0>(i
) + ":" + std::get
<1>(i
);
2211 << dev
.get_life_expectancy_str(now
)
2212 << TextTable::endrow
;
2215 cmdctx
->odata
.append(stringify(tbl
));
2219 ss
<< "daemon " << who
<< " not found";
2221 cmdctx
->reply(r
, ss
);
2223 } else if (prefix
== "device ls-by-host") {
2225 cmd_getval(cmdctx
->cmdmap
, "host", host
);
2227 daemon_state
.list_devids_by_server(host
, &devids
);
2229 f
->open_array_section("devices");
2230 for (auto& devid
: devids
) {
2231 daemon_state
.with_device(
2232 devid
, [&f
] (const DeviceState
& dev
) {
2233 f
->dump_object("device", dev
);
2237 f
->flush(cmdctx
->odata
);
2240 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2241 tbl
.define_column("DEV", TextTable::LEFT
, TextTable::LEFT
);
2242 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2243 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
, TextTable::LEFT
);
2244 auto now
= ceph_clock_now();
2245 for (auto& devid
: devids
) {
2246 daemon_state
.with_device(
2247 devid
, [&tbl
, &host
, now
] (const DeviceState
& dev
) {
2249 for (auto& j
: dev
.attachments
) {
2250 if (std::get
<0>(j
) == host
) {
2254 n
+= std::get
<1>(j
);
2258 for (auto& i
: dev
.daemons
) {
2267 << dev
.get_life_expectancy_str(now
)
2268 << TextTable::endrow
;
2271 cmdctx
->odata
.append(stringify(tbl
));
2273 cmdctx
->reply(0, ss
);
2275 } else if (prefix
== "device info") {
2277 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2280 if (!daemon_state
.with_device(devid
,
2281 [&f
, &rs
] (const DeviceState
& dev
) {
2283 f
->dump_object("device", dev
);
2288 ss
<< "device " << devid
<< " not found";
2292 f
->flush(cmdctx
->odata
);
2294 cmdctx
->odata
.append(rs
.str());
2297 cmdctx
->reply(r
, ss
);
2299 } else if (prefix
== "device set-life-expectancy") {
2301 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2302 string from_str
, to_str
;
2303 cmd_getval(cmdctx
->cmdmap
, "from", from_str
);
2304 cmd_getval(cmdctx
->cmdmap
, "to", to_str
);
2306 if (!from
.parse(from_str
)) {
2307 ss
<< "unable to parse datetime '" << from_str
<< "'";
2309 cmdctx
->reply(r
, ss
);
2310 } else if (to_str
.size() && !to
.parse(to_str
)) {
2311 ss
<< "unable to parse datetime '" << to_str
<< "'";
2313 cmdctx
->reply(r
, ss
);
2315 map
<string
,string
> meta
;
2316 daemon_state
.with_device_create(
2318 [from
, to
, &meta
] (DeviceState
& dev
) {
2319 dev
.set_life_expectancy(from
, to
, ceph_clock_now());
2320 meta
= dev
.metadata
;
2322 json_spirit::Object json_object
;
2323 for (auto& i
: meta
) {
2324 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2327 json
.append(json_spirit::write(json_object
));
2330 "\"prefix\": \"config-key set\", "
2331 "\"key\": \"device/" + devid
+ "\""
2333 auto on_finish
= new ReplyOnFinish(cmdctx
);
2334 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2337 } else if (prefix
== "device rm-life-expectancy") {
2339 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2340 map
<string
,string
> meta
;
2341 if (daemon_state
.with_device_write(devid
, [&meta
] (DeviceState
& dev
) {
2342 dev
.rm_life_expectancy();
2343 meta
= dev
.metadata
;
2350 "\"prefix\": \"config-key rm\", "
2351 "\"key\": \"device/" + devid
+ "\""
2354 json_spirit::Object json_object
;
2355 for (auto& i
: meta
) {
2356 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2358 json
.append(json_spirit::write(json_object
));
2361 "\"prefix\": \"config-key set\", "
2362 "\"key\": \"device/" + devid
+ "\""
2365 auto on_finish
= new ReplyOnFinish(cmdctx
);
2366 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2368 cmdctx
->reply(0, ss
);
2373 ss
<< "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2376 f
->open_object_section("pg_info");
2377 f
->dump_bool("pg_ready", pgmap_ready
);
2380 // fall back to feeding command to PGMap
2381 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2382 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
2383 f
.get(), &ss
, &cmdctx
->odata
);
2389 if (r
!= -EOPNOTSUPP
) {
2391 f
->flush(cmdctx
->odata
);
2393 cmdctx
->reply(r
, ss
);
2398 // Was the command unfound?
2399 if (py_command
.cmdstring
.empty()) {
2400 ss
<< "No handler found for '" << prefix
<< "'";
2401 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
2402 cmdctx
->reply(-EINVAL
, ss
);
2406 dout(10) << "passing through command '" << prefix
<< "' size " << cmdctx
->cmdmap
.size() << dendl
;
2407 finisher
.queue(new LambdaContext([this, cmdctx
, session
, py_command
, prefix
]
2409 std::stringstream ss
;
2411 dout(10) << "dispatching command '" << prefix
<< "' size " << cmdctx
->cmdmap
.size() << dendl
;
2413 // Validate that the module is enabled
2414 auto& py_handler_name
= py_command
.module_name
;
2415 PyModuleRef module
= py_modules
.get_module(py_handler_name
);
2416 ceph_assert(module
);
2417 if (!module
->is_enabled()) {
2418 ss
<< "Module '" << py_handler_name
<< "' is not enabled (required by "
2419 "command '" << prefix
<< "'): use `ceph mgr module enable "
2420 << py_handler_name
<< "` to enable it";
2421 dout(4) << ss
.str() << dendl
;
2422 cmdctx
->reply(-EOPNOTSUPP
, ss
);
2426 // Hack: allow the self-test method to run on unhealthy modules.
2427 // Fix this in future by creating a special path for self test rather
2428 // than having the hook be a normal module command.
2429 std::string self_test_prefix
= py_handler_name
+ " " + "self-test";
2431 // Validate that the module is healthy
2432 bool accept_command
;
2433 if (module
->is_loaded()) {
2434 if (module
->get_can_run() && !module
->is_failed()) {
2436 accept_command
= true;
2437 } else if (self_test_prefix
== prefix
) {
2438 // Unhealthy, but allow because it's a self test command
2439 accept_command
= true;
2441 accept_command
= false;
2442 ss
<< "Module '" << py_handler_name
<< "' has experienced an error and "
2443 "cannot handle commands: " << module
->get_error_string();
2446 // Module not loaded
2447 accept_command
= false;
2448 ss
<< "Module '" << py_handler_name
<< "' failed to load and "
2449 "cannot handle commands: " << module
->get_error_string();
2452 if (!accept_command
) {
2453 dout(4) << ss
.str() << dendl
;
2454 cmdctx
->reply(-EIO
, ss
);
2458 std::stringstream ds
;
2459 bufferlist inbl
= cmdctx
->data
;
2460 int r
= py_modules
.handle_command(py_command
, *session
, cmdctx
->cmdmap
,
2463 log_access_denied(cmdctx
, session
, ss
);
2466 cmdctx
->odata
.append(ds
);
2467 cmdctx
->reply(r
, ss
);
2468 dout(10) << " command returned " << r
<< dendl
;
2473 void DaemonServer::_prune_pending_service_map()
2475 utime_t cutoff
= ceph_clock_now();
2476 cutoff
-= g_conf().get_val
<double>("mgr_service_beacon_grace");
2477 auto p
= pending_service_map
.services
.begin();
2478 while (p
!= pending_service_map
.services
.end()) {
2479 auto q
= p
->second
.daemons
.begin();
2480 while (q
!= p
->second
.daemons
.end()) {
2481 DaemonKey key
{p
->first
, q
->first
};
2482 if (!daemon_state
.exists(key
)) {
2483 if (ServiceMap::is_normal_ceph_entity(p
->first
)) {
2484 dout(10) << "daemon " << key
<< " in service map but not in daemon state "
2485 << "index -- force pruning" << dendl
;
2486 q
= p
->second
.daemons
.erase(q
);
2487 pending_service_map_dirty
= pending_service_map
.epoch
;
2489 derr
<< "missing key " << key
<< dendl
;
2496 auto daemon
= daemon_state
.get(key
);
2497 std::lock_guard
l(daemon
->lock
);
2498 if (daemon
->last_service_beacon
== utime_t()) {
2499 // we must have just restarted; assume they are alive now.
2500 daemon
->last_service_beacon
= ceph_clock_now();
2504 if (daemon
->last_service_beacon
< cutoff
) {
2505 dout(10) << "pruning stale " << p
->first
<< "." << q
->first
2506 << " last_beacon " << daemon
->last_service_beacon
<< dendl
;
2507 q
= p
->second
.daemons
.erase(q
);
2508 pending_service_map_dirty
= pending_service_map
.epoch
;
2513 if (p
->second
.daemons
.empty()) {
2514 p
= pending_service_map
.services
.erase(p
);
2515 pending_service_map_dirty
= pending_service_map
.epoch
;
2522 void DaemonServer::send_report()
2525 if (ceph_clock_now() - started_at
> g_conf().get_val
<int64_t>("mgr_stats_period") * 4.0) {
2527 reported_osds
.clear();
2528 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2529 << "potentially incomplete PG state to mon" << dendl
;
2531 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2537 auto m
= ceph::make_message
<MMonMgrReport
>();
2538 m
->gid
= monc
->get_global_id();
2539 py_modules
.get_health_checks(&m
->health_checks
);
2540 py_modules
.get_progress_events(&m
->progress_events
);
2542 cluster_state
.with_mutable_pgmap([&](PGMap
& pg_map
) {
2543 cluster_state
.update_delta_stats();
2545 if (pending_service_map
.epoch
) {
2546 _prune_pending_service_map();
2547 if (pending_service_map_dirty
>= pending_service_map
.epoch
) {
2548 pending_service_map
.modified
= ceph_clock_now();
2549 encode(pending_service_map
, m
->service_map_bl
, CEPH_FEATURES_ALL
);
2550 dout(10) << "sending service_map e" << pending_service_map
.epoch
2552 pending_service_map
.epoch
++;
2556 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
2557 // FIXME: no easy way to get mon features here. this will do for
2558 // now, though, as long as we don't make a backward-incompat change.
2559 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
2560 dout(10) << pg_map
<< dendl
;
2562 pg_map
.get_health_checks(g_ceph_context
, osdmap
,
2565 dout(10) << m
->health_checks
.checks
.size() << " health checks"
2567 dout(20) << "health checks:\n";
2568 JSONFormatter
jf(true);
2569 jf
.dump_object("health_checks", m
->health_checks
);
2572 if (osdmap
.require_osd_release
>= ceph_release_t::luminous
) {
2573 clog
->debug() << "pgmap v" << pg_map
.version
<< ": " << pg_map
;
2578 map
<daemon_metric
, unique_ptr
<DaemonHealthMetricCollector
>> accumulated
;
2579 for (auto service
: {"osd", "mon"} ) {
2580 auto daemons
= daemon_state
.get_by_service(service
);
2581 for (const auto& [key
,state
] : daemons
) {
2582 std::lock_guard l
{state
->lock
};
2583 for (const auto& metric
: state
->daemon_health_metrics
) {
2584 auto acc
= accumulated
.find(metric
.get_type());
2585 if (acc
== accumulated
.end()) {
2586 auto collector
= DaemonHealthMetricCollector::create(metric
.get_type());
2588 derr
<< __func__
<< " " << key
2589 << " sent me an unknown health metric: "
2590 << std::hex
<< static_cast<uint8_t>(metric
.get_type())
2591 << std::dec
<< dendl
;
2594 dout(20) << " + " << state
->key
<< " "
2596 tie(acc
, std::ignore
) = accumulated
.emplace(metric
.get_type(),
2597 std::move(collector
));
2599 acc
->second
->update(key
, metric
);
2603 for (const auto& acc
: accumulated
) {
2604 acc
.second
->summarize(m
->health_checks
);
2606 // TODO? We currently do not notify the PyModules
2607 // TODO: respect needs_send, so we send the report only if we are asked to do
2608 // so, or the state is updated.
2609 monc
->send_mon_message(std::move(m
));
2612 void DaemonServer::adjust_pgs()
2615 unsigned max
= std::max
<int64_t>(1, g_conf()->mon_osd_max_creating_pgs
);
2616 double max_misplaced
= g_conf().get_val
<double>("target_max_misplaced_ratio");
2617 bool aggro
= g_conf().get_val
<bool>("mgr_debug_aggressive_pg_num_changes");
2619 map
<string
,unsigned> pg_num_to_set
;
2620 map
<string
,unsigned> pgp_num_to_set
;
2621 set
<pg_t
> upmaps_to_clear
;
2622 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2623 unsigned creating_or_unknown
= 0;
2624 for (auto& i
: pg_map
.num_pg_by_state
) {
2625 if ((i
.first
& (PG_STATE_CREATING
)) ||
2627 creating_or_unknown
+= i
.second
;
2630 unsigned left
= max
;
2631 if (creating_or_unknown
>= max
) {
2634 left
-= creating_or_unknown
;
2635 dout(10) << "creating_or_unknown " << creating_or_unknown
2636 << " max_creating " << max
2640 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2641 // can run more frequently than we get updated pg stats from OSDs. We
2642 // may make multiple adjustments with stale informaiton.
2643 double misplaced_ratio
, degraded_ratio
;
2644 double inactive_pgs_ratio
, unknown_pgs_ratio
;
2645 pg_map
.get_recovery_stats(&misplaced_ratio
, °raded_ratio
,
2646 &inactive_pgs_ratio
, &unknown_pgs_ratio
);
2647 dout(20) << "misplaced_ratio " << misplaced_ratio
2648 << " degraded_ratio " << degraded_ratio
2649 << " inactive_pgs_ratio " << inactive_pgs_ratio
2650 << " unknown_pgs_ratio " << unknown_pgs_ratio
2651 << "; target_max_misplaced_ratio " << max_misplaced
2654 for (auto& i
: osdmap
.get_pools()) {
2655 const pg_pool_t
& p
= i
.second
;
2658 if (p
.get_pg_num_target() != p
.get_pg_num()) {
2659 dout(20) << "pool " << i
.first
2660 << " pg_num " << p
.get_pg_num()
2661 << " target " << p
.get_pg_num_target()
2663 if (p
.has_flag(pg_pool_t::FLAG_CREATING
)) {
2664 dout(10) << "pool " << i
.first
2665 << " pg_num_target " << p
.get_pg_num_target()
2666 << " pg_num " << p
.get_pg_num()
2667 << " - still creating initial pgs"
2669 } else if (p
.get_pg_num_target() < p
.get_pg_num()) {
2670 // pg_num decrease (merge)
2671 pg_t
merge_source(p
.get_pg_num() - 1, i
.first
);
2672 pg_t merge_target
= merge_source
.get_parent();
2675 if (p
.get_pg_num() != p
.get_pg_num_pending()) {
2676 dout(10) << "pool " << i
.first
2677 << " pg_num_target " << p
.get_pg_num_target()
2678 << " pg_num " << p
.get_pg_num()
2679 << " - decrease and pg_num_pending != pg_num, waiting"
2682 } else if (p
.get_pg_num() == p
.get_pgp_num()) {
2683 dout(10) << "pool " << i
.first
2684 << " pg_num_target " << p
.get_pg_num_target()
2685 << " pg_num " << p
.get_pg_num()
2686 << " - decrease blocked by pgp_num "
2691 vector
<int32_t> source_acting
;
2692 for (auto &merge_participant
: {merge_source
, merge_target
}) {
2693 bool is_merge_source
= merge_participant
== merge_source
;
2694 if (osdmap
.have_pg_upmaps(merge_participant
)) {
2695 dout(10) << "pool " << i
.first
2696 << " pg_num_target " << p
.get_pg_num_target()
2697 << " pg_num " << p
.get_pg_num()
2698 << (is_merge_source
? " - merge source " : " - merge target ")
2699 << merge_participant
2700 << " has upmap" << dendl
;
2701 upmaps_to_clear
.insert(merge_participant
);
2704 auto q
= pg_map
.pg_stat
.find(merge_participant
);
2705 if (q
== pg_map
.pg_stat
.end()) {
2706 dout(10) << "pool " << i
.first
2707 << " pg_num_target " << p
.get_pg_num_target()
2708 << " pg_num " << p
.get_pg_num()
2709 << " - no state for " << merge_participant
2710 << (is_merge_source
? " (merge source)" : " (merge target)")
2713 } else if ((q
->second
.state
& (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) !=
2714 (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) {
2715 dout(10) << "pool " << i
.first
2716 << " pg_num_target " << p
.get_pg_num_target()
2717 << " pg_num " << p
.get_pg_num()
2718 << (is_merge_source
? " - merge source " : " - merge target ")
2719 << merge_participant
2720 << " not clean (" << pg_state_string(q
->second
.state
)
2724 if (is_merge_source
) {
2725 source_acting
= q
->second
.acting
;
2726 } else if (ok
&& q
->second
.acting
!= source_acting
) {
2727 dout(10) << "pool " << i
.first
2728 << " pg_num_target " << p
.get_pg_num_target()
2729 << " pg_num " << p
.get_pg_num()
2730 << (is_merge_source
? " - merge source " : " - merge target ")
2731 << merge_participant
2732 << " acting does not match (source " << source_acting
2733 << " != target " << q
->second
.acting
2740 unsigned target
= p
.get_pg_num() - 1;
2741 dout(10) << "pool " << i
.first
2742 << " pg_num_target " << p
.get_pg_num_target()
2743 << " pg_num " << p
.get_pg_num()
2745 << " (merging " << merge_source
2746 << " and " << merge_target
2748 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2751 } else if (p
.get_pg_num_target() > p
.get_pg_num()) {
2752 // pg_num increase (split)
2754 auto q
= pg_map
.num_pg_by_pool_state
.find(i
.first
);
2755 if (q
!= pg_map
.num_pg_by_pool_state
.end()) {
2756 for (auto& j
: q
->second
) {
2757 if ((j
.first
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)) == 0) {
2758 dout(20) << "pool " << i
.first
<< " has " << j
.second
2759 << " pgs in " << pg_state_string(j
.first
)
2769 dout(10) << "pool " << i
.first
2770 << " pg_num_target " << p
.get_pg_num_target()
2771 << " pg_num " << p
.get_pg_num()
2772 << " - not all pgs active"
2775 unsigned add
= std::min(
2777 p
.get_pg_num_target() - p
.get_pg_num());
2778 unsigned target
= p
.get_pg_num() + add
;
2780 dout(10) << "pool " << i
.first
2781 << " pg_num_target " << p
.get_pg_num_target()
2782 << " pg_num " << p
.get_pg_num()
2783 << " -> " << target
<< dendl
;
2784 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2790 unsigned target
= std::min(p
.get_pg_num_pending(),
2791 p
.get_pgp_num_target());
2792 if (target
!= p
.get_pgp_num()) {
2793 dout(20) << "pool " << i
.first
2794 << " pgp_num_target " << p
.get_pgp_num_target()
2795 << " pgp_num " << p
.get_pgp_num()
2796 << " -> " << target
<< dendl
;
2797 if (target
> p
.get_pgp_num() &&
2798 p
.get_pgp_num() == p
.get_pg_num()) {
2799 dout(10) << "pool " << i
.first
2800 << " pgp_num_target " << p
.get_pgp_num_target()
2801 << " pgp_num " << p
.get_pgp_num()
2802 << " - increase blocked by pg_num " << p
.get_pg_num()
2804 } else if (!aggro
&& (inactive_pgs_ratio
> 0 ||
2805 degraded_ratio
> 0 ||
2806 unknown_pgs_ratio
> 0)) {
2807 dout(10) << "pool " << i
.first
2808 << " pgp_num_target " << p
.get_pgp_num_target()
2809 << " pgp_num " << p
.get_pgp_num()
2810 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2811 << " update" << dendl
;
2812 } else if (!aggro
&& (misplaced_ratio
> max_misplaced
)) {
2813 dout(10) << "pool " << i
.first
2814 << " pgp_num_target " << p
.get_pgp_num_target()
2815 << " pgp_num " << p
.get_pgp_num()
2816 << " - misplaced_ratio " << misplaced_ratio
2817 << " > max " << max_misplaced
2818 << ", deferring pgp_num update" << dendl
;
2820 // NOTE: this calculation assumes objects are
2821 // basically uniformly distributed across all PGs
2822 // (regardless of pool), which is probably not
2823 // perfectly correct, but it's a start. make no
2824 // single adjustment that's more than half of the
2825 // max_misplaced, to somewhat limit the magnitude of
2826 // our potential error here.
2828 static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP
= 1;
2829 pool_stat_t s
= pg_map
.get_pg_pool_sum_stat(i
.first
);
2831 // pool is (virtually) empty; just jump to final pgp_num?
2832 (p
.get_pgp_num_target() > p
.get_pgp_num() &&
2833 s
.stats
.sum
.num_objects
<= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP
*
2834 p
.get_pgp_num_target()))) {
2838 std::min
<double>(max_misplaced
- misplaced_ratio
,
2839 max_misplaced
/ 2.0);
2840 unsigned estmax
= std::max
<unsigned>(
2841 (double)p
.get_pg_num() * room
, 1u);
2842 unsigned next_min
= 0;
2843 if (p
.get_pgp_num() > estmax
) {
2844 next_min
= p
.get_pgp_num() - estmax
;
2846 next
= std::clamp(target
,
2848 p
.get_pgp_num() + estmax
);
2849 dout(20) << " room " << room
<< " estmax " << estmax
2850 << " delta " << (target
-p
.get_pgp_num())
2851 << " next " << next
<< dendl
;
2852 if (p
.get_pgp_num_target() == p
.get_pg_num_target() &&
2853 p
.get_pgp_num_target() < p
.get_pg_num()) {
2854 // since pgp_num is tracking pg_num, ceph is handling
2855 // pgp_num. so, be responsible: don't let pgp_num get
2856 // too far out ahead of merges (if we are merging).
2857 // this avoids moving lots of unmerged pgs onto a
2858 // small number of OSDs where we might blow out the
2860 unsigned max_outpace_merges
=
2861 std::max
<unsigned>(8, p
.get_pg_num() * max_misplaced
);
2862 if (next
+ max_outpace_merges
< p
.get_pg_num()) {
2863 next
= p
.get_pg_num() - max_outpace_merges
;
2864 dout(10) << " using next " << next
2865 << " to avoid outpacing merges (max_outpace_merges "
2866 << max_outpace_merges
<< ")" << dendl
;
2870 dout(10) << "pool " << i
.first
2871 << " pgp_num_target " << p
.get_pgp_num_target()
2872 << " pgp_num " << p
.get_pgp_num()
2873 << " -> " << next
<< dendl
;
2874 pgp_num_to_set
[osdmap
.get_pool_name(i
.first
)] = next
;
2882 for (auto i
: pg_num_to_set
) {
2885 "\"prefix\": \"osd pool set\", "
2886 "\"pool\": \"" + i
.first
+ "\", "
2887 "\"var\": \"pg_num_actual\", "
2888 "\"val\": \"" + stringify(i
.second
) + "\""
2890 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2892 for (auto i
: pgp_num_to_set
) {
2895 "\"prefix\": \"osd pool set\", "
2896 "\"pool\": \"" + i
.first
+ "\", "
2897 "\"var\": \"pgp_num_actual\", "
2898 "\"val\": \"" + stringify(i
.second
) + "\""
2900 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2902 for (auto pg
: upmaps_to_clear
) {
2905 "\"prefix\": \"osd rm-pg-upmap\", "
2906 "\"pgid\": \"" + stringify(pg
) + "\""
2908 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2911 "\"prefix\": \"osd rm-pg-upmap-items\", "
2912 "\"pgid\": \"" + stringify(pg
) + "\"" +
2914 monc
->start_mon_command({cmd2
}, {}, nullptr, nullptr, nullptr);
2918 void DaemonServer::got_service_map()
2920 std::lock_guard
l(lock
);
2922 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
2923 if (pending_service_map
.epoch
== 0) {
2924 // we just started up
2925 dout(10) << "got initial map e" << service_map
.epoch
<< dendl
;
2926 pending_service_map
= service_map
;
2927 pending_service_map
.epoch
= service_map
.epoch
+ 1;
2929 // we we already active and therefore must have persisted it,
2930 // which means ours is the same or newer.
2931 dout(10) << "got updated map e" << service_map
.epoch
<< dendl
;
2932 ceph_assert(pending_service_map
.epoch
> service_map
.epoch
);
2936 // cull missing daemons, populate new ones
2937 std::set
<std::string
> types
;
2938 for (auto& [type
, service
] : pending_service_map
.services
) {
2939 if (ServiceMap::is_normal_ceph_entity(type
)) {
2945 std::set
<std::string
> names
;
2946 for (auto& q
: service
.daemons
) {
2947 names
.insert(q
.first
);
2948 DaemonKey key
{type
, q
.first
};
2949 if (!daemon_state
.exists(key
)) {
2950 auto daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
2952 daemon
->set_metadata(q
.second
.metadata
);
2953 daemon
->service_daemon
= true;
2954 daemon_state
.insert(daemon
);
2955 dout(10) << "added missing " << key
<< dendl
;
2958 daemon_state
.cull(type
, names
);
2960 daemon_state
.cull_services(types
);
2963 void DaemonServer::got_mgr_map()
2965 std::lock_guard
l(lock
);
2966 set
<std::string
> have
;
2967 cluster_state
.with_mgrmap([&](const MgrMap
& mgrmap
) {
2968 auto md_update
= [&] (DaemonKey key
) {
2969 std::ostringstream oss
;
2970 auto c
= new MetadataUpdate(daemon_state
, key
);
2971 // FIXME remove post-nautilus: include 'id' for luminous mons
2972 oss
<< "{\"prefix\": \"mgr metadata\", \"who\": \""
2973 << key
.name
<< "\", \"id\": \"" << key
.name
<< "\"}";
2974 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
2976 if (mgrmap
.active_name
.size()) {
2977 DaemonKey key
{"mgr", mgrmap
.active_name
};
2978 have
.insert(mgrmap
.active_name
);
2979 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
2981 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
2984 for (auto& i
: mgrmap
.standbys
) {
2985 DaemonKey key
{"mgr", i
.second
.name
};
2986 have
.insert(i
.second
.name
);
2987 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
2989 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
2993 daemon_state
.cull("mgr", have
);
2996 const char** DaemonServer::get_tracked_conf_keys() const
2998 static const char *KEYS
[] = {
2999 "mgr_stats_threshold",
3007 void DaemonServer::handle_conf_change(const ConfigProxy
& conf
,
3008 const std::set
<std::string
> &changed
)
3011 if (changed
.count("mgr_stats_threshold") || changed
.count("mgr_stats_period")) {
3012 dout(4) << "Updating stats threshold/period on "
3013 << daemon_connections
.size() << " clients" << dendl
;
3014 // Send a fresh MMgrConfigure to all clients, so that they can follow
3015 // the new policy for transmitting stats
3016 finisher
.queue(new LambdaContext([this](int r
) {
3017 std::lock_guard
l(lock
);
3018 for (auto &c
: daemon_connections
) {
3025 void DaemonServer::_send_configure(ConnectionRef c
)
3027 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
3029 auto configure
= make_message
<MMgrConfigure
>();
3030 configure
->stats_period
= g_conf().get_val
<int64_t>("mgr_stats_period");
3031 configure
->stats_threshold
= g_conf().get_val
<int64_t>("mgr_stats_threshold");
3033 if (c
->peer_is_osd()) {
3034 configure
->osd_perf_metric_queries
=
3035 osd_perf_metric_collector
.get_queries();
3036 } else if (c
->peer_is_mds()) {
3037 configure
->metric_config_message
=
3038 MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector
.get_queries()));
3041 c
->send_message2(configure
);
3044 MetricQueryID
DaemonServer::add_osd_perf_query(
3045 const OSDPerfMetricQuery
&query
,
3046 const std::optional
<OSDPerfMetricLimit
> &limit
)
3048 return osd_perf_metric_collector
.add_query(query
, limit
);
3051 int DaemonServer::remove_osd_perf_query(MetricQueryID query_id
)
3053 return osd_perf_metric_collector
.remove_query(query_id
);
3056 int DaemonServer::get_osd_perf_counters(OSDPerfCollector
*collector
)
3058 return osd_perf_metric_collector
.get_counters(collector
);
3061 MetricQueryID
DaemonServer::add_mds_perf_query(
3062 const MDSPerfMetricQuery
&query
,
3063 const std::optional
<MDSPerfMetricLimit
> &limit
)
3065 return mds_perf_metric_collector
.add_query(query
, limit
);
3068 int DaemonServer::remove_mds_perf_query(MetricQueryID query_id
)
3070 return mds_perf_metric_collector
.remove_query(query_id
);
3073 int DaemonServer::get_mds_perf_counters(MDSPerfCollector
*collector
)
3075 return mds_perf_metric_collector
.get_counters(collector
);