1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
15 #include <boost/algorithm/string.hpp>
18 #include "include/stringify.h"
19 #include "include/str_list.h"
20 #include "auth/RotatingKeyRing.h"
21 #include "json_spirit/json_spirit_writer.h"
23 #include "mgr/mgr_commands.h"
24 #include "mgr/DaemonHealthMetricCollector.h"
25 #include "mgr/OSDPerfMetricCollector.h"
26 #include "mgr/MDSPerfMetricCollector.h"
27 #include "mon/MonCommand.h"
29 #include "messages/MMgrOpen.h"
30 #include "messages/MMgrUpdate.h"
31 #include "messages/MMgrClose.h"
32 #include "messages/MMgrConfigure.h"
33 #include "messages/MMonMgrReport.h"
34 #include "messages/MCommand.h"
35 #include "messages/MCommandReply.h"
36 #include "messages/MMgrCommand.h"
37 #include "messages/MMgrCommandReply.h"
38 #include "messages/MPGStats.h"
39 #include "messages/MOSDScrub2.h"
40 #include "messages/MOSDForceRecovery.h"
41 #include "common/errno.h"
42 #include "common/pick_address.h"
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_mgr
47 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
49 using namespace TOPNSPC::common
;
52 using std::ostringstream
;
54 using std::stringstream
;
56 using std::unique_ptr
;
59 template <typename Map
>
60 bool map_compare(Map
const &lhs
, Map
const &rhs
) {
61 return lhs
.size() == rhs
.size()
62 && std::equal(lhs
.begin(), lhs
.end(), rhs
.begin(),
63 [] (auto a
, auto b
) { return a
.first
== b
.first
&& a
.second
== b
.second
; });
67 DaemonServer::DaemonServer(MonClient
*monc_
,
69 DaemonStateIndex
&daemon_state_
,
70 ClusterState
&cluster_state_
,
71 PyModuleRegistry
&py_modules_
,
73 LogChannelRef audit_clog_
)
74 : Dispatcher(g_ceph_context
),
75 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
76 g_conf().get_val
<Option::size_t>("mgr_client_bytes"))),
77 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
78 g_conf().get_val
<uint64_t>("mgr_client_messages"))),
79 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
80 g_conf().get_val
<Option::size_t>("mgr_osd_bytes"))),
81 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
82 g_conf().get_val
<uint64_t>("mgr_osd_messages"))),
83 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
84 g_conf().get_val
<Option::size_t>("mgr_mds_bytes"))),
85 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
86 g_conf().get_val
<uint64_t>("mgr_mds_messages"))),
87 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
88 g_conf().get_val
<Option::size_t>("mgr_mon_bytes"))),
89 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
90 g_conf().get_val
<uint64_t>("mgr_mon_messages"))),
94 daemon_state(daemon_state_
),
95 cluster_state(cluster_state_
),
96 py_modules(py_modules_
),
98 audit_clog(audit_clog_
),
100 timer(g_ceph_context
, lock
),
101 shutting_down(false),
103 osd_perf_metric_collector_listener(this),
104 osd_perf_metric_collector(osd_perf_metric_collector_listener
),
105 mds_perf_metric_collector_listener(this),
106 mds_perf_metric_collector(mds_perf_metric_collector_listener
)
108 g_conf().add_observer(this);
111 DaemonServer::~DaemonServer() {
113 g_conf().remove_observer(this);
116 int DaemonServer::init(uint64_t gid
, entity_addrvec_t client_addrs
)
118 // Initialize Messenger
119 std::string public_msgr_type
= g_conf()->ms_public_type
.empty() ?
120 g_conf().get_val
<std::string
>("ms_type") : g_conf()->ms_public_type
;
121 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
122 entity_name_t::MGR(gid
),
124 Messenger::get_pid_nonce());
125 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
127 msgr
->set_auth_client(monc
);
130 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
131 client_byte_throttler
.get(),
132 client_msg_throttler
.get());
135 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
136 osd_byte_throttler
.get(),
137 osd_msg_throttler
.get());
138 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
139 mds_byte_throttler
.get(),
140 mds_msg_throttler
.get());
141 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
142 mon_byte_throttler
.get(),
143 mon_msg_throttler
.get());
145 entity_addrvec_t addrs
;
146 int r
= pick_addresses(cct
, CEPH_PICK_ADDRESS_PUBLIC
, &addrs
);
150 dout(20) << __func__
<< " will bind to " << addrs
<< dendl
;
151 r
= msgr
->bindv(addrs
);
153 derr
<< "unable to bind mgr to " << addrs
<< dendl
;
157 msgr
->set_myname(entity_name_t::MGR(gid
));
158 msgr
->set_addr_unknowns(client_addrs
);
161 msgr
->add_dispatcher_tail(this);
163 msgr
->set_auth_server(monc
);
164 monc
->set_handle_authentication_dispatcher(this);
166 started_at
= ceph_clock_now();
168 std::lock_guard
l(lock
);
171 schedule_tick_locked(
172 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
177 entity_addrvec_t
DaemonServer::get_myaddrs() const
179 return msgr
->get_myaddrs();
182 int DaemonServer::ms_handle_authentication(Connection
*con
)
184 auto s
= ceph::make_ref
<MgrSession
>(cct
);
186 s
->inst
.addr
= con
->get_peer_addr();
187 s
->entity_name
= con
->peer_name
;
188 dout(10) << __func__
<< " new session " << s
<< " con " << con
189 << " entity " << con
->peer_name
190 << " addr " << con
->get_peer_addrs()
193 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
194 if (caps_info
.allow_all
) {
195 dout(10) << " session " << s
<< " " << s
->entity_name
196 << " allow_all" << dendl
;
197 s
->caps
.set_allow_all();
198 } else if (caps_info
.caps
.length() > 0) {
199 auto p
= caps_info
.caps
.cbegin();
204 catch (buffer::error
& e
) {
205 dout(10) << " session " << s
<< " " << s
->entity_name
206 << " failed to decode caps" << dendl
;
209 if (!s
->caps
.parse(str
)) {
210 dout(10) << " session " << s
<< " " << s
->entity_name
211 << " failed to parse caps '" << str
<< "'" << dendl
;
214 dout(10) << " session " << s
<< " " << s
->entity_name
215 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
218 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
219 std::lock_guard
l(lock
);
220 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
221 dout(10) << "registering osd." << s
->osd_id
<< " session "
222 << s
<< " con " << con
<< dendl
;
223 osd_cons
[s
->osd_id
].insert(con
);
229 bool DaemonServer::ms_handle_reset(Connection
*con
)
231 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
232 auto priv
= con
->get_priv();
233 auto session
= static_cast<MgrSession
*>(priv
.get());
237 std::lock_guard
l(lock
);
238 dout(10) << "unregistering osd." << session
->osd_id
239 << " session " << session
<< " con " << con
<< dendl
;
240 osd_cons
[session
->osd_id
].erase(con
);
242 auto iter
= daemon_connections
.find(con
);
243 if (iter
!= daemon_connections
.end()) {
244 daemon_connections
.erase(iter
);
250 bool DaemonServer::ms_handle_refused(Connection
*con
)
252 // do nothing for now
256 bool DaemonServer::ms_dispatch2(const ref_t
<Message
>& m
)
258 // Note that we do *not* take ::lock here, in order to avoid
259 // serializing all message handling. It's up to each handler
260 // to take whatever locks it needs.
261 switch (m
->get_type()) {
263 cluster_state
.ingest_pgstats(ref_cast
<MPGStats
>(m
));
264 maybe_ready(m
->get_source().num());
267 return handle_report(ref_cast
<MMgrReport
>(m
));
269 return handle_open(ref_cast
<MMgrOpen
>(m
));
271 return handle_update(ref_cast
<MMgrUpdate
>(m
));
273 return handle_close(ref_cast
<MMgrClose
>(m
));
275 return handle_command(ref_cast
<MCommand
>(m
));
276 case MSG_MGR_COMMAND
:
277 return handle_command(ref_cast
<MMgrCommand
>(m
));
279 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
284 void DaemonServer::dump_pg_ready(ceph::Formatter
*f
)
286 f
->dump_bool("pg_ready", pgmap_ready
.load());
289 void DaemonServer::maybe_ready(int32_t osd_id
)
291 if (pgmap_ready
.load()) {
292 // Fast path: we don't need to take lock because pgmap_ready
295 std::lock_guard
l(lock
);
297 if (reported_osds
.find(osd_id
) == reported_osds
.end()) {
298 dout(4) << "initial report from osd " << osd_id
<< dendl
;
299 reported_osds
.insert(osd_id
);
300 std::set
<int32_t> up_osds
;
302 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
303 osdmap
.get_up_osds(up_osds
);
306 std::set
<int32_t> unreported_osds
;
307 std::set_difference(up_osds
.begin(), up_osds
.end(),
308 reported_osds
.begin(), reported_osds
.end(),
309 std::inserter(unreported_osds
, unreported_osds
.begin()));
311 if (unreported_osds
.size() == 0) {
312 dout(4) << "all osds have reported, sending PG state to mon" << dendl
;
314 reported_osds
.clear();
315 // Avoid waiting for next tick
318 dout(4) << "still waiting for " << unreported_osds
.size() << " osds"
319 " to report in before PGMap is ready" << dendl
;
325 void DaemonServer::tick()
331 schedule_tick_locked(
332 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
335 // Currently modules do not set health checks in response to events delivered to
336 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
337 // if this pattern emerges in the future, this scheduler could be modified to
338 // fire after all modules have had a chance to set their health checks.
339 void DaemonServer::schedule_tick_locked(double delay_sec
)
341 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
344 timer
.cancel_event(tick_event
);
345 tick_event
= nullptr;
348 // on shutdown start rejecting explicit requests to send reports that may
349 // originate from python land which may still be running.
353 tick_event
= timer
.add_event_after(delay_sec
,
354 new LambdaContext([this](int r
) {
359 void DaemonServer::schedule_tick(double delay_sec
)
361 std::lock_guard
l(lock
);
362 schedule_tick_locked(delay_sec
);
365 void DaemonServer::handle_osd_perf_metric_query_updated()
369 // Send a fresh MMgrConfigure to all clients, so that they can follow
370 // the new policy for transmitting stats
371 finisher
.queue(new LambdaContext([this](int r
) {
372 std::lock_guard
l(lock
);
373 for (auto &c
: daemon_connections
) {
374 if (c
->peer_is_osd()) {
381 void DaemonServer::handle_mds_perf_metric_query_updated()
385 // Send a fresh MMgrConfigure to all clients, so that they can follow
386 // the new policy for transmitting stats
387 finisher
.queue(new LambdaContext([this](int r
) {
388 std::lock_guard
l(lock
);
389 for (auto &c
: daemon_connections
) {
390 if (c
->peer_is_mds()) {
397 void DaemonServer::shutdown()
399 dout(10) << "begin" << dendl
;
402 cluster_state
.shutdown();
403 dout(10) << "done" << dendl
;
405 std::lock_guard
l(lock
);
406 shutting_down
= true;
410 static DaemonKey
key_from_service(
411 const std::string
& service_name
,
413 const std::string
& daemon_name
)
415 if (!service_name
.empty()) {
416 return DaemonKey
{service_name
, daemon_name
};
418 return DaemonKey
{ceph_entity_type_name(peer_type
), daemon_name
};
422 void DaemonServer::fetch_missing_metadata(const DaemonKey
& key
,
423 const entity_addr_t
& addr
)
425 if (!daemon_state
.is_updating(key
) &&
426 (key
.type
== "osd" || key
.type
== "mds" || key
.type
== "mon")) {
427 std::ostringstream oss
;
428 auto c
= new MetadataUpdate(daemon_state
, key
);
429 if (key
.type
== "osd") {
430 oss
<< "{\"prefix\": \"osd metadata\", \"id\": "
432 } else if (key
.type
== "mds") {
433 c
->set_default("addr", stringify(addr
));
434 oss
<< "{\"prefix\": \"mds metadata\", \"who\": \""
435 << key
.name
<< "\"}";
436 } else if (key
.type
== "mon") {
437 oss
<< "{\"prefix\": \"mon metadata\", \"id\": \""
438 << key
.name
<< "\"}";
442 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
446 bool DaemonServer::handle_open(const ref_t
<MMgrOpen
>& m
)
448 std::unique_lock
l(lock
);
450 DaemonKey key
= key_from_service(m
->service_name
,
451 m
->get_connection()->get_peer_type(),
454 auto con
= m
->get_connection();
455 dout(10) << "from " << key
<< " " << con
->get_peer_addr() << dendl
;
457 _send_configure(con
);
459 DaemonStatePtr daemon
;
460 if (daemon_state
.exists(key
)) {
461 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
462 daemon
= daemon_state
.get(key
);
465 if (m
->service_daemon
) {
466 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
467 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
469 daemon
->service_daemon
= true;
470 daemon_state
.insert(daemon
);
472 /* A normal Ceph daemon has connected but we are or should be waiting on
473 * metadata for it. Close the session so that it tries to reconnect.
475 dout(2) << "ignoring open from " << key
<< " " << con
->get_peer_addr()
476 << "; not ready for session (expect reconnect)" << dendl
;
479 fetch_missing_metadata(key
, m
->get_source_addr());
484 if (m
->service_daemon
) {
485 // update the metadata through the daemon state index to
486 // ensure it's kept up-to-date
487 daemon_state
.update_metadata(daemon
, m
->daemon_metadata
);
490 std::lock_guard
l(daemon
->lock
);
491 daemon
->perf_counters
.clear();
493 daemon
->service_daemon
= m
->service_daemon
;
494 if (m
->service_daemon
) {
495 daemon
->service_status
= m
->daemon_status
;
497 utime_t now
= ceph_clock_now();
498 auto [d
, added
] = pending_service_map
.get_daemon(m
->service_name
,
500 if (added
|| d
->gid
!= (uint64_t)m
->get_source().num()) {
501 dout(10) << "registering " << key
<< " in pending_service_map" << dendl
;
502 d
->gid
= m
->get_source().num();
503 d
->addr
= m
->get_source_addr();
504 d
->start_epoch
= pending_service_map
.epoch
;
505 d
->start_stamp
= now
;
506 d
->metadata
= m
->daemon_metadata
;
507 pending_service_map_dirty
= pending_service_map
.epoch
;
511 auto p
= m
->config_bl
.cbegin();
512 if (p
!= m
->config_bl
.end()) {
513 decode(daemon
->config
, p
);
514 decode(daemon
->ignored_mon_config
, p
);
515 dout(20) << " got config " << daemon
->config
516 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
518 daemon
->config_defaults_bl
= m
->config_defaults_bl
;
519 daemon
->config_defaults
.clear();
520 dout(20) << " got config_defaults_bl " << daemon
->config_defaults_bl
.length()
521 << " bytes" << dendl
;
524 if (con
->get_peer_type() != entity_name_t::TYPE_CLIENT
&&
525 m
->service_name
.empty())
527 // Store in set of the daemon/service connections, i.e. those
528 // connections that require an update in the event of stats
529 // configuration changes.
530 daemon_connections
.insert(con
);
536 bool DaemonServer::handle_update(const ref_t
<MMgrUpdate
>& m
)
539 if (!m
->service_name
.empty()) {
540 key
.type
= m
->service_name
;
542 key
.type
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
544 key
.name
= m
->daemon_name
;
546 dout(10) << "from " << m
->get_connection() << " " << key
<< dendl
;
548 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
549 m
->service_name
.empty()) {
550 // Clients should not be sending us update request
551 dout(10) << "rejecting update request from non-daemon client " << m
->daemon_name
553 clog
->warn() << "rejecting report from non-daemon client " << m
->daemon_name
554 << " at " << m
->get_connection()->get_peer_addrs();
555 m
->get_connection()->mark_down();
561 std::unique_lock
locker(lock
);
563 DaemonStatePtr daemon
;
564 // Look up the DaemonState
565 if (daemon_state
.exists(key
)) {
566 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
568 daemon
= daemon_state
.get(key
);
569 if (m
->need_metadata_update
&&
570 !m
->daemon_metadata
.empty()) {
571 daemon_state
.update_metadata(daemon
, m
->daemon_metadata
);
579 bool DaemonServer::handle_close(const ref_t
<MMgrClose
>& m
)
581 std::lock_guard
l(lock
);
583 DaemonKey key
= key_from_service(m
->service_name
,
584 m
->get_connection()->get_peer_type(),
586 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
588 if (daemon_state
.exists(key
)) {
589 DaemonStatePtr daemon
= daemon_state
.get(key
);
590 daemon_state
.rm(key
);
592 std::lock_guard
l(daemon
->lock
);
593 if (daemon
->service_daemon
) {
594 pending_service_map
.rm_daemon(m
->service_name
, m
->daemon_name
);
595 pending_service_map_dirty
= pending_service_map
.epoch
;
600 // send same message back as a reply
601 m
->get_connection()->send_message2(m
);
605 void DaemonServer::update_task_status(
607 const std::map
<std::string
,std::string
>& task_status
)
609 dout(10) << "got task status from " << key
<< dendl
;
611 [[maybe_unused
]] auto [daemon
, added
] =
612 pending_service_map
.get_daemon(key
.type
, key
.name
);
613 if (daemon
->task_status
!= task_status
) {
614 daemon
->task_status
= task_status
;
615 pending_service_map_dirty
= pending_service_map
.epoch
;
619 bool DaemonServer::handle_report(const ref_t
<MMgrReport
>& m
)
622 if (!m
->service_name
.empty()) {
623 key
.type
= m
->service_name
;
625 key
.type
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
627 key
.name
= m
->daemon_name
;
629 dout(10) << "from " << m
->get_connection() << " " << key
<< dendl
;
631 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
632 m
->service_name
.empty()) {
633 // Clients should not be sending us stats unless they are declaring
634 // themselves to be a daemon for some service.
635 dout(10) << "rejecting report from non-daemon client " << m
->daemon_name
637 clog
->warn() << "rejecting report from non-daemon client " << m
->daemon_name
638 << " at " << m
->get_connection()->get_peer_addrs();
639 m
->get_connection()->mark_down();
645 std::unique_lock
locker(lock
);
647 DaemonStatePtr daemon
;
648 // Look up the DaemonState
649 if (daemon
= daemon_state
.get(key
); daemon
!= nullptr) {
650 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
654 // we don't know the hostname at this stage, reject MMgrReport here.
655 dout(5) << "rejecting report from " << key
<< ", since we do not have its metadata now."
657 // issue metadata request in background
658 fetch_missing_metadata(key
, m
->get_source_addr());
663 auto priv
= m
->get_connection()->get_priv();
664 auto session
= static_cast<MgrSession
*>(priv
.get());
668 m
->get_connection()->mark_down();
670 dout(10) << "unregistering osd." << session
->osd_id
671 << " session " << session
<< " con " << m
->get_connection() << dendl
;
673 if (osd_cons
.find(session
->osd_id
) != osd_cons
.end()) {
674 osd_cons
[session
->osd_id
].erase(m
->get_connection());
677 auto iter
= daemon_connections
.find(m
->get_connection());
678 if (iter
!= daemon_connections
.end()) {
679 daemon_connections
.erase(iter
);
685 // Update the DaemonState
686 ceph_assert(daemon
!= nullptr);
688 std::lock_guard
l(daemon
->lock
);
689 auto &daemon_counters
= daemon
->perf_counters
;
690 daemon_counters
.update(*m
.get());
692 auto p
= m
->config_bl
.cbegin();
693 if (p
!= m
->config_bl
.end()) {
694 decode(daemon
->config
, p
);
695 decode(daemon
->ignored_mon_config
, p
);
696 dout(20) << " got config " << daemon
->config
697 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
700 utime_t now
= ceph_clock_now();
701 if (daemon
->service_daemon
) {
702 if (m
->daemon_status
) {
703 daemon
->service_status_stamp
= now
;
704 daemon
->service_status
= *m
->daemon_status
;
706 daemon
->last_service_beacon
= now
;
707 } else if (m
->daemon_status
) {
708 derr
<< "got status from non-daemon " << key
<< dendl
;
710 // update task status
711 if (m
->task_status
) {
712 update_task_status(key
, *m
->task_status
);
713 daemon
->last_service_beacon
= now
;
715 if (m
->get_connection()->peer_is_osd() || m
->get_connection()->peer_is_mon()) {
716 // only OSD and MON send health_checks to me now
717 daemon
->daemon_health_metrics
= std::move(m
->daemon_health_metrics
);
718 dout(10) << "daemon_health_metrics " << daemon
->daemon_health_metrics
724 // if there are any schema updates, notify the python modules
725 /* no users currently
726 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
727 py_modules.notify_all("perf_schema_update", ceph::to_string(key));
731 if (m
->get_connection()->peer_is_osd()) {
732 osd_perf_metric_collector
.process_reports(m
->osd_perf_metric_reports
);
735 if (m
->metric_report_message
) {
736 const MetricReportMessage
&message
= *m
->metric_report_message
;
737 boost::apply_visitor(HandlePayloadVisitor(this), message
.payload
);
744 void DaemonServer::_generate_command_map(
746 map
<string
,string
> ¶m_str_map
)
748 for (auto p
= cmdmap
.begin();
749 p
!= cmdmap
.end(); ++p
) {
750 if (p
->first
== "prefix")
752 if (p
->first
== "caps") {
754 if (cmd_getval(cmdmap
, "caps", cv
) &&
755 cv
.size() % 2 == 0) {
756 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
757 string k
= string("caps_") + cv
[i
];
758 param_str_map
[k
] = cv
[i
+ 1];
763 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
767 const MonCommand
*DaemonServer::_get_mgrcommand(
768 const string
&cmd_prefix
,
769 const std::vector
<MonCommand
> &cmds
)
771 const MonCommand
*this_cmd
= nullptr;
772 for (const auto &cmd
: cmds
) {
773 if (cmd
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
781 bool DaemonServer::_allowed_command(
783 const string
&service
,
784 const string
&module
,
785 const string
&prefix
,
786 const cmdmap_t
& cmdmap
,
787 const map
<string
,string
>& param_str_map
,
788 const MonCommand
*this_cmd
) {
790 if (s
->entity_name
.is_mon()) {
791 // mon is all-powerful. even when it is forwarding commands on behalf of
792 // old clients; we expect the mon is validating commands before proxying!
796 bool cmd_r
= this_cmd
->requires_perm('r');
797 bool cmd_w
= this_cmd
->requires_perm('w');
798 bool cmd_x
= this_cmd
->requires_perm('x');
800 bool capable
= s
->caps
.is_capable(
803 service
, module
, prefix
, param_str_map
,
807 dout(10) << " " << s
->entity_name
<< " "
808 << (capable
? "" : "not ") << "capable" << dendl
;
813 * The working data for processing an MCommand. This lives in
814 * a class to enable passing it into other threads for processing
815 * outside of the thread/locks that called handle_command.
817 class CommandContext
{
819 ceph::ref_t
<MCommand
> m_tell
;
820 ceph::ref_t
<MMgrCommand
> m_mgr
;
821 const std::vector
<std::string
>& cmd
; ///< ref into m_tell or m_mgr
822 const bufferlist
& data
; ///< ref into m_tell or m_mgr
826 explicit CommandContext(ceph::ref_t
<MCommand
> m
)
827 : m_tell
{std::move(m
)},
829 data(m_tell
->get_data()) {
831 explicit CommandContext(ceph::ref_t
<MMgrCommand
> m
)
832 : m_mgr
{std::move(m
)},
834 data(m_mgr
->get_data()) {
837 void reply(int r
, const std::stringstream
&ss
) {
841 void reply(int r
, const std::string
&rs
) {
842 // Let the connection drop as soon as we've sent our response
843 ConnectionRef con
= m_tell
? m_tell
->get_connection()
844 : m_mgr
->get_connection();
846 con
->mark_disposable();
850 dout(20) << "success" << dendl
;
852 derr
<< __func__
<< " " << cpp_strerror(r
) << " " << rs
<< dendl
;
856 MCommandReply
*reply
= new MCommandReply(r
, rs
);
857 reply
->set_tid(m_tell
->get_tid());
858 reply
->set_data(odata
);
859 con
->send_message(reply
);
861 MMgrCommandReply
*reply
= new MMgrCommandReply(r
, rs
);
862 reply
->set_tid(m_mgr
->get_tid());
863 reply
->set_data(odata
);
864 con
->send_message(reply
);
871 * A context for receiving a bufferlist/error string from a background
872 * function and then calling back to a CommandContext when it's done
874 class ReplyOnFinish
: public Context
{
875 std::shared_ptr
<CommandContext
> cmdctx
;
881 explicit ReplyOnFinish(const std::shared_ptr
<CommandContext
> &cmdctx_
)
884 void finish(int r
) override
{
885 cmdctx
->odata
.claim_append(from_mon
);
886 cmdctx
->reply(r
, outs
);
890 bool DaemonServer::handle_command(const ref_t
<MCommand
>& m
)
892 std::lock_guard
l(lock
);
893 auto cmdctx
= std::make_shared
<CommandContext
>(m
);
895 return _handle_command(cmdctx
);
896 } catch (const bad_cmd_get
& e
) {
897 cmdctx
->reply(-EINVAL
, e
.what());
902 bool DaemonServer::handle_command(const ref_t
<MMgrCommand
>& m
)
904 std::lock_guard
l(lock
);
905 auto cmdctx
= std::make_shared
<CommandContext
>(m
);
907 return _handle_command(cmdctx
);
908 } catch (const bad_cmd_get
& e
) {
909 cmdctx
->reply(-EINVAL
, e
.what());
914 void DaemonServer::log_access_denied(
915 std::shared_ptr
<CommandContext
>& cmdctx
,
916 MgrSession
* session
, std::stringstream
& ss
) {
917 dout(1) << " access denied" << dendl
;
918 audit_clog
->info() << "from='" << session
->inst
<< "' "
919 << "entity='" << session
->entity_name
<< "' "
920 << "cmd=" << cmdctx
->cmd
<< ": access denied";
921 ss
<< "access denied: does your client key have mgr caps? "
922 "See http://docs.ceph.com/en/latest/mgr/administrator/"
923 "#client-authentication";
926 void DaemonServer::_check_offlines_pgs(
927 const set
<int>& osds
,
928 const OSDMap
& osdmap
,
930 offline_pg_report
*report
)
933 *report
= offline_pg_report();
936 for (const auto& q
: pgmap
.pg_stat
) {
937 set
<int32_t> pg_acting
; // net acting sets (with no missing if degraded)
939 if (q
.second
.state
== 0) {
940 report
->unknown
.insert(q
.first
);
943 if (q
.second
.state
& PG_STATE_DEGRADED
) {
944 for (auto& anm
: q
.second
.avail_no_missing
) {
945 if (osds
.count(anm
.osd
)) {
949 if (anm
.osd
!= CRUSH_ITEM_NONE
) {
950 pg_acting
.insert(anm
.osd
);
954 for (auto& a
: q
.second
.acting
) {
959 if (a
!= CRUSH_ITEM_NONE
) {
967 const pg_pool_t
*pi
= osdmap
.get_pg_pool(q
.first
.pool());
968 bool dangerous
= false;
970 report
->bad_no_pool
.insert(q
.first
); // pool is creating or deleting
973 if (!(q
.second
.state
& PG_STATE_ACTIVE
)) {
974 report
->bad_already_inactive
.insert(q
.first
);
977 if (pg_acting
.size() < pi
->min_size
) {
978 report
->bad_become_inactive
.insert(q
.first
);
982 report
->not_ok
.insert(q
.first
);
984 report
->ok
.insert(q
.first
);
985 if (q
.second
.state
& PG_STATE_DEGRADED
) {
986 report
->ok_become_more_degraded
.insert(q
.first
);
988 report
->ok_become_degraded
.insert(q
.first
);
992 dout(20) << osds
<< " -> " << report
->ok
.size() << " ok, "
993 << report
->not_ok
.size() << " not ok, "
994 << report
->unknown
.size() << " unknown"
998 void DaemonServer::_maximize_ok_to_stop_set(
999 const set
<int>& orig_osds
,
1001 const OSDMap
& osdmap
,
1003 offline_pg_report
*out_report
)
1005 dout(20) << "orig_osds " << orig_osds
<< " max " << max
<< dendl
;
1006 _check_offlines_pgs(orig_osds
, osdmap
, pgmap
, out_report
);
1007 if (!out_report
->ok_to_stop()) {
1010 if (orig_osds
.size() >= max
) {
1015 // semi-arbitrarily start with the first osd in the set
1016 offline_pg_report report
;
1017 set
<int> osds
= orig_osds
;
1018 int parent
= *osds
.begin();
1022 // identify the next parent
1023 int r
= osdmap
.crush
->get_immediate_parent_id(parent
, &parent
);
1025 return; // just go with what we have so far!
1028 // get candidate additions that are beneath this point in the tree
1030 r
= osdmap
.crush
->get_all_children(parent
, &children
);
1032 return; // just go with what we have so far!
1034 dout(20) << " parent " << parent
<< " children " << children
<< dendl
;
1036 // try adding in more osds
1037 int failed
= 0; // how many children we failed to add to our set
1038 for (auto o
: children
) {
1039 if (o
>= 0 && osdmap
.is_up(o
) && osds
.count(o
) == 0) {
1041 _check_offlines_pgs(osds
, osdmap
, pgmap
, &report
);
1042 if (!report
.ok_to_stop()) {
1047 *out_report
= report
;
1048 if (osds
.size() == max
) {
1049 dout(20) << " hit max" << dendl
;
1050 return; // yay, we hit the max
1056 // we hit some failures; go with what we have
1057 dout(20) << " hit some peer failures" << dendl
;
1063 bool DaemonServer::_handle_command(
1064 std::shared_ptr
<CommandContext
>& cmdctx
)
1067 bool admin_socket_cmd
= false;
1068 if (cmdctx
->m_tell
) {
1070 // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
1072 admin_socket_cmd
= (cmdctx
->m_tell
->fsid
!= uuid_d());
1076 auto priv
= m
->get_connection()->get_priv();
1077 auto session
= static_cast<MgrSession
*>(priv
.get());
1081 if (session
->inst
.name
== entity_name_t()) {
1082 session
->inst
.name
= m
->get_source();
1085 map
<string
,string
> param_str_map
;
1086 std::stringstream ss
;
1089 if (!cmdmap_from_json(cmdctx
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
1090 cmdctx
->reply(-EINVAL
, ss
);
1095 cmd_getval(cmdctx
->cmdmap
, "prefix", prefix
);
1096 dout(10) << "decoded-size=" << cmdctx
->cmdmap
.size() << " prefix=" << prefix
<< dendl
;
1098 boost::scoped_ptr
<Formatter
> f
;
1101 if (boost::algorithm::ends_with(prefix
, "_json")) {
1104 format
= cmd_getval_or
<string
>(cmdctx
->cmdmap
, "format", "plain");
1106 f
.reset(Formatter::create(format
));
1109 // this is just for mgr commands - admin socket commands will fall
1110 // through and use the admin socket version of
1111 // get_command_descriptions
1112 if (prefix
== "get_command_descriptions" && !admin_socket_cmd
) {
1113 dout(10) << "reading commands from python modules" << dendl
;
1114 const auto py_commands
= py_modules
.get_commands();
1118 f
.open_object_section("command_descriptions");
1120 auto dump_cmd
= [&cmdnum
, &f
, m
](const MonCommand
&mc
){
1121 ostringstream secname
;
1122 secname
<< "cmd" << std::setfill('0') << std::setw(3) << cmdnum
;
1123 dump_cmddesc_to_json(&f
, m
->get_connection()->get_features(),
1124 secname
.str(), mc
.cmdstring
, mc
.helpstring
,
1125 mc
.module
, mc
.req_perms
, 0);
1129 for (const auto &pyc
: py_commands
) {
1133 for (const auto &mgr_cmd
: mgr_commands
) {
1137 f
.close_section(); // command_descriptions
1138 f
.flush(cmdctx
->odata
);
1139 cmdctx
->reply(0, ss
);
1144 const MonCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
);
1145 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
1147 bool is_allowed
= false;
1148 ModuleCommand py_command
;
1149 if (admin_socket_cmd
) {
1150 // admin socket commands require all capabilities
1151 is_allowed
= session
->caps
.is_allow_all();
1152 } else if (!mgr_cmd
) {
1153 // Resolve the command to the name of the module that will
1154 // handle it (if the command exists)
1155 auto py_commands
= py_modules
.get_py_commands();
1156 for (const auto &pyc
: py_commands
) {
1157 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
1158 if (pyc_prefix
== prefix
) {
1164 MonCommand pyc
= {"", "", "py", py_command
.perm
};
1165 is_allowed
= _allowed_command(session
, "py", py_command
.module_name
,
1166 prefix
, cmdctx
->cmdmap
, param_str_map
,
1169 // validate user's permissions for requested command
1170 is_allowed
= _allowed_command(session
, mgr_cmd
->module
, "",
1171 prefix
, cmdctx
->cmdmap
, param_str_map
, mgr_cmd
);
1175 log_access_denied(cmdctx
, session
, ss
);
1176 cmdctx
->reply(-EACCES
, ss
);
1181 << "from='" << session
->inst
<< "' "
1182 << "entity='" << session
->entity_name
<< "' "
1183 << "cmd=" << cmdctx
->cmd
<< ": dispatch";
1185 if (admin_socket_cmd
) {
1186 cct
->get_admin_socket()->queue_tell_command(cmdctx
->m_tell
);
1191 // service map commands
1192 if (prefix
== "service dump") {
1194 f
.reset(Formatter::create("json-pretty"));
1195 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
1196 f
->dump_object("service_map", service_map
);
1198 f
->flush(cmdctx
->odata
);
1199 cmdctx
->reply(0, ss
);
1202 if (prefix
== "service status") {
1204 f
.reset(Formatter::create("json-pretty"));
1205 // only include state from services that are in the persisted service map
1206 f
->open_object_section("service_status");
1207 for (auto& [type
, service
] : pending_service_map
.services
) {
1208 if (ServiceMap::is_normal_ceph_entity(type
)) {
1212 f
->open_object_section(type
.c_str());
1213 for (auto& q
: service
.daemons
) {
1214 f
->open_object_section(q
.first
.c_str());
1215 DaemonKey key
{type
, q
.first
};
1216 ceph_assert(daemon_state
.exists(key
));
1217 auto daemon
= daemon_state
.get(key
);
1218 std::lock_guard
l(daemon
->lock
);
1219 f
->dump_stream("status_stamp") << daemon
->service_status_stamp
;
1220 f
->dump_stream("last_beacon") << daemon
->last_service_beacon
;
1221 f
->open_object_section("status");
1222 for (auto& r
: daemon
->service_status
) {
1223 f
->dump_string(r
.first
.c_str(), r
.second
);
1231 f
->flush(cmdctx
->odata
);
1232 cmdctx
->reply(0, ss
);
1236 if (prefix
== "config set") {
1239 cmd_getval(cmdctx
->cmdmap
, "key", key
);
1240 cmd_getval(cmdctx
->cmdmap
, "value", val
);
1241 r
= cct
->_conf
.set_val(key
, val
, &ss
);
1243 cct
->_conf
.apply_changes(nullptr);
1245 cmdctx
->reply(0, ss
);
1252 if (prefix
== "pg scrub" ||
1253 prefix
== "pg repair" ||
1254 prefix
== "pg deep-scrub") {
1255 string scrubop
= prefix
.substr(3, string::npos
);
1259 cmd_getval(cmdctx
->cmdmap
, "pgid", pgidstr
);
1260 if (!pgid
.parse(pgidstr
.c_str())) {
1261 ss
<< "invalid pgid '" << pgidstr
<< "'";
1262 cmdctx
->reply(-EINVAL
, ss
);
1265 bool pg_exists
= false;
1266 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1267 pg_exists
= osdmap
.pg_exists(pgid
);
1270 ss
<< "pg " << pgid
<< " does not exist";
1271 cmdctx
->reply(-ENOENT
, ss
);
1274 int acting_primary
= -1;
1276 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1277 epoch
= osdmap
.get_epoch();
1278 osdmap
.get_primary_shard(pgid
, &acting_primary
, &spgid
);
1280 if (acting_primary
== -1) {
1281 ss
<< "pg " << pgid
<< " has no primary osd";
1282 cmdctx
->reply(-EAGAIN
, ss
);
1285 auto p
= osd_cons
.find(acting_primary
);
1286 if (p
== osd_cons
.end()) {
1287 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
1288 << " is not currently connected";
1289 cmdctx
->reply(-EAGAIN
, ss
);
1292 for (auto& con
: p
->second
) {
1293 assert(HAVE_FEATURE(con
->get_features(), SERVER_OCTOPUS
));
1294 vector
<spg_t
> pgs
= { spgid
};
1295 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1298 scrubop
== "repair",
1299 scrubop
== "deep-scrub"));
1301 ss
<< "instructing pg " << spgid
<< " on osd." << acting_primary
1302 << " to " << scrubop
;
1303 cmdctx
->reply(0, ss
);
1305 } else if (prefix
== "osd scrub" ||
1306 prefix
== "osd deep-scrub" ||
1307 prefix
== "osd repair") {
1309 cmd_getval(cmdctx
->cmdmap
, "who", whostr
);
1310 vector
<string
> pvec
;
1311 get_str_vec(prefix
, pvec
);
1314 if (whostr
== "*" || whostr
== "all" || whostr
== "any") {
1315 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1316 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
1317 if (osdmap
.is_up(i
)) {
1322 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
1324 ss
<< "invalid osd '" << whostr
<< "'";
1325 cmdctx
->reply(-EINVAL
, ss
);
1328 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1329 if (osdmap
.is_up(osd
)) {
1334 ss
<< "osd." << osd
<< " is not up";
1335 cmdctx
->reply(-EAGAIN
, ss
);
1339 set
<int> sent_osds
, failed_osds
;
1340 for (auto osd
: osds
) {
1343 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1344 epoch
= osdmap
.get_epoch();
1345 auto p
= pgmap
.pg_by_osd
.find(osd
);
1346 if (p
!= pgmap
.pg_by_osd
.end()) {
1347 for (auto pgid
: p
->second
) {
1350 osdmap
.get_primary_shard(pgid
, &primary
, &spg
);
1351 if (primary
== osd
) {
1352 spgs
.push_back(spg
);
1357 auto p
= osd_cons
.find(osd
);
1358 if (p
== osd_cons
.end()) {
1359 failed_osds
.insert(osd
);
1361 sent_osds
.insert(osd
);
1362 for (auto& con
: p
->second
) {
1363 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1366 pvec
.back() == "repair",
1367 pvec
.back() == "deep-scrub"));
1371 if (failed_osds
.size() == osds
.size()) {
1372 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
1373 << " (not connected)";
1376 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
1377 if (!failed_osds
.empty()) {
1378 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
1382 cmdctx
->reply(0, ss
);
1384 } else if (prefix
== "osd pool scrub" ||
1385 prefix
== "osd pool deep-scrub" ||
1386 prefix
== "osd pool repair") {
1387 vector
<string
> pool_names
;
1388 cmd_getval(cmdctx
->cmdmap
, "who", pool_names
);
1389 if (pool_names
.empty()) {
1390 ss
<< "must specify one or more pool names";
1391 cmdctx
->reply(-EINVAL
, ss
);
1395 map
<int32_t, vector
<pg_t
>> pgs_by_primary
; // legacy
1396 map
<int32_t, vector
<spg_t
>> spgs_by_primary
;
1397 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1398 epoch
= osdmap
.get_epoch();
1399 for (auto& pool_name
: pool_names
) {
1400 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1402 ss
<< "unrecognized pool '" << pool_name
<< "'";
1406 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1407 for (int i
= 0; i
< pool_pg_num
; i
++) {
1408 pg_t
pg(i
, pool_id
);
1411 auto got
= osdmap
.get_primary_shard(pg
, &primary
, &spg
);
1414 pgs_by_primary
[primary
].push_back(pg
);
1415 spgs_by_primary
[primary
].push_back(spg
);
1420 cmdctx
->reply(r
, ss
);
1423 for (auto& it
: spgs_by_primary
) {
1424 auto primary
= it
.first
;
1425 auto p
= osd_cons
.find(primary
);
1426 if (p
== osd_cons
.end()) {
1427 ss
<< "osd." << primary
<< " is not currently connected";
1428 cmdctx
->reply(-EAGAIN
, ss
);
1431 for (auto& con
: p
->second
) {
1432 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1435 prefix
== "osd pool repair",
1436 prefix
== "osd pool deep-scrub"));
1439 cmdctx
->reply(0, "");
1441 } else if (prefix
== "osd reweight-by-pg" ||
1442 prefix
== "osd reweight-by-utilization" ||
1443 prefix
== "osd test-reweight-by-pg" ||
1444 prefix
== "osd test-reweight-by-utilization") {
1446 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
1448 prefix
== "osd test-reweight-by-pg" ||
1449 prefix
== "osd test-reweight-by-utilization";
1450 int64_t oload
= cmd_getval_or
<int64_t>(cmdctx
->cmdmap
, "oload", 120);
1452 vector
<string
> poolnames
;
1453 cmd_getval(cmdctx
->cmdmap
, "pools", poolnames
);
1454 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1455 for (const auto& poolname
: poolnames
) {
1456 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
1458 ss
<< "pool '" << poolname
<< "' does not exist";
1465 cmdctx
->reply(r
, ss
);
1469 double max_change
= g_conf().get_val
<double>("mon_reweight_max_change");
1470 cmd_getval(cmdctx
->cmdmap
, "max_change", max_change
);
1471 if (max_change
<= 0.0) {
1472 ss
<< "max_change " << max_change
<< " must be positive";
1473 cmdctx
->reply(-EINVAL
, ss
);
1476 int64_t max_osds
= g_conf().get_val
<int64_t>("mon_reweight_max_osds");
1477 cmd_getval(cmdctx
->cmdmap
, "max_osds", max_osds
);
1478 if (max_osds
<= 0) {
1479 ss
<< "max_osds " << max_osds
<< " must be positive";
1480 cmdctx
->reply(-EINVAL
, ss
);
1483 bool no_increasing
= false;
1484 cmd_getval_compat_cephbool(cmdctx
->cmdmap
, "no_increasing", no_increasing
);
1486 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
1487 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
&osdmap
, const PGMap
& pgmap
) {
1488 return reweight::by_utilization(osdmap
, pgmap
,
1493 pools
.empty() ? NULL
: &pools
,
1496 &ss
, &out_str
, f
.get());
1499 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
1502 f
->flush(cmdctx
->odata
);
1504 cmdctx
->odata
.append(out_str
);
1507 ss
<< "FAILED reweight-by-pg";
1508 cmdctx
->reply(r
, ss
);
1510 } else if (r
== 0 || dry_run
) {
1512 cmdctx
->reply(r
, ss
);
1515 json_spirit::Object json_object
;
1516 for (const auto& osd_weight
: new_weights
) {
1517 json_spirit::Config::add(json_object
,
1518 std::to_string(osd_weight
.first
),
1519 std::to_string(osd_weight
.second
));
1521 string s
= json_spirit::write(json_object
);
1522 std::replace(begin(s
), end(s
), '\"', '\'');
1525 "\"prefix\": \"osd reweightn\", "
1526 "\"weights\": \"" + s
+ "\""
1528 auto on_finish
= new ReplyOnFinish(cmdctx
);
1529 monc
->start_mon_command({cmd
}, {},
1530 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
1533 } else if (prefix
== "osd df") {
1534 string method
, filter
;
1535 cmd_getval(cmdctx
->cmdmap
, "output_method", method
);
1536 cmd_getval(cmdctx
->cmdmap
, "filter", filter
);
1538 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1539 // sanity check filter(s)
1540 if (!filter
.empty() &&
1541 osdmap
.lookup_pg_pool_name(filter
) < 0 &&
1542 !osdmap
.crush
->class_exists(filter
) &&
1543 !osdmap
.crush
->name_exists(filter
)) {
1544 rs
<< "'" << filter
<< "' not a pool, crush node or device class name";
1547 print_osd_utilization(osdmap
, pgmap
, ss
,
1548 f
.get(), method
== "tree", filter
);
1549 cmdctx
->odata
.append(ss
);
1552 cmdctx
->reply(r
, rs
);
1554 } else if (prefix
== "osd pool stats") {
1556 cmd_getval(cmdctx
->cmdmap
, "pool_name", pool_name
);
1557 int64_t poolid
= -ENOENT
;
1558 bool one_pool
= false;
1559 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1560 if (!pool_name
.empty()) {
1561 poolid
= osdmap
.lookup_pg_pool_name(pool_name
);
1563 ceph_assert(poolid
== -ENOENT
);
1564 ss
<< "unrecognized pool '" << pool_name
<< "'";
1571 f
->open_array_section("pool_stats");
1573 if (osdmap
.get_pools().empty()) {
1574 ss
<< "there are no pools!";
1578 for (auto &p
: osdmap
.get_pools()) {
1582 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, f
.get(), &rs
);
1590 f
->flush(cmdctx
->odata
);
1592 cmdctx
->odata
.append(rs
.str());
1596 if (r
!= -EOPNOTSUPP
) {
1597 cmdctx
->reply(r
, ss
);
1600 } else if (prefix
== "osd safe-to-destroy" ||
1601 prefix
== "osd destroy" ||
1602 prefix
== "osd purge") {
1605 if (prefix
== "osd safe-to-destroy") {
1607 cmd_getval(cmdctx
->cmdmap
, "ids", ids
);
1608 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1609 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1611 if (!r
&& osds
.empty()) {
1612 ss
<< "must specify one or more OSDs";
1617 if (!cmd_getval(cmdctx
->cmdmap
, "id", id
)) {
1619 ss
<< "must specify OSD id";
1625 cmdctx
->reply(r
, ss
);
1628 set
<int> active_osds
, missing_stats
, stored_pgs
, safe_to_destroy
;
1629 int affected_pgs
= 0;
1630 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1631 if (pg_map
.num_pg_unknown
> 0) {
1632 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; cannot draw"
1633 << " any conclusions";
1637 int num_active_clean
= 0;
1638 for (auto& p
: pg_map
.num_pg_by_state
) {
1639 unsigned want
= PG_STATE_ACTIVE
|PG_STATE_CLEAN
;
1640 if ((p
.first
& want
) == want
) {
1641 num_active_clean
+= p
.second
;
1644 for (auto osd
: osds
) {
1645 if (!osdmap
.exists(osd
)) {
1646 safe_to_destroy
.insert(osd
);
1647 continue; // clearly safe to destroy
1649 auto q
= pg_map
.num_pg_by_osd
.find(osd
);
1650 if (q
!= pg_map
.num_pg_by_osd
.end()) {
1651 if (q
->second
.acting
> 0 || q
->second
.up_not_acting
> 0) {
1652 active_osds
.insert(osd
);
1653 // XXX: For overlapping PGs, this counts them again
1654 affected_pgs
+= q
->second
.acting
+ q
->second
.up_not_acting
;
1658 if (num_active_clean
< pg_map
.num_pg
) {
1659 // all pgs aren't active+clean; we need to be careful.
1660 auto p
= pg_map
.osd_stat
.find(osd
);
1661 if (p
== pg_map
.osd_stat
.end() || !osdmap
.is_up(osd
)) {
1662 missing_stats
.insert(osd
);
1664 } else if (p
->second
.num_pgs
> 0) {
1665 stored_pgs
.insert(osd
);
1669 safe_to_destroy
.insert(osd
);
1672 if (r
&& prefix
== "osd safe-to-destroy") {
1673 cmdctx
->reply(r
, ss
); // regardless of formatter
1676 if (!r
&& (!active_osds
.empty() ||
1677 !missing_stats
.empty() || !stored_pgs
.empty())) {
1678 if (!safe_to_destroy
.empty()) {
1679 ss
<< "OSD(s) " << safe_to_destroy
1680 << " are safe to destroy without reducing data durability. ";
1682 if (!active_osds
.empty()) {
1683 ss
<< "OSD(s) " << active_osds
<< " have " << affected_pgs
1684 << " pgs currently mapped to them. ";
1686 if (!missing_stats
.empty()) {
1687 ss
<< "OSD(s) " << missing_stats
<< " have no reported stats, and not all"
1688 << " PGs are active+clean; we cannot draw any conclusions. ";
1690 if (!stored_pgs
.empty()) {
1691 ss
<< "OSD(s) " << stored_pgs
<< " last reported they still store some PG"
1692 << " data, and not all PGs are active+clean; we cannot be sure they"
1693 << " aren't still needed.";
1695 if (!active_osds
.empty() || !stored_pgs
.empty()) {
1702 if (prefix
== "osd safe-to-destroy") {
1704 ss
<< "OSD(s) " << osds
<< " are safe to destroy without reducing data"
1708 f
->open_object_section("osd_status");
1709 f
->open_array_section("safe_to_destroy");
1710 for (auto i
: safe_to_destroy
)
1711 f
->dump_int("osd", i
);
1713 f
->open_array_section("active");
1714 for (auto i
: active_osds
)
1715 f
->dump_int("osd", i
);
1717 f
->open_array_section("missing_stats");
1718 for (auto i
: missing_stats
)
1719 f
->dump_int("osd", i
);
1721 f
->open_array_section("stored_pgs");
1722 for (auto i
: stored_pgs
)
1723 f
->dump_int("osd", i
);
1725 f
->close_section(); // osd_status
1726 f
->flush(cmdctx
->odata
);
1728 std::stringstream().swap(ss
);
1730 cmdctx
->reply(r
, ss
);
1736 cmd_getval(cmdctx
->cmdmap
, "force", force
);
1739 cmd_getval(cmdctx
->cmdmap
, "yes_i_really_mean_it", force
);
1742 ss
<< "\nYou can proceed by passing --force, but be warned that"
1743 " this will likely mean real, permanent data loss.";
1749 cmdctx
->reply(r
, ss
);
1754 "\"prefix\": \"" + prefix
+ "-actual\", "
1755 "\"id\": " + stringify(osds
) + ", "
1756 "\"yes_i_really_mean_it\": true"
1758 auto on_finish
= new ReplyOnFinish(cmdctx
);
1759 monc
->start_mon_command({cmd
}, {}, nullptr, &on_finish
->outs
, on_finish
);
1761 } else if (prefix
== "osd ok-to-stop") {
1763 cmd_getval(cmdctx
->cmdmap
, "ids", ids
);
1766 cmd_getval(cmdctx
->cmdmap
, "max", max
);
1768 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1769 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1771 if (!r
&& osds
.empty()) {
1772 ss
<< "must specify one or more OSDs";
1775 if (max
< (int)osds
.size()) {
1779 cmdctx
->reply(r
, ss
);
1782 offline_pg_report out_report
;
1783 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1784 _maximize_ok_to_stop_set(
1785 osds
, max
, osdmap
, pg_map
,
1789 f
.reset(Formatter::create("json"));
1791 f
->dump_object("ok_to_stop", out_report
);
1792 f
->flush(cmdctx
->odata
);
1793 cmdctx
->odata
.append("\n");
1794 if (!out_report
.unknown
.empty()) {
1795 ss
<< out_report
.unknown
.size() << " pgs have unknown state; "
1796 << "cannot draw any conclusions";
1797 cmdctx
->reply(-EAGAIN
, ss
);
1799 if (!out_report
.ok_to_stop()) {
1800 ss
<< "unsafe to stop osd(s) at this time (" << out_report
.not_ok
.size() << " PGs are or would become offline)";
1801 cmdctx
->reply(-EBUSY
, ss
);
1803 cmdctx
->reply(0, ss
);
1806 } else if (prefix
== "pg force-recovery" ||
1807 prefix
== "pg force-backfill" ||
1808 prefix
== "pg cancel-force-recovery" ||
1809 prefix
== "pg cancel-force-backfill" ||
1810 prefix
== "osd pool force-recovery" ||
1811 prefix
== "osd pool force-backfill" ||
1812 prefix
== "osd pool cancel-force-recovery" ||
1813 prefix
== "osd pool cancel-force-backfill") {
1815 get_str_vec(prefix
, vs
);
1816 auto& granularity
= vs
.front();
1817 auto& forceop
= vs
.back();
1820 // figure out actual op just once
1822 if (forceop
== "force-recovery") {
1823 actual_op
= OFR_RECOVERY
;
1824 } else if (forceop
== "force-backfill") {
1825 actual_op
= OFR_BACKFILL
;
1826 } else if (forceop
== "cancel-force-backfill") {
1827 actual_op
= OFR_BACKFILL
| OFR_CANCEL
;
1828 } else if (forceop
== "cancel-force-recovery") {
1829 actual_op
= OFR_RECOVERY
| OFR_CANCEL
;
1832 set
<pg_t
> candidates
; // deduped
1833 if (granularity
== "pg") {
1834 // covnert pg names to pgs, discard any invalid ones while at it
1835 vector
<string
> pgids
;
1836 cmd_getval(cmdctx
->cmdmap
, "pgid", pgids
);
1837 for (auto& i
: pgids
) {
1839 if (!pgid
.parse(i
.c_str())) {
1840 ss
<< "invlaid pgid '" << i
<< "'; ";
1844 candidates
.insert(pgid
);
1848 vector
<string
> pool_names
;
1849 cmd_getval(cmdctx
->cmdmap
, "who", pool_names
);
1850 if (pool_names
.empty()) {
1851 ss
<< "must specify one or more pool names";
1852 cmdctx
->reply(-EINVAL
, ss
);
1855 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1856 for (auto& pool_name
: pool_names
) {
1857 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1859 ss
<< "unrecognized pool '" << pool_name
<< "'";
1863 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1864 for (int i
= 0; i
< pool_pg_num
; i
++)
1865 candidates
.insert({(unsigned int)i
, (uint64_t)pool_id
});
1869 cmdctx
->reply(r
, ss
);
1874 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1875 for (auto& i
: candidates
) {
1876 auto it
= pg_map
.pg_stat
.find(i
);
1877 if (it
== pg_map
.pg_stat
.end()) {
1878 ss
<< "pg " << i
<< " does not exist; ";
1882 auto state
= it
->second
.state
;
1883 // discard pgs for which user requests are pointless
1884 switch (actual_op
) {
1886 if ((state
& (PG_STATE_DEGRADED
|
1887 PG_STATE_RECOVERY_WAIT
|
1888 PG_STATE_RECOVERING
)) == 0) {
1889 // don't return error, user script may be racing with cluster.
1891 ss
<< "pg " << i
<< " doesn't require recovery; ";
1893 } else if (state
& PG_STATE_FORCED_RECOVERY
) {
1894 ss
<< "pg " << i
<< " recovery already forced; ";
1895 // return error, as it may be a bug in user script
1901 if ((state
& (PG_STATE_DEGRADED
|
1902 PG_STATE_BACKFILL_WAIT
|
1903 PG_STATE_BACKFILLING
)) == 0) {
1904 ss
<< "pg " << i
<< " doesn't require backfilling; ";
1906 } else if (state
& PG_STATE_FORCED_BACKFILL
) {
1907 ss
<< "pg " << i
<< " backfill already forced; ";
1912 case OFR_BACKFILL
| OFR_CANCEL
:
1913 if ((state
& PG_STATE_FORCED_BACKFILL
) == 0) {
1914 ss
<< "pg " << i
<< " backfill not forced; ";
1918 case OFR_RECOVERY
| OFR_CANCEL
:
1919 if ((state
& PG_STATE_FORCED_RECOVERY
) == 0) {
1920 ss
<< "pg " << i
<< " recovery not forced; ";
1925 ceph_abort_msg("actual_op value is not supported");
1931 // respond with error only when no pgs are correct
1932 // yes, in case of mixed errors, only the last one will be emitted,
1933 // but the message presented will be fine
1934 if (pgs
.size() != 0) {
1935 // clear error to not confuse users/scripts
1939 // optimize the command -> messages conversion, use only one
1940 // message per distinct OSD
1941 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1942 // group pgs to process by osd
1943 map
<int, vector
<spg_t
>> osdpgs
;
1944 for (auto& pgid
: pgs
) {
1947 if (osdmap
.get_primary_shard(pgid
, &primary
, &spg
)) {
1948 osdpgs
[primary
].push_back(spg
);
1951 for (auto& i
: osdpgs
) {
1952 if (osdmap
.is_up(i
.first
)) {
1953 auto p
= osd_cons
.find(i
.first
);
1954 if (p
== osd_cons
.end()) {
1955 ss
<< "osd." << i
.first
<< " is not currently connected";
1959 for (auto& con
: p
->second
) {
1961 new MOSDForceRecovery(monc
->get_fsid(), i
.second
, actual_op
));
1963 ss
<< "instructing pg(s) " << i
.second
<< " on osd." << i
.first
1964 << " to " << forceop
<< "; ";
1969 cmdctx
->reply(r
, ss
);
1971 } else if (prefix
== "config show" ||
1972 prefix
== "config show-with-defaults") {
1974 cmd_getval(cmdctx
->cmdmap
, "who", who
);
1975 auto [key
, valid
] = DaemonKey::parse(who
);
1977 ss
<< "invalid daemon name: use <type>.<id>";
1978 cmdctx
->reply(-EINVAL
, ss
);
1981 DaemonStatePtr daemon
= daemon_state
.get(key
);
1983 ss
<< "no config state for daemon " << who
;
1984 cmdctx
->reply(-ENOENT
, ss
);
1988 std::lock_guard
l(daemon
->lock
);
1992 if (cmd_getval(cmdctx
->cmdmap
, "key", name
)) {
1993 // handle special options
1994 if (name
== "fsid") {
1995 cmdctx
->odata
.append(stringify(monc
->get_fsid()) + "\n");
1996 cmdctx
->reply(r
, ss
);
1999 auto p
= daemon
->config
.find(name
);
2000 if (p
!= daemon
->config
.end() &&
2001 !p
->second
.empty()) {
2002 cmdctx
->odata
.append(p
->second
.rbegin()->second
+ "\n");
2004 auto& defaults
= daemon
->_get_config_defaults();
2005 auto q
= defaults
.find(name
);
2006 if (q
!= defaults
.end()) {
2007 cmdctx
->odata
.append(q
->second
+ "\n");
2012 } else if (daemon
->config_defaults_bl
.length() > 0) {
2015 f
->open_array_section("config");
2017 tbl
.define_column("NAME", TextTable::LEFT
, TextTable::LEFT
);
2018 tbl
.define_column("VALUE", TextTable::LEFT
, TextTable::LEFT
);
2019 tbl
.define_column("SOURCE", TextTable::LEFT
, TextTable::LEFT
);
2020 tbl
.define_column("OVERRIDES", TextTable::LEFT
, TextTable::LEFT
);
2021 tbl
.define_column("IGNORES", TextTable::LEFT
, TextTable::LEFT
);
2023 if (prefix
== "config show") {
2025 for (auto& i
: daemon
->config
) {
2026 dout(20) << " " << i
.first
<< " -> " << i
.second
<< dendl
;
2027 if (i
.second
.empty()) {
2031 f
->open_object_section("value");
2032 f
->dump_string("name", i
.first
);
2033 f
->dump_string("value", i
.second
.rbegin()->second
);
2034 f
->dump_string("source", ceph_conf_level_name(
2035 i
.second
.rbegin()->first
));
2036 if (i
.second
.size() > 1) {
2037 f
->open_array_section("overrides");
2038 auto j
= i
.second
.rend();
2039 for (--j
; j
!= i
.second
.rbegin(); --j
) {
2040 f
->open_object_section("value");
2041 f
->dump_string("source", ceph_conf_level_name(j
->first
));
2042 f
->dump_string("value", j
->second
);
2047 if (daemon
->ignored_mon_config
.count(i
.first
)) {
2048 f
->dump_string("ignores", "mon");
2053 tbl
<< i
.second
.rbegin()->second
;
2054 tbl
<< ceph_conf_level_name(i
.second
.rbegin()->first
);
2055 if (i
.second
.size() > 1) {
2057 auto j
= i
.second
.rend();
2058 for (--j
; j
!= i
.second
.rbegin(); --j
) {
2059 if (j
->second
== i
.second
.rbegin()->second
) {
2060 ov
.push_front(string("(") + ceph_conf_level_name(j
->first
) +
2061 string("[") + j
->second
+ string("]") +
2064 ov
.push_front(ceph_conf_level_name(j
->first
) +
2065 string("[") + j
->second
+ string("]"));
2073 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
2074 tbl
<< TextTable::endrow
;
2078 // show-with-defaults
2079 auto& defaults
= daemon
->_get_config_defaults();
2080 for (auto& i
: defaults
) {
2082 f
->open_object_section("value");
2083 f
->dump_string("name", i
.first
);
2087 auto j
= daemon
->config
.find(i
.first
);
2088 if (j
!= daemon
->config
.end() && !j
->second
.empty()) {
2091 f
->dump_string("value", j
->second
.rbegin()->second
);
2092 f
->dump_string("source", ceph_conf_level_name(
2093 j
->second
.rbegin()->first
));
2094 if (j
->second
.size() > 1) {
2095 f
->open_array_section("overrides");
2096 auto k
= j
->second
.rend();
2097 for (--k
; k
!= j
->second
.rbegin(); --k
) {
2098 f
->open_object_section("value");
2099 f
->dump_string("source", ceph_conf_level_name(k
->first
));
2100 f
->dump_string("value", k
->second
);
2105 if (daemon
->ignored_mon_config
.count(i
.first
)) {
2106 f
->dump_string("ignores", "mon");
2110 tbl
<< j
->second
.rbegin()->second
;
2111 tbl
<< ceph_conf_level_name(j
->second
.rbegin()->first
);
2112 if (j
->second
.size() > 1) {
2114 auto k
= j
->second
.rend();
2115 for (--k
; k
!= j
->second
.rbegin(); --k
) {
2116 if (k
->second
== j
->second
.rbegin()->second
) {
2117 ov
.push_front(string("(") + ceph_conf_level_name(k
->first
) +
2118 string("[") + k
->second
+ string("]") +
2121 ov
.push_front(ceph_conf_level_name(k
->first
) +
2122 string("[") + k
->second
+ string("]"));
2129 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
2130 tbl
<< TextTable::endrow
;
2133 // only have default
2135 f
->dump_string("value", i
.second
);
2136 f
->dump_string("source", ceph_conf_level_name(CONF_DEFAULT
));
2140 tbl
<< ceph_conf_level_name(CONF_DEFAULT
);
2143 tbl
<< TextTable::endrow
;
2150 f
->flush(cmdctx
->odata
);
2152 cmdctx
->odata
.append(stringify(tbl
));
2155 cmdctx
->reply(r
, ss
);
2157 } else if (prefix
== "device ls") {
2161 f
->open_array_section("devices");
2162 daemon_state
.with_devices([&f
](const DeviceState
& dev
) {
2163 f
->dump_object("device", dev
);
2166 f
->flush(cmdctx
->odata
);
2168 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2169 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
2170 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2171 tbl
.define_column("WEAR", TextTable::RIGHT
, TextTable::RIGHT
);
2172 tbl
.define_column("LIFE EXPECTANCY", TextTable::LEFT
, TextTable::LEFT
);
2173 auto now
= ceph_clock_now();
2174 daemon_state
.with_devices([&tbl
, now
](const DeviceState
& dev
) {
2176 for (auto& i
: dev
.attachments
) {
2180 h
+= std::get
<0>(i
) + ":" + std::get
<1>(i
);
2183 for (auto& i
: dev
.daemons
) {
2189 char wear_level_str
[16] = {0};
2190 if (dev
.wear_level
>= 0) {
2191 snprintf(wear_level_str
, sizeof(wear_level_str
)-1, "%d%%",
2192 (int)(100.1 * dev
.wear_level
));
2198 << dev
.get_life_expectancy_str(now
)
2199 << TextTable::endrow
;
2201 cmdctx
->odata
.append(stringify(tbl
));
2203 cmdctx
->reply(0, ss
);
2205 } else if (prefix
== "device ls-by-daemon") {
2207 cmd_getval(cmdctx
->cmdmap
, "who", who
);
2208 if (auto [k
, valid
] = DaemonKey::parse(who
); !valid
) {
2209 ss
<< who
<< " is not a valid daemon name";
2212 auto dm
= daemon_state
.get(k
);
2215 f
->open_array_section("devices");
2216 for (auto& i
: dm
->devices
) {
2217 daemon_state
.with_device(i
.first
, [&f
] (const DeviceState
& dev
) {
2218 f
->dump_object("device", dev
);
2222 f
->flush(cmdctx
->odata
);
2225 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2226 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
2227 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
,
2229 auto now
= ceph_clock_now();
2230 for (auto& i
: dm
->devices
) {
2231 daemon_state
.with_device(
2232 i
.first
, [&tbl
, now
] (const DeviceState
& dev
) {
2234 for (auto& i
: dev
.attachments
) {
2238 h
+= std::get
<0>(i
) + ":" + std::get
<1>(i
);
2242 << dev
.get_life_expectancy_str(now
)
2243 << TextTable::endrow
;
2246 cmdctx
->odata
.append(stringify(tbl
));
2250 ss
<< "daemon " << who
<< " not found";
2252 cmdctx
->reply(r
, ss
);
2254 } else if (prefix
== "device ls-by-host") {
2256 cmd_getval(cmdctx
->cmdmap
, "host", host
);
2258 daemon_state
.list_devids_by_server(host
, &devids
);
2260 f
->open_array_section("devices");
2261 for (auto& devid
: devids
) {
2262 daemon_state
.with_device(
2263 devid
, [&f
] (const DeviceState
& dev
) {
2264 f
->dump_object("device", dev
);
2268 f
->flush(cmdctx
->odata
);
2271 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2272 tbl
.define_column("DEV", TextTable::LEFT
, TextTable::LEFT
);
2273 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2274 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
, TextTable::LEFT
);
2275 auto now
= ceph_clock_now();
2276 for (auto& devid
: devids
) {
2277 daemon_state
.with_device(
2278 devid
, [&tbl
, &host
, now
] (const DeviceState
& dev
) {
2280 for (auto& j
: dev
.attachments
) {
2281 if (std::get
<0>(j
) == host
) {
2285 n
+= std::get
<1>(j
);
2289 for (auto& i
: dev
.daemons
) {
2298 << dev
.get_life_expectancy_str(now
)
2299 << TextTable::endrow
;
2302 cmdctx
->odata
.append(stringify(tbl
));
2304 cmdctx
->reply(0, ss
);
2306 } else if (prefix
== "device info") {
2308 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2311 if (!daemon_state
.with_device(devid
,
2312 [&f
, &rs
] (const DeviceState
& dev
) {
2314 f
->dump_object("device", dev
);
2319 ss
<< "device " << devid
<< " not found";
2323 f
->flush(cmdctx
->odata
);
2325 cmdctx
->odata
.append(rs
.str());
2328 cmdctx
->reply(r
, ss
);
2330 } else if (prefix
== "device set-life-expectancy") {
2332 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2333 string from_str
, to_str
;
2334 cmd_getval(cmdctx
->cmdmap
, "from", from_str
);
2335 cmd_getval(cmdctx
->cmdmap
, "to", to_str
);
2337 if (!from
.parse(from_str
)) {
2338 ss
<< "unable to parse datetime '" << from_str
<< "'";
2340 cmdctx
->reply(r
, ss
);
2341 } else if (to_str
.size() && !to
.parse(to_str
)) {
2342 ss
<< "unable to parse datetime '" << to_str
<< "'";
2344 cmdctx
->reply(r
, ss
);
2346 map
<string
,string
> meta
;
2347 daemon_state
.with_device_create(
2349 [from
, to
, &meta
] (DeviceState
& dev
) {
2350 dev
.set_life_expectancy(from
, to
, ceph_clock_now());
2351 meta
= dev
.metadata
;
2353 json_spirit::Object json_object
;
2354 for (auto& i
: meta
) {
2355 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2358 json
.append(json_spirit::write(json_object
));
2361 "\"prefix\": \"config-key set\", "
2362 "\"key\": \"device/" + devid
+ "\""
2364 auto on_finish
= new ReplyOnFinish(cmdctx
);
2365 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2368 } else if (prefix
== "device rm-life-expectancy") {
2370 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2371 map
<string
,string
> meta
;
2372 if (daemon_state
.with_device_write(devid
, [&meta
] (DeviceState
& dev
) {
2373 dev
.rm_life_expectancy();
2374 meta
= dev
.metadata
;
2381 "\"prefix\": \"config-key rm\", "
2382 "\"key\": \"device/" + devid
+ "\""
2385 json_spirit::Object json_object
;
2386 for (auto& i
: meta
) {
2387 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2389 json
.append(json_spirit::write(json_object
));
2392 "\"prefix\": \"config-key set\", "
2393 "\"key\": \"device/" + devid
+ "\""
2396 auto on_finish
= new ReplyOnFinish(cmdctx
);
2397 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2399 cmdctx
->reply(0, ss
);
2404 ss
<< "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2407 f
->open_object_section("pg_info");
2408 f
->dump_bool("pg_ready", pgmap_ready
);
2411 // fall back to feeding command to PGMap
2412 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2413 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
2414 f
.get(), &ss
, &cmdctx
->odata
);
2420 if (r
!= -EOPNOTSUPP
) {
2422 f
->flush(cmdctx
->odata
);
2424 cmdctx
->reply(r
, ss
);
2429 // Was the command unfound?
2430 if (py_command
.cmdstring
.empty()) {
2431 ss
<< "No handler found for '" << prefix
<< "'";
2432 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
2433 cmdctx
->reply(-EINVAL
, ss
);
2437 // Validate that the module is active
2438 auto& mod_name
= py_command
.module_name
;
2439 if (!py_modules
.is_module_active(mod_name
)) {
2440 ss
<< "Module '" << mod_name
<< "' is not enabled/loaded (required by "
2441 "command '" << prefix
<< "'): use `ceph mgr module enable "
2442 << mod_name
<< "` to enable it";
2443 dout(4) << ss
.str() << dendl
;
2444 cmdctx
->reply(-EOPNOTSUPP
, ss
);
2448 dout(10) << "passing through command '" << prefix
<< "' size " << cmdctx
->cmdmap
.size() << dendl
;
2449 Finisher
& mod_finisher
= py_modules
.get_active_module_finisher(mod_name
);
2450 mod_finisher
.queue(new LambdaContext([this, cmdctx
, session
, py_command
, prefix
]
2452 std::stringstream ss
;
2454 dout(10) << "dispatching command '" << prefix
<< "' size " << cmdctx
->cmdmap
.size() << dendl
;
2456 // Validate that the module is enabled
2457 auto& py_handler_name
= py_command
.module_name
;
2458 PyModuleRef module
= py_modules
.get_module(py_handler_name
);
2459 ceph_assert(module
);
2460 if (!module
->is_enabled()) {
2461 ss
<< "Module '" << py_handler_name
<< "' is not enabled (required by "
2462 "command '" << prefix
<< "'): use `ceph mgr module enable "
2463 << py_handler_name
<< "` to enable it";
2464 dout(4) << ss
.str() << dendl
;
2465 cmdctx
->reply(-EOPNOTSUPP
, ss
);
2469 // Hack: allow the self-test method to run on unhealthy modules.
2470 // Fix this in future by creating a special path for self test rather
2471 // than having the hook be a normal module command.
2472 std::string self_test_prefix
= py_handler_name
+ " " + "self-test";
2474 // Validate that the module is healthy
2475 bool accept_command
;
2476 if (module
->is_loaded()) {
2477 if (module
->get_can_run() && !module
->is_failed()) {
2479 accept_command
= true;
2480 } else if (self_test_prefix
== prefix
) {
2481 // Unhealthy, but allow because it's a self test command
2482 accept_command
= true;
2484 accept_command
= false;
2485 ss
<< "Module '" << py_handler_name
<< "' has experienced an error and "
2486 "cannot handle commands: " << module
->get_error_string();
2489 // Module not loaded
2490 accept_command
= false;
2491 ss
<< "Module '" << py_handler_name
<< "' failed to load and "
2492 "cannot handle commands: " << module
->get_error_string();
2495 if (!accept_command
) {
2496 dout(4) << ss
.str() << dendl
;
2497 cmdctx
->reply(-EIO
, ss
);
2501 std::stringstream ds
;
2502 bufferlist inbl
= cmdctx
->data
;
2503 int r
= py_modules
.handle_command(py_command
, *session
, cmdctx
->cmdmap
,
2506 log_access_denied(cmdctx
, session
, ss
);
2509 cmdctx
->odata
.append(ds
);
2510 cmdctx
->reply(r
, ss
);
2511 dout(10) << " command returned " << r
<< dendl
;
2516 void DaemonServer::_prune_pending_service_map()
2518 utime_t cutoff
= ceph_clock_now();
2519 cutoff
-= g_conf().get_val
<double>("mgr_service_beacon_grace");
2520 auto p
= pending_service_map
.services
.begin();
2521 while (p
!= pending_service_map
.services
.end()) {
2522 auto q
= p
->second
.daemons
.begin();
2523 while (q
!= p
->second
.daemons
.end()) {
2524 DaemonKey key
{p
->first
, q
->first
};
2525 if (!daemon_state
.exists(key
)) {
2526 if (ServiceMap::is_normal_ceph_entity(p
->first
)) {
2527 dout(10) << "daemon " << key
<< " in service map but not in daemon state "
2528 << "index -- force pruning" << dendl
;
2529 q
= p
->second
.daemons
.erase(q
);
2530 pending_service_map_dirty
= pending_service_map
.epoch
;
2532 derr
<< "missing key " << key
<< dendl
;
2539 auto daemon
= daemon_state
.get(key
);
2540 std::lock_guard
l(daemon
->lock
);
2541 if (daemon
->last_service_beacon
== utime_t()) {
2542 // we must have just restarted; assume they are alive now.
2543 daemon
->last_service_beacon
= ceph_clock_now();
2547 if (daemon
->last_service_beacon
< cutoff
) {
2548 dout(10) << "pruning stale " << p
->first
<< "." << q
->first
2549 << " last_beacon " << daemon
->last_service_beacon
<< dendl
;
2550 q
= p
->second
.daemons
.erase(q
);
2551 pending_service_map_dirty
= pending_service_map
.epoch
;
2556 if (p
->second
.daemons
.empty()) {
2557 p
= pending_service_map
.services
.erase(p
);
2558 pending_service_map_dirty
= pending_service_map
.epoch
;
2565 void DaemonServer::send_report()
2568 if (ceph_clock_now() - started_at
> g_conf().get_val
<int64_t>("mgr_stats_period") * 4.0) {
2570 reported_osds
.clear();
2571 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2572 << "potentially incomplete PG state to mon" << dendl
;
2574 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2580 auto m
= ceph::make_message
<MMonMgrReport
>();
2581 m
->gid
= monc
->get_global_id();
2582 py_modules
.get_health_checks(&m
->health_checks
);
2583 py_modules
.get_progress_events(&m
->progress_events
);
2585 cluster_state
.with_mutable_pgmap([&](PGMap
& pg_map
) {
2586 cluster_state
.update_delta_stats();
2588 if (pending_service_map
.epoch
) {
2589 _prune_pending_service_map();
2590 if (pending_service_map_dirty
>= pending_service_map
.epoch
) {
2591 pending_service_map
.modified
= ceph_clock_now();
2592 encode(pending_service_map
, m
->service_map_bl
, CEPH_FEATURES_ALL
);
2593 dout(10) << "sending service_map e" << pending_service_map
.epoch
2595 pending_service_map
.epoch
++;
2599 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
2600 // FIXME: no easy way to get mon features here. this will do for
2601 // now, though, as long as we don't make a backward-incompat change.
2602 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
2603 dout(10) << pg_map
<< dendl
;
2605 pg_map
.get_health_checks(g_ceph_context
, osdmap
,
2608 dout(10) << m
->health_checks
.checks
.size() << " health checks"
2610 dout(20) << "health checks:\n";
2611 JSONFormatter
jf(true);
2612 jf
.dump_object("health_checks", m
->health_checks
);
2615 if (osdmap
.require_osd_release
>= ceph_release_t::luminous
) {
2616 clog
->debug() << "pgmap v" << pg_map
.version
<< ": " << pg_map
;
2621 map
<daemon_metric
, unique_ptr
<DaemonHealthMetricCollector
>> accumulated
;
2622 for (auto service
: {"osd", "mon"} ) {
2623 auto daemons
= daemon_state
.get_by_service(service
);
2624 for (const auto& [key
,state
] : daemons
) {
2625 std::lock_guard l
{state
->lock
};
2626 for (const auto& metric
: state
->daemon_health_metrics
) {
2627 auto acc
= accumulated
.find(metric
.get_type());
2628 if (acc
== accumulated
.end()) {
2629 auto collector
= DaemonHealthMetricCollector::create(metric
.get_type());
2631 derr
<< __func__
<< " " << key
2632 << " sent me an unknown health metric: "
2633 << std::hex
<< static_cast<uint8_t>(metric
.get_type())
2634 << std::dec
<< dendl
;
2637 tie(acc
, std::ignore
) = accumulated
.emplace(metric
.get_type(),
2638 std::move(collector
));
2640 acc
->second
->update(key
, metric
);
2644 for (const auto& acc
: accumulated
) {
2645 acc
.second
->summarize(m
->health_checks
);
2647 // TODO? We currently do not notify the PyModules
2648 // TODO: respect needs_send, so we send the report only if we are asked to do
2649 // so, or the state is updated.
2650 monc
->send_mon_message(std::move(m
));
2653 void DaemonServer::adjust_pgs()
2656 unsigned max
= std::max
<int64_t>(1, g_conf()->mon_osd_max_creating_pgs
);
2657 double max_misplaced
= g_conf().get_val
<double>("target_max_misplaced_ratio");
2658 bool aggro
= g_conf().get_val
<bool>("mgr_debug_aggressive_pg_num_changes");
2660 map
<string
,unsigned> pg_num_to_set
;
2661 map
<string
,unsigned> pgp_num_to_set
;
2662 set
<pg_t
> upmaps_to_clear
;
2663 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2664 unsigned creating_or_unknown
= 0;
2665 for (auto& i
: pg_map
.num_pg_by_state
) {
2666 if ((i
.first
& (PG_STATE_CREATING
)) ||
2668 creating_or_unknown
+= i
.second
;
2671 unsigned left
= max
;
2672 if (creating_or_unknown
>= max
) {
2675 left
-= creating_or_unknown
;
2676 dout(10) << "creating_or_unknown " << creating_or_unknown
2677 << " max_creating " << max
2681 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2682 // can run more frequently than we get updated pg stats from OSDs. We
2683 // may make multiple adjustments with stale informaiton.
2684 double misplaced_ratio
, degraded_ratio
;
2685 double inactive_pgs_ratio
, unknown_pgs_ratio
;
2686 pg_map
.get_recovery_stats(&misplaced_ratio
, °raded_ratio
,
2687 &inactive_pgs_ratio
, &unknown_pgs_ratio
);
2688 dout(20) << "misplaced_ratio " << misplaced_ratio
2689 << " degraded_ratio " << degraded_ratio
2690 << " inactive_pgs_ratio " << inactive_pgs_ratio
2691 << " unknown_pgs_ratio " << unknown_pgs_ratio
2692 << "; target_max_misplaced_ratio " << max_misplaced
2695 for (auto& i
: osdmap
.get_pools()) {
2696 const pg_pool_t
& p
= i
.second
;
2699 if (p
.get_pg_num_target() != p
.get_pg_num()) {
2700 dout(20) << "pool " << i
.first
2701 << " pg_num " << p
.get_pg_num()
2702 << " target " << p
.get_pg_num_target()
2704 if (p
.has_flag(pg_pool_t::FLAG_CREATING
)) {
2705 dout(10) << "pool " << i
.first
2706 << " pg_num_target " << p
.get_pg_num_target()
2707 << " pg_num " << p
.get_pg_num()
2708 << " - still creating initial pgs"
2710 } else if (p
.get_pg_num_target() < p
.get_pg_num()) {
2711 // pg_num decrease (merge)
2712 pg_t
merge_source(p
.get_pg_num() - 1, i
.first
);
2713 pg_t merge_target
= merge_source
.get_parent();
2716 if (p
.get_pg_num() != p
.get_pg_num_pending()) {
2717 dout(10) << "pool " << i
.first
2718 << " pg_num_target " << p
.get_pg_num_target()
2719 << " pg_num " << p
.get_pg_num()
2720 << " - decrease and pg_num_pending != pg_num, waiting"
2723 } else if (p
.get_pg_num() == p
.get_pgp_num()) {
2724 dout(10) << "pool " << i
.first
2725 << " pg_num_target " << p
.get_pg_num_target()
2726 << " pg_num " << p
.get_pg_num()
2727 << " - decrease blocked by pgp_num "
2732 vector
<int32_t> source_acting
;
2733 for (auto &merge_participant
: {merge_source
, merge_target
}) {
2734 bool is_merge_source
= merge_participant
== merge_source
;
2735 if (osdmap
.have_pg_upmaps(merge_participant
)) {
2736 dout(10) << "pool " << i
.first
2737 << " pg_num_target " << p
.get_pg_num_target()
2738 << " pg_num " << p
.get_pg_num()
2739 << (is_merge_source
? " - merge source " : " - merge target ")
2740 << merge_participant
2741 << " has upmap" << dendl
;
2742 upmaps_to_clear
.insert(merge_participant
);
2745 auto q
= pg_map
.pg_stat
.find(merge_participant
);
2746 if (q
== pg_map
.pg_stat
.end()) {
2747 dout(10) << "pool " << i
.first
2748 << " pg_num_target " << p
.get_pg_num_target()
2749 << " pg_num " << p
.get_pg_num()
2750 << " - no state for " << merge_participant
2751 << (is_merge_source
? " (merge source)" : " (merge target)")
2754 } else if ((q
->second
.state
& (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) !=
2755 (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) {
2756 dout(10) << "pool " << i
.first
2757 << " pg_num_target " << p
.get_pg_num_target()
2758 << " pg_num " << p
.get_pg_num()
2759 << (is_merge_source
? " - merge source " : " - merge target ")
2760 << merge_participant
2761 << " not clean (" << pg_state_string(q
->second
.state
)
2765 if (is_merge_source
) {
2766 source_acting
= q
->second
.acting
;
2767 } else if (ok
&& q
->second
.acting
!= source_acting
) {
2768 dout(10) << "pool " << i
.first
2769 << " pg_num_target " << p
.get_pg_num_target()
2770 << " pg_num " << p
.get_pg_num()
2771 << (is_merge_source
? " - merge source " : " - merge target ")
2772 << merge_participant
2773 << " acting does not match (source " << source_acting
2774 << " != target " << q
->second
.acting
2781 unsigned target
= p
.get_pg_num() - 1;
2782 dout(10) << "pool " << i
.first
2783 << " pg_num_target " << p
.get_pg_num_target()
2784 << " pg_num " << p
.get_pg_num()
2786 << " (merging " << merge_source
2787 << " and " << merge_target
2789 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2792 } else if (p
.get_pg_num_target() > p
.get_pg_num()) {
2793 // pg_num increase (split)
2795 auto q
= pg_map
.num_pg_by_pool_state
.find(i
.first
);
2796 if (q
!= pg_map
.num_pg_by_pool_state
.end()) {
2797 for (auto& j
: q
->second
) {
2798 if ((j
.first
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)) == 0) {
2799 dout(20) << "pool " << i
.first
<< " has " << j
.second
2800 << " pgs in " << pg_state_string(j
.first
)
2809 unsigned pg_gap
= p
.get_pg_num() - p
.get_pgp_num();
2810 unsigned max_jump
= cct
->_conf
->mgr_max_pg_num_change
;
2812 dout(10) << "pool " << i
.first
2813 << " pg_num_target " << p
.get_pg_num_target()
2814 << " pg_num " << p
.get_pg_num()
2815 << " - not all pgs active"
2817 } else if (pg_gap
>= max_jump
) {
2818 dout(10) << "pool " << i
.first
2819 << " pg_num " << p
.get_pg_num()
2820 << " - pgp_num " << p
.get_pgp_num()
2821 << " gap >= max_pg_num_change " << max_jump
2822 << " - must scale pgp_num first"
2825 unsigned add
= std::min(
2826 std::min(left
, max_jump
- pg_gap
),
2827 p
.get_pg_num_target() - p
.get_pg_num());
2828 unsigned target
= p
.get_pg_num() + add
;
2830 dout(10) << "pool " << i
.first
2831 << " pg_num_target " << p
.get_pg_num_target()
2832 << " pg_num " << p
.get_pg_num()
2833 << " -> " << target
<< dendl
;
2834 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2840 unsigned target
= std::min(p
.get_pg_num_pending(),
2841 p
.get_pgp_num_target());
2842 if (target
!= p
.get_pgp_num()) {
2843 dout(20) << "pool " << i
.first
2844 << " pgp_num_target " << p
.get_pgp_num_target()
2845 << " pgp_num " << p
.get_pgp_num()
2846 << " -> " << target
<< dendl
;
2847 if (target
> p
.get_pgp_num() &&
2848 p
.get_pgp_num() == p
.get_pg_num()) {
2849 dout(10) << "pool " << i
.first
2850 << " pgp_num_target " << p
.get_pgp_num_target()
2851 << " pgp_num " << p
.get_pgp_num()
2852 << " - increase blocked by pg_num " << p
.get_pg_num()
2854 } else if (!aggro
&& (inactive_pgs_ratio
> 0 ||
2855 degraded_ratio
> 0 ||
2856 unknown_pgs_ratio
> 0)) {
2857 dout(10) << "pool " << i
.first
2858 << " pgp_num_target " << p
.get_pgp_num_target()
2859 << " pgp_num " << p
.get_pgp_num()
2860 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2861 << " update" << dendl
;
2862 } else if (!aggro
&& (misplaced_ratio
> max_misplaced
)) {
2863 dout(10) << "pool " << i
.first
2864 << " pgp_num_target " << p
.get_pgp_num_target()
2865 << " pgp_num " << p
.get_pgp_num()
2866 << " - misplaced_ratio " << misplaced_ratio
2867 << " > max " << max_misplaced
2868 << ", deferring pgp_num update" << dendl
;
2870 // NOTE: this calculation assumes objects are
2871 // basically uniformly distributed across all PGs
2872 // (regardless of pool), which is probably not
2873 // perfectly correct, but it's a start. make no
2874 // single adjustment that's more than half of the
2875 // max_misplaced, to somewhat limit the magnitude of
2876 // our potential error here.
2878 static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP
= 1;
2879 pool_stat_t s
= pg_map
.get_pg_pool_sum_stat(i
.first
);
2881 // pool is (virtually) empty; just jump to final pgp_num?
2882 (p
.get_pgp_num_target() > p
.get_pgp_num() &&
2883 s
.stats
.sum
.num_objects
<= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP
*
2884 p
.get_pgp_num_target()))) {
2888 std::min
<double>(max_misplaced
- misplaced_ratio
,
2889 max_misplaced
/ 2.0);
2890 unsigned estmax
= std::max
<unsigned>(
2891 (double)p
.get_pg_num() * room
, 1u);
2892 unsigned next_min
= 0;
2893 if (p
.get_pgp_num() > estmax
) {
2894 next_min
= p
.get_pgp_num() - estmax
;
2896 next
= std::clamp(target
,
2898 p
.get_pgp_num() + estmax
);
2899 dout(20) << " room " << room
<< " estmax " << estmax
2900 << " delta " << (target
-p
.get_pgp_num())
2901 << " next " << next
<< dendl
;
2902 if (p
.get_pgp_num_target() == p
.get_pg_num_target() &&
2903 p
.get_pgp_num_target() < p
.get_pg_num()) {
2904 // since pgp_num is tracking pg_num, ceph is handling
2905 // pgp_num. so, be responsible: don't let pgp_num get
2906 // too far out ahead of merges (if we are merging).
2907 // this avoids moving lots of unmerged pgs onto a
2908 // small number of OSDs where we might blow out the
2910 unsigned max_outpace_merges
=
2911 std::max
<unsigned>(8, p
.get_pg_num() * max_misplaced
);
2912 if (next
+ max_outpace_merges
< p
.get_pg_num()) {
2913 next
= p
.get_pg_num() - max_outpace_merges
;
2914 dout(10) << " using next " << next
2915 << " to avoid outpacing merges (max_outpace_merges "
2916 << max_outpace_merges
<< ")" << dendl
;
2920 if (next
!= p
.get_pgp_num()) {
2921 dout(10) << "pool " << i
.first
2922 << " pgp_num_target " << p
.get_pgp_num_target()
2923 << " pgp_num " << p
.get_pgp_num()
2924 << " -> " << next
<< dendl
;
2925 pgp_num_to_set
[osdmap
.get_pool_name(i
.first
)] = next
;
2934 for (auto i
: pg_num_to_set
) {
2937 "\"prefix\": \"osd pool set\", "
2938 "\"pool\": \"" + i
.first
+ "\", "
2939 "\"var\": \"pg_num_actual\", "
2940 "\"val\": \"" + stringify(i
.second
) + "\""
2942 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2944 for (auto i
: pgp_num_to_set
) {
2947 "\"prefix\": \"osd pool set\", "
2948 "\"pool\": \"" + i
.first
+ "\", "
2949 "\"var\": \"pgp_num_actual\", "
2950 "\"val\": \"" + stringify(i
.second
) + "\""
2952 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2954 for (auto pg
: upmaps_to_clear
) {
2957 "\"prefix\": \"osd rm-pg-upmap\", "
2958 "\"pgid\": \"" + stringify(pg
) + "\""
2960 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2963 "\"prefix\": \"osd rm-pg-upmap-items\", "
2964 "\"pgid\": \"" + stringify(pg
) + "\"" +
2966 monc
->start_mon_command({cmd2
}, {}, nullptr, nullptr, nullptr);
2970 void DaemonServer::got_service_map()
2972 std::lock_guard
l(lock
);
2974 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
2975 if (pending_service_map
.epoch
== 0) {
2976 // we just started up
2977 dout(10) << "got initial map e" << service_map
.epoch
<< dendl
;
2978 ceph_assert(pending_service_map_dirty
== 0);
2979 pending_service_map
= service_map
;
2980 pending_service_map
.epoch
= service_map
.epoch
+ 1;
2981 } else if (pending_service_map
.epoch
<= service_map
.epoch
) {
2982 // we just started up but got one more not our own map
2983 dout(10) << "got newer initial map e" << service_map
.epoch
<< dendl
;
2984 ceph_assert(pending_service_map_dirty
== 0);
2985 pending_service_map
= service_map
;
2986 pending_service_map
.epoch
= service_map
.epoch
+ 1;
2988 // we already active and therefore must have persisted it,
2989 // which means ours is the same or newer.
2990 dout(10) << "got updated map e" << service_map
.epoch
<< dendl
;
2994 // cull missing daemons, populate new ones
2995 std::set
<std::string
> types
;
2996 for (auto& [type
, service
] : pending_service_map
.services
) {
2997 if (ServiceMap::is_normal_ceph_entity(type
)) {
3003 std::set
<std::string
> names
;
3004 for (auto& q
: service
.daemons
) {
3005 names
.insert(q
.first
);
3006 DaemonKey key
{type
, q
.first
};
3007 if (!daemon_state
.exists(key
)) {
3008 auto daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
3010 daemon
->set_metadata(q
.second
.metadata
);
3011 daemon
->service_daemon
= true;
3012 daemon_state
.insert(daemon
);
3013 dout(10) << "added missing " << key
<< dendl
;
3016 daemon_state
.cull(type
, names
);
3018 daemon_state
.cull_services(types
);
3021 void DaemonServer::got_mgr_map()
3023 std::lock_guard
l(lock
);
3024 set
<std::string
> have
;
3025 cluster_state
.with_mgrmap([&](const MgrMap
& mgrmap
) {
3026 auto md_update
= [&] (DaemonKey key
) {
3027 std::ostringstream oss
;
3028 auto c
= new MetadataUpdate(daemon_state
, key
);
3029 // FIXME remove post-nautilus: include 'id' for luminous mons
3030 oss
<< "{\"prefix\": \"mgr metadata\", \"who\": \""
3031 << key
.name
<< "\", \"id\": \"" << key
.name
<< "\"}";
3032 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
3034 if (mgrmap
.active_name
.size()) {
3035 DaemonKey key
{"mgr", mgrmap
.active_name
};
3036 have
.insert(mgrmap
.active_name
);
3037 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
3039 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
3042 for (auto& i
: mgrmap
.standbys
) {
3043 DaemonKey key
{"mgr", i
.second
.name
};
3044 have
.insert(i
.second
.name
);
3045 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
3047 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
3051 daemon_state
.cull("mgr", have
);
3054 const char** DaemonServer::get_tracked_conf_keys() const
3056 static const char *KEYS
[] = {
3057 "mgr_stats_threshold",
3065 void DaemonServer::handle_conf_change(const ConfigProxy
& conf
,
3066 const std::set
<std::string
> &changed
)
3069 if (changed
.count("mgr_stats_threshold") || changed
.count("mgr_stats_period")) {
3070 dout(4) << "Updating stats threshold/period on "
3071 << daemon_connections
.size() << " clients" << dendl
;
3072 // Send a fresh MMgrConfigure to all clients, so that they can follow
3073 // the new policy for transmitting stats
3074 finisher
.queue(new LambdaContext([this](int r
) {
3075 std::lock_guard
l(lock
);
3076 for (auto &c
: daemon_connections
) {
3083 void DaemonServer::_send_configure(ConnectionRef c
)
3085 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
3087 auto configure
= make_message
<MMgrConfigure
>();
3088 configure
->stats_period
= g_conf().get_val
<int64_t>("mgr_stats_period");
3089 configure
->stats_threshold
= g_conf().get_val
<int64_t>("mgr_stats_threshold");
3091 if (c
->peer_is_osd()) {
3092 configure
->osd_perf_metric_queries
=
3093 osd_perf_metric_collector
.get_queries();
3094 } else if (c
->peer_is_mds()) {
3095 configure
->metric_config_message
=
3096 MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector
.get_queries()));
3099 c
->send_message2(configure
);
3102 MetricQueryID
DaemonServer::add_osd_perf_query(
3103 const OSDPerfMetricQuery
&query
,
3104 const std::optional
<OSDPerfMetricLimit
> &limit
)
3106 return osd_perf_metric_collector
.add_query(query
, limit
);
3109 int DaemonServer::remove_osd_perf_query(MetricQueryID query_id
)
3111 return osd_perf_metric_collector
.remove_query(query_id
);
3114 int DaemonServer::get_osd_perf_counters(OSDPerfCollector
*collector
)
3116 return osd_perf_metric_collector
.get_counters(collector
);
3119 MetricQueryID
DaemonServer::add_mds_perf_query(
3120 const MDSPerfMetricQuery
&query
,
3121 const std::optional
<MDSPerfMetricLimit
> &limit
)
3123 return mds_perf_metric_collector
.add_query(query
, limit
);
3126 int DaemonServer::remove_mds_perf_query(MetricQueryID query_id
)
3128 return mds_perf_metric_collector
.remove_query(query_id
);
3131 void DaemonServer::reregister_mds_perf_queries()
3133 mds_perf_metric_collector
.reregister_queries();
3136 int DaemonServer::get_mds_perf_counters(MDSPerfCollector
*collector
)
3138 return mds_perf_metric_collector
.get_counters(collector
);