1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
15 #include <boost/algorithm/string.hpp>
18 #include "include/stringify.h"
19 #include "include/str_list.h"
20 #include "auth/RotatingKeyRing.h"
21 #include "json_spirit/json_spirit_writer.h"
23 #include "mgr/mgr_commands.h"
24 #include "mgr/DaemonHealthMetricCollector.h"
25 #include "mgr/OSDPerfMetricCollector.h"
26 #include "mgr/MDSPerfMetricCollector.h"
27 #include "mon/MonCommand.h"
29 #include "messages/MMgrOpen.h"
30 #include "messages/MMgrUpdate.h"
31 #include "messages/MMgrClose.h"
32 #include "messages/MMgrConfigure.h"
33 #include "messages/MMonMgrReport.h"
34 #include "messages/MCommand.h"
35 #include "messages/MCommandReply.h"
36 #include "messages/MMgrCommand.h"
37 #include "messages/MMgrCommandReply.h"
38 #include "messages/MPGStats.h"
39 #include "messages/MOSDScrub2.h"
40 #include "messages/MOSDForceRecovery.h"
41 #include "common/errno.h"
42 #include "common/pick_address.h"
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_mgr
47 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
49 using namespace TOPNSPC::common
;
52 using std::ostringstream
;
54 using std::stringstream
;
56 using std::unique_ptr
;
59 template <typename Map
>
60 bool map_compare(Map
const &lhs
, Map
const &rhs
) {
61 return lhs
.size() == rhs
.size()
62 && std::equal(lhs
.begin(), lhs
.end(), rhs
.begin(),
63 [] (auto a
, auto b
) { return a
.first
== b
.first
&& a
.second
== b
.second
; });
67 DaemonServer::DaemonServer(MonClient
*monc_
,
69 DaemonStateIndex
&daemon_state_
,
70 ClusterState
&cluster_state_
,
71 PyModuleRegistry
&py_modules_
,
73 LogChannelRef audit_clog_
)
74 : Dispatcher(g_ceph_context
),
75 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
76 g_conf().get_val
<Option::size_t>("mgr_client_bytes"))),
77 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
78 g_conf().get_val
<uint64_t>("mgr_client_messages"))),
79 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
80 g_conf().get_val
<Option::size_t>("mgr_osd_bytes"))),
81 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
82 g_conf().get_val
<uint64_t>("mgr_osd_messages"))),
83 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
84 g_conf().get_val
<Option::size_t>("mgr_mds_bytes"))),
85 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
86 g_conf().get_val
<uint64_t>("mgr_mds_messages"))),
87 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
88 g_conf().get_val
<Option::size_t>("mgr_mon_bytes"))),
89 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
90 g_conf().get_val
<uint64_t>("mgr_mon_messages"))),
94 daemon_state(daemon_state_
),
95 cluster_state(cluster_state_
),
96 py_modules(py_modules_
),
98 audit_clog(audit_clog_
),
100 timer(g_ceph_context
, lock
),
101 shutting_down(false),
103 osd_perf_metric_collector_listener(this),
104 osd_perf_metric_collector(osd_perf_metric_collector_listener
),
105 mds_perf_metric_collector_listener(this),
106 mds_perf_metric_collector(mds_perf_metric_collector_listener
)
108 g_conf().add_observer(this);
111 DaemonServer::~DaemonServer() {
113 g_conf().remove_observer(this);
116 int DaemonServer::init(uint64_t gid
, entity_addrvec_t client_addrs
)
118 // Initialize Messenger
119 std::string public_msgr_type
= g_conf()->ms_public_type
.empty() ?
120 g_conf().get_val
<std::string
>("ms_type") : g_conf()->ms_public_type
;
121 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
122 entity_name_t::MGR(gid
),
124 Messenger::get_pid_nonce());
125 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
127 msgr
->set_auth_client(monc
);
130 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
131 client_byte_throttler
.get(),
132 client_msg_throttler
.get());
135 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
136 osd_byte_throttler
.get(),
137 osd_msg_throttler
.get());
138 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
139 mds_byte_throttler
.get(),
140 mds_msg_throttler
.get());
141 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
142 mon_byte_throttler
.get(),
143 mon_msg_throttler
.get());
145 entity_addrvec_t addrs
;
146 int r
= pick_addresses(cct
, CEPH_PICK_ADDRESS_PUBLIC
, &addrs
);
150 dout(20) << __func__
<< " will bind to " << addrs
<< dendl
;
151 r
= msgr
->bindv(addrs
);
153 derr
<< "unable to bind mgr to " << addrs
<< dendl
;
157 msgr
->set_myname(entity_name_t::MGR(gid
));
158 msgr
->set_addr_unknowns(client_addrs
);
161 msgr
->add_dispatcher_tail(this);
163 msgr
->set_auth_server(monc
);
164 monc
->set_handle_authentication_dispatcher(this);
166 started_at
= ceph_clock_now();
168 std::lock_guard
l(lock
);
171 schedule_tick_locked(
172 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
177 entity_addrvec_t
DaemonServer::get_myaddrs() const
179 return msgr
->get_myaddrs();
182 int DaemonServer::ms_handle_fast_authentication(Connection
*con
)
184 auto s
= ceph::make_ref
<MgrSession
>(cct
);
186 s
->inst
.addr
= con
->get_peer_addr();
187 s
->entity_name
= con
->peer_name
;
188 dout(10) << __func__
<< " new session " << s
<< " con " << con
189 << " entity " << con
->peer_name
190 << " addr " << con
->get_peer_addrs()
193 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
194 if (caps_info
.allow_all
) {
195 dout(10) << " session " << s
<< " " << s
->entity_name
196 << " allow_all" << dendl
;
197 s
->caps
.set_allow_all();
198 } else if (caps_info
.caps
.length() > 0) {
199 auto p
= caps_info
.caps
.cbegin();
204 catch (buffer::error
& e
) {
205 dout(10) << " session " << s
<< " " << s
->entity_name
206 << " failed to decode caps" << dendl
;
209 if (!s
->caps
.parse(str
)) {
210 dout(10) << " session " << s
<< " " << s
->entity_name
211 << " failed to parse caps '" << str
<< "'" << dendl
;
214 dout(10) << " session " << s
<< " " << s
->entity_name
215 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
220 void DaemonServer::ms_handle_accept(Connection
* con
)
222 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
223 auto s
= ceph::ref_cast
<MgrSession
>(con
->get_priv());
224 std::lock_guard
l(lock
);
225 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
226 dout(10) << "registering osd." << s
->osd_id
<< " session "
227 << s
<< " con " << con
<< dendl
;
228 osd_cons
[s
->osd_id
].insert(con
);
232 bool DaemonServer::ms_handle_reset(Connection
*con
)
234 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
235 auto priv
= con
->get_priv();
236 auto session
= static_cast<MgrSession
*>(priv
.get());
240 std::lock_guard
l(lock
);
241 dout(10) << "unregistering osd." << session
->osd_id
242 << " session " << session
<< " con " << con
<< dendl
;
243 osd_cons
[session
->osd_id
].erase(con
);
245 auto iter
= daemon_connections
.find(con
);
246 if (iter
!= daemon_connections
.end()) {
247 daemon_connections
.erase(iter
);
253 bool DaemonServer::ms_handle_refused(Connection
*con
)
255 // do nothing for now
259 bool DaemonServer::ms_dispatch2(const ref_t
<Message
>& m
)
261 // Note that we do *not* take ::lock here, in order to avoid
262 // serializing all message handling. It's up to each handler
263 // to take whatever locks it needs.
264 switch (m
->get_type()) {
266 cluster_state
.ingest_pgstats(ref_cast
<MPGStats
>(m
));
267 maybe_ready(m
->get_source().num());
270 return handle_report(ref_cast
<MMgrReport
>(m
));
272 return handle_open(ref_cast
<MMgrOpen
>(m
));
274 return handle_update(ref_cast
<MMgrUpdate
>(m
));
276 return handle_close(ref_cast
<MMgrClose
>(m
));
278 return handle_command(ref_cast
<MCommand
>(m
));
279 case MSG_MGR_COMMAND
:
280 return handle_command(ref_cast
<MMgrCommand
>(m
));
282 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
287 void DaemonServer::dump_pg_ready(ceph::Formatter
*f
)
289 f
->dump_bool("pg_ready", pgmap_ready
.load());
292 void DaemonServer::maybe_ready(int32_t osd_id
)
294 if (pgmap_ready
.load()) {
295 // Fast path: we don't need to take lock because pgmap_ready
298 std::lock_guard
l(lock
);
300 if (reported_osds
.find(osd_id
) == reported_osds
.end()) {
301 dout(4) << "initial report from osd " << osd_id
<< dendl
;
302 reported_osds
.insert(osd_id
);
303 std::set
<int32_t> up_osds
;
305 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
306 osdmap
.get_up_osds(up_osds
);
309 std::set
<int32_t> unreported_osds
;
310 std::set_difference(up_osds
.begin(), up_osds
.end(),
311 reported_osds
.begin(), reported_osds
.end(),
312 std::inserter(unreported_osds
, unreported_osds
.begin()));
314 if (unreported_osds
.size() == 0) {
315 dout(4) << "all osds have reported, sending PG state to mon" << dendl
;
317 reported_osds
.clear();
318 // Avoid waiting for next tick
321 dout(4) << "still waiting for " << unreported_osds
.size() << " osds"
322 " to report in before PGMap is ready" << dendl
;
328 void DaemonServer::tick()
334 schedule_tick_locked(
335 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
338 // Currently modules do not set health checks in response to events delivered to
339 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
340 // if this pattern emerges in the future, this scheduler could be modified to
341 // fire after all modules have had a chance to set their health checks.
342 void DaemonServer::schedule_tick_locked(double delay_sec
)
344 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
347 timer
.cancel_event(tick_event
);
348 tick_event
= nullptr;
351 // on shutdown start rejecting explicit requests to send reports that may
352 // originate from python land which may still be running.
356 tick_event
= timer
.add_event_after(delay_sec
,
357 new LambdaContext([this](int r
) {
362 void DaemonServer::schedule_tick(double delay_sec
)
364 std::lock_guard
l(lock
);
365 schedule_tick_locked(delay_sec
);
368 void DaemonServer::handle_osd_perf_metric_query_updated()
372 // Send a fresh MMgrConfigure to all clients, so that they can follow
373 // the new policy for transmitting stats
374 finisher
.queue(new LambdaContext([this](int r
) {
375 std::lock_guard
l(lock
);
376 for (auto &c
: daemon_connections
) {
377 if (c
->peer_is_osd()) {
384 void DaemonServer::handle_mds_perf_metric_query_updated()
388 // Send a fresh MMgrConfigure to all clients, so that they can follow
389 // the new policy for transmitting stats
390 finisher
.queue(new LambdaContext([this](int r
) {
391 std::lock_guard
l(lock
);
392 for (auto &c
: daemon_connections
) {
393 if (c
->peer_is_mds()) {
400 void DaemonServer::shutdown()
402 dout(10) << "begin" << dendl
;
405 cluster_state
.shutdown();
406 dout(10) << "done" << dendl
;
408 std::lock_guard
l(lock
);
409 shutting_down
= true;
413 static DaemonKey
key_from_service(
414 const std::string
& service_name
,
416 const std::string
& daemon_name
)
418 if (!service_name
.empty()) {
419 return DaemonKey
{service_name
, daemon_name
};
421 return DaemonKey
{ceph_entity_type_name(peer_type
), daemon_name
};
425 void DaemonServer::fetch_missing_metadata(const DaemonKey
& key
,
426 const entity_addr_t
& addr
)
428 if (!daemon_state
.is_updating(key
) &&
429 (key
.type
== "osd" || key
.type
== "mds" || key
.type
== "mon")) {
430 std::ostringstream oss
;
431 auto c
= new MetadataUpdate(daemon_state
, key
);
432 if (key
.type
== "osd") {
433 oss
<< "{\"prefix\": \"osd metadata\", \"id\": "
435 } else if (key
.type
== "mds") {
436 c
->set_default("addr", stringify(addr
));
437 oss
<< "{\"prefix\": \"mds metadata\", \"who\": \""
438 << key
.name
<< "\"}";
439 } else if (key
.type
== "mon") {
440 oss
<< "{\"prefix\": \"mon metadata\", \"id\": \""
441 << key
.name
<< "\"}";
445 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
449 bool DaemonServer::handle_open(const ref_t
<MMgrOpen
>& m
)
451 std::unique_lock
l(lock
);
453 DaemonKey key
= key_from_service(m
->service_name
,
454 m
->get_connection()->get_peer_type(),
457 auto con
= m
->get_connection();
458 dout(10) << "from " << key
<< " " << con
->get_peer_addr() << dendl
;
460 _send_configure(con
);
462 DaemonStatePtr daemon
;
463 if (daemon_state
.exists(key
)) {
464 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
465 daemon
= daemon_state
.get(key
);
468 if (m
->service_daemon
) {
469 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
470 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
472 daemon
->service_daemon
= true;
473 daemon_state
.insert(daemon
);
475 /* A normal Ceph daemon has connected but we are or should be waiting on
476 * metadata for it. Close the session so that it tries to reconnect.
478 dout(2) << "ignoring open from " << key
<< " " << con
->get_peer_addr()
479 << "; not ready for session (expect reconnect)" << dendl
;
482 fetch_missing_metadata(key
, m
->get_source_addr());
487 if (m
->service_daemon
) {
488 // update the metadata through the daemon state index to
489 // ensure it's kept up-to-date
490 daemon_state
.update_metadata(daemon
, m
->daemon_metadata
);
493 std::lock_guard
l(daemon
->lock
);
494 daemon
->perf_counters
.clear();
496 daemon
->service_daemon
= m
->service_daemon
;
497 if (m
->service_daemon
) {
498 daemon
->service_status
= m
->daemon_status
;
500 utime_t now
= ceph_clock_now();
501 auto [d
, added
] = pending_service_map
.get_daemon(m
->service_name
,
503 if (added
|| d
->gid
!= (uint64_t)m
->get_source().num()) {
504 dout(10) << "registering " << key
<< " in pending_service_map" << dendl
;
505 d
->gid
= m
->get_source().num();
506 d
->addr
= m
->get_source_addr();
507 d
->start_epoch
= pending_service_map
.epoch
;
508 d
->start_stamp
= now
;
509 d
->metadata
= m
->daemon_metadata
;
510 pending_service_map_dirty
= pending_service_map
.epoch
;
514 auto p
= m
->config_bl
.cbegin();
515 if (p
!= m
->config_bl
.end()) {
516 decode(daemon
->config
, p
);
517 decode(daemon
->ignored_mon_config
, p
);
518 dout(20) << " got config " << daemon
->config
519 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
521 daemon
->config_defaults_bl
= m
->config_defaults_bl
;
522 daemon
->config_defaults
.clear();
523 dout(20) << " got config_defaults_bl " << daemon
->config_defaults_bl
.length()
524 << " bytes" << dendl
;
527 if (con
->get_peer_type() != entity_name_t::TYPE_CLIENT
&&
528 m
->service_name
.empty())
530 // Store in set of the daemon/service connections, i.e. those
531 // connections that require an update in the event of stats
532 // configuration changes.
533 daemon_connections
.insert(con
);
539 bool DaemonServer::handle_update(const ref_t
<MMgrUpdate
>& m
)
542 if (!m
->service_name
.empty()) {
543 key
.type
= m
->service_name
;
545 key
.type
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
547 key
.name
= m
->daemon_name
;
549 dout(10) << "from " << m
->get_connection() << " " << key
<< dendl
;
551 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
552 m
->service_name
.empty()) {
553 // Clients should not be sending us update request
554 dout(10) << "rejecting update request from non-daemon client " << m
->daemon_name
556 clog
->warn() << "rejecting report from non-daemon client " << m
->daemon_name
557 << " at " << m
->get_connection()->get_peer_addrs();
558 m
->get_connection()->mark_down();
564 std::unique_lock
locker(lock
);
566 DaemonStatePtr daemon
;
567 // Look up the DaemonState
568 if (daemon_state
.exists(key
)) {
569 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
571 daemon
= daemon_state
.get(key
);
572 if (m
->need_metadata_update
&&
573 !m
->daemon_metadata
.empty()) {
574 daemon_state
.update_metadata(daemon
, m
->daemon_metadata
);
582 bool DaemonServer::handle_close(const ref_t
<MMgrClose
>& m
)
584 std::lock_guard
l(lock
);
586 DaemonKey key
= key_from_service(m
->service_name
,
587 m
->get_connection()->get_peer_type(),
589 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
591 if (daemon_state
.exists(key
)) {
592 DaemonStatePtr daemon
= daemon_state
.get(key
);
593 daemon_state
.rm(key
);
595 std::lock_guard
l(daemon
->lock
);
596 if (daemon
->service_daemon
) {
597 pending_service_map
.rm_daemon(m
->service_name
, m
->daemon_name
);
598 pending_service_map_dirty
= pending_service_map
.epoch
;
603 // send same message back as a reply
604 m
->get_connection()->send_message2(m
);
608 void DaemonServer::update_task_status(
610 const std::map
<std::string
,std::string
>& task_status
)
612 dout(10) << "got task status from " << key
<< dendl
;
614 [[maybe_unused
]] auto [daemon
, added
] =
615 pending_service_map
.get_daemon(key
.type
, key
.name
);
616 if (daemon
->task_status
!= task_status
) {
617 daemon
->task_status
= task_status
;
618 pending_service_map_dirty
= pending_service_map
.epoch
;
622 bool DaemonServer::handle_report(const ref_t
<MMgrReport
>& m
)
625 if (!m
->service_name
.empty()) {
626 key
.type
= m
->service_name
;
628 key
.type
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
630 key
.name
= m
->daemon_name
;
632 dout(10) << "from " << m
->get_connection() << " " << key
<< dendl
;
634 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
635 m
->service_name
.empty()) {
636 // Clients should not be sending us stats unless they are declaring
637 // themselves to be a daemon for some service.
638 dout(10) << "rejecting report from non-daemon client " << m
->daemon_name
640 clog
->warn() << "rejecting report from non-daemon client " << m
->daemon_name
641 << " at " << m
->get_connection()->get_peer_addrs();
642 m
->get_connection()->mark_down();
648 std::unique_lock
locker(lock
);
650 DaemonStatePtr daemon
;
651 // Look up the DaemonState
652 if (daemon
= daemon_state
.get(key
); daemon
!= nullptr) {
653 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
657 // we don't know the hostname at this stage, reject MMgrReport here.
658 dout(5) << "rejecting report from " << key
<< ", since we do not have its metadata now."
660 // issue metadata request in background
661 fetch_missing_metadata(key
, m
->get_source_addr());
666 auto priv
= m
->get_connection()->get_priv();
667 auto session
= static_cast<MgrSession
*>(priv
.get());
671 m
->get_connection()->mark_down();
673 dout(10) << "unregistering osd." << session
->osd_id
674 << " session " << session
<< " con " << m
->get_connection() << dendl
;
676 if (osd_cons
.find(session
->osd_id
) != osd_cons
.end()) {
677 osd_cons
[session
->osd_id
].erase(m
->get_connection());
680 auto iter
= daemon_connections
.find(m
->get_connection());
681 if (iter
!= daemon_connections
.end()) {
682 daemon_connections
.erase(iter
);
688 // Update the DaemonState
689 ceph_assert(daemon
!= nullptr);
691 std::lock_guard
l(daemon
->lock
);
692 auto &daemon_counters
= daemon
->perf_counters
;
693 daemon_counters
.update(*m
.get());
695 auto p
= m
->config_bl
.cbegin();
696 if (p
!= m
->config_bl
.end()) {
697 decode(daemon
->config
, p
);
698 decode(daemon
->ignored_mon_config
, p
);
699 dout(20) << " got config " << daemon
->config
700 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
703 utime_t now
= ceph_clock_now();
704 if (daemon
->service_daemon
) {
705 if (m
->daemon_status
) {
706 daemon
->service_status_stamp
= now
;
707 daemon
->service_status
= *m
->daemon_status
;
709 daemon
->last_service_beacon
= now
;
710 } else if (m
->daemon_status
) {
711 derr
<< "got status from non-daemon " << key
<< dendl
;
713 // update task status
714 if (m
->task_status
) {
715 update_task_status(key
, *m
->task_status
);
716 daemon
->last_service_beacon
= now
;
718 if (m
->get_connection()->peer_is_osd() || m
->get_connection()->peer_is_mon()) {
719 // only OSD and MON send health_checks to me now
720 daemon
->daemon_health_metrics
= std::move(m
->daemon_health_metrics
);
721 dout(10) << "daemon_health_metrics " << daemon
->daemon_health_metrics
727 // if there are any schema updates, notify the python modules
728 /* no users currently
729 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
730 py_modules.notify_all("perf_schema_update", ceph::to_string(key));
734 if (m
->get_connection()->peer_is_osd()) {
735 osd_perf_metric_collector
.process_reports(m
->osd_perf_metric_reports
);
738 if (m
->metric_report_message
) {
739 const MetricReportMessage
&message
= *m
->metric_report_message
;
740 boost::apply_visitor(HandlePayloadVisitor(this), message
.payload
);
747 void DaemonServer::_generate_command_map(
749 map
<string
,string
> ¶m_str_map
)
751 for (auto p
= cmdmap
.begin();
752 p
!= cmdmap
.end(); ++p
) {
753 if (p
->first
== "prefix")
755 if (p
->first
== "caps") {
757 if (cmd_getval(cmdmap
, "caps", cv
) &&
758 cv
.size() % 2 == 0) {
759 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
760 string k
= string("caps_") + cv
[i
];
761 param_str_map
[k
] = cv
[i
+ 1];
766 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
770 const MonCommand
*DaemonServer::_get_mgrcommand(
771 const string
&cmd_prefix
,
772 const std::vector
<MonCommand
> &cmds
)
774 const MonCommand
*this_cmd
= nullptr;
775 for (const auto &cmd
: cmds
) {
776 if (cmd
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
784 bool DaemonServer::_allowed_command(
786 const string
&service
,
787 const string
&module
,
788 const string
&prefix
,
789 const cmdmap_t
& cmdmap
,
790 const map
<string
,string
>& param_str_map
,
791 const MonCommand
*this_cmd
) {
793 if (s
->entity_name
.is_mon()) {
794 // mon is all-powerful. even when it is forwarding commands on behalf of
795 // old clients; we expect the mon is validating commands before proxying!
799 bool cmd_r
= this_cmd
->requires_perm('r');
800 bool cmd_w
= this_cmd
->requires_perm('w');
801 bool cmd_x
= this_cmd
->requires_perm('x');
803 bool capable
= s
->caps
.is_capable(
806 service
, module
, prefix
, param_str_map
,
810 dout(10) << " " << s
->entity_name
<< " "
811 << (capable
? "" : "not ") << "capable" << dendl
;
816 * The working data for processing an MCommand. This lives in
817 * a class to enable passing it into other threads for processing
818 * outside of the thread/locks that called handle_command.
820 class CommandContext
{
822 ceph::ref_t
<MCommand
> m_tell
;
823 ceph::ref_t
<MMgrCommand
> m_mgr
;
824 const std::vector
<std::string
>& cmd
; ///< ref into m_tell or m_mgr
825 const bufferlist
& data
; ///< ref into m_tell or m_mgr
829 explicit CommandContext(ceph::ref_t
<MCommand
> m
)
830 : m_tell
{std::move(m
)},
832 data(m_tell
->get_data()) {
834 explicit CommandContext(ceph::ref_t
<MMgrCommand
> m
)
835 : m_mgr
{std::move(m
)},
837 data(m_mgr
->get_data()) {
840 void reply(int r
, const std::stringstream
&ss
) {
844 void reply(int r
, const std::string
&rs
) {
845 // Let the connection drop as soon as we've sent our response
846 ConnectionRef con
= m_tell
? m_tell
->get_connection()
847 : m_mgr
->get_connection();
849 con
->mark_disposable();
853 dout(20) << "success" << dendl
;
855 derr
<< __func__
<< " " << cpp_strerror(r
) << " " << rs
<< dendl
;
859 MCommandReply
*reply
= new MCommandReply(r
, rs
);
860 reply
->set_tid(m_tell
->get_tid());
861 reply
->set_data(odata
);
862 con
->send_message(reply
);
864 MMgrCommandReply
*reply
= new MMgrCommandReply(r
, rs
);
865 reply
->set_tid(m_mgr
->get_tid());
866 reply
->set_data(odata
);
867 con
->send_message(reply
);
874 * A context for receiving a bufferlist/error string from a background
875 * function and then calling back to a CommandContext when it's done
877 class ReplyOnFinish
: public Context
{
878 std::shared_ptr
<CommandContext
> cmdctx
;
884 explicit ReplyOnFinish(const std::shared_ptr
<CommandContext
> &cmdctx_
)
887 void finish(int r
) override
{
888 cmdctx
->odata
.claim_append(from_mon
);
889 cmdctx
->reply(r
, outs
);
893 bool DaemonServer::handle_command(const ref_t
<MCommand
>& m
)
895 std::lock_guard
l(lock
);
896 auto cmdctx
= std::make_shared
<CommandContext
>(m
);
898 return _handle_command(cmdctx
);
899 } catch (const bad_cmd_get
& e
) {
900 cmdctx
->reply(-EINVAL
, e
.what());
905 bool DaemonServer::handle_command(const ref_t
<MMgrCommand
>& m
)
907 std::lock_guard
l(lock
);
908 auto cmdctx
= std::make_shared
<CommandContext
>(m
);
910 return _handle_command(cmdctx
);
911 } catch (const bad_cmd_get
& e
) {
912 cmdctx
->reply(-EINVAL
, e
.what());
917 void DaemonServer::log_access_denied(
918 std::shared_ptr
<CommandContext
>& cmdctx
,
919 MgrSession
* session
, std::stringstream
& ss
) {
920 dout(1) << " access denied" << dendl
;
921 audit_clog
->info() << "from='" << session
->inst
<< "' "
922 << "entity='" << session
->entity_name
<< "' "
923 << "cmd=" << cmdctx
->cmd
<< ": access denied";
924 ss
<< "access denied: does your client key have mgr caps? "
925 "See http://docs.ceph.com/en/latest/mgr/administrator/"
926 "#client-authentication";
929 void DaemonServer::_check_offlines_pgs(
930 const set
<int>& osds
,
931 const OSDMap
& osdmap
,
933 offline_pg_report
*report
)
936 *report
= offline_pg_report();
939 for (const auto& q
: pgmap
.pg_stat
) {
940 set
<int32_t> pg_acting
; // net acting sets (with no missing if degraded)
942 if (q
.second
.state
== 0) {
943 report
->unknown
.insert(q
.first
);
946 if (q
.second
.state
& PG_STATE_DEGRADED
) {
947 for (auto& anm
: q
.second
.avail_no_missing
) {
948 if (osds
.count(anm
.osd
)) {
952 if (anm
.osd
!= CRUSH_ITEM_NONE
) {
953 pg_acting
.insert(anm
.osd
);
957 for (auto& a
: q
.second
.acting
) {
962 if (a
!= CRUSH_ITEM_NONE
) {
970 const pg_pool_t
*pi
= osdmap
.get_pg_pool(q
.first
.pool());
971 bool dangerous
= false;
973 report
->bad_no_pool
.insert(q
.first
); // pool is creating or deleting
976 if (!(q
.second
.state
& PG_STATE_ACTIVE
)) {
977 report
->bad_already_inactive
.insert(q
.first
);
980 if (pg_acting
.size() < pi
->min_size
) {
981 report
->bad_become_inactive
.insert(q
.first
);
985 report
->not_ok
.insert(q
.first
);
987 report
->ok
.insert(q
.first
);
988 if (q
.second
.state
& PG_STATE_DEGRADED
) {
989 report
->ok_become_more_degraded
.insert(q
.first
);
991 report
->ok_become_degraded
.insert(q
.first
);
995 dout(20) << osds
<< " -> " << report
->ok
.size() << " ok, "
996 << report
->not_ok
.size() << " not ok, "
997 << report
->unknown
.size() << " unknown"
1001 void DaemonServer::_maximize_ok_to_stop_set(
1002 const set
<int>& orig_osds
,
1004 const OSDMap
& osdmap
,
1006 offline_pg_report
*out_report
)
1008 dout(20) << "orig_osds " << orig_osds
<< " max " << max
<< dendl
;
1009 _check_offlines_pgs(orig_osds
, osdmap
, pgmap
, out_report
);
1010 if (!out_report
->ok_to_stop()) {
1013 if (orig_osds
.size() >= max
) {
1018 // semi-arbitrarily start with the first osd in the set
1019 offline_pg_report report
;
1020 set
<int> osds
= orig_osds
;
1021 int parent
= *osds
.begin();
1025 // identify the next parent
1026 int r
= osdmap
.crush
->get_immediate_parent_id(parent
, &parent
);
1028 return; // just go with what we have so far!
1031 // get candidate additions that are beneath this point in the tree
1033 r
= osdmap
.crush
->get_all_children(parent
, &children
);
1035 return; // just go with what we have so far!
1037 dout(20) << " parent " << parent
<< " children " << children
<< dendl
;
1039 // try adding in more osds
1040 int failed
= 0; // how many children we failed to add to our set
1041 for (auto o
: children
) {
1042 if (o
>= 0 && osdmap
.is_up(o
) && osds
.count(o
) == 0) {
1044 _check_offlines_pgs(osds
, osdmap
, pgmap
, &report
);
1045 if (!report
.ok_to_stop()) {
1050 *out_report
= report
;
1051 if (osds
.size() == max
) {
1052 dout(20) << " hit max" << dendl
;
1053 return; // yay, we hit the max
1059 // we hit some failures; go with what we have
1060 dout(20) << " hit some peer failures" << dendl
;
1066 bool DaemonServer::_handle_command(
1067 std::shared_ptr
<CommandContext
>& cmdctx
)
1070 bool admin_socket_cmd
= false;
1071 if (cmdctx
->m_tell
) {
1073 // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
1075 admin_socket_cmd
= (cmdctx
->m_tell
->fsid
!= uuid_d());
1079 auto priv
= m
->get_connection()->get_priv();
1080 auto session
= static_cast<MgrSession
*>(priv
.get());
1084 if (session
->inst
.name
== entity_name_t()) {
1085 session
->inst
.name
= m
->get_source();
1088 map
<string
,string
> param_str_map
;
1089 std::stringstream ss
;
1092 if (!cmdmap_from_json(cmdctx
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
1093 cmdctx
->reply(-EINVAL
, ss
);
1098 cmd_getval(cmdctx
->cmdmap
, "prefix", prefix
);
1099 dout(10) << "decoded-size=" << cmdctx
->cmdmap
.size() << " prefix=" << prefix
<< dendl
;
1101 boost::scoped_ptr
<Formatter
> f
;
1104 if (boost::algorithm::ends_with(prefix
, "_json")) {
1107 format
= cmd_getval_or
<string
>(cmdctx
->cmdmap
, "format", "plain");
1109 f
.reset(Formatter::create(format
));
1112 // this is just for mgr commands - admin socket commands will fall
1113 // through and use the admin socket version of
1114 // get_command_descriptions
1115 if (prefix
== "get_command_descriptions" && !admin_socket_cmd
) {
1116 dout(10) << "reading commands from python modules" << dendl
;
1117 const auto py_commands
= py_modules
.get_commands();
1121 f
.open_object_section("command_descriptions");
1123 auto dump_cmd
= [&cmdnum
, &f
, m
](const MonCommand
&mc
){
1124 ostringstream secname
;
1125 secname
<< "cmd" << std::setfill('0') << std::setw(3) << cmdnum
;
1126 dump_cmddesc_to_json(&f
, m
->get_connection()->get_features(),
1127 secname
.str(), mc
.cmdstring
, mc
.helpstring
,
1128 mc
.module
, mc
.req_perms
, 0);
1132 for (const auto &pyc
: py_commands
) {
1136 for (const auto &mgr_cmd
: mgr_commands
) {
1140 f
.close_section(); // command_descriptions
1141 f
.flush(cmdctx
->odata
);
1142 cmdctx
->reply(0, ss
);
1147 const MonCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
);
1148 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
1150 bool is_allowed
= false;
1151 ModuleCommand py_command
;
1152 if (admin_socket_cmd
) {
1153 // admin socket commands require all capabilities
1154 is_allowed
= session
->caps
.is_allow_all();
1155 } else if (!mgr_cmd
) {
1156 // Resolve the command to the name of the module that will
1157 // handle it (if the command exists)
1158 auto py_commands
= py_modules
.get_py_commands();
1159 for (const auto &pyc
: py_commands
) {
1160 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
1161 if (pyc_prefix
== prefix
) {
1167 MonCommand pyc
= {"", "", "py", py_command
.perm
};
1168 is_allowed
= _allowed_command(session
, "py", py_command
.module_name
,
1169 prefix
, cmdctx
->cmdmap
, param_str_map
,
1172 // validate user's permissions for requested command
1173 is_allowed
= _allowed_command(session
, mgr_cmd
->module
, "",
1174 prefix
, cmdctx
->cmdmap
, param_str_map
, mgr_cmd
);
1178 log_access_denied(cmdctx
, session
, ss
);
1179 cmdctx
->reply(-EACCES
, ss
);
1184 << "from='" << session
->inst
<< "' "
1185 << "entity='" << session
->entity_name
<< "' "
1186 << "cmd=" << cmdctx
->cmd
<< ": dispatch";
1188 if (admin_socket_cmd
) {
1189 cct
->get_admin_socket()->queue_tell_command(cmdctx
->m_tell
);
1194 // service map commands
1195 if (prefix
== "service dump") {
1197 f
.reset(Formatter::create("json-pretty"));
1198 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
1199 f
->dump_object("service_map", service_map
);
1201 f
->flush(cmdctx
->odata
);
1202 cmdctx
->reply(0, ss
);
1205 if (prefix
== "service status") {
1207 f
.reset(Formatter::create("json-pretty"));
1208 // only include state from services that are in the persisted service map
1209 f
->open_object_section("service_status");
1210 for (auto& [type
, service
] : pending_service_map
.services
) {
1211 if (ServiceMap::is_normal_ceph_entity(type
)) {
1215 f
->open_object_section(type
.c_str());
1216 for (auto& q
: service
.daemons
) {
1217 f
->open_object_section(q
.first
.c_str());
1218 DaemonKey key
{type
, q
.first
};
1219 ceph_assert(daemon_state
.exists(key
));
1220 auto daemon
= daemon_state
.get(key
);
1221 std::lock_guard
l(daemon
->lock
);
1222 f
->dump_stream("status_stamp") << daemon
->service_status_stamp
;
1223 f
->dump_stream("last_beacon") << daemon
->last_service_beacon
;
1224 f
->open_object_section("status");
1225 for (auto& r
: daemon
->service_status
) {
1226 f
->dump_string(r
.first
.c_str(), r
.second
);
1234 f
->flush(cmdctx
->odata
);
1235 cmdctx
->reply(0, ss
);
1239 if (prefix
== "config set") {
1242 cmd_getval(cmdctx
->cmdmap
, "key", key
);
1243 cmd_getval(cmdctx
->cmdmap
, "value", val
);
1244 r
= cct
->_conf
.set_val(key
, val
, &ss
);
1246 cct
->_conf
.apply_changes(nullptr);
1248 cmdctx
->reply(0, ss
);
1255 if (prefix
== "pg scrub" ||
1256 prefix
== "pg repair" ||
1257 prefix
== "pg deep-scrub") {
1258 string scrubop
= prefix
.substr(3, string::npos
);
1262 cmd_getval(cmdctx
->cmdmap
, "pgid", pgidstr
);
1263 if (!pgid
.parse(pgidstr
.c_str())) {
1264 ss
<< "invalid pgid '" << pgidstr
<< "'";
1265 cmdctx
->reply(-EINVAL
, ss
);
1268 bool pg_exists
= false;
1269 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1270 pg_exists
= osdmap
.pg_exists(pgid
);
1273 ss
<< "pg " << pgid
<< " does not exist";
1274 cmdctx
->reply(-ENOENT
, ss
);
1277 int acting_primary
= -1;
1279 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1280 epoch
= osdmap
.get_epoch();
1281 osdmap
.get_primary_shard(pgid
, &acting_primary
, &spgid
);
1283 if (acting_primary
== -1) {
1284 ss
<< "pg " << pgid
<< " has no primary osd";
1285 cmdctx
->reply(-EAGAIN
, ss
);
1288 auto p
= osd_cons
.find(acting_primary
);
1289 if (p
== osd_cons
.end()) {
1290 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
1291 << " is not currently connected";
1292 cmdctx
->reply(-EAGAIN
, ss
);
1295 for (auto& con
: p
->second
) {
1296 assert(HAVE_FEATURE(con
->get_features(), SERVER_OCTOPUS
));
1297 vector
<spg_t
> pgs
= { spgid
};
1298 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1301 scrubop
== "repair",
1302 scrubop
== "deep-scrub"));
1304 ss
<< "instructing pg " << spgid
<< " on osd." << acting_primary
1305 << " to " << scrubop
;
1306 cmdctx
->reply(0, ss
);
1308 } else if (prefix
== "osd scrub" ||
1309 prefix
== "osd deep-scrub" ||
1310 prefix
== "osd repair") {
1312 cmd_getval(cmdctx
->cmdmap
, "who", whostr
);
1313 vector
<string
> pvec
;
1314 get_str_vec(prefix
, pvec
);
1317 if (whostr
== "*" || whostr
== "all" || whostr
== "any") {
1318 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1319 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
1320 if (osdmap
.is_up(i
)) {
1325 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
1327 ss
<< "invalid osd '" << whostr
<< "'";
1328 cmdctx
->reply(-EINVAL
, ss
);
1331 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1332 if (osdmap
.is_up(osd
)) {
1337 ss
<< "osd." << osd
<< " is not up";
1338 cmdctx
->reply(-EAGAIN
, ss
);
1342 set
<int> sent_osds
, failed_osds
;
1343 for (auto osd
: osds
) {
1346 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1347 epoch
= osdmap
.get_epoch();
1348 auto p
= pgmap
.pg_by_osd
.find(osd
);
1349 if (p
!= pgmap
.pg_by_osd
.end()) {
1350 for (auto pgid
: p
->second
) {
1353 osdmap
.get_primary_shard(pgid
, &primary
, &spg
);
1354 if (primary
== osd
) {
1355 spgs
.push_back(spg
);
1360 auto p
= osd_cons
.find(osd
);
1361 if (p
== osd_cons
.end()) {
1362 failed_osds
.insert(osd
);
1364 sent_osds
.insert(osd
);
1365 for (auto& con
: p
->second
) {
1366 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1369 pvec
.back() == "repair",
1370 pvec
.back() == "deep-scrub"));
1374 if (failed_osds
.size() == osds
.size()) {
1375 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
1376 << " (not connected)";
1379 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
1380 if (!failed_osds
.empty()) {
1381 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
1385 cmdctx
->reply(0, ss
);
1387 } else if (prefix
== "osd pool scrub" ||
1388 prefix
== "osd pool deep-scrub" ||
1389 prefix
== "osd pool repair") {
1390 vector
<string
> pool_names
;
1391 cmd_getval(cmdctx
->cmdmap
, "who", pool_names
);
1392 if (pool_names
.empty()) {
1393 ss
<< "must specify one or more pool names";
1394 cmdctx
->reply(-EINVAL
, ss
);
1398 map
<int32_t, vector
<pg_t
>> pgs_by_primary
; // legacy
1399 map
<int32_t, vector
<spg_t
>> spgs_by_primary
;
1400 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1401 epoch
= osdmap
.get_epoch();
1402 for (auto& pool_name
: pool_names
) {
1403 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1405 ss
<< "unrecognized pool '" << pool_name
<< "'";
1409 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1410 for (int i
= 0; i
< pool_pg_num
; i
++) {
1411 pg_t
pg(i
, pool_id
);
1414 auto got
= osdmap
.get_primary_shard(pg
, &primary
, &spg
);
1417 pgs_by_primary
[primary
].push_back(pg
);
1418 spgs_by_primary
[primary
].push_back(spg
);
1423 cmdctx
->reply(r
, ss
);
1426 for (auto& it
: spgs_by_primary
) {
1427 auto primary
= it
.first
;
1428 auto p
= osd_cons
.find(primary
);
1429 if (p
== osd_cons
.end()) {
1430 ss
<< "osd." << primary
<< " is not currently connected";
1431 cmdctx
->reply(-EAGAIN
, ss
);
1434 for (auto& con
: p
->second
) {
1435 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1438 prefix
== "osd pool repair",
1439 prefix
== "osd pool deep-scrub"));
1442 cmdctx
->reply(0, "");
1444 } else if (prefix
== "osd reweight-by-pg" ||
1445 prefix
== "osd reweight-by-utilization" ||
1446 prefix
== "osd test-reweight-by-pg" ||
1447 prefix
== "osd test-reweight-by-utilization") {
1449 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
1451 prefix
== "osd test-reweight-by-pg" ||
1452 prefix
== "osd test-reweight-by-utilization";
1453 int64_t oload
= cmd_getval_or
<int64_t>(cmdctx
->cmdmap
, "oload", 120);
1455 vector
<string
> poolnames
;
1456 cmd_getval(cmdctx
->cmdmap
, "pools", poolnames
);
1457 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1458 for (const auto& poolname
: poolnames
) {
1459 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
1461 ss
<< "pool '" << poolname
<< "' does not exist";
1468 cmdctx
->reply(r
, ss
);
1472 double max_change
= g_conf().get_val
<double>("mon_reweight_max_change");
1473 cmd_getval(cmdctx
->cmdmap
, "max_change", max_change
);
1474 if (max_change
<= 0.0) {
1475 ss
<< "max_change " << max_change
<< " must be positive";
1476 cmdctx
->reply(-EINVAL
, ss
);
1479 int64_t max_osds
= g_conf().get_val
<int64_t>("mon_reweight_max_osds");
1480 cmd_getval(cmdctx
->cmdmap
, "max_osds", max_osds
);
1481 if (max_osds
<= 0) {
1482 ss
<< "max_osds " << max_osds
<< " must be positive";
1483 cmdctx
->reply(-EINVAL
, ss
);
1486 bool no_increasing
= false;
1487 cmd_getval_compat_cephbool(cmdctx
->cmdmap
, "no_increasing", no_increasing
);
1489 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
1490 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
&osdmap
, const PGMap
& pgmap
) {
1491 return reweight::by_utilization(osdmap
, pgmap
,
1496 pools
.empty() ? NULL
: &pools
,
1499 &ss
, &out_str
, f
.get());
1502 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
1505 f
->flush(cmdctx
->odata
);
1507 cmdctx
->odata
.append(out_str
);
1510 ss
<< "FAILED reweight-by-pg";
1511 cmdctx
->reply(r
, ss
);
1513 } else if (r
== 0 || dry_run
) {
1515 cmdctx
->reply(r
, ss
);
1518 json_spirit::Object json_object
;
1519 for (const auto& osd_weight
: new_weights
) {
1520 json_spirit::Config::add(json_object
,
1521 std::to_string(osd_weight
.first
),
1522 std::to_string(osd_weight
.second
));
1524 string s
= json_spirit::write(json_object
);
1525 std::replace(begin(s
), end(s
), '\"', '\'');
1528 "\"prefix\": \"osd reweightn\", "
1529 "\"weights\": \"" + s
+ "\""
1531 auto on_finish
= new ReplyOnFinish(cmdctx
);
1532 monc
->start_mon_command({cmd
}, {},
1533 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
1536 } else if (prefix
== "osd df") {
1537 string method
, filter
;
1538 cmd_getval(cmdctx
->cmdmap
, "output_method", method
);
1539 cmd_getval(cmdctx
->cmdmap
, "filter", filter
);
1541 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1542 // sanity check filter(s)
1543 if (!filter
.empty() &&
1544 osdmap
.lookup_pg_pool_name(filter
) < 0 &&
1545 !osdmap
.crush
->class_exists(filter
) &&
1546 !osdmap
.crush
->name_exists(filter
)) {
1547 rs
<< "'" << filter
<< "' not a pool, crush node or device class name";
1550 print_osd_utilization(osdmap
, pgmap
, ss
,
1551 f
.get(), method
== "tree", filter
);
1552 cmdctx
->odata
.append(ss
);
1555 cmdctx
->reply(r
, rs
);
1557 } else if (prefix
== "osd pool stats") {
1559 cmd_getval(cmdctx
->cmdmap
, "pool_name", pool_name
);
1560 int64_t poolid
= -ENOENT
;
1561 bool one_pool
= false;
1562 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1563 if (!pool_name
.empty()) {
1564 poolid
= osdmap
.lookup_pg_pool_name(pool_name
);
1566 ceph_assert(poolid
== -ENOENT
);
1567 ss
<< "unrecognized pool '" << pool_name
<< "'";
1574 f
->open_array_section("pool_stats");
1576 if (osdmap
.get_pools().empty()) {
1577 ss
<< "there are no pools!";
1581 for (auto &p
: osdmap
.get_pools()) {
1585 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, f
.get(), &rs
);
1593 f
->flush(cmdctx
->odata
);
1595 cmdctx
->odata
.append(rs
.str());
1599 if (r
!= -EOPNOTSUPP
) {
1600 cmdctx
->reply(r
, ss
);
1603 } else if (prefix
== "osd safe-to-destroy" ||
1604 prefix
== "osd destroy" ||
1605 prefix
== "osd purge") {
1608 if (prefix
== "osd safe-to-destroy") {
1610 cmd_getval(cmdctx
->cmdmap
, "ids", ids
);
1611 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1612 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1614 if (!r
&& osds
.empty()) {
1615 ss
<< "must specify one or more OSDs";
1620 if (!cmd_getval(cmdctx
->cmdmap
, "id", id
)) {
1622 ss
<< "must specify OSD id";
1628 cmdctx
->reply(r
, ss
);
1631 set
<int> active_osds
, missing_stats
, stored_pgs
, safe_to_destroy
;
1632 int affected_pgs
= 0;
1633 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1634 if (pg_map
.num_pg_unknown
> 0) {
1635 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; cannot draw"
1636 << " any conclusions";
1640 int num_active_clean
= 0;
1641 for (auto& p
: pg_map
.num_pg_by_state
) {
1642 unsigned want
= PG_STATE_ACTIVE
|PG_STATE_CLEAN
;
1643 if ((p
.first
& want
) == want
) {
1644 num_active_clean
+= p
.second
;
1647 for (auto osd
: osds
) {
1648 if (!osdmap
.exists(osd
)) {
1649 safe_to_destroy
.insert(osd
);
1650 continue; // clearly safe to destroy
1652 auto q
= pg_map
.num_pg_by_osd
.find(osd
);
1653 if (q
!= pg_map
.num_pg_by_osd
.end()) {
1654 if (q
->second
.acting
> 0 || q
->second
.up_not_acting
> 0) {
1655 active_osds
.insert(osd
);
1656 // XXX: For overlapping PGs, this counts them again
1657 affected_pgs
+= q
->second
.acting
+ q
->second
.up_not_acting
;
1661 if (num_active_clean
< pg_map
.num_pg
) {
1662 // all pgs aren't active+clean; we need to be careful.
1663 auto p
= pg_map
.osd_stat
.find(osd
);
1664 if (p
== pg_map
.osd_stat
.end() || !osdmap
.is_up(osd
)) {
1665 missing_stats
.insert(osd
);
1667 } else if (p
->second
.num_pgs
> 0) {
1668 stored_pgs
.insert(osd
);
1672 safe_to_destroy
.insert(osd
);
1675 if (r
&& prefix
== "osd safe-to-destroy") {
1676 cmdctx
->reply(r
, ss
); // regardless of formatter
1679 if (!r
&& (!active_osds
.empty() ||
1680 !missing_stats
.empty() || !stored_pgs
.empty())) {
1681 if (!safe_to_destroy
.empty()) {
1682 ss
<< "OSD(s) " << safe_to_destroy
1683 << " are safe to destroy without reducing data durability. ";
1685 if (!active_osds
.empty()) {
1686 ss
<< "OSD(s) " << active_osds
<< " have " << affected_pgs
1687 << " pgs currently mapped to them. ";
1689 if (!missing_stats
.empty()) {
1690 ss
<< "OSD(s) " << missing_stats
<< " have no reported stats, and not all"
1691 << " PGs are active+clean; we cannot draw any conclusions. ";
1693 if (!stored_pgs
.empty()) {
1694 ss
<< "OSD(s) " << stored_pgs
<< " last reported they still store some PG"
1695 << " data, and not all PGs are active+clean; we cannot be sure they"
1696 << " aren't still needed.";
1698 if (!active_osds
.empty() || !stored_pgs
.empty()) {
1705 if (prefix
== "osd safe-to-destroy") {
1707 ss
<< "OSD(s) " << osds
<< " are safe to destroy without reducing data"
1711 f
->open_object_section("osd_status");
1712 f
->open_array_section("safe_to_destroy");
1713 for (auto i
: safe_to_destroy
)
1714 f
->dump_int("osd", i
);
1716 f
->open_array_section("active");
1717 for (auto i
: active_osds
)
1718 f
->dump_int("osd", i
);
1720 f
->open_array_section("missing_stats");
1721 for (auto i
: missing_stats
)
1722 f
->dump_int("osd", i
);
1724 f
->open_array_section("stored_pgs");
1725 for (auto i
: stored_pgs
)
1726 f
->dump_int("osd", i
);
1728 f
->close_section(); // osd_status
1729 f
->flush(cmdctx
->odata
);
1731 std::stringstream().swap(ss
);
1733 cmdctx
->reply(r
, ss
);
1739 cmd_getval(cmdctx
->cmdmap
, "force", force
);
1742 cmd_getval(cmdctx
->cmdmap
, "yes_i_really_mean_it", force
);
1745 ss
<< "\nYou can proceed by passing --force, but be warned that"
1746 " this will likely mean real, permanent data loss.";
1752 cmdctx
->reply(r
, ss
);
1757 "\"prefix\": \"" + prefix
+ "-actual\", "
1758 "\"id\": " + stringify(osds
) + ", "
1759 "\"yes_i_really_mean_it\": true"
1761 auto on_finish
= new ReplyOnFinish(cmdctx
);
1762 monc
->start_mon_command({cmd
}, {}, nullptr, &on_finish
->outs
, on_finish
);
1764 } else if (prefix
== "osd ok-to-stop") {
1766 cmd_getval(cmdctx
->cmdmap
, "ids", ids
);
1769 cmd_getval(cmdctx
->cmdmap
, "max", max
);
1771 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1772 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1774 if (!r
&& osds
.empty()) {
1775 ss
<< "must specify one or more OSDs";
1778 if (max
< (int)osds
.size()) {
1782 cmdctx
->reply(r
, ss
);
1785 offline_pg_report out_report
;
1786 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1787 _maximize_ok_to_stop_set(
1788 osds
, max
, osdmap
, pg_map
,
1792 f
.reset(Formatter::create("json"));
1794 f
->dump_object("ok_to_stop", out_report
);
1795 f
->flush(cmdctx
->odata
);
1796 cmdctx
->odata
.append("\n");
1797 if (!out_report
.unknown
.empty()) {
1798 ss
<< out_report
.unknown
.size() << " pgs have unknown state; "
1799 << "cannot draw any conclusions";
1800 cmdctx
->reply(-EAGAIN
, ss
);
1802 if (!out_report
.ok_to_stop()) {
1803 ss
<< "unsafe to stop osd(s) at this time (" << out_report
.not_ok
.size() << " PGs are or would become offline)";
1804 cmdctx
->reply(-EBUSY
, ss
);
1806 cmdctx
->reply(0, ss
);
1809 } else if (prefix
== "pg force-recovery" ||
1810 prefix
== "pg force-backfill" ||
1811 prefix
== "pg cancel-force-recovery" ||
1812 prefix
== "pg cancel-force-backfill" ||
1813 prefix
== "osd pool force-recovery" ||
1814 prefix
== "osd pool force-backfill" ||
1815 prefix
== "osd pool cancel-force-recovery" ||
1816 prefix
== "osd pool cancel-force-backfill") {
1818 get_str_vec(prefix
, vs
);
1819 auto& granularity
= vs
.front();
1820 auto& forceop
= vs
.back();
1823 // figure out actual op just once
1825 if (forceop
== "force-recovery") {
1826 actual_op
= OFR_RECOVERY
;
1827 } else if (forceop
== "force-backfill") {
1828 actual_op
= OFR_BACKFILL
;
1829 } else if (forceop
== "cancel-force-backfill") {
1830 actual_op
= OFR_BACKFILL
| OFR_CANCEL
;
1831 } else if (forceop
== "cancel-force-recovery") {
1832 actual_op
= OFR_RECOVERY
| OFR_CANCEL
;
1835 set
<pg_t
> candidates
; // deduped
1836 if (granularity
== "pg") {
1837 // covnert pg names to pgs, discard any invalid ones while at it
1838 vector
<string
> pgids
;
1839 cmd_getval(cmdctx
->cmdmap
, "pgid", pgids
);
1840 for (auto& i
: pgids
) {
1842 if (!pgid
.parse(i
.c_str())) {
1843 ss
<< "invlaid pgid '" << i
<< "'; ";
1847 candidates
.insert(pgid
);
1851 vector
<string
> pool_names
;
1852 cmd_getval(cmdctx
->cmdmap
, "who", pool_names
);
1853 if (pool_names
.empty()) {
1854 ss
<< "must specify one or more pool names";
1855 cmdctx
->reply(-EINVAL
, ss
);
1858 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1859 for (auto& pool_name
: pool_names
) {
1860 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1862 ss
<< "unrecognized pool '" << pool_name
<< "'";
1866 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1867 for (int i
= 0; i
< pool_pg_num
; i
++)
1868 candidates
.insert({(unsigned int)i
, (uint64_t)pool_id
});
1872 cmdctx
->reply(r
, ss
);
1877 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1878 for (auto& i
: candidates
) {
1879 auto it
= pg_map
.pg_stat
.find(i
);
1880 if (it
== pg_map
.pg_stat
.end()) {
1881 ss
<< "pg " << i
<< " does not exist; ";
1885 auto state
= it
->second
.state
;
1886 // discard pgs for which user requests are pointless
1887 switch (actual_op
) {
1889 if ((state
& (PG_STATE_DEGRADED
|
1890 PG_STATE_RECOVERY_WAIT
|
1891 PG_STATE_RECOVERING
)) == 0) {
1892 // don't return error, user script may be racing with cluster.
1894 ss
<< "pg " << i
<< " doesn't require recovery; ";
1896 } else if (state
& PG_STATE_FORCED_RECOVERY
) {
1897 ss
<< "pg " << i
<< " recovery already forced; ";
1898 // return error, as it may be a bug in user script
1904 if ((state
& (PG_STATE_DEGRADED
|
1905 PG_STATE_BACKFILL_WAIT
|
1906 PG_STATE_BACKFILLING
)) == 0) {
1907 ss
<< "pg " << i
<< " doesn't require backfilling; ";
1909 } else if (state
& PG_STATE_FORCED_BACKFILL
) {
1910 ss
<< "pg " << i
<< " backfill already forced; ";
1915 case OFR_BACKFILL
| OFR_CANCEL
:
1916 if ((state
& PG_STATE_FORCED_BACKFILL
) == 0) {
1917 ss
<< "pg " << i
<< " backfill not forced; ";
1921 case OFR_RECOVERY
| OFR_CANCEL
:
1922 if ((state
& PG_STATE_FORCED_RECOVERY
) == 0) {
1923 ss
<< "pg " << i
<< " recovery not forced; ";
1928 ceph_abort_msg("actual_op value is not supported");
1934 // respond with error only when no pgs are correct
1935 // yes, in case of mixed errors, only the last one will be emitted,
1936 // but the message presented will be fine
1937 if (pgs
.size() != 0) {
1938 // clear error to not confuse users/scripts
1942 // optimize the command -> messages conversion, use only one
1943 // message per distinct OSD
1944 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1945 // group pgs to process by osd
1946 map
<int, vector
<spg_t
>> osdpgs
;
1947 for (auto& pgid
: pgs
) {
1950 if (osdmap
.get_primary_shard(pgid
, &primary
, &spg
)) {
1951 osdpgs
[primary
].push_back(spg
);
1954 for (auto& i
: osdpgs
) {
1955 if (osdmap
.is_up(i
.first
)) {
1956 auto p
= osd_cons
.find(i
.first
);
1957 if (p
== osd_cons
.end()) {
1958 ss
<< "osd." << i
.first
<< " is not currently connected";
1962 for (auto& con
: p
->second
) {
1964 new MOSDForceRecovery(monc
->get_fsid(), i
.second
, actual_op
));
1966 ss
<< "instructing pg(s) " << i
.second
<< " on osd." << i
.first
1967 << " to " << forceop
<< "; ";
1972 cmdctx
->reply(r
, ss
);
1974 } else if (prefix
== "config show" ||
1975 prefix
== "config show-with-defaults") {
1977 cmd_getval(cmdctx
->cmdmap
, "who", who
);
1978 auto [key
, valid
] = DaemonKey::parse(who
);
1980 ss
<< "invalid daemon name: use <type>.<id>";
1981 cmdctx
->reply(-EINVAL
, ss
);
1984 DaemonStatePtr daemon
= daemon_state
.get(key
);
1986 ss
<< "no config state for daemon " << who
;
1987 cmdctx
->reply(-ENOENT
, ss
);
1991 std::lock_guard
l(daemon
->lock
);
1995 if (cmd_getval(cmdctx
->cmdmap
, "key", name
)) {
1996 // handle special options
1997 if (name
== "fsid") {
1998 cmdctx
->odata
.append(stringify(monc
->get_fsid()) + "\n");
1999 cmdctx
->reply(r
, ss
);
2002 auto p
= daemon
->config
.find(name
);
2003 if (p
!= daemon
->config
.end() &&
2004 !p
->second
.empty()) {
2005 cmdctx
->odata
.append(p
->second
.rbegin()->second
+ "\n");
2007 auto& defaults
= daemon
->_get_config_defaults();
2008 auto q
= defaults
.find(name
);
2009 if (q
!= defaults
.end()) {
2010 cmdctx
->odata
.append(q
->second
+ "\n");
2015 } else if (daemon
->config_defaults_bl
.length() > 0) {
2018 f
->open_array_section("config");
2020 tbl
.define_column("NAME", TextTable::LEFT
, TextTable::LEFT
);
2021 tbl
.define_column("VALUE", TextTable::LEFT
, TextTable::LEFT
);
2022 tbl
.define_column("SOURCE", TextTable::LEFT
, TextTable::LEFT
);
2023 tbl
.define_column("OVERRIDES", TextTable::LEFT
, TextTable::LEFT
);
2024 tbl
.define_column("IGNORES", TextTable::LEFT
, TextTable::LEFT
);
2026 if (prefix
== "config show") {
2028 for (auto& i
: daemon
->config
) {
2029 dout(20) << " " << i
.first
<< " -> " << i
.second
<< dendl
;
2030 if (i
.second
.empty()) {
2034 f
->open_object_section("value");
2035 f
->dump_string("name", i
.first
);
2036 f
->dump_string("value", i
.second
.rbegin()->second
);
2037 f
->dump_string("source", ceph_conf_level_name(
2038 i
.second
.rbegin()->first
));
2039 if (i
.second
.size() > 1) {
2040 f
->open_array_section("overrides");
2041 auto j
= i
.second
.rend();
2042 for (--j
; j
!= i
.second
.rbegin(); --j
) {
2043 f
->open_object_section("value");
2044 f
->dump_string("source", ceph_conf_level_name(j
->first
));
2045 f
->dump_string("value", j
->second
);
2050 if (daemon
->ignored_mon_config
.count(i
.first
)) {
2051 f
->dump_string("ignores", "mon");
2056 tbl
<< i
.second
.rbegin()->second
;
2057 tbl
<< ceph_conf_level_name(i
.second
.rbegin()->first
);
2058 if (i
.second
.size() > 1) {
2060 auto j
= i
.second
.rend();
2061 for (--j
; j
!= i
.second
.rbegin(); --j
) {
2062 if (j
->second
== i
.second
.rbegin()->second
) {
2063 ov
.push_front(string("(") + ceph_conf_level_name(j
->first
) +
2064 string("[") + j
->second
+ string("]") +
2067 ov
.push_front(ceph_conf_level_name(j
->first
) +
2068 string("[") + j
->second
+ string("]"));
2076 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
2077 tbl
<< TextTable::endrow
;
2081 // show-with-defaults
2082 auto& defaults
= daemon
->_get_config_defaults();
2083 for (auto& i
: defaults
) {
2085 f
->open_object_section("value");
2086 f
->dump_string("name", i
.first
);
2090 auto j
= daemon
->config
.find(i
.first
);
2091 if (j
!= daemon
->config
.end() && !j
->second
.empty()) {
2094 f
->dump_string("value", j
->second
.rbegin()->second
);
2095 f
->dump_string("source", ceph_conf_level_name(
2096 j
->second
.rbegin()->first
));
2097 if (j
->second
.size() > 1) {
2098 f
->open_array_section("overrides");
2099 auto k
= j
->second
.rend();
2100 for (--k
; k
!= j
->second
.rbegin(); --k
) {
2101 f
->open_object_section("value");
2102 f
->dump_string("source", ceph_conf_level_name(k
->first
));
2103 f
->dump_string("value", k
->second
);
2108 if (daemon
->ignored_mon_config
.count(i
.first
)) {
2109 f
->dump_string("ignores", "mon");
2113 tbl
<< j
->second
.rbegin()->second
;
2114 tbl
<< ceph_conf_level_name(j
->second
.rbegin()->first
);
2115 if (j
->second
.size() > 1) {
2117 auto k
= j
->second
.rend();
2118 for (--k
; k
!= j
->second
.rbegin(); --k
) {
2119 if (k
->second
== j
->second
.rbegin()->second
) {
2120 ov
.push_front(string("(") + ceph_conf_level_name(k
->first
) +
2121 string("[") + k
->second
+ string("]") +
2124 ov
.push_front(ceph_conf_level_name(k
->first
) +
2125 string("[") + k
->second
+ string("]"));
2132 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
2133 tbl
<< TextTable::endrow
;
2136 // only have default
2138 f
->dump_string("value", i
.second
);
2139 f
->dump_string("source", ceph_conf_level_name(CONF_DEFAULT
));
2143 tbl
<< ceph_conf_level_name(CONF_DEFAULT
);
2146 tbl
<< TextTable::endrow
;
2153 f
->flush(cmdctx
->odata
);
2155 cmdctx
->odata
.append(stringify(tbl
));
2158 cmdctx
->reply(r
, ss
);
2160 } else if (prefix
== "device ls") {
2164 f
->open_array_section("devices");
2165 daemon_state
.with_devices([&f
](const DeviceState
& dev
) {
2166 f
->dump_object("device", dev
);
2169 f
->flush(cmdctx
->odata
);
2171 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2172 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
2173 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2174 tbl
.define_column("WEAR", TextTable::RIGHT
, TextTable::RIGHT
);
2175 tbl
.define_column("LIFE EXPECTANCY", TextTable::LEFT
, TextTable::LEFT
);
2176 auto now
= ceph_clock_now();
2177 daemon_state
.with_devices([&tbl
, now
](const DeviceState
& dev
) {
2179 for (auto& i
: dev
.attachments
) {
2183 h
+= std::get
<0>(i
) + ":" + std::get
<1>(i
);
2186 for (auto& i
: dev
.daemons
) {
2192 char wear_level_str
[16] = {0};
2193 if (dev
.wear_level
>= 0) {
2194 snprintf(wear_level_str
, sizeof(wear_level_str
)-1, "%d%%",
2195 (int)(100.1 * dev
.wear_level
));
2201 << dev
.get_life_expectancy_str(now
)
2202 << TextTable::endrow
;
2204 cmdctx
->odata
.append(stringify(tbl
));
2206 cmdctx
->reply(0, ss
);
2208 } else if (prefix
== "device ls-by-daemon") {
2210 cmd_getval(cmdctx
->cmdmap
, "who", who
);
2211 if (auto [k
, valid
] = DaemonKey::parse(who
); !valid
) {
2212 ss
<< who
<< " is not a valid daemon name";
2215 auto dm
= daemon_state
.get(k
);
2218 f
->open_array_section("devices");
2219 for (auto& i
: dm
->devices
) {
2220 daemon_state
.with_device(i
.first
, [&f
] (const DeviceState
& dev
) {
2221 f
->dump_object("device", dev
);
2225 f
->flush(cmdctx
->odata
);
2228 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2229 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
2230 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
,
2232 auto now
= ceph_clock_now();
2233 for (auto& i
: dm
->devices
) {
2234 daemon_state
.with_device(
2235 i
.first
, [&tbl
, now
] (const DeviceState
& dev
) {
2237 for (auto& i
: dev
.attachments
) {
2241 h
+= std::get
<0>(i
) + ":" + std::get
<1>(i
);
2245 << dev
.get_life_expectancy_str(now
)
2246 << TextTable::endrow
;
2249 cmdctx
->odata
.append(stringify(tbl
));
2253 ss
<< "daemon " << who
<< " not found";
2255 cmdctx
->reply(r
, ss
);
2257 } else if (prefix
== "device ls-by-host") {
2259 cmd_getval(cmdctx
->cmdmap
, "host", host
);
2261 daemon_state
.list_devids_by_server(host
, &devids
);
2263 f
->open_array_section("devices");
2264 for (auto& devid
: devids
) {
2265 daemon_state
.with_device(
2266 devid
, [&f
] (const DeviceState
& dev
) {
2267 f
->dump_object("device", dev
);
2271 f
->flush(cmdctx
->odata
);
2274 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2275 tbl
.define_column("DEV", TextTable::LEFT
, TextTable::LEFT
);
2276 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2277 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
, TextTable::LEFT
);
2278 auto now
= ceph_clock_now();
2279 for (auto& devid
: devids
) {
2280 daemon_state
.with_device(
2281 devid
, [&tbl
, &host
, now
] (const DeviceState
& dev
) {
2283 for (auto& j
: dev
.attachments
) {
2284 if (std::get
<0>(j
) == host
) {
2288 n
+= std::get
<1>(j
);
2292 for (auto& i
: dev
.daemons
) {
2301 << dev
.get_life_expectancy_str(now
)
2302 << TextTable::endrow
;
2305 cmdctx
->odata
.append(stringify(tbl
));
2307 cmdctx
->reply(0, ss
);
2309 } else if (prefix
== "device info") {
2311 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2314 if (!daemon_state
.with_device(devid
,
2315 [&f
, &rs
] (const DeviceState
& dev
) {
2317 f
->dump_object("device", dev
);
2322 ss
<< "device " << devid
<< " not found";
2326 f
->flush(cmdctx
->odata
);
2328 cmdctx
->odata
.append(rs
.str());
2331 cmdctx
->reply(r
, ss
);
2333 } else if (prefix
== "device set-life-expectancy") {
2335 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2336 string from_str
, to_str
;
2337 cmd_getval(cmdctx
->cmdmap
, "from", from_str
);
2338 cmd_getval(cmdctx
->cmdmap
, "to", to_str
);
2340 if (!from
.parse(from_str
)) {
2341 ss
<< "unable to parse datetime '" << from_str
<< "'";
2343 cmdctx
->reply(r
, ss
);
2344 } else if (to_str
.size() && !to
.parse(to_str
)) {
2345 ss
<< "unable to parse datetime '" << to_str
<< "'";
2347 cmdctx
->reply(r
, ss
);
2349 map
<string
,string
> meta
;
2350 daemon_state
.with_device_create(
2352 [from
, to
, &meta
] (DeviceState
& dev
) {
2353 dev
.set_life_expectancy(from
, to
, ceph_clock_now());
2354 meta
= dev
.metadata
;
2356 json_spirit::Object json_object
;
2357 for (auto& i
: meta
) {
2358 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2361 json
.append(json_spirit::write(json_object
));
2364 "\"prefix\": \"config-key set\", "
2365 "\"key\": \"device/" + devid
+ "\""
2367 auto on_finish
= new ReplyOnFinish(cmdctx
);
2368 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2371 } else if (prefix
== "device rm-life-expectancy") {
2373 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2374 map
<string
,string
> meta
;
2375 if (daemon_state
.with_device_write(devid
, [&meta
] (DeviceState
& dev
) {
2376 dev
.rm_life_expectancy();
2377 meta
= dev
.metadata
;
2384 "\"prefix\": \"config-key rm\", "
2385 "\"key\": \"device/" + devid
+ "\""
2388 json_spirit::Object json_object
;
2389 for (auto& i
: meta
) {
2390 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2392 json
.append(json_spirit::write(json_object
));
2395 "\"prefix\": \"config-key set\", "
2396 "\"key\": \"device/" + devid
+ "\""
2399 auto on_finish
= new ReplyOnFinish(cmdctx
);
2400 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2402 cmdctx
->reply(0, ss
);
2407 ss
<< "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2410 f
->open_object_section("pg_info");
2411 f
->dump_bool("pg_ready", pgmap_ready
);
2414 // fall back to feeding command to PGMap
2415 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2416 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
2417 f
.get(), &ss
, &cmdctx
->odata
);
2423 if (r
!= -EOPNOTSUPP
) {
2425 f
->flush(cmdctx
->odata
);
2427 cmdctx
->reply(r
, ss
);
2432 // Was the command unfound?
2433 if (py_command
.cmdstring
.empty()) {
2434 ss
<< "No handler found for '" << prefix
<< "'";
2435 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
2436 cmdctx
->reply(-EINVAL
, ss
);
2440 // Validate that the module is active
2441 auto& mod_name
= py_command
.module_name
;
2442 if (!py_modules
.is_module_active(mod_name
)) {
2443 ss
<< "Module '" << mod_name
<< "' is not enabled/loaded (required by "
2444 "command '" << prefix
<< "'): use `ceph mgr module enable "
2445 << mod_name
<< "` to enable it";
2446 dout(4) << ss
.str() << dendl
;
2447 cmdctx
->reply(-EOPNOTSUPP
, ss
);
2451 dout(10) << "passing through command '" << prefix
<< "' size " << cmdctx
->cmdmap
.size() << dendl
;
2452 Finisher
& mod_finisher
= py_modules
.get_active_module_finisher(mod_name
);
2453 mod_finisher
.queue(new LambdaContext([this, cmdctx
, session
, py_command
, prefix
]
2455 std::stringstream ss
;
2457 dout(10) << "dispatching command '" << prefix
<< "' size " << cmdctx
->cmdmap
.size() << dendl
;
2459 // Validate that the module is enabled
2460 auto& py_handler_name
= py_command
.module_name
;
2461 PyModuleRef module
= py_modules
.get_module(py_handler_name
);
2462 ceph_assert(module
);
2463 if (!module
->is_enabled()) {
2464 ss
<< "Module '" << py_handler_name
<< "' is not enabled (required by "
2465 "command '" << prefix
<< "'): use `ceph mgr module enable "
2466 << py_handler_name
<< "` to enable it";
2467 dout(4) << ss
.str() << dendl
;
2468 cmdctx
->reply(-EOPNOTSUPP
, ss
);
2472 // Hack: allow the self-test method to run on unhealthy modules.
2473 // Fix this in future by creating a special path for self test rather
2474 // than having the hook be a normal module command.
2475 std::string self_test_prefix
= py_handler_name
+ " " + "self-test";
2477 // Validate that the module is healthy
2478 bool accept_command
;
2479 if (module
->is_loaded()) {
2480 if (module
->get_can_run() && !module
->is_failed()) {
2482 accept_command
= true;
2483 } else if (self_test_prefix
== prefix
) {
2484 // Unhealthy, but allow because it's a self test command
2485 accept_command
= true;
2487 accept_command
= false;
2488 ss
<< "Module '" << py_handler_name
<< "' has experienced an error and "
2489 "cannot handle commands: " << module
->get_error_string();
2492 // Module not loaded
2493 accept_command
= false;
2494 ss
<< "Module '" << py_handler_name
<< "' failed to load and "
2495 "cannot handle commands: " << module
->get_error_string();
2498 if (!accept_command
) {
2499 dout(4) << ss
.str() << dendl
;
2500 cmdctx
->reply(-EIO
, ss
);
2504 std::stringstream ds
;
2505 bufferlist inbl
= cmdctx
->data
;
2506 int r
= py_modules
.handle_command(py_command
, *session
, cmdctx
->cmdmap
,
2509 log_access_denied(cmdctx
, session
, ss
);
2512 cmdctx
->odata
.append(ds
);
2513 cmdctx
->reply(r
, ss
);
2514 dout(10) << " command returned " << r
<< dendl
;
2519 void DaemonServer::_prune_pending_service_map()
2521 utime_t cutoff
= ceph_clock_now();
2522 cutoff
-= g_conf().get_val
<double>("mgr_service_beacon_grace");
2523 auto p
= pending_service_map
.services
.begin();
2524 while (p
!= pending_service_map
.services
.end()) {
2525 auto q
= p
->second
.daemons
.begin();
2526 while (q
!= p
->second
.daemons
.end()) {
2527 DaemonKey key
{p
->first
, q
->first
};
2528 if (!daemon_state
.exists(key
)) {
2529 if (ServiceMap::is_normal_ceph_entity(p
->first
)) {
2530 dout(10) << "daemon " << key
<< " in service map but not in daemon state "
2531 << "index -- force pruning" << dendl
;
2532 q
= p
->second
.daemons
.erase(q
);
2533 pending_service_map_dirty
= pending_service_map
.epoch
;
2535 derr
<< "missing key " << key
<< dendl
;
2542 auto daemon
= daemon_state
.get(key
);
2543 std::lock_guard
l(daemon
->lock
);
2544 if (daemon
->last_service_beacon
== utime_t()) {
2545 // we must have just restarted; assume they are alive now.
2546 daemon
->last_service_beacon
= ceph_clock_now();
2550 if (daemon
->last_service_beacon
< cutoff
) {
2551 dout(10) << "pruning stale " << p
->first
<< "." << q
->first
2552 << " last_beacon " << daemon
->last_service_beacon
<< dendl
;
2553 q
= p
->second
.daemons
.erase(q
);
2554 pending_service_map_dirty
= pending_service_map
.epoch
;
2559 if (p
->second
.daemons
.empty()) {
2560 p
= pending_service_map
.services
.erase(p
);
2561 pending_service_map_dirty
= pending_service_map
.epoch
;
2568 void DaemonServer::send_report()
2571 if (ceph_clock_now() - started_at
> g_conf().get_val
<int64_t>("mgr_stats_period") * 4.0) {
2573 reported_osds
.clear();
2574 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2575 << "potentially incomplete PG state to mon" << dendl
;
2577 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2583 auto m
= ceph::make_message
<MMonMgrReport
>();
2584 m
->gid
= monc
->get_global_id();
2585 py_modules
.get_health_checks(&m
->health_checks
);
2586 py_modules
.get_progress_events(&m
->progress_events
);
2588 cluster_state
.with_mutable_pgmap([&](PGMap
& pg_map
) {
2589 cluster_state
.update_delta_stats();
2591 if (pending_service_map
.epoch
) {
2592 _prune_pending_service_map();
2593 if (pending_service_map_dirty
>= pending_service_map
.epoch
) {
2594 pending_service_map
.modified
= ceph_clock_now();
2595 encode(pending_service_map
, m
->service_map_bl
, CEPH_FEATURES_ALL
);
2596 dout(10) << "sending service_map e" << pending_service_map
.epoch
2598 pending_service_map
.epoch
++;
2602 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
2603 // FIXME: no easy way to get mon features here. this will do for
2604 // now, though, as long as we don't make a backward-incompat change.
2605 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
2606 dout(10) << pg_map
<< dendl
;
2608 pg_map
.get_health_checks(g_ceph_context
, osdmap
,
2611 dout(10) << m
->health_checks
.checks
.size() << " health checks"
2613 dout(20) << "health checks:\n";
2614 JSONFormatter
jf(true);
2615 jf
.dump_object("health_checks", m
->health_checks
);
2618 if (osdmap
.require_osd_release
>= ceph_release_t::luminous
) {
2619 clog
->debug() << "pgmap v" << pg_map
.version
<< ": " << pg_map
;
2624 map
<daemon_metric
, unique_ptr
<DaemonHealthMetricCollector
>> accumulated
;
2625 for (auto service
: {"osd", "mon"} ) {
2626 auto daemons
= daemon_state
.get_by_service(service
);
2627 for (const auto& [key
,state
] : daemons
) {
2628 std::lock_guard l
{state
->lock
};
2629 for (const auto& metric
: state
->daemon_health_metrics
) {
2630 auto acc
= accumulated
.find(metric
.get_type());
2631 if (acc
== accumulated
.end()) {
2632 auto collector
= DaemonHealthMetricCollector::create(metric
.get_type());
2634 derr
<< __func__
<< " " << key
2635 << " sent me an unknown health metric: "
2636 << std::hex
<< static_cast<uint8_t>(metric
.get_type())
2637 << std::dec
<< dendl
;
2640 tie(acc
, std::ignore
) = accumulated
.emplace(metric
.get_type(),
2641 std::move(collector
));
2643 acc
->second
->update(key
, metric
);
2647 for (const auto& acc
: accumulated
) {
2648 acc
.second
->summarize(m
->health_checks
);
2650 // TODO? We currently do not notify the PyModules
2651 // TODO: respect needs_send, so we send the report only if we are asked to do
2652 // so, or the state is updated.
2653 monc
->send_mon_message(std::move(m
));
2656 void DaemonServer::adjust_pgs()
2659 unsigned max
= std::max
<int64_t>(1, g_conf()->mon_osd_max_creating_pgs
);
2660 double max_misplaced
= g_conf().get_val
<double>("target_max_misplaced_ratio");
2661 bool aggro
= g_conf().get_val
<bool>("mgr_debug_aggressive_pg_num_changes");
2663 map
<string
,unsigned> pg_num_to_set
;
2664 map
<string
,unsigned> pgp_num_to_set
;
2665 set
<pg_t
> upmaps_to_clear
;
2666 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2667 unsigned creating_or_unknown
= 0;
2668 for (auto& i
: pg_map
.num_pg_by_state
) {
2669 if ((i
.first
& (PG_STATE_CREATING
)) ||
2671 creating_or_unknown
+= i
.second
;
2674 unsigned left
= max
;
2675 if (creating_or_unknown
>= max
) {
2678 left
-= creating_or_unknown
;
2679 dout(10) << "creating_or_unknown " << creating_or_unknown
2680 << " max_creating " << max
2684 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2685 // can run more frequently than we get updated pg stats from OSDs. We
2686 // may make multiple adjustments with stale informaiton.
2687 double misplaced_ratio
, degraded_ratio
;
2688 double inactive_pgs_ratio
, unknown_pgs_ratio
;
2689 pg_map
.get_recovery_stats(&misplaced_ratio
, °raded_ratio
,
2690 &inactive_pgs_ratio
, &unknown_pgs_ratio
);
2691 dout(20) << "misplaced_ratio " << misplaced_ratio
2692 << " degraded_ratio " << degraded_ratio
2693 << " inactive_pgs_ratio " << inactive_pgs_ratio
2694 << " unknown_pgs_ratio " << unknown_pgs_ratio
2695 << "; target_max_misplaced_ratio " << max_misplaced
2698 for (auto& i
: osdmap
.get_pools()) {
2699 const pg_pool_t
& p
= i
.second
;
2702 if (p
.get_pg_num_target() != p
.get_pg_num()) {
2703 dout(20) << "pool " << i
.first
2704 << " pg_num " << p
.get_pg_num()
2705 << " target " << p
.get_pg_num_target()
2707 if (p
.has_flag(pg_pool_t::FLAG_CREATING
)) {
2708 dout(10) << "pool " << i
.first
2709 << " pg_num_target " << p
.get_pg_num_target()
2710 << " pg_num " << p
.get_pg_num()
2711 << " - still creating initial pgs"
2713 } else if (p
.get_pg_num_target() < p
.get_pg_num()) {
2714 // pg_num decrease (merge)
2715 pg_t
merge_source(p
.get_pg_num() - 1, i
.first
);
2716 pg_t merge_target
= merge_source
.get_parent();
2719 if (p
.get_pg_num() != p
.get_pg_num_pending()) {
2720 dout(10) << "pool " << i
.first
2721 << " pg_num_target " << p
.get_pg_num_target()
2722 << " pg_num " << p
.get_pg_num()
2723 << " - decrease and pg_num_pending != pg_num, waiting"
2726 } else if (p
.get_pg_num() == p
.get_pgp_num()) {
2727 dout(10) << "pool " << i
.first
2728 << " pg_num_target " << p
.get_pg_num_target()
2729 << " pg_num " << p
.get_pg_num()
2730 << " - decrease blocked by pgp_num "
2735 vector
<int32_t> source_acting
;
2736 for (auto &merge_participant
: {merge_source
, merge_target
}) {
2737 bool is_merge_source
= merge_participant
== merge_source
;
2738 if (osdmap
.have_pg_upmaps(merge_participant
)) {
2739 dout(10) << "pool " << i
.first
2740 << " pg_num_target " << p
.get_pg_num_target()
2741 << " pg_num " << p
.get_pg_num()
2742 << (is_merge_source
? " - merge source " : " - merge target ")
2743 << merge_participant
2744 << " has upmap" << dendl
;
2745 upmaps_to_clear
.insert(merge_participant
);
2748 auto q
= pg_map
.pg_stat
.find(merge_participant
);
2749 if (q
== pg_map
.pg_stat
.end()) {
2750 dout(10) << "pool " << i
.first
2751 << " pg_num_target " << p
.get_pg_num_target()
2752 << " pg_num " << p
.get_pg_num()
2753 << " - no state for " << merge_participant
2754 << (is_merge_source
? " (merge source)" : " (merge target)")
2757 } else if ((q
->second
.state
& (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) !=
2758 (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) {
2759 dout(10) << "pool " << i
.first
2760 << " pg_num_target " << p
.get_pg_num_target()
2761 << " pg_num " << p
.get_pg_num()
2762 << (is_merge_source
? " - merge source " : " - merge target ")
2763 << merge_participant
2764 << " not clean (" << pg_state_string(q
->second
.state
)
2768 if (is_merge_source
) {
2769 source_acting
= q
->second
.acting
;
2770 } else if (ok
&& q
->second
.acting
!= source_acting
) {
2771 dout(10) << "pool " << i
.first
2772 << " pg_num_target " << p
.get_pg_num_target()
2773 << " pg_num " << p
.get_pg_num()
2774 << (is_merge_source
? " - merge source " : " - merge target ")
2775 << merge_participant
2776 << " acting does not match (source " << source_acting
2777 << " != target " << q
->second
.acting
2784 unsigned target
= p
.get_pg_num() - 1;
2785 dout(10) << "pool " << i
.first
2786 << " pg_num_target " << p
.get_pg_num_target()
2787 << " pg_num " << p
.get_pg_num()
2789 << " (merging " << merge_source
2790 << " and " << merge_target
2792 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2795 } else if (p
.get_pg_num_target() > p
.get_pg_num()) {
2796 // pg_num increase (split)
2798 auto q
= pg_map
.num_pg_by_pool_state
.find(i
.first
);
2799 if (q
!= pg_map
.num_pg_by_pool_state
.end()) {
2800 for (auto& j
: q
->second
) {
2801 if ((j
.first
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)) == 0) {
2802 dout(20) << "pool " << i
.first
<< " has " << j
.second
2803 << " pgs in " << pg_state_string(j
.first
)
2812 unsigned pg_gap
= p
.get_pg_num() - p
.get_pgp_num();
2813 unsigned max_jump
= cct
->_conf
->mgr_max_pg_num_change
;
2815 dout(10) << "pool " << i
.first
2816 << " pg_num_target " << p
.get_pg_num_target()
2817 << " pg_num " << p
.get_pg_num()
2818 << " - not all pgs active"
2820 } else if (pg_gap
>= max_jump
) {
2821 dout(10) << "pool " << i
.first
2822 << " pg_num " << p
.get_pg_num()
2823 << " - pgp_num " << p
.get_pgp_num()
2824 << " gap >= max_pg_num_change " << max_jump
2825 << " - must scale pgp_num first"
2828 unsigned add
= std::min(
2829 std::min(left
, max_jump
- pg_gap
),
2830 p
.get_pg_num_target() - p
.get_pg_num());
2831 unsigned target
= p
.get_pg_num() + add
;
2833 dout(10) << "pool " << i
.first
2834 << " pg_num_target " << p
.get_pg_num_target()
2835 << " pg_num " << p
.get_pg_num()
2836 << " -> " << target
<< dendl
;
2837 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2843 unsigned target
= std::min(p
.get_pg_num_pending(),
2844 p
.get_pgp_num_target());
2845 if (target
!= p
.get_pgp_num()) {
2846 dout(20) << "pool " << i
.first
2847 << " pgp_num_target " << p
.get_pgp_num_target()
2848 << " pgp_num " << p
.get_pgp_num()
2849 << " -> " << target
<< dendl
;
2850 if (target
> p
.get_pgp_num() &&
2851 p
.get_pgp_num() == p
.get_pg_num()) {
2852 dout(10) << "pool " << i
.first
2853 << " pgp_num_target " << p
.get_pgp_num_target()
2854 << " pgp_num " << p
.get_pgp_num()
2855 << " - increase blocked by pg_num " << p
.get_pg_num()
2857 } else if (!aggro
&& (inactive_pgs_ratio
> 0 ||
2858 degraded_ratio
> 0 ||
2859 unknown_pgs_ratio
> 0)) {
2860 dout(10) << "pool " << i
.first
2861 << " pgp_num_target " << p
.get_pgp_num_target()
2862 << " pgp_num " << p
.get_pgp_num()
2863 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2864 << " update" << dendl
;
2865 } else if (!aggro
&& (misplaced_ratio
> max_misplaced
)) {
2866 dout(10) << "pool " << i
.first
2867 << " pgp_num_target " << p
.get_pgp_num_target()
2868 << " pgp_num " << p
.get_pgp_num()
2869 << " - misplaced_ratio " << misplaced_ratio
2870 << " > max " << max_misplaced
2871 << ", deferring pgp_num update" << dendl
;
2873 // NOTE: this calculation assumes objects are
2874 // basically uniformly distributed across all PGs
2875 // (regardless of pool), which is probably not
2876 // perfectly correct, but it's a start. make no
2877 // single adjustment that's more than half of the
2878 // max_misplaced, to somewhat limit the magnitude of
2879 // our potential error here.
2881 static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP
= 1;
2882 pool_stat_t s
= pg_map
.get_pg_pool_sum_stat(i
.first
);
2884 // pool is (virtually) empty; just jump to final pgp_num?
2885 (p
.get_pgp_num_target() > p
.get_pgp_num() &&
2886 s
.stats
.sum
.num_objects
<= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP
*
2887 p
.get_pgp_num_target()))) {
2891 std::min
<double>(max_misplaced
- misplaced_ratio
,
2892 max_misplaced
/ 2.0);
2893 unsigned estmax
= std::max
<unsigned>(
2894 (double)p
.get_pg_num() * room
, 1u);
2895 unsigned next_min
= 0;
2896 if (p
.get_pgp_num() > estmax
) {
2897 next_min
= p
.get_pgp_num() - estmax
;
2899 next
= std::clamp(target
,
2901 p
.get_pgp_num() + estmax
);
2902 dout(20) << " room " << room
<< " estmax " << estmax
2903 << " delta " << (target
-p
.get_pgp_num())
2904 << " next " << next
<< dendl
;
2905 if (p
.get_pgp_num_target() == p
.get_pg_num_target() &&
2906 p
.get_pgp_num_target() < p
.get_pg_num()) {
2907 // since pgp_num is tracking pg_num, ceph is handling
2908 // pgp_num. so, be responsible: don't let pgp_num get
2909 // too far out ahead of merges (if we are merging).
2910 // this avoids moving lots of unmerged pgs onto a
2911 // small number of OSDs where we might blow out the
2913 unsigned max_outpace_merges
=
2914 std::max
<unsigned>(8, p
.get_pg_num() * max_misplaced
);
2915 if (next
+ max_outpace_merges
< p
.get_pg_num()) {
2916 next
= p
.get_pg_num() - max_outpace_merges
;
2917 dout(10) << " using next " << next
2918 << " to avoid outpacing merges (max_outpace_merges "
2919 << max_outpace_merges
<< ")" << dendl
;
2923 if (next
!= p
.get_pgp_num()) {
2924 dout(10) << "pool " << i
.first
2925 << " pgp_num_target " << p
.get_pgp_num_target()
2926 << " pgp_num " << p
.get_pgp_num()
2927 << " -> " << next
<< dendl
;
2928 pgp_num_to_set
[osdmap
.get_pool_name(i
.first
)] = next
;
2937 for (auto i
: pg_num_to_set
) {
2940 "\"prefix\": \"osd pool set\", "
2941 "\"pool\": \"" + i
.first
+ "\", "
2942 "\"var\": \"pg_num_actual\", "
2943 "\"val\": \"" + stringify(i
.second
) + "\""
2945 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2947 for (auto i
: pgp_num_to_set
) {
2950 "\"prefix\": \"osd pool set\", "
2951 "\"pool\": \"" + i
.first
+ "\", "
2952 "\"var\": \"pgp_num_actual\", "
2953 "\"val\": \"" + stringify(i
.second
) + "\""
2955 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2957 for (auto pg
: upmaps_to_clear
) {
2960 "\"prefix\": \"osd rm-pg-upmap\", "
2961 "\"pgid\": \"" + stringify(pg
) + "\""
2963 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2966 "\"prefix\": \"osd rm-pg-upmap-items\", "
2967 "\"pgid\": \"" + stringify(pg
) + "\"" +
2969 monc
->start_mon_command({cmd2
}, {}, nullptr, nullptr, nullptr);
2973 void DaemonServer::got_service_map()
2975 std::lock_guard
l(lock
);
2977 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
2978 if (pending_service_map
.epoch
== 0) {
2979 // we just started up
2980 dout(10) << "got initial map e" << service_map
.epoch
<< dendl
;
2981 ceph_assert(pending_service_map_dirty
== 0);
2982 pending_service_map
= service_map
;
2983 pending_service_map
.epoch
= service_map
.epoch
+ 1;
2984 } else if (pending_service_map
.epoch
<= service_map
.epoch
) {
2985 // we just started up but got one more not our own map
2986 dout(10) << "got newer initial map e" << service_map
.epoch
<< dendl
;
2987 ceph_assert(pending_service_map_dirty
== 0);
2988 pending_service_map
= service_map
;
2989 pending_service_map
.epoch
= service_map
.epoch
+ 1;
2991 // we already active and therefore must have persisted it,
2992 // which means ours is the same or newer.
2993 dout(10) << "got updated map e" << service_map
.epoch
<< dendl
;
2997 // cull missing daemons, populate new ones
2998 std::set
<std::string
> types
;
2999 for (auto& [type
, service
] : pending_service_map
.services
) {
3000 if (ServiceMap::is_normal_ceph_entity(type
)) {
3006 std::set
<std::string
> names
;
3007 for (auto& q
: service
.daemons
) {
3008 names
.insert(q
.first
);
3009 DaemonKey key
{type
, q
.first
};
3010 if (!daemon_state
.exists(key
)) {
3011 auto daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
3013 daemon
->set_metadata(q
.second
.metadata
);
3014 daemon
->service_daemon
= true;
3015 daemon_state
.insert(daemon
);
3016 dout(10) << "added missing " << key
<< dendl
;
3019 daemon_state
.cull(type
, names
);
3021 daemon_state
.cull_services(types
);
3024 void DaemonServer::got_mgr_map()
3026 std::lock_guard
l(lock
);
3027 set
<std::string
> have
;
3028 cluster_state
.with_mgrmap([&](const MgrMap
& mgrmap
) {
3029 auto md_update
= [&] (DaemonKey key
) {
3030 std::ostringstream oss
;
3031 auto c
= new MetadataUpdate(daemon_state
, key
);
3032 // FIXME remove post-nautilus: include 'id' for luminous mons
3033 oss
<< "{\"prefix\": \"mgr metadata\", \"who\": \""
3034 << key
.name
<< "\", \"id\": \"" << key
.name
<< "\"}";
3035 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
3037 if (mgrmap
.active_name
.size()) {
3038 DaemonKey key
{"mgr", mgrmap
.active_name
};
3039 have
.insert(mgrmap
.active_name
);
3040 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
3042 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
3045 for (auto& i
: mgrmap
.standbys
) {
3046 DaemonKey key
{"mgr", i
.second
.name
};
3047 have
.insert(i
.second
.name
);
3048 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
3050 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
3054 daemon_state
.cull("mgr", have
);
3057 const char** DaemonServer::get_tracked_conf_keys() const
3059 static const char *KEYS
[] = {
3060 "mgr_stats_threshold",
3068 void DaemonServer::handle_conf_change(const ConfigProxy
& conf
,
3069 const std::set
<std::string
> &changed
)
3072 if (changed
.count("mgr_stats_threshold") || changed
.count("mgr_stats_period")) {
3073 dout(4) << "Updating stats threshold/period on "
3074 << daemon_connections
.size() << " clients" << dendl
;
3075 // Send a fresh MMgrConfigure to all clients, so that they can follow
3076 // the new policy for transmitting stats
3077 finisher
.queue(new LambdaContext([this](int r
) {
3078 std::lock_guard
l(lock
);
3079 for (auto &c
: daemon_connections
) {
3086 void DaemonServer::_send_configure(ConnectionRef c
)
3088 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
3090 auto configure
= make_message
<MMgrConfigure
>();
3091 configure
->stats_period
= g_conf().get_val
<int64_t>("mgr_stats_period");
3092 configure
->stats_threshold
= g_conf().get_val
<int64_t>("mgr_stats_threshold");
3094 if (c
->peer_is_osd()) {
3095 configure
->osd_perf_metric_queries
=
3096 osd_perf_metric_collector
.get_queries();
3097 } else if (c
->peer_is_mds()) {
3098 configure
->metric_config_message
=
3099 MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector
.get_queries()));
3102 c
->send_message2(configure
);
3105 MetricQueryID
DaemonServer::add_osd_perf_query(
3106 const OSDPerfMetricQuery
&query
,
3107 const std::optional
<OSDPerfMetricLimit
> &limit
)
3109 return osd_perf_metric_collector
.add_query(query
, limit
);
3112 int DaemonServer::remove_osd_perf_query(MetricQueryID query_id
)
3114 return osd_perf_metric_collector
.remove_query(query_id
);
3117 int DaemonServer::get_osd_perf_counters(OSDPerfCollector
*collector
)
3119 return osd_perf_metric_collector
.get_counters(collector
);
3122 MetricQueryID
DaemonServer::add_mds_perf_query(
3123 const MDSPerfMetricQuery
&query
,
3124 const std::optional
<MDSPerfMetricLimit
> &limit
)
3126 return mds_perf_metric_collector
.add_query(query
, limit
);
3129 int DaemonServer::remove_mds_perf_query(MetricQueryID query_id
)
3131 return mds_perf_metric_collector
.remove_query(query_id
);
3134 void DaemonServer::reregister_mds_perf_queries()
3136 mds_perf_metric_collector
.reregister_queries();
3139 int DaemonServer::get_mds_perf_counters(MDSPerfCollector
*collector
)
3141 return mds_perf_metric_collector
.get_counters(collector
);