1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
17 #include "include/stringify.h"
18 #include "include/str_list.h"
19 #include "auth/RotatingKeyRing.h"
20 #include "json_spirit/json_spirit_writer.h"
22 #include "mgr/mgr_commands.h"
23 #include "mgr/DaemonHealthMetricCollector.h"
24 #include "mgr/OSDPerfMetricCollector.h"
25 #include "mon/MonCommand.h"
27 #include "messages/MMgrOpen.h"
28 #include "messages/MMgrClose.h"
29 #include "messages/MMgrConfigure.h"
30 #include "messages/MMonMgrReport.h"
31 #include "messages/MCommand.h"
32 #include "messages/MCommandReply.h"
33 #include "messages/MPGStats.h"
34 #include "messages/MOSDScrub.h"
35 #include "messages/MOSDScrub2.h"
36 #include "messages/MOSDForceRecovery.h"
37 #include "common/errno.h"
38 #include "common/pick_address.h"
40 #define dout_context g_ceph_context
41 #define dout_subsys ceph_subsys_mgr
43 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
47 DaemonServer::DaemonServer(MonClient
*monc_
,
49 DaemonStateIndex
&daemon_state_
,
50 ClusterState
&cluster_state_
,
51 PyModuleRegistry
&py_modules_
,
53 LogChannelRef audit_clog_
)
54 : Dispatcher(g_ceph_context
),
55 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
56 g_conf().get_val
<Option::size_t>("mgr_client_bytes"))),
57 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
58 g_conf().get_val
<uint64_t>("mgr_client_messages"))),
59 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
60 g_conf().get_val
<Option::size_t>("mgr_osd_bytes"))),
61 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
62 g_conf().get_val
<uint64_t>("mgr_osd_messages"))),
63 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
64 g_conf().get_val
<Option::size_t>("mgr_mds_bytes"))),
65 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
66 g_conf().get_val
<uint64_t>("mgr_mds_messages"))),
67 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
68 g_conf().get_val
<Option::size_t>("mgr_mon_bytes"))),
69 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
70 g_conf().get_val
<uint64_t>("mgr_mon_messages"))),
74 daemon_state(daemon_state_
),
75 cluster_state(cluster_state_
),
76 py_modules(py_modules_
),
78 audit_clog(audit_clog_
),
81 timer(g_ceph_context
, lock
),
84 osd_perf_metric_collector_listener(this),
85 osd_perf_metric_collector(osd_perf_metric_collector_listener
)
87 g_conf().add_observer(this);
90 DaemonServer::~DaemonServer() {
92 g_conf().remove_observer(this);
95 int DaemonServer::init(uint64_t gid
, entity_addrvec_t client_addrs
)
97 // Initialize Messenger
98 std::string public_msgr_type
= g_conf()->ms_public_type
.empty() ?
99 g_conf().get_val
<std::string
>("ms_type") : g_conf()->ms_public_type
;
100 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
101 entity_name_t::MGR(gid
),
104 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
106 msgr
->set_auth_client(monc
);
109 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
110 client_byte_throttler
.get(),
111 client_msg_throttler
.get());
114 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
115 osd_byte_throttler
.get(),
116 osd_msg_throttler
.get());
117 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
118 mds_byte_throttler
.get(),
119 mds_msg_throttler
.get());
120 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
121 mon_byte_throttler
.get(),
122 mon_msg_throttler
.get());
124 entity_addrvec_t addrs
;
125 int r
= pick_addresses(cct
, CEPH_PICK_ADDRESS_PUBLIC
, &addrs
);
129 dout(20) << __func__
<< " will bind to " << addrs
<< dendl
;
130 r
= msgr
->bindv(addrs
);
132 derr
<< "unable to bind mgr to " << addrs
<< dendl
;
136 msgr
->set_myname(entity_name_t::MGR(gid
));
137 msgr
->set_addr_unknowns(client_addrs
);
140 msgr
->add_dispatcher_tail(this);
142 msgr
->set_auth_server(monc
);
143 monc
->set_handle_authentication_dispatcher(this);
145 started_at
= ceph_clock_now();
147 std::lock_guard
l(lock
);
150 schedule_tick_locked(
151 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
156 entity_addrvec_t
DaemonServer::get_myaddrs() const
158 return msgr
->get_myaddrs();
161 KeyStore
*DaemonServer::ms_get_auth1_authorizer_keystore()
163 return monc
->rotating_secrets
.get();
166 int DaemonServer::ms_handle_authentication(Connection
*con
)
169 MgrSession
*s
= new MgrSession(cct
);
171 s
->inst
.addr
= con
->get_peer_addr();
172 s
->entity_name
= con
->peer_name
;
173 dout(10) << __func__
<< " new session " << s
<< " con " << con
174 << " entity " << con
->peer_name
175 << " addr " << con
->get_peer_addrs()
178 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
179 if (caps_info
.allow_all
) {
180 dout(10) << " session " << s
<< " " << s
->entity_name
181 << " allow_all" << dendl
;
182 s
->caps
.set_allow_all();
185 if (caps_info
.caps
.length() > 0) {
186 auto p
= caps_info
.caps
.cbegin();
191 catch (buffer::error
& e
) {
194 bool success
= s
->caps
.parse(str
);
196 dout(10) << " session " << s
<< " " << s
->entity_name
197 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
200 dout(10) << " session " << s
<< " " << s
->entity_name
201 << " failed to parse caps '" << str
<< "'" << dendl
;
206 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
207 std::lock_guard
l(lock
);
208 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
209 dout(10) << "registering osd." << s
->osd_id
<< " session "
210 << s
<< " con " << con
<< dendl
;
211 osd_cons
[s
->osd_id
].insert(con
);
217 bool DaemonServer::ms_get_authorizer(
219 AuthAuthorizer
**authorizer
)
221 dout(10) << "type=" << ceph_entity_type_name(dest_type
) << dendl
;
223 if (dest_type
== CEPH_ENTITY_TYPE_MON
) {
227 *authorizer
= monc
->build_authorizer(dest_type
);
228 dout(20) << "got authorizer " << *authorizer
<< dendl
;
229 return *authorizer
!= NULL
;
232 bool DaemonServer::ms_handle_reset(Connection
*con
)
234 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
235 auto priv
= con
->get_priv();
236 auto session
= static_cast<MgrSession
*>(priv
.get());
240 std::lock_guard
l(lock
);
241 dout(10) << "unregistering osd." << session
->osd_id
242 << " session " << session
<< " con " << con
<< dendl
;
243 osd_cons
[session
->osd_id
].erase(con
);
245 auto iter
= daemon_connections
.find(con
);
246 if (iter
!= daemon_connections
.end()) {
247 daemon_connections
.erase(iter
);
253 bool DaemonServer::ms_handle_refused(Connection
*con
)
255 // do nothing for now
259 bool DaemonServer::ms_dispatch(Message
*m
)
261 // Note that we do *not* take ::lock here, in order to avoid
262 // serializing all message handling. It's up to each handler
263 // to take whatever locks it needs.
264 switch (m
->get_type()) {
266 cluster_state
.ingest_pgstats(static_cast<MPGStats
*>(m
));
267 maybe_ready(m
->get_source().num());
271 return handle_report(static_cast<MMgrReport
*>(m
));
273 return handle_open(static_cast<MMgrOpen
*>(m
));
275 return handle_close(static_cast<MMgrClose
*>(m
));
277 return handle_command(static_cast<MCommand
*>(m
));
279 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
284 void DaemonServer::maybe_ready(int32_t osd_id
)
286 if (pgmap_ready
.load()) {
287 // Fast path: we don't need to take lock because pgmap_ready
290 std::lock_guard
l(lock
);
292 if (reported_osds
.find(osd_id
) == reported_osds
.end()) {
293 dout(4) << "initial report from osd " << osd_id
<< dendl
;
294 reported_osds
.insert(osd_id
);
295 std::set
<int32_t> up_osds
;
297 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
298 osdmap
.get_up_osds(up_osds
);
301 std::set
<int32_t> unreported_osds
;
302 std::set_difference(up_osds
.begin(), up_osds
.end(),
303 reported_osds
.begin(), reported_osds
.end(),
304 std::inserter(unreported_osds
, unreported_osds
.begin()));
306 if (unreported_osds
.size() == 0) {
307 dout(4) << "all osds have reported, sending PG state to mon" << dendl
;
309 reported_osds
.clear();
310 // Avoid waiting for next tick
313 dout(4) << "still waiting for " << unreported_osds
.size() << " osds"
314 " to report in before PGMap is ready" << dendl
;
320 void DaemonServer::tick()
326 schedule_tick_locked(
327 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
330 // Currently modules do not set health checks in response to events delivered to
331 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
332 // if this pattern emerges in the future, this scheduler could be modified to
333 // fire after all modules have had a chance to set their health checks.
334 void DaemonServer::schedule_tick_locked(double delay_sec
)
336 ceph_assert(lock
.is_locked_by_me());
339 timer
.cancel_event(tick_event
);
340 tick_event
= nullptr;
343 // on shutdown start rejecting explicit requests to send reports that may
344 // originate from python land which may still be running.
348 tick_event
= timer
.add_event_after(delay_sec
,
349 new FunctionContext([this](int r
) {
354 void DaemonServer::schedule_tick(double delay_sec
)
356 std::lock_guard
l(lock
);
357 schedule_tick_locked(delay_sec
);
360 void DaemonServer::handle_osd_perf_metric_query_updated()
364 // Send a fresh MMgrConfigure to all clients, so that they can follow
365 // the new policy for transmitting stats
366 finisher
.queue(new FunctionContext([this](int r
) {
367 std::lock_guard
l(lock
);
368 for (auto &c
: daemon_connections
) {
369 if (c
->peer_is_osd()) {
376 void DaemonServer::shutdown()
378 dout(10) << "begin" << dendl
;
381 cluster_state
.shutdown();
382 dout(10) << "done" << dendl
;
384 std::lock_guard
l(lock
);
385 shutting_down
= true;
389 static DaemonKey
key_from_service(
390 const std::string
& service_name
,
392 const std::string
& daemon_name
)
394 if (!service_name
.empty()) {
395 return DaemonKey(service_name
, daemon_name
);
397 return DaemonKey(ceph_entity_type_name(peer_type
), daemon_name
);
401 static bool key_from_string(
402 const std::string
& name
,
405 auto p
= name
.find('.');
406 if (p
== std::string::npos
) {
409 out
->first
= name
.substr(0, p
);
410 out
->second
= name
.substr(p
+ 1);
414 bool DaemonServer::handle_open(MMgrOpen
*m
)
416 std::lock_guard
l(lock
);
418 DaemonKey key
= key_from_service(m
->service_name
,
419 m
->get_connection()->get_peer_type(),
422 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
424 _send_configure(m
->get_connection());
426 DaemonStatePtr daemon
;
427 if (daemon_state
.exists(key
)) {
428 daemon
= daemon_state
.get(key
);
430 if (m
->service_daemon
&& !daemon
) {
431 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
432 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
434 daemon
->service_daemon
= true;
435 daemon_state
.insert(daemon
);
438 dout(20) << "updating existing DaemonState for " << m
->daemon_name
<< dendl
;
439 std::lock_guard
l(daemon
->lock
);
440 daemon
->perf_counters
.clear();
442 if (m
->service_daemon
) {
443 daemon
->set_metadata(m
->daemon_metadata
);
444 daemon
->service_status
= m
->daemon_status
;
446 utime_t now
= ceph_clock_now();
447 auto d
= pending_service_map
.get_daemon(m
->service_name
,
449 if (d
->gid
!= (uint64_t)m
->get_source().num()) {
450 dout(10) << "registering " << key
<< " in pending_service_map" << dendl
;
451 d
->gid
= m
->get_source().num();
452 d
->addr
= m
->get_source_addr();
453 d
->start_epoch
= pending_service_map
.epoch
;
454 d
->start_stamp
= now
;
455 d
->metadata
= m
->daemon_metadata
;
456 pending_service_map_dirty
= pending_service_map
.epoch
;
460 auto p
= m
->config_bl
.cbegin();
461 if (p
!= m
->config_bl
.end()) {
462 decode(daemon
->config
, p
);
463 decode(daemon
->ignored_mon_config
, p
);
464 dout(20) << " got config " << daemon
->config
465 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
467 daemon
->config_defaults_bl
= m
->config_defaults_bl
;
468 daemon
->config_defaults
.clear();
469 dout(20) << " got config_defaults_bl " << daemon
->config_defaults_bl
.length()
470 << " bytes" << dendl
;
473 if (m
->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT
&&
474 m
->service_name
.empty())
476 // Store in set of the daemon/service connections, i.e. those
477 // connections that require an update in the event of stats
478 // configuration changes.
479 daemon_connections
.insert(m
->get_connection());
486 bool DaemonServer::handle_close(MMgrClose
*m
)
488 std::lock_guard
l(lock
);
490 DaemonKey key
= key_from_service(m
->service_name
,
491 m
->get_connection()->get_peer_type(),
493 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
495 if (daemon_state
.exists(key
)) {
496 DaemonStatePtr daemon
= daemon_state
.get(key
);
497 daemon_state
.rm(key
);
499 std::lock_guard
l(daemon
->lock
);
500 if (daemon
->service_daemon
) {
501 pending_service_map
.rm_daemon(m
->service_name
, m
->daemon_name
);
502 pending_service_map_dirty
= pending_service_map
.epoch
;
507 // send same message back as a reply
508 m
->get_connection()->send_message(m
);
512 bool DaemonServer::handle_report(MMgrReport
*m
)
515 if (!m
->service_name
.empty()) {
516 key
.first
= m
->service_name
;
518 key
.first
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
520 key
.second
= m
->daemon_name
;
522 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
524 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
525 m
->service_name
.empty()) {
526 // Clients should not be sending us stats unless they are declaring
527 // themselves to be a daemon for some service.
528 dout(4) << "rejecting report from non-daemon client " << m
->daemon_name
530 m
->get_connection()->mark_down();
535 // Look up the DaemonState
536 DaemonStatePtr daemon
;
537 if (daemon_state
.exists(key
)) {
538 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
539 daemon
= daemon_state
.get(key
);
541 // we don't know the hostname at this stage, reject MMgrReport here.
542 dout(5) << "rejecting report from " << key
<< ", since we do not have its metadata now."
545 // issue metadata request in background
546 if (!daemon_state
.is_updating(key
) &&
547 (key
.first
== "osd" || key
.first
== "mds" || key
.first
== "mon")) {
549 std::ostringstream oss
;
550 auto c
= new MetadataUpdate(daemon_state
, key
);
551 if (key
.first
== "osd") {
552 oss
<< "{\"prefix\": \"osd metadata\", \"id\": "
555 } else if (key
.first
== "mds") {
556 c
->set_default("addr", stringify(m
->get_source_addr()));
557 oss
<< "{\"prefix\": \"mds metadata\", \"who\": \""
558 << key
.second
<< "\"}";
560 } else if (key
.first
== "mon") {
561 oss
<< "{\"prefix\": \"mon metadata\", \"id\": \""
562 << key
.second
<< "\"}";
567 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
571 std::lock_guard
l(lock
);
573 auto priv
= m
->get_connection()->get_priv();
574 auto session
= static_cast<MgrSession
*>(priv
.get());
578 m
->get_connection()->mark_down();
580 dout(10) << "unregistering osd." << session
->osd_id
581 << " session " << session
<< " con " << m
->get_connection() << dendl
;
583 if (osd_cons
.find(session
->osd_id
) != osd_cons
.end()) {
584 osd_cons
[session
->osd_id
].erase(m
->get_connection());
587 auto iter
= daemon_connections
.find(m
->get_connection());
588 if (iter
!= daemon_connections
.end()) {
589 daemon_connections
.erase(iter
);
596 // Update the DaemonState
597 ceph_assert(daemon
!= nullptr);
599 std::lock_guard
l(daemon
->lock
);
600 auto &daemon_counters
= daemon
->perf_counters
;
601 daemon_counters
.update(m
);
603 auto p
= m
->config_bl
.cbegin();
604 if (p
!= m
->config_bl
.end()) {
605 decode(daemon
->config
, p
);
606 decode(daemon
->ignored_mon_config
, p
);
607 dout(20) << " got config " << daemon
->config
608 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
611 if (daemon
->service_daemon
) {
612 utime_t now
= ceph_clock_now();
613 if (m
->daemon_status
) {
614 daemon
->service_status
= *m
->daemon_status
;
615 daemon
->service_status_stamp
= now
;
617 daemon
->last_service_beacon
= now
;
618 } else if (m
->daemon_status
) {
619 derr
<< "got status from non-daemon " << key
<< dendl
;
621 if (m
->get_connection()->peer_is_osd() || m
->get_connection()->peer_is_mon()) {
622 // only OSD and MON send health_checks to me now
623 daemon
->daemon_health_metrics
= std::move(m
->daemon_health_metrics
);
624 dout(10) << "daemon_health_metrics " << daemon
->daemon_health_metrics
629 // if there are any schema updates, notify the python modules
630 if (!m
->declare_types
.empty() || !m
->undeclare_types
.empty()) {
632 oss
<< key
.first
<< '.' << key
.second
;
633 py_modules
.notify_all("perf_schema_update", oss
.str());
636 if (m
->get_connection()->peer_is_osd()) {
637 osd_perf_metric_collector
.process_reports(m
->osd_perf_metric_reports
);
645 void DaemonServer::_generate_command_map(
647 map
<string
,string
> ¶m_str_map
)
649 for (auto p
= cmdmap
.begin();
650 p
!= cmdmap
.end(); ++p
) {
651 if (p
->first
== "prefix")
653 if (p
->first
== "caps") {
655 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
656 cv
.size() % 2 == 0) {
657 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
658 string k
= string("caps_") + cv
[i
];
659 param_str_map
[k
] = cv
[i
+ 1];
664 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
668 const MonCommand
*DaemonServer::_get_mgrcommand(
669 const string
&cmd_prefix
,
670 const std::vector
<MonCommand
> &cmds
)
672 const MonCommand
*this_cmd
= nullptr;
673 for (const auto &cmd
: cmds
) {
674 if (cmd
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
682 bool DaemonServer::_allowed_command(
684 const string
&module
,
685 const string
&prefix
,
686 const cmdmap_t
& cmdmap
,
687 const map
<string
,string
>& param_str_map
,
688 const MonCommand
*this_cmd
) {
690 if (s
->entity_name
.is_mon()) {
691 // mon is all-powerful. even when it is forwarding commands on behalf of
692 // old clients; we expect the mon is validating commands before proxying!
696 bool cmd_r
= this_cmd
->requires_perm('r');
697 bool cmd_w
= this_cmd
->requires_perm('w');
698 bool cmd_x
= this_cmd
->requires_perm('x');
700 bool capable
= s
->caps
.is_capable(
702 CEPH_ENTITY_TYPE_MGR
,
704 module
, prefix
, param_str_map
,
708 dout(10) << " " << s
->entity_name
<< " "
709 << (capable
? "" : "not ") << "capable" << dendl
;
714 * The working data for processing an MCommand. This lives in
715 * a class to enable passing it into other threads for processing
716 * outside of the thread/locks that called handle_command.
718 class CommandContext
{
724 explicit CommandContext(MCommand
*m_
)
732 void reply(int r
, const std::stringstream
&ss
) {
736 void reply(int r
, const std::string
&rs
) {
737 // Let the connection drop as soon as we've sent our response
738 ConnectionRef con
= m
->get_connection();
740 con
->mark_disposable();
744 dout(4) << __func__
<< " success" << dendl
;
746 derr
<< __func__
<< " " << cpp_strerror(r
) << " " << rs
<< dendl
;
749 MCommandReply
*reply
= new MCommandReply(r
, rs
);
750 reply
->set_tid(m
->get_tid());
751 reply
->set_data(odata
);
752 con
->send_message(reply
);
758 * A context for receiving a bufferlist/error string from a background
759 * function and then calling back to a CommandContext when it's done
761 class ReplyOnFinish
: public Context
{
762 std::shared_ptr
<CommandContext
> cmdctx
;
768 explicit ReplyOnFinish(const std::shared_ptr
<CommandContext
> &cmdctx_
)
771 void finish(int r
) override
{
772 cmdctx
->odata
.claim_append(from_mon
);
773 cmdctx
->reply(r
, outs
);
777 bool DaemonServer::handle_command(MCommand
*m
)
779 std::lock_guard
l(lock
);
780 std::shared_ptr
<CommandContext
> cmdctx
= std::make_shared
<CommandContext
>(m
);
782 return _handle_command(m
, cmdctx
);
783 } catch (const bad_cmd_get
& e
) {
784 cmdctx
->reply(-EINVAL
, e
.what());
789 bool DaemonServer::_handle_command(
791 std::shared_ptr
<CommandContext
>& cmdctx
)
793 auto priv
= m
->get_connection()->get_priv();
794 auto session
= static_cast<MgrSession
*>(priv
.get());
798 if (session
->inst
.name
== entity_name_t())
799 session
->inst
.name
= m
->get_source();
802 boost::scoped_ptr
<Formatter
> f
;
803 map
<string
,string
> param_str_map
;
804 std::stringstream ss
;
807 if (!cmdmap_from_json(m
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
808 cmdctx
->reply(-EINVAL
, ss
);
813 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "format", format
, string("plain"));
814 f
.reset(Formatter::create(format
));
818 cmd_getval(cct
, cmdctx
->cmdmap
, "prefix", prefix
);
820 dout(4) << "decoded " << cmdctx
->cmdmap
.size() << dendl
;
821 dout(4) << "prefix=" << prefix
<< dendl
;
823 if (prefix
== "get_command_descriptions") {
824 dout(10) << "reading commands from python modules" << dendl
;
825 const auto py_commands
= py_modules
.get_commands();
829 f
.open_object_section("command_descriptions");
831 auto dump_cmd
= [&cmdnum
, &f
, m
](const MonCommand
&mc
){
832 ostringstream secname
;
833 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
834 dump_cmddesc_to_json(&f
, m
->get_connection()->get_features(),
835 secname
.str(), mc
.cmdstring
, mc
.helpstring
,
836 mc
.module
, mc
.req_perms
, 0);
840 for (const auto &pyc
: py_commands
) {
844 for (const auto &mgr_cmd
: mgr_commands
) {
848 f
.close_section(); // command_descriptions
849 f
.flush(cmdctx
->odata
);
850 cmdctx
->reply(0, ss
);
855 const MonCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
);
856 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
860 MonCommand py_command
= {"", "", "py", "rw"};
861 is_allowed
= _allowed_command(session
, py_command
.module
,
862 prefix
, cmdctx
->cmdmap
, param_str_map
, &py_command
);
864 // validate user's permissions for requested command
865 is_allowed
= _allowed_command(session
, mgr_cmd
->module
,
866 prefix
, cmdctx
->cmdmap
, param_str_map
, mgr_cmd
);
869 dout(1) << " access denied" << dendl
;
870 audit_clog
->info() << "from='" << session
->inst
<< "' "
871 << "entity='" << session
->entity_name
<< "' "
872 << "cmd=" << m
->cmd
<< ": access denied";
873 ss
<< "access denied: does your client key have mgr caps? "
874 "See http://docs.ceph.com/docs/master/mgr/administrator/"
875 "#client-authentication";
876 cmdctx
->reply(-EACCES
, ss
);
881 << "from='" << session
->inst
<< "' "
882 << "entity='" << session
->entity_name
<< "' "
883 << "cmd=" << m
->cmd
<< ": dispatch";
886 // service map commands
887 if (prefix
== "service dump") {
889 f
.reset(Formatter::create("json-pretty"));
890 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
891 f
->dump_object("service_map", service_map
);
893 f
->flush(cmdctx
->odata
);
894 cmdctx
->reply(0, ss
);
897 if (prefix
== "service status") {
899 f
.reset(Formatter::create("json-pretty"));
900 // only include state from services that are in the persisted service map
901 f
->open_object_section("service_status");
902 for (auto& p
: pending_service_map
.services
) {
903 f
->open_object_section(p
.first
.c_str());
904 for (auto& q
: p
.second
.daemons
) {
905 f
->open_object_section(q
.first
.c_str());
906 DaemonKey
key(p
.first
, q
.first
);
907 ceph_assert(daemon_state
.exists(key
));
908 auto daemon
= daemon_state
.get(key
);
909 std::lock_guard
l(daemon
->lock
);
910 f
->dump_stream("status_stamp") << daemon
->service_status_stamp
;
911 f
->dump_stream("last_beacon") << daemon
->last_service_beacon
;
912 f
->open_object_section("status");
913 for (auto& r
: daemon
->service_status
) {
914 f
->dump_string(r
.first
.c_str(), r
.second
);
922 f
->flush(cmdctx
->odata
);
923 cmdctx
->reply(0, ss
);
927 if (prefix
== "config set") {
930 cmd_getval(cct
, cmdctx
->cmdmap
, "key", key
);
931 cmd_getval(cct
, cmdctx
->cmdmap
, "value", val
);
932 r
= cct
->_conf
.set_val(key
, val
, &ss
);
934 cct
->_conf
.apply_changes(nullptr);
936 cmdctx
->reply(0, ss
);
943 if (prefix
== "pg scrub" ||
944 prefix
== "pg repair" ||
945 prefix
== "pg deep-scrub") {
946 string scrubop
= prefix
.substr(3, string::npos
);
950 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgidstr
);
951 if (!pgid
.parse(pgidstr
.c_str())) {
952 ss
<< "invalid pgid '" << pgidstr
<< "'";
953 cmdctx
->reply(-EINVAL
, ss
);
956 bool pg_exists
= false;
957 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
958 pg_exists
= osdmap
.pg_exists(pgid
);
961 ss
<< "pg " << pgid
<< " does not exist";
962 cmdctx
->reply(-ENOENT
, ss
);
965 int acting_primary
= -1;
967 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
968 epoch
= osdmap
.get_epoch();
969 osdmap
.get_primary_shard(pgid
, &acting_primary
, &spgid
);
971 if (acting_primary
== -1) {
972 ss
<< "pg " << pgid
<< " has no primary osd";
973 cmdctx
->reply(-EAGAIN
, ss
);
976 auto p
= osd_cons
.find(acting_primary
);
977 if (p
== osd_cons
.end()) {
978 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
979 << " is not currently connected";
980 cmdctx
->reply(-EAGAIN
, ss
);
983 for (auto& con
: p
->second
) {
984 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
985 vector
<spg_t
> pgs
= { spgid
};
986 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
990 scrubop
== "deep-scrub"));
992 vector
<pg_t
> pgs
= { pgid
};
993 con
->send_message(new MOSDScrub(monc
->get_fsid(),
996 scrubop
== "deep-scrub"));
999 ss
<< "instructing pg " << spgid
<< " on osd." << acting_primary
1000 << " to " << scrubop
;
1001 cmdctx
->reply(0, ss
);
1003 } else if (prefix
== "osd scrub" ||
1004 prefix
== "osd deep-scrub" ||
1005 prefix
== "osd repair") {
1007 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", whostr
);
1008 vector
<string
> pvec
;
1009 get_str_vec(prefix
, pvec
);
1012 if (whostr
== "*" || whostr
== "all" || whostr
== "any") {
1013 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1014 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
1015 if (osdmap
.is_up(i
)) {
1020 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
1022 ss
<< "invalid osd '" << whostr
<< "'";
1023 cmdctx
->reply(-EINVAL
, ss
);
1026 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1027 if (osdmap
.is_up(osd
)) {
1032 ss
<< "osd." << osd
<< " is not up";
1033 cmdctx
->reply(-EAGAIN
, ss
);
1037 set
<int> sent_osds
, failed_osds
;
1038 for (auto osd
: osds
) {
1041 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1042 epoch
= osdmap
.get_epoch();
1043 auto p
= pgmap
.pg_by_osd
.find(osd
);
1044 if (p
!= pgmap
.pg_by_osd
.end()) {
1045 for (auto pgid
: p
->second
) {
1048 osdmap
.get_primary_shard(pgid
, &primary
, &spg
);
1049 if (primary
== osd
) {
1050 spgs
.push_back(spg
);
1055 auto p
= osd_cons
.find(osd
);
1056 if (p
== osd_cons
.end()) {
1057 failed_osds
.insert(osd
);
1059 sent_osds
.insert(osd
);
1060 for (auto& con
: p
->second
) {
1061 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
1062 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1065 pvec
.back() == "repair",
1066 pvec
.back() == "deep-scrub"));
1068 con
->send_message(new MOSDScrub(monc
->get_fsid(),
1069 pvec
.back() == "repair",
1070 pvec
.back() == "deep-scrub"));
1075 if (failed_osds
.size() == osds
.size()) {
1076 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
1077 << " (not connected)";
1080 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
1081 if (!failed_osds
.empty()) {
1082 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
1086 cmdctx
->reply(0, ss
);
1088 } else if (prefix
== "osd pool scrub" ||
1089 prefix
== "osd pool deep-scrub" ||
1090 prefix
== "osd pool repair") {
1091 vector
<string
> pool_names
;
1092 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", pool_names
);
1093 if (pool_names
.empty()) {
1094 ss
<< "must specify one or more pool names";
1095 cmdctx
->reply(-EINVAL
, ss
);
1099 map
<int32_t, vector
<pg_t
>> pgs_by_primary
; // legacy
1100 map
<int32_t, vector
<spg_t
>> spgs_by_primary
;
1101 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1102 epoch
= osdmap
.get_epoch();
1103 for (auto& pool_name
: pool_names
) {
1104 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1106 ss
<< "unrecognized pool '" << pool_name
<< "'";
1110 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1111 for (int i
= 0; i
< pool_pg_num
; i
++) {
1112 pg_t
pg(i
, pool_id
);
1115 auto got
= osdmap
.get_primary_shard(pg
, &primary
, &spg
);
1118 pgs_by_primary
[primary
].push_back(pg
);
1119 spgs_by_primary
[primary
].push_back(spg
);
1124 cmdctx
->reply(r
, ss
);
1127 for (auto& it
: spgs_by_primary
) {
1128 auto primary
= it
.first
;
1129 auto p
= osd_cons
.find(primary
);
1130 if (p
== osd_cons
.end()) {
1131 ss
<< "osd." << primary
<< " is not currently connected";
1132 cmdctx
->reply(-EAGAIN
, ss
);
1135 for (auto& con
: p
->second
) {
1136 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
1137 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1140 prefix
== "osd pool repair",
1141 prefix
== "osd pool deep-scrub"));
1144 auto q
= pgs_by_primary
.find(primary
);
1145 ceph_assert(q
!= pgs_by_primary
.end());
1146 con
->send_message(new MOSDScrub(monc
->get_fsid(),
1148 prefix
== "osd pool repair",
1149 prefix
== "osd pool deep-scrub"));
1153 cmdctx
->reply(0, "");
1155 } else if (prefix
== "osd reweight-by-pg" ||
1156 prefix
== "osd reweight-by-utilization" ||
1157 prefix
== "osd test-reweight-by-pg" ||
1158 prefix
== "osd test-reweight-by-utilization") {
1160 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
1162 prefix
== "osd test-reweight-by-pg" ||
1163 prefix
== "osd test-reweight-by-utilization";
1165 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "oload", oload
, int64_t(120));
1167 vector
<string
> poolnames
;
1168 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pools", poolnames
);
1169 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1170 for (const auto& poolname
: poolnames
) {
1171 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
1173 ss
<< "pool '" << poolname
<< "' does not exist";
1180 cmdctx
->reply(r
, ss
);
1184 double max_change
= g_conf().get_val
<double>("mon_reweight_max_change");
1185 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_change", max_change
);
1186 if (max_change
<= 0.0) {
1187 ss
<< "max_change " << max_change
<< " must be positive";
1188 cmdctx
->reply(-EINVAL
, ss
);
1191 int64_t max_osds
= g_conf().get_val
<int64_t>("mon_reweight_max_osds");
1192 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_osds", max_osds
);
1193 if (max_osds
<= 0) {
1194 ss
<< "max_osds " << max_osds
<< " must be positive";
1195 cmdctx
->reply(-EINVAL
, ss
);
1198 bool no_increasing
= false;
1199 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "no_increasing", no_increasing
);
1201 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
1202 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
&osdmap
, const PGMap
& pgmap
) {
1203 return reweight::by_utilization(osdmap
, pgmap
,
1208 pools
.empty() ? NULL
: &pools
,
1211 &ss
, &out_str
, f
.get());
1214 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
1217 f
->flush(cmdctx
->odata
);
1219 cmdctx
->odata
.append(out_str
);
1222 ss
<< "FAILED reweight-by-pg";
1223 cmdctx
->reply(r
, ss
);
1225 } else if (r
== 0 || dry_run
) {
1227 cmdctx
->reply(r
, ss
);
1230 json_spirit::Object json_object
;
1231 for (const auto& osd_weight
: new_weights
) {
1232 json_spirit::Config::add(json_object
,
1233 std::to_string(osd_weight
.first
),
1234 std::to_string(osd_weight
.second
));
1236 string s
= json_spirit::write(json_object
);
1237 std::replace(begin(s
), end(s
), '\"', '\'');
1240 "\"prefix\": \"osd reweightn\", "
1241 "\"weights\": \"" + s
+ "\""
1243 auto on_finish
= new ReplyOnFinish(cmdctx
);
1244 monc
->start_mon_command({cmd
}, {},
1245 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
1248 } else if (prefix
== "osd df") {
1250 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "output_method", method
);
1253 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "filter_by", filter_by
);
1254 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "filter", filter
);
1255 if (filter_by
.empty() != filter
.empty()) {
1256 cmdctx
->reply(-EINVAL
, "you must specify both 'filter_by' and 'filter'");
1260 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1263 // sanity check filter(s)
1264 if (filter_by
== "class") {
1265 if (!osdmap
.crush
->class_exists(filter
)) {
1266 rs
<< "specified class '" << filter
<< "' does not exist";
1269 class_name
= filter
;
1271 if (filter_by
== "name") {
1272 if (!osdmap
.crush
->name_exists(filter
)) {
1273 rs
<< "specified name '" << filter
<< "' does not exist";
1278 print_osd_utilization(osdmap
, pgmap
, ss
,
1279 f
.get(), method
== "tree",
1280 class_name
, item_name
);
1282 cmdctx
->odata
.append(ss
);
1285 cmdctx
->reply(r
, rs
);
1287 } else if (prefix
== "osd pool stats") {
1289 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pool_name", pool_name
);
1290 int64_t poolid
= -ENOENT
;
1291 bool one_pool
= false;
1292 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1293 if (!pool_name
.empty()) {
1294 poolid
= osdmap
.lookup_pg_pool_name(pool_name
);
1296 ceph_assert(poolid
== -ENOENT
);
1297 ss
<< "unrecognized pool '" << pool_name
<< "'";
1304 f
->open_array_section("pool_stats");
1306 if (osdmap
.get_pools().empty()) {
1307 ss
<< "there are no pools!";
1311 for (auto &p
: osdmap
.get_pools()) {
1315 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, f
.get(), &rs
);
1323 f
->flush(cmdctx
->odata
);
1325 cmdctx
->odata
.append(rs
.str());
1329 if (r
!= -EOPNOTSUPP
) {
1330 cmdctx
->reply(r
, ss
);
1333 } else if (prefix
== "osd safe-to-destroy" ||
1334 prefix
== "osd destroy" ||
1335 prefix
== "osd purge") {
1338 if (prefix
== "osd safe-to-destroy") {
1340 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "ids", ids
);
1341 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1342 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1344 if (!r
&& osds
.empty()) {
1345 ss
<< "must specify one or more OSDs";
1350 if (!cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "id", id
)) {
1352 ss
<< "must specify OSD id";
1358 cmdctx
->reply(r
, ss
);
1361 set
<int> active_osds
, missing_stats
, stored_pgs
, safe_to_destroy
;
1362 int affected_pgs
= 0;
1363 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1364 if (pg_map
.num_pg_unknown
> 0) {
1365 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; cannot draw"
1366 << " any conclusions";
1370 int num_active_clean
= 0;
1371 for (auto& p
: pg_map
.num_pg_by_state
) {
1372 unsigned want
= PG_STATE_ACTIVE
|PG_STATE_CLEAN
;
1373 if ((p
.first
& want
) == want
) {
1374 num_active_clean
+= p
.second
;
1377 for (auto osd
: osds
) {
1378 if (!osdmap
.exists(osd
)) {
1379 safe_to_destroy
.insert(osd
);
1380 continue; // clearly safe to destroy
1382 auto q
= pg_map
.num_pg_by_osd
.find(osd
);
1383 if (q
!= pg_map
.num_pg_by_osd
.end()) {
1384 if (q
->second
.acting
> 0 || q
->second
.up_not_acting
> 0) {
1385 active_osds
.insert(osd
);
1386 // XXX: For overlapping PGs, this counts them again
1387 affected_pgs
+= q
->second
.acting
+ q
->second
.up_not_acting
;
1391 if (num_active_clean
< pg_map
.num_pg
) {
1392 // all pgs aren't active+clean; we need to be careful.
1393 auto p
= pg_map
.osd_stat
.find(osd
);
1394 if (p
== pg_map
.osd_stat
.end() || !osdmap
.is_up(osd
)) {
1395 missing_stats
.insert(osd
);
1397 } else if (p
->second
.num_pgs
> 0) {
1398 stored_pgs
.insert(osd
);
1402 safe_to_destroy
.insert(osd
);
1405 if (r
&& prefix
== "osd safe-to-destroy") {
1406 cmdctx
->reply(r
, ss
); // regardless of formatter
1409 if (!r
&& (!active_osds
.empty() ||
1410 !missing_stats
.empty() || !stored_pgs
.empty())) {
1411 if (!safe_to_destroy
.empty()) {
1412 ss
<< "OSD(s) " << safe_to_destroy
1413 << " are safe to destroy without reducing data durability. ";
1415 if (!active_osds
.empty()) {
1416 ss
<< "OSD(s) " << active_osds
<< " have " << affected_pgs
1417 << " pgs currently mapped to them. ";
1419 if (!missing_stats
.empty()) {
1420 ss
<< "OSD(s) " << missing_stats
<< " have no reported stats, and not all"
1421 << " PGs are active+clean; we cannot draw any conclusions. ";
1423 if (!stored_pgs
.empty()) {
1424 ss
<< "OSD(s) " << stored_pgs
<< " last reported they still store some PG"
1425 << " data, and not all PGs are active+clean; we cannot be sure they"
1426 << " aren't still needed.";
1428 if (!active_osds
.empty() || !stored_pgs
.empty()) {
1435 if (prefix
== "osd safe-to-destroy") {
1437 ss
<< "OSD(s) " << osds
<< " are safe to destroy without reducing data"
1441 f
->open_object_section("osd_status");
1442 f
->open_array_section("safe_to_destroy");
1443 for (auto i
: safe_to_destroy
)
1444 f
->dump_int("osd", i
);
1446 f
->open_array_section("active");
1447 for (auto i
: active_osds
)
1448 f
->dump_int("osd", i
);
1450 f
->open_array_section("missing_stats");
1451 for (auto i
: missing_stats
)
1452 f
->dump_int("osd", i
);
1454 f
->open_array_section("stored_pgs");
1455 for (auto i
: stored_pgs
)
1456 f
->dump_int("osd", i
);
1458 f
->close_section(); // osd_status
1459 f
->flush(cmdctx
->odata
);
1461 std::stringstream().swap(ss
);
1463 cmdctx
->reply(r
, ss
);
1469 cmd_getval(cct
, cmdctx
->cmdmap
, "force", force
);
1472 cmd_getval(cct
, cmdctx
->cmdmap
, "yes_i_really_mean_it", force
);
1475 ss
<< "\nYou can proceed by passing --force, but be warned that"
1476 " this will likely mean real, permanent data loss.";
1482 cmdctx
->reply(r
, ss
);
1487 "\"prefix\": \"" + prefix
+ "-actual\", "
1488 "\"id\": " + stringify(osds
) + ", "
1489 "\"yes_i_really_mean_it\": true"
1491 auto on_finish
= new ReplyOnFinish(cmdctx
);
1492 monc
->start_mon_command({cmd
}, {}, nullptr, &on_finish
->outs
, on_finish
);
1494 } else if (prefix
== "osd ok-to-stop") {
1496 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "ids", ids
);
1499 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1500 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1502 if (!r
&& osds
.empty()) {
1503 ss
<< "must specify one or more OSDs";
1507 cmdctx
->reply(r
, ss
);
1510 int touched_pgs
= 0;
1511 int dangerous_pgs
= 0;
1512 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1513 if (pg_map
.num_pg_unknown
> 0) {
1514 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; "
1515 << "cannot draw any conclusions";
1519 for (const auto& q
: pg_map
.pg_stat
) {
1520 set
<int32_t> pg_acting
; // net acting sets (with no missing if degraded)
1522 if (q
.second
.state
& PG_STATE_DEGRADED
) {
1523 for (auto& anm
: q
.second
.avail_no_missing
) {
1524 if (osds
.count(anm
.osd
)) {
1528 pg_acting
.insert(anm
.osd
);
1531 for (auto& a
: q
.second
.acting
) {
1532 if (osds
.count(a
)) {
1536 pg_acting
.insert(a
);
1543 if (!(q
.second
.state
& PG_STATE_ACTIVE
) ||
1544 (q
.second
.state
& PG_STATE_DEGRADED
)) {
1548 const pg_pool_t
*pi
= osdmap
.get_pg_pool(q
.first
.pool());
1550 ++dangerous_pgs
; // pool is creating or deleting
1552 if (pg_acting
.size() < pi
->min_size
) {
1559 cmdctx
->reply(r
, ss
);
1562 if (dangerous_pgs
) {
1563 ss
<< dangerous_pgs
<< " PGs are already too degraded, would become"
1564 << " too degraded or might become unavailable";
1565 cmdctx
->reply(-EBUSY
, ss
);
1568 ss
<< "OSD(s) " << osds
<< " are ok to stop without reducing"
1569 << " availability or risking data, provided there are no other concurrent failures"
1570 << " or interventions." << std::endl
;
1571 ss
<< touched_pgs
<< " PGs are likely to be"
1572 << " degraded (but remain available) as a result.";
1573 cmdctx
->reply(0, ss
);
1575 } else if (prefix
== "pg force-recovery" ||
1576 prefix
== "pg force-backfill" ||
1577 prefix
== "pg cancel-force-recovery" ||
1578 prefix
== "pg cancel-force-backfill" ||
1579 prefix
== "osd pool force-recovery" ||
1580 prefix
== "osd pool force-backfill" ||
1581 prefix
== "osd pool cancel-force-recovery" ||
1582 prefix
== "osd pool cancel-force-backfill") {
1584 get_str_vec(prefix
, vs
);
1585 auto& granularity
= vs
.front();
1586 auto& forceop
= vs
.back();
1589 // figure out actual op just once
1591 if (forceop
== "force-recovery") {
1592 actual_op
= OFR_RECOVERY
;
1593 } else if (forceop
== "force-backfill") {
1594 actual_op
= OFR_BACKFILL
;
1595 } else if (forceop
== "cancel-force-backfill") {
1596 actual_op
= OFR_BACKFILL
| OFR_CANCEL
;
1597 } else if (forceop
== "cancel-force-recovery") {
1598 actual_op
= OFR_RECOVERY
| OFR_CANCEL
;
1601 set
<pg_t
> candidates
; // deduped
1602 if (granularity
== "pg") {
1603 // covnert pg names to pgs, discard any invalid ones while at it
1604 vector
<string
> pgids
;
1605 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgids
);
1606 for (auto& i
: pgids
) {
1608 if (!pgid
.parse(i
.c_str())) {
1609 ss
<< "invlaid pgid '" << i
<< "'; ";
1613 candidates
.insert(pgid
);
1617 vector
<string
> pool_names
;
1618 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", pool_names
);
1619 if (pool_names
.empty()) {
1620 ss
<< "must specify one or more pool names";
1621 cmdctx
->reply(-EINVAL
, ss
);
1624 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1625 for (auto& pool_name
: pool_names
) {
1626 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1628 ss
<< "unrecognized pool '" << pool_name
<< "'";
1632 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1633 for (int i
= 0; i
< pool_pg_num
; i
++)
1634 candidates
.insert({(unsigned int)i
, (uint64_t)pool_id
});
1638 cmdctx
->reply(r
, ss
);
1643 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1644 for (auto& i
: candidates
) {
1645 auto it
= pg_map
.pg_stat
.find(i
);
1646 if (it
== pg_map
.pg_stat
.end()) {
1647 ss
<< "pg " << i
<< " does not exist; ";
1651 auto state
= it
->second
.state
;
1652 // discard pgs for which user requests are pointless
1653 switch (actual_op
) {
1655 if ((state
& (PG_STATE_DEGRADED
|
1656 PG_STATE_RECOVERY_WAIT
|
1657 PG_STATE_RECOVERING
)) == 0) {
1658 // don't return error, user script may be racing with cluster.
1660 ss
<< "pg " << i
<< " doesn't require recovery; ";
1662 } else if (state
& PG_STATE_FORCED_RECOVERY
) {
1663 ss
<< "pg " << i
<< " recovery already forced; ";
1664 // return error, as it may be a bug in user script
1670 if ((state
& (PG_STATE_DEGRADED
|
1671 PG_STATE_BACKFILL_WAIT
|
1672 PG_STATE_BACKFILLING
)) == 0) {
1673 ss
<< "pg " << i
<< " doesn't require backfilling; ";
1675 } else if (state
& PG_STATE_FORCED_BACKFILL
) {
1676 ss
<< "pg " << i
<< " backfill already forced; ";
1681 case OFR_BACKFILL
| OFR_CANCEL
:
1682 if ((state
& PG_STATE_FORCED_BACKFILL
) == 0) {
1683 ss
<< "pg " << i
<< " backfill not forced; ";
1687 case OFR_RECOVERY
| OFR_CANCEL
:
1688 if ((state
& PG_STATE_FORCED_RECOVERY
) == 0) {
1689 ss
<< "pg " << i
<< " recovery not forced; ";
1694 ceph_abort_msg("actual_op value is not supported");
1700 // respond with error only when no pgs are correct
1701 // yes, in case of mixed errors, only the last one will be emitted,
1702 // but the message presented will be fine
1703 if (pgs
.size() != 0) {
1704 // clear error to not confuse users/scripts
1708 // optimize the command -> messages conversion, use only one
1709 // message per distinct OSD
1710 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1711 // group pgs to process by osd
1712 map
<int, vector
<spg_t
>> osdpgs
;
1713 for (auto& pgid
: pgs
) {
1716 if (osdmap
.get_primary_shard(pgid
, &primary
, &spg
)) {
1717 osdpgs
[primary
].push_back(spg
);
1720 for (auto& i
: osdpgs
) {
1721 if (osdmap
.is_up(i
.first
)) {
1722 auto p
= osd_cons
.find(i
.first
);
1723 if (p
== osd_cons
.end()) {
1724 ss
<< "osd." << i
.first
<< " is not currently connected";
1728 for (auto& con
: p
->second
) {
1730 new MOSDForceRecovery(monc
->get_fsid(), i
.second
, actual_op
));
1732 ss
<< "instructing pg(s) " << i
.second
<< " on osd." << i
.first
1733 << " to " << forceop
<< "; ";
1738 cmdctx
->reply(r
, ss
);
1740 } else if (prefix
== "config show" ||
1741 prefix
== "config show-with-defaults") {
1743 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", who
);
1745 auto dot
= who
.find('.');
1747 key
.first
= who
.substr(0, dot
);
1748 key
.second
= who
.substr(dot
+ 1);
1749 DaemonStatePtr daemon
= daemon_state
.get(key
);
1752 ss
<< "no config state for daemon " << who
;
1753 cmdctx
->reply(-ENOENT
, ss
);
1757 std::lock_guard
l(daemon
->lock
);
1759 if (cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "key", name
)) {
1760 auto p
= daemon
->config
.find(name
);
1761 if (p
!= daemon
->config
.end() &&
1762 !p
->second
.empty()) {
1763 cmdctx
->odata
.append(p
->second
.rbegin()->second
+ "\n");
1765 auto& defaults
= daemon
->_get_config_defaults();
1766 auto q
= defaults
.find(name
);
1767 if (q
!= defaults
.end()) {
1768 cmdctx
->odata
.append(q
->second
+ "\n");
1773 } else if (daemon
->config_defaults_bl
.length() > 0) {
1776 f
->open_array_section("config");
1778 tbl
.define_column("NAME", TextTable::LEFT
, TextTable::LEFT
);
1779 tbl
.define_column("VALUE", TextTable::LEFT
, TextTable::LEFT
);
1780 tbl
.define_column("SOURCE", TextTable::LEFT
, TextTable::LEFT
);
1781 tbl
.define_column("OVERRIDES", TextTable::LEFT
, TextTable::LEFT
);
1782 tbl
.define_column("IGNORES", TextTable::LEFT
, TextTable::LEFT
);
1784 if (prefix
== "config show") {
1786 for (auto& i
: daemon
->config
) {
1787 dout(20) << " " << i
.first
<< " -> " << i
.second
<< dendl
;
1788 if (i
.second
.empty()) {
1792 f
->open_object_section("value");
1793 f
->dump_string("name", i
.first
);
1794 f
->dump_string("value", i
.second
.rbegin()->second
);
1795 f
->dump_string("source", ceph_conf_level_name(
1796 i
.second
.rbegin()->first
));
1797 if (i
.second
.size() > 1) {
1798 f
->open_array_section("overrides");
1799 auto j
= i
.second
.rend();
1800 for (--j
; j
!= i
.second
.rbegin(); --j
) {
1801 f
->open_object_section("value");
1802 f
->dump_string("source", ceph_conf_level_name(j
->first
));
1803 f
->dump_string("value", j
->second
);
1808 if (daemon
->ignored_mon_config
.count(i
.first
)) {
1809 f
->dump_string("ignores", "mon");
1814 tbl
<< i
.second
.rbegin()->second
;
1815 tbl
<< ceph_conf_level_name(i
.second
.rbegin()->first
);
1816 if (i
.second
.size() > 1) {
1818 auto j
= i
.second
.rend();
1819 for (--j
; j
!= i
.second
.rbegin(); --j
) {
1820 if (j
->second
== i
.second
.rbegin()->second
) {
1821 ov
.push_front(string("(") + ceph_conf_level_name(j
->first
) +
1822 string("[") + j
->second
+ string("]") +
1825 ov
.push_front(ceph_conf_level_name(j
->first
) +
1826 string("[") + j
->second
+ string("]"));
1834 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
1835 tbl
<< TextTable::endrow
;
1839 // show-with-defaults
1840 auto& defaults
= daemon
->_get_config_defaults();
1841 for (auto& i
: defaults
) {
1843 f
->open_object_section("value");
1844 f
->dump_string("name", i
.first
);
1848 auto j
= daemon
->config
.find(i
.first
);
1849 if (j
!= daemon
->config
.end() && !j
->second
.empty()) {
1852 f
->dump_string("value", j
->second
.rbegin()->second
);
1853 f
->dump_string("source", ceph_conf_level_name(
1854 j
->second
.rbegin()->first
));
1855 if (j
->second
.size() > 1) {
1856 f
->open_array_section("overrides");
1857 auto k
= j
->second
.rend();
1858 for (--k
; k
!= j
->second
.rbegin(); --k
) {
1859 f
->open_object_section("value");
1860 f
->dump_string("source", ceph_conf_level_name(k
->first
));
1861 f
->dump_string("value", k
->second
);
1866 if (daemon
->ignored_mon_config
.count(i
.first
)) {
1867 f
->dump_string("ignores", "mon");
1871 tbl
<< j
->second
.rbegin()->second
;
1872 tbl
<< ceph_conf_level_name(j
->second
.rbegin()->first
);
1873 if (j
->second
.size() > 1) {
1875 auto k
= j
->second
.rend();
1876 for (--k
; k
!= j
->second
.rbegin(); --k
) {
1877 if (k
->second
== j
->second
.rbegin()->second
) {
1878 ov
.push_front(string("(") + ceph_conf_level_name(k
->first
) +
1879 string("[") + k
->second
+ string("]") +
1882 ov
.push_front(ceph_conf_level_name(k
->first
) +
1883 string("[") + k
->second
+ string("]"));
1890 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
1891 tbl
<< TextTable::endrow
;
1894 // only have default
1896 f
->dump_string("value", i
.second
);
1897 f
->dump_string("source", ceph_conf_level_name(CONF_DEFAULT
));
1901 tbl
<< ceph_conf_level_name(CONF_DEFAULT
);
1904 tbl
<< TextTable::endrow
;
1911 f
->flush(cmdctx
->odata
);
1913 cmdctx
->odata
.append(stringify(tbl
));
1916 cmdctx
->reply(r
, ss
);
1918 } else if (prefix
== "device ls") {
1922 f
->open_array_section("devices");
1923 daemon_state
.with_devices([&f
](const DeviceState
& dev
) {
1924 f
->dump_object("device", dev
);
1927 f
->flush(cmdctx
->odata
);
1929 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
1930 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
1931 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
1932 tbl
.define_column("LIFE EXPECTANCY", TextTable::LEFT
, TextTable::LEFT
);
1933 auto now
= ceph_clock_now();
1934 daemon_state
.with_devices([&tbl
, now
](const DeviceState
& dev
) {
1936 for (auto& i
: dev
.devnames
) {
1940 h
+= i
.first
+ ":" + i
.second
;
1943 for (auto& i
: dev
.daemons
) {
1952 << dev
.get_life_expectancy_str(now
)
1953 << TextTable::endrow
;
1955 cmdctx
->odata
.append(stringify(tbl
));
1957 cmdctx
->reply(0, ss
);
1959 } else if (prefix
== "device ls-by-daemon") {
1961 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", who
);
1963 if (!key_from_string(who
, &k
)) {
1964 ss
<< who
<< " is not a valid daemon name";
1967 auto dm
= daemon_state
.get(k
);
1970 f
->open_array_section("devices");
1971 for (auto& i
: dm
->devices
) {
1972 daemon_state
.with_device(i
.first
, [&f
] (const DeviceState
& dev
) {
1973 f
->dump_object("device", dev
);
1977 f
->flush(cmdctx
->odata
);
1980 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
1981 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
1982 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
,
1984 auto now
= ceph_clock_now();
1985 for (auto& i
: dm
->devices
) {
1986 daemon_state
.with_device(
1987 i
.first
, [&tbl
, now
] (const DeviceState
& dev
) {
1989 for (auto& i
: dev
.devnames
) {
1993 h
+= i
.first
+ ":" + i
.second
;
1997 << dev
.get_life_expectancy_str(now
)
1998 << TextTable::endrow
;
2001 cmdctx
->odata
.append(stringify(tbl
));
2005 ss
<< "daemon " << who
<< " not found";
2007 cmdctx
->reply(r
, ss
);
2009 } else if (prefix
== "device ls-by-host") {
2011 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "host", host
);
2013 daemon_state
.list_devids_by_server(host
, &devids
);
2015 f
->open_array_section("devices");
2016 for (auto& devid
: devids
) {
2017 daemon_state
.with_device(
2018 devid
, [&f
] (const DeviceState
& dev
) {
2019 f
->dump_object("device", dev
);
2023 f
->flush(cmdctx
->odata
);
2026 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2027 tbl
.define_column("DEV", TextTable::LEFT
, TextTable::LEFT
);
2028 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2029 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
, TextTable::LEFT
);
2030 auto now
= ceph_clock_now();
2031 for (auto& devid
: devids
) {
2032 daemon_state
.with_device(
2033 devid
, [&tbl
, &host
, now
] (const DeviceState
& dev
) {
2035 for (auto& j
: dev
.devnames
) {
2036 if (j
.first
== host
) {
2044 for (auto& i
: dev
.daemons
) {
2053 << dev
.get_life_expectancy_str(now
)
2054 << TextTable::endrow
;
2057 cmdctx
->odata
.append(stringify(tbl
));
2059 cmdctx
->reply(0, ss
);
2061 } else if (prefix
== "device info") {
2063 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "devid", devid
);
2066 if (!daemon_state
.with_device(devid
,
2067 [&f
, &rs
] (const DeviceState
& dev
) {
2069 f
->dump_object("device", dev
);
2074 ss
<< "device " << devid
<< " not found";
2078 f
->flush(cmdctx
->odata
);
2080 cmdctx
->odata
.append(rs
.str());
2083 cmdctx
->reply(r
, ss
);
2085 } else if (prefix
== "device set-life-expectancy") {
2087 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "devid", devid
);
2088 string from_str
, to_str
;
2089 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "from", from_str
);
2090 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "to", to_str
);
2092 if (!from
.parse(from_str
)) {
2093 ss
<< "unable to parse datetime '" << from_str
<< "'";
2095 cmdctx
->reply(r
, ss
);
2096 } else if (to_str
.size() && !to
.parse(to_str
)) {
2097 ss
<< "unable to parse datetime '" << to_str
<< "'";
2099 cmdctx
->reply(r
, ss
);
2101 map
<string
,string
> meta
;
2102 daemon_state
.with_device_create(
2104 [from
, to
, &meta
] (DeviceState
& dev
) {
2105 dev
.set_life_expectancy(from
, to
, ceph_clock_now());
2106 meta
= dev
.metadata
;
2108 json_spirit::Object json_object
;
2109 for (auto& i
: meta
) {
2110 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2113 json
.append(json_spirit::write(json_object
));
2116 "\"prefix\": \"config-key set\", "
2117 "\"key\": \"device/" + devid
+ "\""
2119 auto on_finish
= new ReplyOnFinish(cmdctx
);
2120 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2123 } else if (prefix
== "device rm-life-expectancy") {
2125 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "devid", devid
);
2126 map
<string
,string
> meta
;
2127 if (daemon_state
.with_device_write(devid
, [&meta
] (DeviceState
& dev
) {
2128 dev
.rm_life_expectancy();
2129 meta
= dev
.metadata
;
2136 "\"prefix\": \"config-key rm\", "
2137 "\"key\": \"device/" + devid
+ "\""
2140 json_spirit::Object json_object
;
2141 for (auto& i
: meta
) {
2142 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2144 json
.append(json_spirit::write(json_object
));
2147 "\"prefix\": \"config-key set\", "
2148 "\"key\": \"device/" + devid
+ "\""
2151 auto on_finish
= new ReplyOnFinish(cmdctx
);
2152 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2154 cmdctx
->reply(0, ss
);
2159 ss
<< "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2162 f
->open_object_section("pg_info");
2163 f
->dump_bool("pg_ready", pgmap_ready
);
2166 // fall back to feeding command to PGMap
2167 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2168 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
2169 f
.get(), &ss
, &cmdctx
->odata
);
2175 if (r
!= -EOPNOTSUPP
) {
2177 f
->flush(cmdctx
->odata
);
2179 cmdctx
->reply(r
, ss
);
2184 // Resolve the command to the name of the module that will
2185 // handle it (if the command exists)
2186 std::string handler_name
;
2187 auto py_commands
= py_modules
.get_py_commands();
2188 for (const auto &pyc
: py_commands
) {
2189 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
2190 if (pyc_prefix
== prefix
) {
2191 handler_name
= pyc
.module_name
;
2196 // Was the command unfound?
2197 if (handler_name
.empty()) {
2198 ss
<< "No handler found for '" << prefix
<< "'";
2199 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
2200 cmdctx
->reply(-EINVAL
, ss
);
2204 dout(4) << "passing through " << cmdctx
->cmdmap
.size() << dendl
;
2205 finisher
.queue(new FunctionContext([this, cmdctx
, handler_name
, prefix
](int r_
) {
2206 std::stringstream ss
;
2208 // Validate that the module is enabled
2209 PyModuleRef module
= py_modules
.get_module(handler_name
);
2210 ceph_assert(module
);
2211 if (!module
->is_enabled()) {
2212 ss
<< "Module '" << handler_name
<< "' is not enabled (required by "
2213 "command '" << prefix
<< "'): use `ceph mgr module enable "
2214 << handler_name
<< "` to enable it";
2215 dout(4) << ss
.str() << dendl
;
2216 cmdctx
->reply(-EOPNOTSUPP
, ss
);
2220 // Hack: allow the self-test method to run on unhealthy modules.
2221 // Fix this in future by creating a special path for self test rather
2222 // than having the hook be a normal module command.
2223 std::string self_test_prefix
= handler_name
+ " " + "self-test";
2225 // Validate that the module is healthy
2226 bool accept_command
;
2227 if (module
->is_loaded()) {
2228 if (module
->get_can_run() && !module
->is_failed()) {
2230 accept_command
= true;
2231 } else if (self_test_prefix
== prefix
) {
2232 // Unhealthy, but allow because it's a self test command
2233 accept_command
= true;
2235 accept_command
= false;
2236 ss
<< "Module '" << handler_name
<< "' has experienced an error and "
2237 "cannot handle commands: " << module
->get_error_string();
2240 // Module not loaded
2241 accept_command
= false;
2242 ss
<< "Module '" << handler_name
<< "' failed to load and "
2243 "cannot handle commands: " << module
->get_error_string();
2246 if (!accept_command
) {
2247 dout(4) << ss
.str() << dendl
;
2248 cmdctx
->reply(-EIO
, ss
);
2252 std::stringstream ds
;
2253 bufferlist inbl
= cmdctx
->m
->get_data();
2254 int r
= py_modules
.handle_command(handler_name
, cmdctx
->cmdmap
, inbl
, &ds
, &ss
);
2255 cmdctx
->odata
.append(ds
);
2256 cmdctx
->reply(r
, ss
);
2261 void DaemonServer::_prune_pending_service_map()
2263 utime_t cutoff
= ceph_clock_now();
2264 cutoff
-= g_conf().get_val
<double>("mgr_service_beacon_grace");
2265 auto p
= pending_service_map
.services
.begin();
2266 while (p
!= pending_service_map
.services
.end()) {
2267 auto q
= p
->second
.daemons
.begin();
2268 while (q
!= p
->second
.daemons
.end()) {
2269 DaemonKey
key(p
->first
, q
->first
);
2270 if (!daemon_state
.exists(key
)) {
2271 derr
<< "missing key " << key
<< dendl
;
2275 auto daemon
= daemon_state
.get(key
);
2276 std::lock_guard
l(daemon
->lock
);
2277 if (daemon
->last_service_beacon
== utime_t()) {
2278 // we must have just restarted; assume they are alive now.
2279 daemon
->last_service_beacon
= ceph_clock_now();
2283 if (daemon
->last_service_beacon
< cutoff
) {
2284 dout(10) << "pruning stale " << p
->first
<< "." << q
->first
2285 << " last_beacon " << daemon
->last_service_beacon
<< dendl
;
2286 q
= p
->second
.daemons
.erase(q
);
2287 pending_service_map_dirty
= pending_service_map
.epoch
;
2292 if (p
->second
.daemons
.empty()) {
2293 p
= pending_service_map
.services
.erase(p
);
2294 pending_service_map_dirty
= pending_service_map
.epoch
;
2301 void DaemonServer::send_report()
2304 if (ceph_clock_now() - started_at
> g_conf().get_val
<int64_t>("mgr_stats_period") * 4.0) {
2306 reported_osds
.clear();
2307 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2308 << "potentially incomplete PG state to mon" << dendl
;
2310 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2316 auto m
= new MMonMgrReport();
2317 py_modules
.get_health_checks(&m
->health_checks
);
2318 py_modules
.get_progress_events(&m
->progress_events
);
2320 cluster_state
.with_mutable_pgmap([&](PGMap
& pg_map
) {
2321 cluster_state
.update_delta_stats();
2323 if (pending_service_map
.epoch
) {
2324 _prune_pending_service_map();
2325 if (pending_service_map_dirty
>= pending_service_map
.epoch
) {
2326 pending_service_map
.modified
= ceph_clock_now();
2327 encode(pending_service_map
, m
->service_map_bl
, CEPH_FEATURES_ALL
);
2328 dout(10) << "sending service_map e" << pending_service_map
.epoch
2330 pending_service_map
.epoch
++;
2334 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
2335 // FIXME: no easy way to get mon features here. this will do for
2336 // now, though, as long as we don't make a backward-incompat change.
2337 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
2338 dout(10) << pg_map
<< dendl
;
2340 pg_map
.get_health_checks(g_ceph_context
, osdmap
,
2343 dout(10) << m
->health_checks
.checks
.size() << " health checks"
2345 dout(20) << "health checks:\n";
2346 JSONFormatter
jf(true);
2347 jf
.dump_object("health_checks", m
->health_checks
);
2350 if (osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2351 clog
->debug() << "pgmap v" << pg_map
.version
<< ": " << pg_map
;
2356 map
<daemon_metric
, unique_ptr
<DaemonHealthMetricCollector
>> accumulated
;
2357 for (auto service
: {"osd", "mon"} ) {
2358 auto daemons
= daemon_state
.get_by_service(service
);
2359 for (const auto& [key
,state
] : daemons
) {
2360 std::lock_guard l
{state
->lock
};
2361 for (const auto& metric
: state
->daemon_health_metrics
) {
2362 auto acc
= accumulated
.find(metric
.get_type());
2363 if (acc
== accumulated
.end()) {
2364 auto collector
= DaemonHealthMetricCollector::create(metric
.get_type());
2366 derr
<< __func__
<< " " << key
.first
<< "." << key
.second
2367 << " sent me an unknown health metric: "
2368 << std::hex
<< static_cast<uint8_t>(metric
.get_type())
2369 << std::dec
<< dendl
;
2372 dout(20) << " + " << state
->key
<< " "
2374 tie(acc
, std::ignore
) = accumulated
.emplace(metric
.get_type(),
2375 std::move(collector
));
2377 acc
->second
->update(key
, metric
);
2381 for (const auto& acc
: accumulated
) {
2382 acc
.second
->summarize(m
->health_checks
);
2384 // TODO? We currently do not notify the PyModules
2385 // TODO: respect needs_send, so we send the report only if we are asked to do
2386 // so, or the state is updated.
2387 monc
->send_mon_message(m
);
2390 void DaemonServer::adjust_pgs()
2393 unsigned max
= std::max
<int64_t>(1, g_conf()->mon_osd_max_creating_pgs
);
2394 double max_misplaced
= g_conf().get_val
<double>("target_max_misplaced_ratio");
2395 bool aggro
= g_conf().get_val
<bool>("mgr_debug_aggressive_pg_num_changes");
2397 map
<string
,unsigned> pg_num_to_set
;
2398 map
<string
,unsigned> pgp_num_to_set
;
2399 set
<pg_t
> upmaps_to_clear
;
2400 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2401 unsigned creating_or_unknown
= 0;
2402 for (auto& i
: pg_map
.num_pg_by_state
) {
2403 if ((i
.first
& (PG_STATE_CREATING
)) ||
2405 creating_or_unknown
+= i
.second
;
2408 unsigned left
= max
;
2409 if (creating_or_unknown
>= max
) {
2412 left
-= creating_or_unknown
;
2413 dout(10) << "creating_or_unknown " << creating_or_unknown
2414 << " max_creating " << max
2418 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2419 // can run more frequently than we get updated pg stats from OSDs. We
2420 // may make multiple adjustments with stale informaiton.
2421 double misplaced_ratio
, degraded_ratio
;
2422 double inactive_pgs_ratio
, unknown_pgs_ratio
;
2423 pg_map
.get_recovery_stats(&misplaced_ratio
, °raded_ratio
,
2424 &inactive_pgs_ratio
, &unknown_pgs_ratio
);
2425 dout(20) << "misplaced_ratio " << misplaced_ratio
2426 << " degraded_ratio " << degraded_ratio
2427 << " inactive_pgs_ratio " << inactive_pgs_ratio
2428 << " unknown_pgs_ratio " << unknown_pgs_ratio
2429 << "; target_max_misplaced_ratio " << max_misplaced
2432 for (auto& i
: osdmap
.get_pools()) {
2433 const pg_pool_t
& p
= i
.second
;
2436 if (p
.get_pg_num_target() != p
.get_pg_num()) {
2437 dout(20) << "pool " << i
.first
2438 << " pg_num " << p
.get_pg_num()
2439 << " target " << p
.get_pg_num_target()
2441 if (p
.has_flag(pg_pool_t::FLAG_CREATING
)) {
2442 dout(10) << "pool " << i
.first
2443 << " pg_num_target " << p
.get_pg_num_target()
2444 << " pg_num " << p
.get_pg_num()
2445 << " - still creating initial pgs"
2447 } else if (p
.get_pg_num_target() < p
.get_pg_num()) {
2448 // pg_num decrease (merge)
2449 pg_t
merge_source(p
.get_pg_num() - 1, i
.first
);
2450 pg_t merge_target
= merge_source
.get_parent();
2453 if (p
.get_pg_num() != p
.get_pg_num_pending()) {
2454 dout(10) << "pool " << i
.first
2455 << " pg_num_target " << p
.get_pg_num_target()
2456 << " pg_num " << p
.get_pg_num()
2457 << " - decrease and pg_num_pending != pg_num, waiting"
2460 } else if (p
.get_pg_num() == p
.get_pgp_num()) {
2461 dout(10) << "pool " << i
.first
2462 << " pg_num_target " << p
.get_pg_num_target()
2463 << " pg_num " << p
.get_pg_num()
2464 << " - decrease blocked by pgp_num "
2469 for (auto &merge_participant
: {merge_source
, merge_target
}) {
2470 bool is_merge_source
= merge_participant
== merge_source
;
2471 if (osdmap
.have_pg_upmaps(merge_participant
)) {
2472 dout(10) << "pool " << i
.first
2473 << " pg_num_target " << p
.get_pg_num_target()
2474 << " pg_num " << p
.get_pg_num()
2475 << (is_merge_source
? " - merge source " : " - merge target ")
2476 << merge_participant
2477 << " has upmap" << dendl
;
2478 upmaps_to_clear
.insert(merge_participant
);
2481 auto q
= pg_map
.pg_stat
.find(merge_participant
);
2482 if (q
== pg_map
.pg_stat
.end()) {
2483 dout(10) << "pool " << i
.first
2484 << " pg_num_target " << p
.get_pg_num_target()
2485 << " pg_num " << p
.get_pg_num()
2486 << " - no state for " << merge_participant
2487 << (is_merge_source
? " (merge source)" : " (merge target)")
2490 } else if ((q
->second
.state
& (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) !=
2491 (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) {
2492 dout(10) << "pool " << i
.first
2493 << " pg_num_target " << p
.get_pg_num_target()
2494 << " pg_num " << p
.get_pg_num()
2495 << (is_merge_source
? " - merge source " : " - merge target ")
2496 << merge_participant
2497 << " not clean (" << pg_state_string(q
->second
.state
)
2504 unsigned target
= p
.get_pg_num() - 1;
2505 dout(10) << "pool " << i
.first
2506 << " pg_num_target " << p
.get_pg_num_target()
2507 << " pg_num " << p
.get_pg_num()
2509 << " (merging " << merge_source
2510 << " and " << merge_target
2512 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2514 } else if (p
.get_pg_num_target() > p
.get_pg_num()) {
2515 // pg_num increase (split)
2517 auto q
= pg_map
.num_pg_by_pool_state
.find(i
.first
);
2518 if (q
!= pg_map
.num_pg_by_pool_state
.end()) {
2519 for (auto& j
: q
->second
) {
2520 if ((j
.first
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)) == 0) {
2521 dout(20) << "pool " << i
.first
<< " has " << j
.second
2522 << " pgs in " << pg_state_string(j
.first
)
2532 dout(10) << "pool " << i
.first
2533 << " pg_num_target " << p
.get_pg_num_target()
2534 << " pg_num " << p
.get_pg_num()
2535 << " - not all pgs active"
2538 unsigned add
= std::min(
2540 p
.get_pg_num_target() - p
.get_pg_num());
2541 unsigned target
= p
.get_pg_num() + add
;
2543 dout(10) << "pool " << i
.first
2544 << " pg_num_target " << p
.get_pg_num_target()
2545 << " pg_num " << p
.get_pg_num()
2546 << " -> " << target
<< dendl
;
2547 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2553 unsigned target
= std::min(p
.get_pg_num_pending(),
2554 p
.get_pgp_num_target());
2555 if (target
!= p
.get_pgp_num()) {
2556 dout(20) << "pool " << i
.first
2557 << " pgp_num_target " << p
.get_pgp_num_target()
2558 << " pgp_num " << p
.get_pgp_num()
2559 << " -> " << target
<< dendl
;
2560 if (target
> p
.get_pgp_num() &&
2561 p
.get_pgp_num() == p
.get_pg_num()) {
2562 dout(10) << "pool " << i
.first
2563 << " pgp_num_target " << p
.get_pgp_num_target()
2564 << " pgp_num " << p
.get_pgp_num()
2565 << " - increase blocked by pg_num " << p
.get_pg_num()
2567 } else if (!aggro
&& (inactive_pgs_ratio
> 0 ||
2568 degraded_ratio
> 0 ||
2569 unknown_pgs_ratio
> 0)) {
2570 dout(10) << "pool " << i
.first
2571 << " pgp_num_target " << p
.get_pgp_num_target()
2572 << " pgp_num " << p
.get_pgp_num()
2573 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2574 << " update" << dendl
;
2575 } else if (!aggro
&& (misplaced_ratio
> max_misplaced
)) {
2576 dout(10) << "pool " << i
.first
2577 << " pgp_num_target " << p
.get_pgp_num_target()
2578 << " pgp_num " << p
.get_pgp_num()
2579 << " - misplaced_ratio " << misplaced_ratio
2580 << " > max " << max_misplaced
2581 << ", deferring pgp_num update" << dendl
;
2583 // NOTE: this calculation assumes objects are
2584 // basically uniformly distributed across all PGs
2585 // (regardless of pool), which is probably not
2586 // perfectly correct, but it's a start. make no
2587 // single adjustment that's more than half of the
2588 // max_misplaced, to somewhat limit the magnitude of
2589 // our potential error here.
2592 pool_stat_t s
= pg_map
.get_pg_pool_sum_stat(i
.first
);
2594 // pool is (virtually) empty; just jump to final pgp_num?
2595 (p
.get_pgp_num_target() > p
.get_pgp_num() &&
2596 s
.stats
.sum
.num_objects
<= p
.get_pgp_num_target())) {
2600 std::min
<double>(max_misplaced
- misplaced_ratio
,
2601 max_misplaced
/ 2.0);
2602 unsigned estmax
= std::max
<unsigned>(
2603 (double)p
.get_pg_num() * room
, 1u);
2604 int delta
= target
- p
.get_pgp_num();
2605 next
= p
.get_pgp_num();
2607 next
+= std::max
<int>(-estmax
, delta
);
2609 next
+= std::min
<int>(estmax
, delta
);
2611 dout(20) << " room " << room
<< " estmax " << estmax
2612 << " delta " << delta
<< " next " << next
<< dendl
;
2613 if (p
.get_pgp_num_target() == p
.get_pg_num_target() &&
2614 p
.get_pgp_num_target() < p
.get_pg_num()) {
2615 // since pgp_num is tracking pg_num, ceph is handling
2616 // pgp_num. so, be responsible: don't let pgp_num get
2617 // too far out ahead of merges (if we are merging).
2618 // this avoids moving lots of unmerged pgs onto a
2619 // small number of OSDs where we might blow out the
2621 unsigned max_outpace_merges
=
2622 std::max
<unsigned>(8, p
.get_pg_num() * max_misplaced
);
2623 if (next
+ max_outpace_merges
< p
.get_pg_num()) {
2624 next
= p
.get_pg_num() - max_outpace_merges
;
2625 dout(10) << " using next " << next
2626 << " to avoid outpacing merges (max_outpace_merges "
2627 << max_outpace_merges
<< ")" << dendl
;
2631 dout(10) << "pool " << i
.first
2632 << " pgp_num_target " << p
.get_pgp_num_target()
2633 << " pgp_num " << p
.get_pgp_num()
2634 << " -> " << next
<< dendl
;
2635 pgp_num_to_set
[osdmap
.get_pool_name(i
.first
)] = next
;
2643 for (auto i
: pg_num_to_set
) {
2646 "\"prefix\": \"osd pool set\", "
2647 "\"pool\": \"" + i
.first
+ "\", "
2648 "\"var\": \"pg_num_actual\", "
2649 "\"val\": \"" + stringify(i
.second
) + "\""
2651 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2653 for (auto i
: pgp_num_to_set
) {
2656 "\"prefix\": \"osd pool set\", "
2657 "\"pool\": \"" + i
.first
+ "\", "
2658 "\"var\": \"pgp_num_actual\", "
2659 "\"val\": \"" + stringify(i
.second
) + "\""
2661 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2663 for (auto pg
: upmaps_to_clear
) {
2666 "\"prefix\": \"osd rm-pg-upmap\", "
2667 "\"pgid\": \"" + stringify(pg
) + "\""
2669 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2672 "\"prefix\": \"osd rm-pg-upmap-items\", "
2673 "\"pgid\": \"" + stringify(pg
) + "\"" +
2675 monc
->start_mon_command({cmd2
}, {}, nullptr, nullptr, nullptr);
2679 void DaemonServer::got_service_map()
2681 std::lock_guard
l(lock
);
2683 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
2684 if (pending_service_map
.epoch
== 0) {
2685 // we just started up
2686 dout(10) << "got initial map e" << service_map
.epoch
<< dendl
;
2687 pending_service_map
= service_map
;
2689 // we we already active and therefore must have persisted it,
2690 // which means ours is the same or newer.
2691 dout(10) << "got updated map e" << service_map
.epoch
<< dendl
;
2693 pending_service_map
.epoch
= service_map
.epoch
+ 1;
2696 // cull missing daemons, populate new ones
2697 for (auto& p
: pending_service_map
.services
) {
2698 std::set
<std::string
> names
;
2699 for (auto& q
: p
.second
.daemons
) {
2700 names
.insert(q
.first
);
2701 DaemonKey
key(p
.first
, q
.first
);
2702 if (!daemon_state
.exists(key
)) {
2703 auto daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
2705 daemon
->set_metadata(q
.second
.metadata
);
2706 daemon
->service_daemon
= true;
2707 daemon_state
.insert(daemon
);
2708 dout(10) << "added missing " << key
<< dendl
;
2711 daemon_state
.cull(p
.first
, names
);
2715 void DaemonServer::got_mgr_map()
2717 std::lock_guard
l(lock
);
2718 set
<std::string
> have
;
2719 cluster_state
.with_mgrmap([&](const MgrMap
& mgrmap
) {
2720 auto md_update
= [&] (DaemonKey key
) {
2721 std::ostringstream oss
;
2722 auto c
= new MetadataUpdate(daemon_state
, key
);
2723 // FIXME remove post-nautilus: include 'id' for luminous mons
2724 oss
<< "{\"prefix\": \"mgr metadata\", \"who\": \""
2725 << key
.second
<< "\", \"id\": \"" << key
.second
<< "\"}";
2726 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
2728 if (mgrmap
.active_name
.size()) {
2729 DaemonKey
key("mgr", mgrmap
.active_name
);
2730 have
.insert(mgrmap
.active_name
);
2731 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
2733 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
2736 for (auto& i
: mgrmap
.standbys
) {
2737 DaemonKey
key("mgr", i
.second
.name
);
2738 have
.insert(i
.second
.name
);
2739 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
2741 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
2745 daemon_state
.cull("mgr", have
);
2748 const char** DaemonServer::get_tracked_conf_keys() const
2750 static const char *KEYS
[] = {
2751 "mgr_stats_threshold",
2759 void DaemonServer::handle_conf_change(const ConfigProxy
& conf
,
2760 const std::set
<std::string
> &changed
)
2763 if (changed
.count("mgr_stats_threshold") || changed
.count("mgr_stats_period")) {
2764 dout(4) << "Updating stats threshold/period on "
2765 << daemon_connections
.size() << " clients" << dendl
;
2766 // Send a fresh MMgrConfigure to all clients, so that they can follow
2767 // the new policy for transmitting stats
2768 finisher
.queue(new FunctionContext([this](int r
) {
2769 std::lock_guard
l(lock
);
2770 for (auto &c
: daemon_connections
) {
2777 void DaemonServer::_send_configure(ConnectionRef c
)
2779 ceph_assert(lock
.is_locked_by_me());
2781 auto configure
= new MMgrConfigure();
2782 configure
->stats_period
= g_conf().get_val
<int64_t>("mgr_stats_period");
2783 configure
->stats_threshold
= g_conf().get_val
<int64_t>("mgr_stats_threshold");
2785 if (c
->peer_is_osd()) {
2786 configure
->osd_perf_metric_queries
=
2787 osd_perf_metric_collector
.get_queries();
2790 c
->send_message(configure
);
2793 OSDPerfMetricQueryID
DaemonServer::add_osd_perf_query(
2794 const OSDPerfMetricQuery
&query
,
2795 const std::optional
<OSDPerfMetricLimit
> &limit
)
2797 return osd_perf_metric_collector
.add_query(query
, limit
);
2800 int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id
)
2802 return osd_perf_metric_collector
.remove_query(query_id
);
2805 int DaemonServer::get_osd_perf_counters(
2806 OSDPerfMetricQueryID query_id
,
2807 std::map
<OSDPerfMetricKey
, PerformanceCounters
> *counters
)
2809 return osd_perf_metric_collector
.get_counters(query_id
, counters
);