1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
15 #include <boost/algorithm/string.hpp>
18 #include "include/stringify.h"
19 #include "include/str_list.h"
20 #include "auth/RotatingKeyRing.h"
21 #include "json_spirit/json_spirit_writer.h"
23 #include "mgr/mgr_commands.h"
24 #include "mgr/DaemonHealthMetricCollector.h"
25 #include "mgr/OSDPerfMetricCollector.h"
26 #include "mgr/MDSPerfMetricCollector.h"
27 #include "mon/MonCommand.h"
29 #include "messages/MMgrOpen.h"
30 #include "messages/MMgrUpdate.h"
31 #include "messages/MMgrClose.h"
32 #include "messages/MMgrConfigure.h"
33 #include "messages/MMonMgrReport.h"
34 #include "messages/MCommand.h"
35 #include "messages/MCommandReply.h"
36 #include "messages/MMgrCommand.h"
37 #include "messages/MMgrCommandReply.h"
38 #include "messages/MPGStats.h"
39 #include "messages/MOSDScrub.h"
40 #include "messages/MOSDScrub2.h"
41 #include "messages/MOSDForceRecovery.h"
42 #include "common/errno.h"
43 #include "common/pick_address.h"
45 #define dout_context g_ceph_context
46 #define dout_subsys ceph_subsys_mgr
48 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
50 using namespace TOPNSPC::common
;
53 using std::ostringstream
;
55 using std::stringstream
;
57 using std::unique_ptr
;
60 template <typename Map
>
61 bool map_compare(Map
const &lhs
, Map
const &rhs
) {
62 return lhs
.size() == rhs
.size()
63 && std::equal(lhs
.begin(), lhs
.end(), rhs
.begin(),
64 [] (auto a
, auto b
) { return a
.first
== b
.first
&& a
.second
== b
.second
; });
68 DaemonServer::DaemonServer(MonClient
*monc_
,
70 DaemonStateIndex
&daemon_state_
,
71 ClusterState
&cluster_state_
,
72 PyModuleRegistry
&py_modules_
,
74 LogChannelRef audit_clog_
)
75 : Dispatcher(g_ceph_context
),
76 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
77 g_conf().get_val
<Option::size_t>("mgr_client_bytes"))),
78 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
79 g_conf().get_val
<uint64_t>("mgr_client_messages"))),
80 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
81 g_conf().get_val
<Option::size_t>("mgr_osd_bytes"))),
82 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
83 g_conf().get_val
<uint64_t>("mgr_osd_messages"))),
84 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
85 g_conf().get_val
<Option::size_t>("mgr_mds_bytes"))),
86 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
87 g_conf().get_val
<uint64_t>("mgr_mds_messages"))),
88 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
89 g_conf().get_val
<Option::size_t>("mgr_mon_bytes"))),
90 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
91 g_conf().get_val
<uint64_t>("mgr_mon_messages"))),
95 daemon_state(daemon_state_
),
96 cluster_state(cluster_state_
),
97 py_modules(py_modules_
),
99 audit_clog(audit_clog_
),
101 timer(g_ceph_context
, lock
),
102 shutting_down(false),
104 osd_perf_metric_collector_listener(this),
105 osd_perf_metric_collector(osd_perf_metric_collector_listener
),
106 mds_perf_metric_collector_listener(this),
107 mds_perf_metric_collector(mds_perf_metric_collector_listener
)
109 g_conf().add_observer(this);
112 DaemonServer::~DaemonServer() {
114 g_conf().remove_observer(this);
117 int DaemonServer::init(uint64_t gid
, entity_addrvec_t client_addrs
)
119 // Initialize Messenger
120 std::string public_msgr_type
= g_conf()->ms_public_type
.empty() ?
121 g_conf().get_val
<std::string
>("ms_type") : g_conf()->ms_public_type
;
122 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
123 entity_name_t::MGR(gid
),
125 Messenger::get_pid_nonce());
126 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
128 msgr
->set_auth_client(monc
);
131 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
132 client_byte_throttler
.get(),
133 client_msg_throttler
.get());
136 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
137 osd_byte_throttler
.get(),
138 osd_msg_throttler
.get());
139 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
140 mds_byte_throttler
.get(),
141 mds_msg_throttler
.get());
142 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
143 mon_byte_throttler
.get(),
144 mon_msg_throttler
.get());
146 entity_addrvec_t addrs
;
147 int r
= pick_addresses(cct
, CEPH_PICK_ADDRESS_PUBLIC
, &addrs
);
151 dout(20) << __func__
<< " will bind to " << addrs
<< dendl
;
152 r
= msgr
->bindv(addrs
);
154 derr
<< "unable to bind mgr to " << addrs
<< dendl
;
158 msgr
->set_myname(entity_name_t::MGR(gid
));
159 msgr
->set_addr_unknowns(client_addrs
);
162 msgr
->add_dispatcher_tail(this);
164 msgr
->set_auth_server(monc
);
165 monc
->set_handle_authentication_dispatcher(this);
167 started_at
= ceph_clock_now();
169 std::lock_guard
l(lock
);
172 schedule_tick_locked(
173 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
178 entity_addrvec_t
DaemonServer::get_myaddrs() const
180 return msgr
->get_myaddrs();
183 int DaemonServer::ms_handle_authentication(Connection
*con
)
185 auto s
= ceph::make_ref
<MgrSession
>(cct
);
187 s
->inst
.addr
= con
->get_peer_addr();
188 s
->entity_name
= con
->peer_name
;
189 dout(10) << __func__
<< " new session " << s
<< " con " << con
190 << " entity " << con
->peer_name
191 << " addr " << con
->get_peer_addrs()
194 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
195 if (caps_info
.allow_all
) {
196 dout(10) << " session " << s
<< " " << s
->entity_name
197 << " allow_all" << dendl
;
198 s
->caps
.set_allow_all();
199 } else if (caps_info
.caps
.length() > 0) {
200 auto p
= caps_info
.caps
.cbegin();
205 catch (buffer::error
& e
) {
206 dout(10) << " session " << s
<< " " << s
->entity_name
207 << " failed to decode caps" << dendl
;
210 if (!s
->caps
.parse(str
)) {
211 dout(10) << " session " << s
<< " " << s
->entity_name
212 << " failed to parse caps '" << str
<< "'" << dendl
;
215 dout(10) << " session " << s
<< " " << s
->entity_name
216 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
219 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
220 std::lock_guard
l(lock
);
221 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
222 dout(10) << "registering osd." << s
->osd_id
<< " session "
223 << s
<< " con " << con
<< dendl
;
224 osd_cons
[s
->osd_id
].insert(con
);
230 bool DaemonServer::ms_handle_reset(Connection
*con
)
232 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
233 auto priv
= con
->get_priv();
234 auto session
= static_cast<MgrSession
*>(priv
.get());
238 std::lock_guard
l(lock
);
239 dout(10) << "unregistering osd." << session
->osd_id
240 << " session " << session
<< " con " << con
<< dendl
;
241 osd_cons
[session
->osd_id
].erase(con
);
243 auto iter
= daemon_connections
.find(con
);
244 if (iter
!= daemon_connections
.end()) {
245 daemon_connections
.erase(iter
);
251 bool DaemonServer::ms_handle_refused(Connection
*con
)
253 // do nothing for now
257 bool DaemonServer::ms_dispatch2(const ref_t
<Message
>& m
)
259 // Note that we do *not* take ::lock here, in order to avoid
260 // serializing all message handling. It's up to each handler
261 // to take whatever locks it needs.
262 switch (m
->get_type()) {
264 cluster_state
.ingest_pgstats(ref_cast
<MPGStats
>(m
));
265 maybe_ready(m
->get_source().num());
268 return handle_report(ref_cast
<MMgrReport
>(m
));
270 return handle_open(ref_cast
<MMgrOpen
>(m
));
272 return handle_update(ref_cast
<MMgrUpdate
>(m
));
274 return handle_close(ref_cast
<MMgrClose
>(m
));
276 return handle_command(ref_cast
<MCommand
>(m
));
277 case MSG_MGR_COMMAND
:
278 return handle_command(ref_cast
<MMgrCommand
>(m
));
280 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
285 void DaemonServer::dump_pg_ready(ceph::Formatter
*f
)
287 f
->dump_bool("pg_ready", pgmap_ready
.load());
290 void DaemonServer::maybe_ready(int32_t osd_id
)
292 if (pgmap_ready
.load()) {
293 // Fast path: we don't need to take lock because pgmap_ready
296 std::lock_guard
l(lock
);
298 if (reported_osds
.find(osd_id
) == reported_osds
.end()) {
299 dout(4) << "initial report from osd " << osd_id
<< dendl
;
300 reported_osds
.insert(osd_id
);
301 std::set
<int32_t> up_osds
;
303 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
304 osdmap
.get_up_osds(up_osds
);
307 std::set
<int32_t> unreported_osds
;
308 std::set_difference(up_osds
.begin(), up_osds
.end(),
309 reported_osds
.begin(), reported_osds
.end(),
310 std::inserter(unreported_osds
, unreported_osds
.begin()));
312 if (unreported_osds
.size() == 0) {
313 dout(4) << "all osds have reported, sending PG state to mon" << dendl
;
315 reported_osds
.clear();
316 // Avoid waiting for next tick
319 dout(4) << "still waiting for " << unreported_osds
.size() << " osds"
320 " to report in before PGMap is ready" << dendl
;
326 void DaemonServer::tick()
332 schedule_tick_locked(
333 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
336 // Currently modules do not set health checks in response to events delivered to
337 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
338 // if this pattern emerges in the future, this scheduler could be modified to
339 // fire after all modules have had a chance to set their health checks.
340 void DaemonServer::schedule_tick_locked(double delay_sec
)
342 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
345 timer
.cancel_event(tick_event
);
346 tick_event
= nullptr;
349 // on shutdown start rejecting explicit requests to send reports that may
350 // originate from python land which may still be running.
354 tick_event
= timer
.add_event_after(delay_sec
,
355 new LambdaContext([this](int r
) {
360 void DaemonServer::schedule_tick(double delay_sec
)
362 std::lock_guard
l(lock
);
363 schedule_tick_locked(delay_sec
);
366 void DaemonServer::handle_osd_perf_metric_query_updated()
370 // Send a fresh MMgrConfigure to all clients, so that they can follow
371 // the new policy for transmitting stats
372 finisher
.queue(new LambdaContext([this](int r
) {
373 std::lock_guard
l(lock
);
374 for (auto &c
: daemon_connections
) {
375 if (c
->peer_is_osd()) {
382 void DaemonServer::handle_mds_perf_metric_query_updated()
386 // Send a fresh MMgrConfigure to all clients, so that they can follow
387 // the new policy for transmitting stats
388 finisher
.queue(new LambdaContext([this](int r
) {
389 std::lock_guard
l(lock
);
390 for (auto &c
: daemon_connections
) {
391 if (c
->peer_is_mds()) {
398 void DaemonServer::shutdown()
400 dout(10) << "begin" << dendl
;
403 cluster_state
.shutdown();
404 dout(10) << "done" << dendl
;
406 std::lock_guard
l(lock
);
407 shutting_down
= true;
411 static DaemonKey
key_from_service(
412 const std::string
& service_name
,
414 const std::string
& daemon_name
)
416 if (!service_name
.empty()) {
417 return DaemonKey
{service_name
, daemon_name
};
419 return DaemonKey
{ceph_entity_type_name(peer_type
), daemon_name
};
423 void DaemonServer::fetch_missing_metadata(const DaemonKey
& key
,
424 const entity_addr_t
& addr
)
426 if (!daemon_state
.is_updating(key
) &&
427 (key
.type
== "osd" || key
.type
== "mds" || key
.type
== "mon")) {
428 std::ostringstream oss
;
429 auto c
= new MetadataUpdate(daemon_state
, key
);
430 if (key
.type
== "osd") {
431 oss
<< "{\"prefix\": \"osd metadata\", \"id\": "
433 } else if (key
.type
== "mds") {
434 c
->set_default("addr", stringify(addr
));
435 oss
<< "{\"prefix\": \"mds metadata\", \"who\": \""
436 << key
.name
<< "\"}";
437 } else if (key
.type
== "mon") {
438 oss
<< "{\"prefix\": \"mon metadata\", \"id\": \""
439 << key
.name
<< "\"}";
443 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
447 bool DaemonServer::handle_open(const ref_t
<MMgrOpen
>& m
)
449 std::unique_lock
l(lock
);
451 DaemonKey key
= key_from_service(m
->service_name
,
452 m
->get_connection()->get_peer_type(),
455 auto con
= m
->get_connection();
456 dout(10) << "from " << key
<< " " << con
->get_peer_addr() << dendl
;
458 _send_configure(con
);
460 DaemonStatePtr daemon
;
461 if (daemon_state
.exists(key
)) {
462 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
463 daemon
= daemon_state
.get(key
);
466 if (m
->service_daemon
) {
467 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
468 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
470 daemon
->service_daemon
= true;
471 daemon_state
.insert(daemon
);
473 /* A normal Ceph daemon has connected but we are or should be waiting on
474 * metadata for it. Close the session so that it tries to reconnect.
476 dout(2) << "ignoring open from " << key
<< " " << con
->get_peer_addr()
477 << "; not ready for session (expect reconnect)" << dendl
;
480 fetch_missing_metadata(key
, m
->get_source_addr());
485 if (m
->service_daemon
) {
486 // update the metadata through the daemon state index to
487 // ensure it's kept up-to-date
488 daemon_state
.update_metadata(daemon
, m
->daemon_metadata
);
491 std::lock_guard
l(daemon
->lock
);
492 daemon
->perf_counters
.clear();
494 daemon
->service_daemon
= m
->service_daemon
;
495 if (m
->service_daemon
) {
496 daemon
->service_status
= m
->daemon_status
;
498 utime_t now
= ceph_clock_now();
499 auto [d
, added
] = pending_service_map
.get_daemon(m
->service_name
,
501 if (added
|| d
->gid
!= (uint64_t)m
->get_source().num()) {
502 dout(10) << "registering " << key
<< " in pending_service_map" << dendl
;
503 d
->gid
= m
->get_source().num();
504 d
->addr
= m
->get_source_addr();
505 d
->start_epoch
= pending_service_map
.epoch
;
506 d
->start_stamp
= now
;
507 d
->metadata
= m
->daemon_metadata
;
508 pending_service_map_dirty
= pending_service_map
.epoch
;
512 auto p
= m
->config_bl
.cbegin();
513 if (p
!= m
->config_bl
.end()) {
514 decode(daemon
->config
, p
);
515 decode(daemon
->ignored_mon_config
, p
);
516 dout(20) << " got config " << daemon
->config
517 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
519 daemon
->config_defaults_bl
= m
->config_defaults_bl
;
520 daemon
->config_defaults
.clear();
521 dout(20) << " got config_defaults_bl " << daemon
->config_defaults_bl
.length()
522 << " bytes" << dendl
;
525 if (con
->get_peer_type() != entity_name_t::TYPE_CLIENT
&&
526 m
->service_name
.empty())
528 // Store in set of the daemon/service connections, i.e. those
529 // connections that require an update in the event of stats
530 // configuration changes.
531 daemon_connections
.insert(con
);
537 bool DaemonServer::handle_update(const ref_t
<MMgrUpdate
>& m
)
540 if (!m
->service_name
.empty()) {
541 key
.type
= m
->service_name
;
543 key
.type
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
545 key
.name
= m
->daemon_name
;
547 dout(10) << "from " << m
->get_connection() << " " << key
<< dendl
;
549 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
550 m
->service_name
.empty()) {
551 // Clients should not be sending us update request
552 dout(10) << "rejecting update request from non-daemon client " << m
->daemon_name
554 clog
->warn() << "rejecting report from non-daemon client " << m
->daemon_name
555 << " at " << m
->get_connection()->get_peer_addrs();
556 m
->get_connection()->mark_down();
562 std::unique_lock
locker(lock
);
564 DaemonStatePtr daemon
;
565 // Look up the DaemonState
566 if (daemon_state
.exists(key
)) {
567 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
569 daemon
= daemon_state
.get(key
);
570 if (m
->need_metadata_update
&&
571 !m
->daemon_metadata
.empty()) {
572 daemon_state
.update_metadata(daemon
, m
->daemon_metadata
);
580 bool DaemonServer::handle_close(const ref_t
<MMgrClose
>& m
)
582 std::lock_guard
l(lock
);
584 DaemonKey key
= key_from_service(m
->service_name
,
585 m
->get_connection()->get_peer_type(),
587 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
589 if (daemon_state
.exists(key
)) {
590 DaemonStatePtr daemon
= daemon_state
.get(key
);
591 daemon_state
.rm(key
);
593 std::lock_guard
l(daemon
->lock
);
594 if (daemon
->service_daemon
) {
595 pending_service_map
.rm_daemon(m
->service_name
, m
->daemon_name
);
596 pending_service_map_dirty
= pending_service_map
.epoch
;
601 // send same message back as a reply
602 m
->get_connection()->send_message2(m
);
606 void DaemonServer::update_task_status(
608 const std::map
<std::string
,std::string
>& task_status
)
610 dout(10) << "got task status from " << key
<< dendl
;
612 [[maybe_unused
]] auto [daemon
, added
] =
613 pending_service_map
.get_daemon(key
.type
, key
.name
);
614 if (daemon
->task_status
!= task_status
) {
615 daemon
->task_status
= task_status
;
616 pending_service_map_dirty
= pending_service_map
.epoch
;
620 bool DaemonServer::handle_report(const ref_t
<MMgrReport
>& m
)
623 if (!m
->service_name
.empty()) {
624 key
.type
= m
->service_name
;
626 key
.type
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
628 key
.name
= m
->daemon_name
;
630 dout(10) << "from " << m
->get_connection() << " " << key
<< dendl
;
632 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
633 m
->service_name
.empty()) {
634 // Clients should not be sending us stats unless they are declaring
635 // themselves to be a daemon for some service.
636 dout(10) << "rejecting report from non-daemon client " << m
->daemon_name
638 clog
->warn() << "rejecting report from non-daemon client " << m
->daemon_name
639 << " at " << m
->get_connection()->get_peer_addrs();
640 m
->get_connection()->mark_down();
646 std::unique_lock
locker(lock
);
648 DaemonStatePtr daemon
;
649 // Look up the DaemonState
650 if (daemon_state
.exists(key
)) {
651 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
652 daemon
= daemon_state
.get(key
);
656 // we don't know the hostname at this stage, reject MMgrReport here.
657 dout(5) << "rejecting report from " << key
<< ", since we do not have its metadata now."
659 // issue metadata request in background
660 fetch_missing_metadata(key
, m
->get_source_addr());
665 auto priv
= m
->get_connection()->get_priv();
666 auto session
= static_cast<MgrSession
*>(priv
.get());
670 m
->get_connection()->mark_down();
672 dout(10) << "unregistering osd." << session
->osd_id
673 << " session " << session
<< " con " << m
->get_connection() << dendl
;
675 if (osd_cons
.find(session
->osd_id
) != osd_cons
.end()) {
676 osd_cons
[session
->osd_id
].erase(m
->get_connection());
679 auto iter
= daemon_connections
.find(m
->get_connection());
680 if (iter
!= daemon_connections
.end()) {
681 daemon_connections
.erase(iter
);
687 // Update the DaemonState
688 ceph_assert(daemon
!= nullptr);
690 std::lock_guard
l(daemon
->lock
);
691 auto &daemon_counters
= daemon
->perf_counters
;
692 daemon_counters
.update(*m
.get());
694 auto p
= m
->config_bl
.cbegin();
695 if (p
!= m
->config_bl
.end()) {
696 decode(daemon
->config
, p
);
697 decode(daemon
->ignored_mon_config
, p
);
698 dout(20) << " got config " << daemon
->config
699 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
702 utime_t now
= ceph_clock_now();
703 if (daemon
->service_daemon
) {
704 if (m
->daemon_status
) {
705 daemon
->service_status_stamp
= now
;
706 daemon
->service_status
= *m
->daemon_status
;
708 daemon
->last_service_beacon
= now
;
709 } else if (m
->daemon_status
) {
710 derr
<< "got status from non-daemon " << key
<< dendl
;
712 // update task status
713 if (m
->task_status
) {
714 update_task_status(key
, *m
->task_status
);
715 daemon
->last_service_beacon
= now
;
717 if (m
->get_connection()->peer_is_osd() || m
->get_connection()->peer_is_mon()) {
718 // only OSD and MON send health_checks to me now
719 daemon
->daemon_health_metrics
= std::move(m
->daemon_health_metrics
);
720 dout(10) << "daemon_health_metrics " << daemon
->daemon_health_metrics
726 // if there are any schema updates, notify the python modules
727 /* no users currently
728 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
729 py_modules.notify_all("perf_schema_update", ceph::to_string(key));
733 if (m
->get_connection()->peer_is_osd()) {
734 osd_perf_metric_collector
.process_reports(m
->osd_perf_metric_reports
);
737 if (m
->metric_report_message
) {
738 const MetricReportMessage
&message
= *m
->metric_report_message
;
739 boost::apply_visitor(HandlePayloadVisitor(this), message
.payload
);
746 void DaemonServer::_generate_command_map(
748 map
<string
,string
> ¶m_str_map
)
750 for (auto p
= cmdmap
.begin();
751 p
!= cmdmap
.end(); ++p
) {
752 if (p
->first
== "prefix")
754 if (p
->first
== "caps") {
756 if (cmd_getval(cmdmap
, "caps", cv
) &&
757 cv
.size() % 2 == 0) {
758 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
759 string k
= string("caps_") + cv
[i
];
760 param_str_map
[k
] = cv
[i
+ 1];
765 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
769 const MonCommand
*DaemonServer::_get_mgrcommand(
770 const string
&cmd_prefix
,
771 const std::vector
<MonCommand
> &cmds
)
773 const MonCommand
*this_cmd
= nullptr;
774 for (const auto &cmd
: cmds
) {
775 if (cmd
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
783 bool DaemonServer::_allowed_command(
785 const string
&service
,
786 const string
&module
,
787 const string
&prefix
,
788 const cmdmap_t
& cmdmap
,
789 const map
<string
,string
>& param_str_map
,
790 const MonCommand
*this_cmd
) {
792 if (s
->entity_name
.is_mon()) {
793 // mon is all-powerful. even when it is forwarding commands on behalf of
794 // old clients; we expect the mon is validating commands before proxying!
798 bool cmd_r
= this_cmd
->requires_perm('r');
799 bool cmd_w
= this_cmd
->requires_perm('w');
800 bool cmd_x
= this_cmd
->requires_perm('x');
802 bool capable
= s
->caps
.is_capable(
805 service
, module
, prefix
, param_str_map
,
809 dout(10) << " " << s
->entity_name
<< " "
810 << (capable
? "" : "not ") << "capable" << dendl
;
815 * The working data for processing an MCommand. This lives in
816 * a class to enable passing it into other threads for processing
817 * outside of the thread/locks that called handle_command.
819 class CommandContext
{
821 ceph::ref_t
<MCommand
> m_tell
;
822 ceph::ref_t
<MMgrCommand
> m_mgr
;
823 const std::vector
<std::string
>& cmd
; ///< ref into m_tell or m_mgr
824 const bufferlist
& data
; ///< ref into m_tell or m_mgr
828 explicit CommandContext(ceph::ref_t
<MCommand
> m
)
829 : m_tell
{std::move(m
)},
831 data(m_tell
->get_data()) {
833 explicit CommandContext(ceph::ref_t
<MMgrCommand
> m
)
834 : m_mgr
{std::move(m
)},
836 data(m_mgr
->get_data()) {
839 void reply(int r
, const std::stringstream
&ss
) {
843 void reply(int r
, const std::string
&rs
) {
844 // Let the connection drop as soon as we've sent our response
845 ConnectionRef con
= m_tell
? m_tell
->get_connection()
846 : m_mgr
->get_connection();
848 con
->mark_disposable();
852 dout(20) << "success" << dendl
;
854 derr
<< __func__
<< " " << cpp_strerror(r
) << " " << rs
<< dendl
;
858 MCommandReply
*reply
= new MCommandReply(r
, rs
);
859 reply
->set_tid(m_tell
->get_tid());
860 reply
->set_data(odata
);
861 con
->send_message(reply
);
863 MMgrCommandReply
*reply
= new MMgrCommandReply(r
, rs
);
864 reply
->set_tid(m_mgr
->get_tid());
865 reply
->set_data(odata
);
866 con
->send_message(reply
);
873 * A context for receiving a bufferlist/error string from a background
874 * function and then calling back to a CommandContext when it's done
876 class ReplyOnFinish
: public Context
{
877 std::shared_ptr
<CommandContext
> cmdctx
;
883 explicit ReplyOnFinish(const std::shared_ptr
<CommandContext
> &cmdctx_
)
886 void finish(int r
) override
{
887 cmdctx
->odata
.claim_append(from_mon
);
888 cmdctx
->reply(r
, outs
);
892 bool DaemonServer::handle_command(const ref_t
<MCommand
>& m
)
894 std::lock_guard
l(lock
);
895 auto cmdctx
= std::make_shared
<CommandContext
>(m
);
897 return _handle_command(cmdctx
);
898 } catch (const bad_cmd_get
& e
) {
899 cmdctx
->reply(-EINVAL
, e
.what());
904 bool DaemonServer::handle_command(const ref_t
<MMgrCommand
>& m
)
906 std::lock_guard
l(lock
);
907 auto cmdctx
= std::make_shared
<CommandContext
>(m
);
909 return _handle_command(cmdctx
);
910 } catch (const bad_cmd_get
& e
) {
911 cmdctx
->reply(-EINVAL
, e
.what());
916 void DaemonServer::log_access_denied(
917 std::shared_ptr
<CommandContext
>& cmdctx
,
918 MgrSession
* session
, std::stringstream
& ss
) {
919 dout(1) << " access denied" << dendl
;
920 audit_clog
->info() << "from='" << session
->inst
<< "' "
921 << "entity='" << session
->entity_name
<< "' "
922 << "cmd=" << cmdctx
->cmd
<< ": access denied";
923 ss
<< "access denied: does your client key have mgr caps? "
924 "See http://docs.ceph.com/en/latest/mgr/administrator/"
925 "#client-authentication";
928 void DaemonServer::_check_offlines_pgs(
929 const set
<int>& osds
,
930 const OSDMap
& osdmap
,
932 offline_pg_report
*report
)
935 *report
= offline_pg_report();
938 for (const auto& q
: pgmap
.pg_stat
) {
939 set
<int32_t> pg_acting
; // net acting sets (with no missing if degraded)
941 if (q
.second
.state
== 0) {
942 report
->unknown
.insert(q
.first
);
945 if (q
.second
.state
& PG_STATE_DEGRADED
) {
946 for (auto& anm
: q
.second
.avail_no_missing
) {
947 if (osds
.count(anm
.osd
)) {
951 if (anm
.osd
!= CRUSH_ITEM_NONE
) {
952 pg_acting
.insert(anm
.osd
);
956 for (auto& a
: q
.second
.acting
) {
961 if (a
!= CRUSH_ITEM_NONE
) {
969 const pg_pool_t
*pi
= osdmap
.get_pg_pool(q
.first
.pool());
970 bool dangerous
= false;
972 report
->bad_no_pool
.insert(q
.first
); // pool is creating or deleting
975 if (!(q
.second
.state
& PG_STATE_ACTIVE
)) {
976 report
->bad_already_inactive
.insert(q
.first
);
979 if (pg_acting
.size() < pi
->min_size
) {
980 report
->bad_become_inactive
.insert(q
.first
);
984 report
->not_ok
.insert(q
.first
);
986 report
->ok
.insert(q
.first
);
987 if (q
.second
.state
& PG_STATE_DEGRADED
) {
988 report
->ok_become_more_degraded
.insert(q
.first
);
990 report
->ok_become_degraded
.insert(q
.first
);
994 dout(20) << osds
<< " -> " << report
->ok
.size() << " ok, "
995 << report
->not_ok
.size() << " not ok, "
996 << report
->unknown
.size() << " unknown"
1000 void DaemonServer::_maximize_ok_to_stop_set(
1001 const set
<int>& orig_osds
,
1003 const OSDMap
& osdmap
,
1005 offline_pg_report
*out_report
)
1007 dout(20) << "orig_osds " << orig_osds
<< " max " << max
<< dendl
;
1008 _check_offlines_pgs(orig_osds
, osdmap
, pgmap
, out_report
);
1009 if (!out_report
->ok_to_stop()) {
1012 if (orig_osds
.size() >= max
) {
1017 // semi-arbitrarily start with the first osd in the set
1018 offline_pg_report report
;
1019 set
<int> osds
= orig_osds
;
1020 int parent
= *osds
.begin();
1024 // identify the next parent
1025 int r
= osdmap
.crush
->get_immediate_parent_id(parent
, &parent
);
1027 return; // just go with what we have so far!
1030 // get candidate additions that are beneath this point in the tree
1032 r
= osdmap
.crush
->get_all_children(parent
, &children
);
1034 return; // just go with what we have so far!
1036 dout(20) << " parent " << parent
<< " children " << children
<< dendl
;
1038 // try adding in more osds
1039 int failed
= 0; // how many children we failed to add to our set
1040 for (auto o
: children
) {
1041 if (o
>= 0 && osdmap
.is_up(o
) && osds
.count(o
) == 0) {
1043 _check_offlines_pgs(osds
, osdmap
, pgmap
, &report
);
1044 if (!report
.ok_to_stop()) {
1049 *out_report
= report
;
1050 if (osds
.size() == max
) {
1051 dout(20) << " hit max" << dendl
;
1052 return; // yay, we hit the max
1058 // we hit some failures; go with what we have
1059 dout(20) << " hit some peer failures" << dendl
;
1065 bool DaemonServer::_handle_command(
1066 std::shared_ptr
<CommandContext
>& cmdctx
)
1069 bool admin_socket_cmd
= false;
1070 if (cmdctx
->m_tell
) {
1072 // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
1074 admin_socket_cmd
= (cmdctx
->m_tell
->fsid
!= uuid_d());
1078 auto priv
= m
->get_connection()->get_priv();
1079 auto session
= static_cast<MgrSession
*>(priv
.get());
1083 if (session
->inst
.name
== entity_name_t()) {
1084 session
->inst
.name
= m
->get_source();
1087 map
<string
,string
> param_str_map
;
1088 std::stringstream ss
;
1091 if (!cmdmap_from_json(cmdctx
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
1092 cmdctx
->reply(-EINVAL
, ss
);
1097 cmd_getval(cmdctx
->cmdmap
, "prefix", prefix
);
1098 dout(10) << "decoded-size=" << cmdctx
->cmdmap
.size() << " prefix=" << prefix
<< dendl
;
1100 boost::scoped_ptr
<Formatter
> f
;
1103 if (boost::algorithm::ends_with(prefix
, "_json")) {
1106 format
= cmd_getval_or
<string
>(cmdctx
->cmdmap
, "format", "plain");
1108 f
.reset(Formatter::create(format
));
1111 // this is just for mgr commands - admin socket commands will fall
1112 // through and use the admin socket version of
1113 // get_command_descriptions
1114 if (prefix
== "get_command_descriptions" && !admin_socket_cmd
) {
1115 dout(10) << "reading commands from python modules" << dendl
;
1116 const auto py_commands
= py_modules
.get_commands();
1120 f
.open_object_section("command_descriptions");
1122 auto dump_cmd
= [&cmdnum
, &f
, m
](const MonCommand
&mc
){
1123 ostringstream secname
;
1124 secname
<< "cmd" << std::setfill('0') << std::setw(3) << cmdnum
;
1125 dump_cmddesc_to_json(&f
, m
->get_connection()->get_features(),
1126 secname
.str(), mc
.cmdstring
, mc
.helpstring
,
1127 mc
.module
, mc
.req_perms
, 0);
1131 for (const auto &pyc
: py_commands
) {
1135 for (const auto &mgr_cmd
: mgr_commands
) {
1139 f
.close_section(); // command_descriptions
1140 f
.flush(cmdctx
->odata
);
1141 cmdctx
->reply(0, ss
);
1146 const MonCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
);
1147 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
1149 bool is_allowed
= false;
1150 ModuleCommand py_command
;
1151 if (admin_socket_cmd
) {
1152 // admin socket commands require all capabilities
1153 is_allowed
= session
->caps
.is_allow_all();
1154 } else if (!mgr_cmd
) {
1155 // Resolve the command to the name of the module that will
1156 // handle it (if the command exists)
1157 auto py_commands
= py_modules
.get_py_commands();
1158 for (const auto &pyc
: py_commands
) {
1159 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
1160 if (pyc_prefix
== prefix
) {
1166 MonCommand pyc
= {"", "", "py", py_command
.perm
};
1167 is_allowed
= _allowed_command(session
, "py", py_command
.module_name
,
1168 prefix
, cmdctx
->cmdmap
, param_str_map
,
1171 // validate user's permissions for requested command
1172 is_allowed
= _allowed_command(session
, mgr_cmd
->module
, "",
1173 prefix
, cmdctx
->cmdmap
, param_str_map
, mgr_cmd
);
1177 log_access_denied(cmdctx
, session
, ss
);
1178 cmdctx
->reply(-EACCES
, ss
);
1183 << "from='" << session
->inst
<< "' "
1184 << "entity='" << session
->entity_name
<< "' "
1185 << "cmd=" << cmdctx
->cmd
<< ": dispatch";
1187 if (admin_socket_cmd
) {
1188 cct
->get_admin_socket()->queue_tell_command(cmdctx
->m_tell
);
1193 // service map commands
1194 if (prefix
== "service dump") {
1196 f
.reset(Formatter::create("json-pretty"));
1197 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
1198 f
->dump_object("service_map", service_map
);
1200 f
->flush(cmdctx
->odata
);
1201 cmdctx
->reply(0, ss
);
1204 if (prefix
== "service status") {
1206 f
.reset(Formatter::create("json-pretty"));
1207 // only include state from services that are in the persisted service map
1208 f
->open_object_section("service_status");
1209 for (auto& [type
, service
] : pending_service_map
.services
) {
1210 if (ServiceMap::is_normal_ceph_entity(type
)) {
1214 f
->open_object_section(type
.c_str());
1215 for (auto& q
: service
.daemons
) {
1216 f
->open_object_section(q
.first
.c_str());
1217 DaemonKey key
{type
, q
.first
};
1218 ceph_assert(daemon_state
.exists(key
));
1219 auto daemon
= daemon_state
.get(key
);
1220 std::lock_guard
l(daemon
->lock
);
1221 f
->dump_stream("status_stamp") << daemon
->service_status_stamp
;
1222 f
->dump_stream("last_beacon") << daemon
->last_service_beacon
;
1223 f
->open_object_section("status");
1224 for (auto& r
: daemon
->service_status
) {
1225 f
->dump_string(r
.first
.c_str(), r
.second
);
1233 f
->flush(cmdctx
->odata
);
1234 cmdctx
->reply(0, ss
);
1238 if (prefix
== "config set") {
1241 cmd_getval(cmdctx
->cmdmap
, "key", key
);
1242 cmd_getval(cmdctx
->cmdmap
, "value", val
);
1243 r
= cct
->_conf
.set_val(key
, val
, &ss
);
1245 cct
->_conf
.apply_changes(nullptr);
1247 cmdctx
->reply(0, ss
);
1254 if (prefix
== "pg scrub" ||
1255 prefix
== "pg repair" ||
1256 prefix
== "pg deep-scrub") {
1257 string scrubop
= prefix
.substr(3, string::npos
);
1261 cmd_getval(cmdctx
->cmdmap
, "pgid", pgidstr
);
1262 if (!pgid
.parse(pgidstr
.c_str())) {
1263 ss
<< "invalid pgid '" << pgidstr
<< "'";
1264 cmdctx
->reply(-EINVAL
, ss
);
1267 bool pg_exists
= false;
1268 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1269 pg_exists
= osdmap
.pg_exists(pgid
);
1272 ss
<< "pg " << pgid
<< " does not exist";
1273 cmdctx
->reply(-ENOENT
, ss
);
1276 int acting_primary
= -1;
1278 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1279 epoch
= osdmap
.get_epoch();
1280 osdmap
.get_primary_shard(pgid
, &acting_primary
, &spgid
);
1282 if (acting_primary
== -1) {
1283 ss
<< "pg " << pgid
<< " has no primary osd";
1284 cmdctx
->reply(-EAGAIN
, ss
);
1287 auto p
= osd_cons
.find(acting_primary
);
1288 if (p
== osd_cons
.end()) {
1289 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
1290 << " is not currently connected";
1291 cmdctx
->reply(-EAGAIN
, ss
);
1294 for (auto& con
: p
->second
) {
1295 assert(HAVE_FEATURE(con
->get_features(), SERVER_OCTOPUS
));
1296 vector
<spg_t
> pgs
= { spgid
};
1297 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1300 scrubop
== "repair",
1301 scrubop
== "deep-scrub"));
1303 ss
<< "instructing pg " << spgid
<< " on osd." << acting_primary
1304 << " to " << scrubop
;
1305 cmdctx
->reply(0, ss
);
1307 } else if (prefix
== "osd scrub" ||
1308 prefix
== "osd deep-scrub" ||
1309 prefix
== "osd repair") {
1311 cmd_getval(cmdctx
->cmdmap
, "who", whostr
);
1312 vector
<string
> pvec
;
1313 get_str_vec(prefix
, pvec
);
1316 if (whostr
== "*" || whostr
== "all" || whostr
== "any") {
1317 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1318 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
1319 if (osdmap
.is_up(i
)) {
1324 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
1326 ss
<< "invalid osd '" << whostr
<< "'";
1327 cmdctx
->reply(-EINVAL
, ss
);
1330 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1331 if (osdmap
.is_up(osd
)) {
1336 ss
<< "osd." << osd
<< " is not up";
1337 cmdctx
->reply(-EAGAIN
, ss
);
1341 set
<int> sent_osds
, failed_osds
;
1342 for (auto osd
: osds
) {
1345 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1346 epoch
= osdmap
.get_epoch();
1347 auto p
= pgmap
.pg_by_osd
.find(osd
);
1348 if (p
!= pgmap
.pg_by_osd
.end()) {
1349 for (auto pgid
: p
->second
) {
1352 osdmap
.get_primary_shard(pgid
, &primary
, &spg
);
1353 if (primary
== osd
) {
1354 spgs
.push_back(spg
);
1359 auto p
= osd_cons
.find(osd
);
1360 if (p
== osd_cons
.end()) {
1361 failed_osds
.insert(osd
);
1363 sent_osds
.insert(osd
);
1364 for (auto& con
: p
->second
) {
1365 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
1366 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1369 pvec
.back() == "repair",
1370 pvec
.back() == "deep-scrub"));
1372 con
->send_message(new MOSDScrub(monc
->get_fsid(),
1373 pvec
.back() == "repair",
1374 pvec
.back() == "deep-scrub"));
1379 if (failed_osds
.size() == osds
.size()) {
1380 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
1381 << " (not connected)";
1384 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
1385 if (!failed_osds
.empty()) {
1386 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
1390 cmdctx
->reply(0, ss
);
1392 } else if (prefix
== "osd pool scrub" ||
1393 prefix
== "osd pool deep-scrub" ||
1394 prefix
== "osd pool repair") {
1395 vector
<string
> pool_names
;
1396 cmd_getval(cmdctx
->cmdmap
, "who", pool_names
);
1397 if (pool_names
.empty()) {
1398 ss
<< "must specify one or more pool names";
1399 cmdctx
->reply(-EINVAL
, ss
);
1403 map
<int32_t, vector
<pg_t
>> pgs_by_primary
; // legacy
1404 map
<int32_t, vector
<spg_t
>> spgs_by_primary
;
1405 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1406 epoch
= osdmap
.get_epoch();
1407 for (auto& pool_name
: pool_names
) {
1408 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1410 ss
<< "unrecognized pool '" << pool_name
<< "'";
1414 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1415 for (int i
= 0; i
< pool_pg_num
; i
++) {
1416 pg_t
pg(i
, pool_id
);
1419 auto got
= osdmap
.get_primary_shard(pg
, &primary
, &spg
);
1422 pgs_by_primary
[primary
].push_back(pg
);
1423 spgs_by_primary
[primary
].push_back(spg
);
1428 cmdctx
->reply(r
, ss
);
1431 for (auto& it
: spgs_by_primary
) {
1432 auto primary
= it
.first
;
1433 auto p
= osd_cons
.find(primary
);
1434 if (p
== osd_cons
.end()) {
1435 ss
<< "osd." << primary
<< " is not currently connected";
1436 cmdctx
->reply(-EAGAIN
, ss
);
1439 for (auto& con
: p
->second
) {
1440 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
1441 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1444 prefix
== "osd pool repair",
1445 prefix
== "osd pool deep-scrub"));
1448 auto q
= pgs_by_primary
.find(primary
);
1449 ceph_assert(q
!= pgs_by_primary
.end());
1450 con
->send_message(new MOSDScrub(monc
->get_fsid(),
1452 prefix
== "osd pool repair",
1453 prefix
== "osd pool deep-scrub"));
1457 cmdctx
->reply(0, "");
1459 } else if (prefix
== "osd reweight-by-pg" ||
1460 prefix
== "osd reweight-by-utilization" ||
1461 prefix
== "osd test-reweight-by-pg" ||
1462 prefix
== "osd test-reweight-by-utilization") {
1464 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
1466 prefix
== "osd test-reweight-by-pg" ||
1467 prefix
== "osd test-reweight-by-utilization";
1468 int64_t oload
= cmd_getval_or
<int64_t>(cmdctx
->cmdmap
, "oload", 120);
1470 vector
<string
> poolnames
;
1471 cmd_getval(cmdctx
->cmdmap
, "pools", poolnames
);
1472 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1473 for (const auto& poolname
: poolnames
) {
1474 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
1476 ss
<< "pool '" << poolname
<< "' does not exist";
1483 cmdctx
->reply(r
, ss
);
1487 double max_change
= g_conf().get_val
<double>("mon_reweight_max_change");
1488 cmd_getval(cmdctx
->cmdmap
, "max_change", max_change
);
1489 if (max_change
<= 0.0) {
1490 ss
<< "max_change " << max_change
<< " must be positive";
1491 cmdctx
->reply(-EINVAL
, ss
);
1494 int64_t max_osds
= g_conf().get_val
<int64_t>("mon_reweight_max_osds");
1495 cmd_getval(cmdctx
->cmdmap
, "max_osds", max_osds
);
1496 if (max_osds
<= 0) {
1497 ss
<< "max_osds " << max_osds
<< " must be positive";
1498 cmdctx
->reply(-EINVAL
, ss
);
1501 bool no_increasing
= false;
1502 cmd_getval_compat_cephbool(cmdctx
->cmdmap
, "no_increasing", no_increasing
);
1504 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
1505 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
&osdmap
, const PGMap
& pgmap
) {
1506 return reweight::by_utilization(osdmap
, pgmap
,
1511 pools
.empty() ? NULL
: &pools
,
1514 &ss
, &out_str
, f
.get());
1517 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
1520 f
->flush(cmdctx
->odata
);
1522 cmdctx
->odata
.append(out_str
);
1525 ss
<< "FAILED reweight-by-pg";
1526 cmdctx
->reply(r
, ss
);
1528 } else if (r
== 0 || dry_run
) {
1530 cmdctx
->reply(r
, ss
);
1533 json_spirit::Object json_object
;
1534 for (const auto& osd_weight
: new_weights
) {
1535 json_spirit::Config::add(json_object
,
1536 std::to_string(osd_weight
.first
),
1537 std::to_string(osd_weight
.second
));
1539 string s
= json_spirit::write(json_object
);
1540 std::replace(begin(s
), end(s
), '\"', '\'');
1543 "\"prefix\": \"osd reweightn\", "
1544 "\"weights\": \"" + s
+ "\""
1546 auto on_finish
= new ReplyOnFinish(cmdctx
);
1547 monc
->start_mon_command({cmd
}, {},
1548 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
1551 } else if (prefix
== "osd df") {
1552 string method
, filter
;
1553 cmd_getval(cmdctx
->cmdmap
, "output_method", method
);
1554 cmd_getval(cmdctx
->cmdmap
, "filter", filter
);
1556 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1557 // sanity check filter(s)
1558 if (!filter
.empty() &&
1559 osdmap
.lookup_pg_pool_name(filter
) < 0 &&
1560 !osdmap
.crush
->class_exists(filter
) &&
1561 !osdmap
.crush
->name_exists(filter
)) {
1562 rs
<< "'" << filter
<< "' not a pool, crush node or device class name";
1565 print_osd_utilization(osdmap
, pgmap
, ss
,
1566 f
.get(), method
== "tree", filter
);
1567 cmdctx
->odata
.append(ss
);
1570 cmdctx
->reply(r
, rs
);
1572 } else if (prefix
== "osd pool stats") {
1574 cmd_getval(cmdctx
->cmdmap
, "pool_name", pool_name
);
1575 int64_t poolid
= -ENOENT
;
1576 bool one_pool
= false;
1577 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1578 if (!pool_name
.empty()) {
1579 poolid
= osdmap
.lookup_pg_pool_name(pool_name
);
1581 ceph_assert(poolid
== -ENOENT
);
1582 ss
<< "unrecognized pool '" << pool_name
<< "'";
1589 f
->open_array_section("pool_stats");
1591 if (osdmap
.get_pools().empty()) {
1592 ss
<< "there are no pools!";
1596 for (auto &p
: osdmap
.get_pools()) {
1600 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, f
.get(), &rs
);
1608 f
->flush(cmdctx
->odata
);
1610 cmdctx
->odata
.append(rs
.str());
1614 if (r
!= -EOPNOTSUPP
) {
1615 cmdctx
->reply(r
, ss
);
1618 } else if (prefix
== "osd safe-to-destroy" ||
1619 prefix
== "osd destroy" ||
1620 prefix
== "osd purge") {
1623 if (prefix
== "osd safe-to-destroy") {
1625 cmd_getval(cmdctx
->cmdmap
, "ids", ids
);
1626 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1627 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1629 if (!r
&& osds
.empty()) {
1630 ss
<< "must specify one or more OSDs";
1635 if (!cmd_getval(cmdctx
->cmdmap
, "id", id
)) {
1637 ss
<< "must specify OSD id";
1643 cmdctx
->reply(r
, ss
);
1646 set
<int> active_osds
, missing_stats
, stored_pgs
, safe_to_destroy
;
1647 int affected_pgs
= 0;
1648 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1649 if (pg_map
.num_pg_unknown
> 0) {
1650 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; cannot draw"
1651 << " any conclusions";
1655 int num_active_clean
= 0;
1656 for (auto& p
: pg_map
.num_pg_by_state
) {
1657 unsigned want
= PG_STATE_ACTIVE
|PG_STATE_CLEAN
;
1658 if ((p
.first
& want
) == want
) {
1659 num_active_clean
+= p
.second
;
1662 for (auto osd
: osds
) {
1663 if (!osdmap
.exists(osd
)) {
1664 safe_to_destroy
.insert(osd
);
1665 continue; // clearly safe to destroy
1667 auto q
= pg_map
.num_pg_by_osd
.find(osd
);
1668 if (q
!= pg_map
.num_pg_by_osd
.end()) {
1669 if (q
->second
.acting
> 0 || q
->second
.up_not_acting
> 0) {
1670 active_osds
.insert(osd
);
1671 // XXX: For overlapping PGs, this counts them again
1672 affected_pgs
+= q
->second
.acting
+ q
->second
.up_not_acting
;
1676 if (num_active_clean
< pg_map
.num_pg
) {
1677 // all pgs aren't active+clean; we need to be careful.
1678 auto p
= pg_map
.osd_stat
.find(osd
);
1679 if (p
== pg_map
.osd_stat
.end() || !osdmap
.is_up(osd
)) {
1680 missing_stats
.insert(osd
);
1682 } else if (p
->second
.num_pgs
> 0) {
1683 stored_pgs
.insert(osd
);
1687 safe_to_destroy
.insert(osd
);
1690 if (r
&& prefix
== "osd safe-to-destroy") {
1691 cmdctx
->reply(r
, ss
); // regardless of formatter
1694 if (!r
&& (!active_osds
.empty() ||
1695 !missing_stats
.empty() || !stored_pgs
.empty())) {
1696 if (!safe_to_destroy
.empty()) {
1697 ss
<< "OSD(s) " << safe_to_destroy
1698 << " are safe to destroy without reducing data durability. ";
1700 if (!active_osds
.empty()) {
1701 ss
<< "OSD(s) " << active_osds
<< " have " << affected_pgs
1702 << " pgs currently mapped to them. ";
1704 if (!missing_stats
.empty()) {
1705 ss
<< "OSD(s) " << missing_stats
<< " have no reported stats, and not all"
1706 << " PGs are active+clean; we cannot draw any conclusions. ";
1708 if (!stored_pgs
.empty()) {
1709 ss
<< "OSD(s) " << stored_pgs
<< " last reported they still store some PG"
1710 << " data, and not all PGs are active+clean; we cannot be sure they"
1711 << " aren't still needed.";
1713 if (!active_osds
.empty() || !stored_pgs
.empty()) {
1720 if (prefix
== "osd safe-to-destroy") {
1722 ss
<< "OSD(s) " << osds
<< " are safe to destroy without reducing data"
1726 f
->open_object_section("osd_status");
1727 f
->open_array_section("safe_to_destroy");
1728 for (auto i
: safe_to_destroy
)
1729 f
->dump_int("osd", i
);
1731 f
->open_array_section("active");
1732 for (auto i
: active_osds
)
1733 f
->dump_int("osd", i
);
1735 f
->open_array_section("missing_stats");
1736 for (auto i
: missing_stats
)
1737 f
->dump_int("osd", i
);
1739 f
->open_array_section("stored_pgs");
1740 for (auto i
: stored_pgs
)
1741 f
->dump_int("osd", i
);
1743 f
->close_section(); // osd_status
1744 f
->flush(cmdctx
->odata
);
1746 std::stringstream().swap(ss
);
1748 cmdctx
->reply(r
, ss
);
1754 cmd_getval(cmdctx
->cmdmap
, "force", force
);
1757 cmd_getval(cmdctx
->cmdmap
, "yes_i_really_mean_it", force
);
1760 ss
<< "\nYou can proceed by passing --force, but be warned that"
1761 " this will likely mean real, permanent data loss.";
1767 cmdctx
->reply(r
, ss
);
1772 "\"prefix\": \"" + prefix
+ "-actual\", "
1773 "\"id\": " + stringify(osds
) + ", "
1774 "\"yes_i_really_mean_it\": true"
1776 auto on_finish
= new ReplyOnFinish(cmdctx
);
1777 monc
->start_mon_command({cmd
}, {}, nullptr, &on_finish
->outs
, on_finish
);
1779 } else if (prefix
== "osd ok-to-stop") {
1781 cmd_getval(cmdctx
->cmdmap
, "ids", ids
);
1784 cmd_getval(cmdctx
->cmdmap
, "max", max
);
1786 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1787 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1789 if (!r
&& osds
.empty()) {
1790 ss
<< "must specify one or more OSDs";
1793 if (max
< (int)osds
.size()) {
1797 cmdctx
->reply(r
, ss
);
1800 offline_pg_report out_report
;
1801 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1802 _maximize_ok_to_stop_set(
1803 osds
, max
, osdmap
, pg_map
,
1807 f
.reset(Formatter::create("json"));
1809 f
->dump_object("ok_to_stop", out_report
);
1810 f
->flush(cmdctx
->odata
);
1811 cmdctx
->odata
.append("\n");
1812 if (!out_report
.unknown
.empty()) {
1813 ss
<< out_report
.unknown
.size() << " pgs have unknown state; "
1814 << "cannot draw any conclusions";
1815 cmdctx
->reply(-EAGAIN
, ss
);
1817 if (!out_report
.ok_to_stop()) {
1818 ss
<< "unsafe to stop osd(s) at this time (" << out_report
.not_ok
.size() << " PGs are or would become offline)";
1819 cmdctx
->reply(-EBUSY
, ss
);
1821 cmdctx
->reply(0, ss
);
1824 } else if (prefix
== "pg force-recovery" ||
1825 prefix
== "pg force-backfill" ||
1826 prefix
== "pg cancel-force-recovery" ||
1827 prefix
== "pg cancel-force-backfill" ||
1828 prefix
== "osd pool force-recovery" ||
1829 prefix
== "osd pool force-backfill" ||
1830 prefix
== "osd pool cancel-force-recovery" ||
1831 prefix
== "osd pool cancel-force-backfill") {
1833 get_str_vec(prefix
, vs
);
1834 auto& granularity
= vs
.front();
1835 auto& forceop
= vs
.back();
1838 // figure out actual op just once
1840 if (forceop
== "force-recovery") {
1841 actual_op
= OFR_RECOVERY
;
1842 } else if (forceop
== "force-backfill") {
1843 actual_op
= OFR_BACKFILL
;
1844 } else if (forceop
== "cancel-force-backfill") {
1845 actual_op
= OFR_BACKFILL
| OFR_CANCEL
;
1846 } else if (forceop
== "cancel-force-recovery") {
1847 actual_op
= OFR_RECOVERY
| OFR_CANCEL
;
1850 set
<pg_t
> candidates
; // deduped
1851 if (granularity
== "pg") {
1852 // covnert pg names to pgs, discard any invalid ones while at it
1853 vector
<string
> pgids
;
1854 cmd_getval(cmdctx
->cmdmap
, "pgid", pgids
);
1855 for (auto& i
: pgids
) {
1857 if (!pgid
.parse(i
.c_str())) {
1858 ss
<< "invlaid pgid '" << i
<< "'; ";
1862 candidates
.insert(pgid
);
1866 vector
<string
> pool_names
;
1867 cmd_getval(cmdctx
->cmdmap
, "who", pool_names
);
1868 if (pool_names
.empty()) {
1869 ss
<< "must specify one or more pool names";
1870 cmdctx
->reply(-EINVAL
, ss
);
1873 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1874 for (auto& pool_name
: pool_names
) {
1875 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1877 ss
<< "unrecognized pool '" << pool_name
<< "'";
1881 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1882 for (int i
= 0; i
< pool_pg_num
; i
++)
1883 candidates
.insert({(unsigned int)i
, (uint64_t)pool_id
});
1887 cmdctx
->reply(r
, ss
);
1892 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1893 for (auto& i
: candidates
) {
1894 auto it
= pg_map
.pg_stat
.find(i
);
1895 if (it
== pg_map
.pg_stat
.end()) {
1896 ss
<< "pg " << i
<< " does not exist; ";
1900 auto state
= it
->second
.state
;
1901 // discard pgs for which user requests are pointless
1902 switch (actual_op
) {
1904 if ((state
& (PG_STATE_DEGRADED
|
1905 PG_STATE_RECOVERY_WAIT
|
1906 PG_STATE_RECOVERING
)) == 0) {
1907 // don't return error, user script may be racing with cluster.
1909 ss
<< "pg " << i
<< " doesn't require recovery; ";
1911 } else if (state
& PG_STATE_FORCED_RECOVERY
) {
1912 ss
<< "pg " << i
<< " recovery already forced; ";
1913 // return error, as it may be a bug in user script
1919 if ((state
& (PG_STATE_DEGRADED
|
1920 PG_STATE_BACKFILL_WAIT
|
1921 PG_STATE_BACKFILLING
)) == 0) {
1922 ss
<< "pg " << i
<< " doesn't require backfilling; ";
1924 } else if (state
& PG_STATE_FORCED_BACKFILL
) {
1925 ss
<< "pg " << i
<< " backfill already forced; ";
1930 case OFR_BACKFILL
| OFR_CANCEL
:
1931 if ((state
& PG_STATE_FORCED_BACKFILL
) == 0) {
1932 ss
<< "pg " << i
<< " backfill not forced; ";
1936 case OFR_RECOVERY
| OFR_CANCEL
:
1937 if ((state
& PG_STATE_FORCED_RECOVERY
) == 0) {
1938 ss
<< "pg " << i
<< " recovery not forced; ";
1943 ceph_abort_msg("actual_op value is not supported");
1949 // respond with error only when no pgs are correct
1950 // yes, in case of mixed errors, only the last one will be emitted,
1951 // but the message presented will be fine
1952 if (pgs
.size() != 0) {
1953 // clear error to not confuse users/scripts
1957 // optimize the command -> messages conversion, use only one
1958 // message per distinct OSD
1959 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1960 // group pgs to process by osd
1961 map
<int, vector
<spg_t
>> osdpgs
;
1962 for (auto& pgid
: pgs
) {
1965 if (osdmap
.get_primary_shard(pgid
, &primary
, &spg
)) {
1966 osdpgs
[primary
].push_back(spg
);
1969 for (auto& i
: osdpgs
) {
1970 if (osdmap
.is_up(i
.first
)) {
1971 auto p
= osd_cons
.find(i
.first
);
1972 if (p
== osd_cons
.end()) {
1973 ss
<< "osd." << i
.first
<< " is not currently connected";
1977 for (auto& con
: p
->second
) {
1979 new MOSDForceRecovery(monc
->get_fsid(), i
.second
, actual_op
));
1981 ss
<< "instructing pg(s) " << i
.second
<< " on osd." << i
.first
1982 << " to " << forceop
<< "; ";
1987 cmdctx
->reply(r
, ss
);
1989 } else if (prefix
== "config show" ||
1990 prefix
== "config show-with-defaults") {
1992 cmd_getval(cmdctx
->cmdmap
, "who", who
);
1993 auto [key
, valid
] = DaemonKey::parse(who
);
1995 ss
<< "invalid daemon name: use <type>.<id>";
1996 cmdctx
->reply(-EINVAL
, ss
);
1999 DaemonStatePtr daemon
= daemon_state
.get(key
);
2001 ss
<< "no config state for daemon " << who
;
2002 cmdctx
->reply(-ENOENT
, ss
);
2006 std::lock_guard
l(daemon
->lock
);
2010 if (cmd_getval(cmdctx
->cmdmap
, "key", name
)) {
2011 // handle special options
2012 if (name
== "fsid") {
2013 cmdctx
->odata
.append(stringify(monc
->get_fsid()) + "\n");
2014 cmdctx
->reply(r
, ss
);
2017 auto p
= daemon
->config
.find(name
);
2018 if (p
!= daemon
->config
.end() &&
2019 !p
->second
.empty()) {
2020 cmdctx
->odata
.append(p
->second
.rbegin()->second
+ "\n");
2022 auto& defaults
= daemon
->_get_config_defaults();
2023 auto q
= defaults
.find(name
);
2024 if (q
!= defaults
.end()) {
2025 cmdctx
->odata
.append(q
->second
+ "\n");
2030 } else if (daemon
->config_defaults_bl
.length() > 0) {
2033 f
->open_array_section("config");
2035 tbl
.define_column("NAME", TextTable::LEFT
, TextTable::LEFT
);
2036 tbl
.define_column("VALUE", TextTable::LEFT
, TextTable::LEFT
);
2037 tbl
.define_column("SOURCE", TextTable::LEFT
, TextTable::LEFT
);
2038 tbl
.define_column("OVERRIDES", TextTable::LEFT
, TextTable::LEFT
);
2039 tbl
.define_column("IGNORES", TextTable::LEFT
, TextTable::LEFT
);
2041 if (prefix
== "config show") {
2043 for (auto& i
: daemon
->config
) {
2044 dout(20) << " " << i
.first
<< " -> " << i
.second
<< dendl
;
2045 if (i
.second
.empty()) {
2049 f
->open_object_section("value");
2050 f
->dump_string("name", i
.first
);
2051 f
->dump_string("value", i
.second
.rbegin()->second
);
2052 f
->dump_string("source", ceph_conf_level_name(
2053 i
.second
.rbegin()->first
));
2054 if (i
.second
.size() > 1) {
2055 f
->open_array_section("overrides");
2056 auto j
= i
.second
.rend();
2057 for (--j
; j
!= i
.second
.rbegin(); --j
) {
2058 f
->open_object_section("value");
2059 f
->dump_string("source", ceph_conf_level_name(j
->first
));
2060 f
->dump_string("value", j
->second
);
2065 if (daemon
->ignored_mon_config
.count(i
.first
)) {
2066 f
->dump_string("ignores", "mon");
2071 tbl
<< i
.second
.rbegin()->second
;
2072 tbl
<< ceph_conf_level_name(i
.second
.rbegin()->first
);
2073 if (i
.second
.size() > 1) {
2075 auto j
= i
.second
.rend();
2076 for (--j
; j
!= i
.second
.rbegin(); --j
) {
2077 if (j
->second
== i
.second
.rbegin()->second
) {
2078 ov
.push_front(string("(") + ceph_conf_level_name(j
->first
) +
2079 string("[") + j
->second
+ string("]") +
2082 ov
.push_front(ceph_conf_level_name(j
->first
) +
2083 string("[") + j
->second
+ string("]"));
2091 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
2092 tbl
<< TextTable::endrow
;
2096 // show-with-defaults
2097 auto& defaults
= daemon
->_get_config_defaults();
2098 for (auto& i
: defaults
) {
2100 f
->open_object_section("value");
2101 f
->dump_string("name", i
.first
);
2105 auto j
= daemon
->config
.find(i
.first
);
2106 if (j
!= daemon
->config
.end() && !j
->second
.empty()) {
2109 f
->dump_string("value", j
->second
.rbegin()->second
);
2110 f
->dump_string("source", ceph_conf_level_name(
2111 j
->second
.rbegin()->first
));
2112 if (j
->second
.size() > 1) {
2113 f
->open_array_section("overrides");
2114 auto k
= j
->second
.rend();
2115 for (--k
; k
!= j
->second
.rbegin(); --k
) {
2116 f
->open_object_section("value");
2117 f
->dump_string("source", ceph_conf_level_name(k
->first
));
2118 f
->dump_string("value", k
->second
);
2123 if (daemon
->ignored_mon_config
.count(i
.first
)) {
2124 f
->dump_string("ignores", "mon");
2128 tbl
<< j
->second
.rbegin()->second
;
2129 tbl
<< ceph_conf_level_name(j
->second
.rbegin()->first
);
2130 if (j
->second
.size() > 1) {
2132 auto k
= j
->second
.rend();
2133 for (--k
; k
!= j
->second
.rbegin(); --k
) {
2134 if (k
->second
== j
->second
.rbegin()->second
) {
2135 ov
.push_front(string("(") + ceph_conf_level_name(k
->first
) +
2136 string("[") + k
->second
+ string("]") +
2139 ov
.push_front(ceph_conf_level_name(k
->first
) +
2140 string("[") + k
->second
+ string("]"));
2147 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
2148 tbl
<< TextTable::endrow
;
2151 // only have default
2153 f
->dump_string("value", i
.second
);
2154 f
->dump_string("source", ceph_conf_level_name(CONF_DEFAULT
));
2158 tbl
<< ceph_conf_level_name(CONF_DEFAULT
);
2161 tbl
<< TextTable::endrow
;
2168 f
->flush(cmdctx
->odata
);
2170 cmdctx
->odata
.append(stringify(tbl
));
2173 cmdctx
->reply(r
, ss
);
2175 } else if (prefix
== "device ls") {
2179 f
->open_array_section("devices");
2180 daemon_state
.with_devices([&f
](const DeviceState
& dev
) {
2181 f
->dump_object("device", dev
);
2184 f
->flush(cmdctx
->odata
);
2186 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2187 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
2188 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2189 tbl
.define_column("WEAR", TextTable::RIGHT
, TextTable::RIGHT
);
2190 tbl
.define_column("LIFE EXPECTANCY", TextTable::LEFT
, TextTable::LEFT
);
2191 auto now
= ceph_clock_now();
2192 daemon_state
.with_devices([&tbl
, now
](const DeviceState
& dev
) {
2194 for (auto& i
: dev
.attachments
) {
2198 h
+= std::get
<0>(i
) + ":" + std::get
<1>(i
);
2201 for (auto& i
: dev
.daemons
) {
2207 char wear_level_str
[16] = {0};
2208 if (dev
.wear_level
>= 0) {
2209 snprintf(wear_level_str
, sizeof(wear_level_str
)-1, "%d%%",
2210 (int)(100.1 * dev
.wear_level
));
2216 << dev
.get_life_expectancy_str(now
)
2217 << TextTable::endrow
;
2219 cmdctx
->odata
.append(stringify(tbl
));
2221 cmdctx
->reply(0, ss
);
2223 } else if (prefix
== "device ls-by-daemon") {
2225 cmd_getval(cmdctx
->cmdmap
, "who", who
);
2226 if (auto [k
, valid
] = DaemonKey::parse(who
); !valid
) {
2227 ss
<< who
<< " is not a valid daemon name";
2230 auto dm
= daemon_state
.get(k
);
2233 f
->open_array_section("devices");
2234 for (auto& i
: dm
->devices
) {
2235 daemon_state
.with_device(i
.first
, [&f
] (const DeviceState
& dev
) {
2236 f
->dump_object("device", dev
);
2240 f
->flush(cmdctx
->odata
);
2243 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2244 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
2245 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
,
2247 auto now
= ceph_clock_now();
2248 for (auto& i
: dm
->devices
) {
2249 daemon_state
.with_device(
2250 i
.first
, [&tbl
, now
] (const DeviceState
& dev
) {
2252 for (auto& i
: dev
.attachments
) {
2256 h
+= std::get
<0>(i
) + ":" + std::get
<1>(i
);
2260 << dev
.get_life_expectancy_str(now
)
2261 << TextTable::endrow
;
2264 cmdctx
->odata
.append(stringify(tbl
));
2268 ss
<< "daemon " << who
<< " not found";
2270 cmdctx
->reply(r
, ss
);
2272 } else if (prefix
== "device ls-by-host") {
2274 cmd_getval(cmdctx
->cmdmap
, "host", host
);
2276 daemon_state
.list_devids_by_server(host
, &devids
);
2278 f
->open_array_section("devices");
2279 for (auto& devid
: devids
) {
2280 daemon_state
.with_device(
2281 devid
, [&f
] (const DeviceState
& dev
) {
2282 f
->dump_object("device", dev
);
2286 f
->flush(cmdctx
->odata
);
2289 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2290 tbl
.define_column("DEV", TextTable::LEFT
, TextTable::LEFT
);
2291 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2292 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
, TextTable::LEFT
);
2293 auto now
= ceph_clock_now();
2294 for (auto& devid
: devids
) {
2295 daemon_state
.with_device(
2296 devid
, [&tbl
, &host
, now
] (const DeviceState
& dev
) {
2298 for (auto& j
: dev
.attachments
) {
2299 if (std::get
<0>(j
) == host
) {
2303 n
+= std::get
<1>(j
);
2307 for (auto& i
: dev
.daemons
) {
2316 << dev
.get_life_expectancy_str(now
)
2317 << TextTable::endrow
;
2320 cmdctx
->odata
.append(stringify(tbl
));
2322 cmdctx
->reply(0, ss
);
2324 } else if (prefix
== "device info") {
2326 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2329 if (!daemon_state
.with_device(devid
,
2330 [&f
, &rs
] (const DeviceState
& dev
) {
2332 f
->dump_object("device", dev
);
2337 ss
<< "device " << devid
<< " not found";
2341 f
->flush(cmdctx
->odata
);
2343 cmdctx
->odata
.append(rs
.str());
2346 cmdctx
->reply(r
, ss
);
2348 } else if (prefix
== "device set-life-expectancy") {
2350 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2351 string from_str
, to_str
;
2352 cmd_getval(cmdctx
->cmdmap
, "from", from_str
);
2353 cmd_getval(cmdctx
->cmdmap
, "to", to_str
);
2355 if (!from
.parse(from_str
)) {
2356 ss
<< "unable to parse datetime '" << from_str
<< "'";
2358 cmdctx
->reply(r
, ss
);
2359 } else if (to_str
.size() && !to
.parse(to_str
)) {
2360 ss
<< "unable to parse datetime '" << to_str
<< "'";
2362 cmdctx
->reply(r
, ss
);
2364 map
<string
,string
> meta
;
2365 daemon_state
.with_device_create(
2367 [from
, to
, &meta
] (DeviceState
& dev
) {
2368 dev
.set_life_expectancy(from
, to
, ceph_clock_now());
2369 meta
= dev
.metadata
;
2371 json_spirit::Object json_object
;
2372 for (auto& i
: meta
) {
2373 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2376 json
.append(json_spirit::write(json_object
));
2379 "\"prefix\": \"config-key set\", "
2380 "\"key\": \"device/" + devid
+ "\""
2382 auto on_finish
= new ReplyOnFinish(cmdctx
);
2383 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2386 } else if (prefix
== "device rm-life-expectancy") {
2388 cmd_getval(cmdctx
->cmdmap
, "devid", devid
);
2389 map
<string
,string
> meta
;
2390 if (daemon_state
.with_device_write(devid
, [&meta
] (DeviceState
& dev
) {
2391 dev
.rm_life_expectancy();
2392 meta
= dev
.metadata
;
2399 "\"prefix\": \"config-key rm\", "
2400 "\"key\": \"device/" + devid
+ "\""
2403 json_spirit::Object json_object
;
2404 for (auto& i
: meta
) {
2405 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2407 json
.append(json_spirit::write(json_object
));
2410 "\"prefix\": \"config-key set\", "
2411 "\"key\": \"device/" + devid
+ "\""
2414 auto on_finish
= new ReplyOnFinish(cmdctx
);
2415 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2417 cmdctx
->reply(0, ss
);
2422 ss
<< "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2425 f
->open_object_section("pg_info");
2426 f
->dump_bool("pg_ready", pgmap_ready
);
2429 // fall back to feeding command to PGMap
2430 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2431 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
2432 f
.get(), &ss
, &cmdctx
->odata
);
2438 if (r
!= -EOPNOTSUPP
) {
2440 f
->flush(cmdctx
->odata
);
2442 cmdctx
->reply(r
, ss
);
2447 // Was the command unfound?
2448 if (py_command
.cmdstring
.empty()) {
2449 ss
<< "No handler found for '" << prefix
<< "'";
2450 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
2451 cmdctx
->reply(-EINVAL
, ss
);
2455 dout(10) << "passing through command '" << prefix
<< "' size " << cmdctx
->cmdmap
.size() << dendl
;
2456 finisher
.queue(new LambdaContext([this, cmdctx
, session
, py_command
, prefix
]
2458 std::stringstream ss
;
2460 dout(10) << "dispatching command '" << prefix
<< "' size " << cmdctx
->cmdmap
.size() << dendl
;
2462 // Validate that the module is enabled
2463 auto& py_handler_name
= py_command
.module_name
;
2464 PyModuleRef module
= py_modules
.get_module(py_handler_name
);
2465 ceph_assert(module
);
2466 if (!module
->is_enabled()) {
2467 ss
<< "Module '" << py_handler_name
<< "' is not enabled (required by "
2468 "command '" << prefix
<< "'): use `ceph mgr module enable "
2469 << py_handler_name
<< "` to enable it";
2470 dout(4) << ss
.str() << dendl
;
2471 cmdctx
->reply(-EOPNOTSUPP
, ss
);
2475 // Hack: allow the self-test method to run on unhealthy modules.
2476 // Fix this in future by creating a special path for self test rather
2477 // than having the hook be a normal module command.
2478 std::string self_test_prefix
= py_handler_name
+ " " + "self-test";
2480 // Validate that the module is healthy
2481 bool accept_command
;
2482 if (module
->is_loaded()) {
2483 if (module
->get_can_run() && !module
->is_failed()) {
2485 accept_command
= true;
2486 } else if (self_test_prefix
== prefix
) {
2487 // Unhealthy, but allow because it's a self test command
2488 accept_command
= true;
2490 accept_command
= false;
2491 ss
<< "Module '" << py_handler_name
<< "' has experienced an error and "
2492 "cannot handle commands: " << module
->get_error_string();
2495 // Module not loaded
2496 accept_command
= false;
2497 ss
<< "Module '" << py_handler_name
<< "' failed to load and "
2498 "cannot handle commands: " << module
->get_error_string();
2501 if (!accept_command
) {
2502 dout(4) << ss
.str() << dendl
;
2503 cmdctx
->reply(-EIO
, ss
);
2507 std::stringstream ds
;
2508 bufferlist inbl
= cmdctx
->data
;
2509 int r
= py_modules
.handle_command(py_command
, *session
, cmdctx
->cmdmap
,
2512 log_access_denied(cmdctx
, session
, ss
);
2515 cmdctx
->odata
.append(ds
);
2516 cmdctx
->reply(r
, ss
);
2517 dout(10) << " command returned " << r
<< dendl
;
2522 void DaemonServer::_prune_pending_service_map()
2524 utime_t cutoff
= ceph_clock_now();
2525 cutoff
-= g_conf().get_val
<double>("mgr_service_beacon_grace");
2526 auto p
= pending_service_map
.services
.begin();
2527 while (p
!= pending_service_map
.services
.end()) {
2528 auto q
= p
->second
.daemons
.begin();
2529 while (q
!= p
->second
.daemons
.end()) {
2530 DaemonKey key
{p
->first
, q
->first
};
2531 if (!daemon_state
.exists(key
)) {
2532 if (ServiceMap::is_normal_ceph_entity(p
->first
)) {
2533 dout(10) << "daemon " << key
<< " in service map but not in daemon state "
2534 << "index -- force pruning" << dendl
;
2535 q
= p
->second
.daemons
.erase(q
);
2536 pending_service_map_dirty
= pending_service_map
.epoch
;
2538 derr
<< "missing key " << key
<< dendl
;
2545 auto daemon
= daemon_state
.get(key
);
2546 std::lock_guard
l(daemon
->lock
);
2547 if (daemon
->last_service_beacon
== utime_t()) {
2548 // we must have just restarted; assume they are alive now.
2549 daemon
->last_service_beacon
= ceph_clock_now();
2553 if (daemon
->last_service_beacon
< cutoff
) {
2554 dout(10) << "pruning stale " << p
->first
<< "." << q
->first
2555 << " last_beacon " << daemon
->last_service_beacon
<< dendl
;
2556 q
= p
->second
.daemons
.erase(q
);
2557 pending_service_map_dirty
= pending_service_map
.epoch
;
2562 if (p
->second
.daemons
.empty()) {
2563 p
= pending_service_map
.services
.erase(p
);
2564 pending_service_map_dirty
= pending_service_map
.epoch
;
2571 void DaemonServer::send_report()
2574 if (ceph_clock_now() - started_at
> g_conf().get_val
<int64_t>("mgr_stats_period") * 4.0) {
2576 reported_osds
.clear();
2577 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2578 << "potentially incomplete PG state to mon" << dendl
;
2580 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2586 auto m
= ceph::make_message
<MMonMgrReport
>();
2587 m
->gid
= monc
->get_global_id();
2588 py_modules
.get_health_checks(&m
->health_checks
);
2589 py_modules
.get_progress_events(&m
->progress_events
);
2591 cluster_state
.with_mutable_pgmap([&](PGMap
& pg_map
) {
2592 cluster_state
.update_delta_stats();
2594 if (pending_service_map
.epoch
) {
2595 _prune_pending_service_map();
2596 if (pending_service_map_dirty
>= pending_service_map
.epoch
) {
2597 pending_service_map
.modified
= ceph_clock_now();
2598 encode(pending_service_map
, m
->service_map_bl
, CEPH_FEATURES_ALL
);
2599 dout(10) << "sending service_map e" << pending_service_map
.epoch
2601 pending_service_map
.epoch
++;
2605 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
2606 // FIXME: no easy way to get mon features here. this will do for
2607 // now, though, as long as we don't make a backward-incompat change.
2608 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
2609 dout(10) << pg_map
<< dendl
;
2611 pg_map
.get_health_checks(g_ceph_context
, osdmap
,
2614 dout(10) << m
->health_checks
.checks
.size() << " health checks"
2616 dout(20) << "health checks:\n";
2617 JSONFormatter
jf(true);
2618 jf
.dump_object("health_checks", m
->health_checks
);
2621 if (osdmap
.require_osd_release
>= ceph_release_t::luminous
) {
2622 clog
->debug() << "pgmap v" << pg_map
.version
<< ": " << pg_map
;
2627 map
<daemon_metric
, unique_ptr
<DaemonHealthMetricCollector
>> accumulated
;
2628 for (auto service
: {"osd", "mon"} ) {
2629 auto daemons
= daemon_state
.get_by_service(service
);
2630 for (const auto& [key
,state
] : daemons
) {
2631 std::lock_guard l
{state
->lock
};
2632 for (const auto& metric
: state
->daemon_health_metrics
) {
2633 auto acc
= accumulated
.find(metric
.get_type());
2634 if (acc
== accumulated
.end()) {
2635 auto collector
= DaemonHealthMetricCollector::create(metric
.get_type());
2637 derr
<< __func__
<< " " << key
2638 << " sent me an unknown health metric: "
2639 << std::hex
<< static_cast<uint8_t>(metric
.get_type())
2640 << std::dec
<< dendl
;
2643 dout(20) << " + " << state
->key
<< " "
2645 tie(acc
, std::ignore
) = accumulated
.emplace(metric
.get_type(),
2646 std::move(collector
));
2648 acc
->second
->update(key
, metric
);
2652 for (const auto& acc
: accumulated
) {
2653 acc
.second
->summarize(m
->health_checks
);
2655 // TODO? We currently do not notify the PyModules
2656 // TODO: respect needs_send, so we send the report only if we are asked to do
2657 // so, or the state is updated.
2658 monc
->send_mon_message(std::move(m
));
2661 void DaemonServer::adjust_pgs()
2664 unsigned max
= std::max
<int64_t>(1, g_conf()->mon_osd_max_creating_pgs
);
2665 double max_misplaced
= g_conf().get_val
<double>("target_max_misplaced_ratio");
2666 bool aggro
= g_conf().get_val
<bool>("mgr_debug_aggressive_pg_num_changes");
2668 map
<string
,unsigned> pg_num_to_set
;
2669 map
<string
,unsigned> pgp_num_to_set
;
2670 set
<pg_t
> upmaps_to_clear
;
2671 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2672 unsigned creating_or_unknown
= 0;
2673 for (auto& i
: pg_map
.num_pg_by_state
) {
2674 if ((i
.first
& (PG_STATE_CREATING
)) ||
2676 creating_or_unknown
+= i
.second
;
2679 unsigned left
= max
;
2680 if (creating_or_unknown
>= max
) {
2683 left
-= creating_or_unknown
;
2684 dout(10) << "creating_or_unknown " << creating_or_unknown
2685 << " max_creating " << max
2689 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2690 // can run more frequently than we get updated pg stats from OSDs. We
2691 // may make multiple adjustments with stale informaiton.
2692 double misplaced_ratio
, degraded_ratio
;
2693 double inactive_pgs_ratio
, unknown_pgs_ratio
;
2694 pg_map
.get_recovery_stats(&misplaced_ratio
, °raded_ratio
,
2695 &inactive_pgs_ratio
, &unknown_pgs_ratio
);
2696 dout(20) << "misplaced_ratio " << misplaced_ratio
2697 << " degraded_ratio " << degraded_ratio
2698 << " inactive_pgs_ratio " << inactive_pgs_ratio
2699 << " unknown_pgs_ratio " << unknown_pgs_ratio
2700 << "; target_max_misplaced_ratio " << max_misplaced
2703 for (auto& i
: osdmap
.get_pools()) {
2704 const pg_pool_t
& p
= i
.second
;
2707 if (p
.get_pg_num_target() != p
.get_pg_num()) {
2708 dout(20) << "pool " << i
.first
2709 << " pg_num " << p
.get_pg_num()
2710 << " target " << p
.get_pg_num_target()
2712 if (p
.has_flag(pg_pool_t::FLAG_CREATING
)) {
2713 dout(10) << "pool " << i
.first
2714 << " pg_num_target " << p
.get_pg_num_target()
2715 << " pg_num " << p
.get_pg_num()
2716 << " - still creating initial pgs"
2718 } else if (p
.get_pg_num_target() < p
.get_pg_num()) {
2719 // pg_num decrease (merge)
2720 pg_t
merge_source(p
.get_pg_num() - 1, i
.first
);
2721 pg_t merge_target
= merge_source
.get_parent();
2724 if (p
.get_pg_num() != p
.get_pg_num_pending()) {
2725 dout(10) << "pool " << i
.first
2726 << " pg_num_target " << p
.get_pg_num_target()
2727 << " pg_num " << p
.get_pg_num()
2728 << " - decrease and pg_num_pending != pg_num, waiting"
2731 } else if (p
.get_pg_num() == p
.get_pgp_num()) {
2732 dout(10) << "pool " << i
.first
2733 << " pg_num_target " << p
.get_pg_num_target()
2734 << " pg_num " << p
.get_pg_num()
2735 << " - decrease blocked by pgp_num "
2740 vector
<int32_t> source_acting
;
2741 for (auto &merge_participant
: {merge_source
, merge_target
}) {
2742 bool is_merge_source
= merge_participant
== merge_source
;
2743 if (osdmap
.have_pg_upmaps(merge_participant
)) {
2744 dout(10) << "pool " << i
.first
2745 << " pg_num_target " << p
.get_pg_num_target()
2746 << " pg_num " << p
.get_pg_num()
2747 << (is_merge_source
? " - merge source " : " - merge target ")
2748 << merge_participant
2749 << " has upmap" << dendl
;
2750 upmaps_to_clear
.insert(merge_participant
);
2753 auto q
= pg_map
.pg_stat
.find(merge_participant
);
2754 if (q
== pg_map
.pg_stat
.end()) {
2755 dout(10) << "pool " << i
.first
2756 << " pg_num_target " << p
.get_pg_num_target()
2757 << " pg_num " << p
.get_pg_num()
2758 << " - no state for " << merge_participant
2759 << (is_merge_source
? " (merge source)" : " (merge target)")
2762 } else if ((q
->second
.state
& (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) !=
2763 (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) {
2764 dout(10) << "pool " << i
.first
2765 << " pg_num_target " << p
.get_pg_num_target()
2766 << " pg_num " << p
.get_pg_num()
2767 << (is_merge_source
? " - merge source " : " - merge target ")
2768 << merge_participant
2769 << " not clean (" << pg_state_string(q
->second
.state
)
2773 if (is_merge_source
) {
2774 source_acting
= q
->second
.acting
;
2775 } else if (ok
&& q
->second
.acting
!= source_acting
) {
2776 dout(10) << "pool " << i
.first
2777 << " pg_num_target " << p
.get_pg_num_target()
2778 << " pg_num " << p
.get_pg_num()
2779 << (is_merge_source
? " - merge source " : " - merge target ")
2780 << merge_participant
2781 << " acting does not match (source " << source_acting
2782 << " != target " << q
->second
.acting
2789 unsigned target
= p
.get_pg_num() - 1;
2790 dout(10) << "pool " << i
.first
2791 << " pg_num_target " << p
.get_pg_num_target()
2792 << " pg_num " << p
.get_pg_num()
2794 << " (merging " << merge_source
2795 << " and " << merge_target
2797 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2800 } else if (p
.get_pg_num_target() > p
.get_pg_num()) {
2801 // pg_num increase (split)
2803 auto q
= pg_map
.num_pg_by_pool_state
.find(i
.first
);
2804 if (q
!= pg_map
.num_pg_by_pool_state
.end()) {
2805 for (auto& j
: q
->second
) {
2806 if ((j
.first
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)) == 0) {
2807 dout(20) << "pool " << i
.first
<< " has " << j
.second
2808 << " pgs in " << pg_state_string(j
.first
)
2817 unsigned pg_gap
= p
.get_pg_num() - p
.get_pgp_num();
2818 unsigned max_jump
= cct
->_conf
->mgr_max_pg_num_change
;
2820 dout(10) << "pool " << i
.first
2821 << " pg_num_target " << p
.get_pg_num_target()
2822 << " pg_num " << p
.get_pg_num()
2823 << " - not all pgs active"
2825 } else if (pg_gap
>= max_jump
) {
2826 dout(10) << "pool " << i
.first
2827 << " pg_num " << p
.get_pg_num()
2828 << " - pgp_num " << p
.get_pgp_num()
2829 << " gap >= max_pg_num_change " << max_jump
2830 << " - must scale pgp_num first"
2833 unsigned add
= std::min(
2834 std::min(left
, max_jump
- pg_gap
),
2835 p
.get_pg_num_target() - p
.get_pg_num());
2836 unsigned target
= p
.get_pg_num() + add
;
2838 dout(10) << "pool " << i
.first
2839 << " pg_num_target " << p
.get_pg_num_target()
2840 << " pg_num " << p
.get_pg_num()
2841 << " -> " << target
<< dendl
;
2842 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2848 unsigned target
= std::min(p
.get_pg_num_pending(),
2849 p
.get_pgp_num_target());
2850 if (target
!= p
.get_pgp_num()) {
2851 dout(20) << "pool " << i
.first
2852 << " pgp_num_target " << p
.get_pgp_num_target()
2853 << " pgp_num " << p
.get_pgp_num()
2854 << " -> " << target
<< dendl
;
2855 if (target
> p
.get_pgp_num() &&
2856 p
.get_pgp_num() == p
.get_pg_num()) {
2857 dout(10) << "pool " << i
.first
2858 << " pgp_num_target " << p
.get_pgp_num_target()
2859 << " pgp_num " << p
.get_pgp_num()
2860 << " - increase blocked by pg_num " << p
.get_pg_num()
2862 } else if (!aggro
&& (inactive_pgs_ratio
> 0 ||
2863 degraded_ratio
> 0 ||
2864 unknown_pgs_ratio
> 0)) {
2865 dout(10) << "pool " << i
.first
2866 << " pgp_num_target " << p
.get_pgp_num_target()
2867 << " pgp_num " << p
.get_pgp_num()
2868 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2869 << " update" << dendl
;
2870 } else if (!aggro
&& (misplaced_ratio
> max_misplaced
)) {
2871 dout(10) << "pool " << i
.first
2872 << " pgp_num_target " << p
.get_pgp_num_target()
2873 << " pgp_num " << p
.get_pgp_num()
2874 << " - misplaced_ratio " << misplaced_ratio
2875 << " > max " << max_misplaced
2876 << ", deferring pgp_num update" << dendl
;
2878 // NOTE: this calculation assumes objects are
2879 // basically uniformly distributed across all PGs
2880 // (regardless of pool), which is probably not
2881 // perfectly correct, but it's a start. make no
2882 // single adjustment that's more than half of the
2883 // max_misplaced, to somewhat limit the magnitude of
2884 // our potential error here.
2886 static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP
= 1;
2887 pool_stat_t s
= pg_map
.get_pg_pool_sum_stat(i
.first
);
2889 // pool is (virtually) empty; just jump to final pgp_num?
2890 (p
.get_pgp_num_target() > p
.get_pgp_num() &&
2891 s
.stats
.sum
.num_objects
<= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP
*
2892 p
.get_pgp_num_target()))) {
2896 std::min
<double>(max_misplaced
- misplaced_ratio
,
2897 max_misplaced
/ 2.0);
2898 unsigned estmax
= std::max
<unsigned>(
2899 (double)p
.get_pg_num() * room
, 1u);
2900 unsigned next_min
= 0;
2901 if (p
.get_pgp_num() > estmax
) {
2902 next_min
= p
.get_pgp_num() - estmax
;
2904 next
= std::clamp(target
,
2906 p
.get_pgp_num() + estmax
);
2907 dout(20) << " room " << room
<< " estmax " << estmax
2908 << " delta " << (target
-p
.get_pgp_num())
2909 << " next " << next
<< dendl
;
2910 if (p
.get_pgp_num_target() == p
.get_pg_num_target() &&
2911 p
.get_pgp_num_target() < p
.get_pg_num()) {
2912 // since pgp_num is tracking pg_num, ceph is handling
2913 // pgp_num. so, be responsible: don't let pgp_num get
2914 // too far out ahead of merges (if we are merging).
2915 // this avoids moving lots of unmerged pgs onto a
2916 // small number of OSDs where we might blow out the
2918 unsigned max_outpace_merges
=
2919 std::max
<unsigned>(8, p
.get_pg_num() * max_misplaced
);
2920 if (next
+ max_outpace_merges
< p
.get_pg_num()) {
2921 next
= p
.get_pg_num() - max_outpace_merges
;
2922 dout(10) << " using next " << next
2923 << " to avoid outpacing merges (max_outpace_merges "
2924 << max_outpace_merges
<< ")" << dendl
;
2928 if (next
!= p
.get_pgp_num()) {
2929 dout(10) << "pool " << i
.first
2930 << " pgp_num_target " << p
.get_pgp_num_target()
2931 << " pgp_num " << p
.get_pgp_num()
2932 << " -> " << next
<< dendl
;
2933 pgp_num_to_set
[osdmap
.get_pool_name(i
.first
)] = next
;
2942 for (auto i
: pg_num_to_set
) {
2945 "\"prefix\": \"osd pool set\", "
2946 "\"pool\": \"" + i
.first
+ "\", "
2947 "\"var\": \"pg_num_actual\", "
2948 "\"val\": \"" + stringify(i
.second
) + "\""
2950 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2952 for (auto i
: pgp_num_to_set
) {
2955 "\"prefix\": \"osd pool set\", "
2956 "\"pool\": \"" + i
.first
+ "\", "
2957 "\"var\": \"pgp_num_actual\", "
2958 "\"val\": \"" + stringify(i
.second
) + "\""
2960 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2962 for (auto pg
: upmaps_to_clear
) {
2965 "\"prefix\": \"osd rm-pg-upmap\", "
2966 "\"pgid\": \"" + stringify(pg
) + "\""
2968 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2971 "\"prefix\": \"osd rm-pg-upmap-items\", "
2972 "\"pgid\": \"" + stringify(pg
) + "\"" +
2974 monc
->start_mon_command({cmd2
}, {}, nullptr, nullptr, nullptr);
2978 void DaemonServer::got_service_map()
2980 std::lock_guard
l(lock
);
2982 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
2983 if (pending_service_map
.epoch
== 0) {
2984 // we just started up
2985 dout(10) << "got initial map e" << service_map
.epoch
<< dendl
;
2986 ceph_assert(pending_service_map_dirty
== 0);
2987 pending_service_map
= service_map
;
2988 pending_service_map
.epoch
= service_map
.epoch
+ 1;
2989 } else if (pending_service_map
.epoch
<= service_map
.epoch
) {
2990 // we just started up but got one more not our own map
2991 dout(10) << "got newer initial map e" << service_map
.epoch
<< dendl
;
2992 ceph_assert(pending_service_map_dirty
== 0);
2993 pending_service_map
= service_map
;
2994 pending_service_map
.epoch
= service_map
.epoch
+ 1;
2996 // we already active and therefore must have persisted it,
2997 // which means ours is the same or newer.
2998 dout(10) << "got updated map e" << service_map
.epoch
<< dendl
;
3002 // cull missing daemons, populate new ones
3003 std::set
<std::string
> types
;
3004 for (auto& [type
, service
] : pending_service_map
.services
) {
3005 if (ServiceMap::is_normal_ceph_entity(type
)) {
3011 std::set
<std::string
> names
;
3012 for (auto& q
: service
.daemons
) {
3013 names
.insert(q
.first
);
3014 DaemonKey key
{type
, q
.first
};
3015 if (!daemon_state
.exists(key
)) {
3016 auto daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
3018 daemon
->set_metadata(q
.second
.metadata
);
3019 daemon
->service_daemon
= true;
3020 daemon_state
.insert(daemon
);
3021 dout(10) << "added missing " << key
<< dendl
;
3024 daemon_state
.cull(type
, names
);
3026 daemon_state
.cull_services(types
);
3029 void DaemonServer::got_mgr_map()
3031 std::lock_guard
l(lock
);
3032 set
<std::string
> have
;
3033 cluster_state
.with_mgrmap([&](const MgrMap
& mgrmap
) {
3034 auto md_update
= [&] (DaemonKey key
) {
3035 std::ostringstream oss
;
3036 auto c
= new MetadataUpdate(daemon_state
, key
);
3037 // FIXME remove post-nautilus: include 'id' for luminous mons
3038 oss
<< "{\"prefix\": \"mgr metadata\", \"who\": \""
3039 << key
.name
<< "\", \"id\": \"" << key
.name
<< "\"}";
3040 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
3042 if (mgrmap
.active_name
.size()) {
3043 DaemonKey key
{"mgr", mgrmap
.active_name
};
3044 have
.insert(mgrmap
.active_name
);
3045 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
3047 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
3050 for (auto& i
: mgrmap
.standbys
) {
3051 DaemonKey key
{"mgr", i
.second
.name
};
3052 have
.insert(i
.second
.name
);
3053 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
3055 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
3059 daemon_state
.cull("mgr", have
);
3062 const char** DaemonServer::get_tracked_conf_keys() const
3064 static const char *KEYS
[] = {
3065 "mgr_stats_threshold",
3073 void DaemonServer::handle_conf_change(const ConfigProxy
& conf
,
3074 const std::set
<std::string
> &changed
)
3077 if (changed
.count("mgr_stats_threshold") || changed
.count("mgr_stats_period")) {
3078 dout(4) << "Updating stats threshold/period on "
3079 << daemon_connections
.size() << " clients" << dendl
;
3080 // Send a fresh MMgrConfigure to all clients, so that they can follow
3081 // the new policy for transmitting stats
3082 finisher
.queue(new LambdaContext([this](int r
) {
3083 std::lock_guard
l(lock
);
3084 for (auto &c
: daemon_connections
) {
3091 void DaemonServer::_send_configure(ConnectionRef c
)
3093 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
3095 auto configure
= make_message
<MMgrConfigure
>();
3096 configure
->stats_period
= g_conf().get_val
<int64_t>("mgr_stats_period");
3097 configure
->stats_threshold
= g_conf().get_val
<int64_t>("mgr_stats_threshold");
3099 if (c
->peer_is_osd()) {
3100 configure
->osd_perf_metric_queries
=
3101 osd_perf_metric_collector
.get_queries();
3102 } else if (c
->peer_is_mds()) {
3103 configure
->metric_config_message
=
3104 MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector
.get_queries()));
3107 c
->send_message2(configure
);
3110 MetricQueryID
DaemonServer::add_osd_perf_query(
3111 const OSDPerfMetricQuery
&query
,
3112 const std::optional
<OSDPerfMetricLimit
> &limit
)
3114 return osd_perf_metric_collector
.add_query(query
, limit
);
3117 int DaemonServer::remove_osd_perf_query(MetricQueryID query_id
)
3119 return osd_perf_metric_collector
.remove_query(query_id
);
3122 int DaemonServer::get_osd_perf_counters(OSDPerfCollector
*collector
)
3124 return osd_perf_metric_collector
.get_counters(collector
);
3127 MetricQueryID
DaemonServer::add_mds_perf_query(
3128 const MDSPerfMetricQuery
&query
,
3129 const std::optional
<MDSPerfMetricLimit
> &limit
)
3131 return mds_perf_metric_collector
.add_query(query
, limit
);
3134 int DaemonServer::remove_mds_perf_query(MetricQueryID query_id
)
3136 return mds_perf_metric_collector
.remove_query(query_id
);
3139 void DaemonServer::reregister_mds_perf_queries()
3141 mds_perf_metric_collector
.reregister_queries();
3144 int DaemonServer::get_mds_perf_counters(MDSPerfCollector
*collector
)
3146 return mds_perf_metric_collector
.get_counters(collector
);