1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonServer.h"
17 #include "include/stringify.h"
18 #include "include/str_list.h"
19 #include "auth/RotatingKeyRing.h"
20 #include "json_spirit/json_spirit_writer.h"
22 #include "mgr/mgr_commands.h"
23 #include "mgr/DaemonHealthMetricCollector.h"
24 #include "mgr/OSDPerfMetricCollector.h"
25 #include "mon/MonCommand.h"
27 #include "messages/MMgrOpen.h"
28 #include "messages/MMgrClose.h"
29 #include "messages/MMgrConfigure.h"
30 #include "messages/MMonMgrReport.h"
31 #include "messages/MCommand.h"
32 #include "messages/MCommandReply.h"
33 #include "messages/MPGStats.h"
34 #include "messages/MOSDScrub.h"
35 #include "messages/MOSDScrub2.h"
36 #include "messages/MOSDForceRecovery.h"
37 #include "common/errno.h"
38 #include "common/pick_address.h"
40 #define dout_context g_ceph_context
41 #define dout_subsys ceph_subsys_mgr
43 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
47 DaemonServer::DaemonServer(MonClient
*monc_
,
49 DaemonStateIndex
&daemon_state_
,
50 ClusterState
&cluster_state_
,
51 PyModuleRegistry
&py_modules_
,
53 LogChannelRef audit_clog_
)
54 : Dispatcher(g_ceph_context
),
55 client_byte_throttler(new Throttle(g_ceph_context
, "mgr_client_bytes",
56 g_conf().get_val
<Option::size_t>("mgr_client_bytes"))),
57 client_msg_throttler(new Throttle(g_ceph_context
, "mgr_client_messages",
58 g_conf().get_val
<uint64_t>("mgr_client_messages"))),
59 osd_byte_throttler(new Throttle(g_ceph_context
, "mgr_osd_bytes",
60 g_conf().get_val
<Option::size_t>("mgr_osd_bytes"))),
61 osd_msg_throttler(new Throttle(g_ceph_context
, "mgr_osd_messsages",
62 g_conf().get_val
<uint64_t>("mgr_osd_messages"))),
63 mds_byte_throttler(new Throttle(g_ceph_context
, "mgr_mds_bytes",
64 g_conf().get_val
<Option::size_t>("mgr_mds_bytes"))),
65 mds_msg_throttler(new Throttle(g_ceph_context
, "mgr_mds_messsages",
66 g_conf().get_val
<uint64_t>("mgr_mds_messages"))),
67 mon_byte_throttler(new Throttle(g_ceph_context
, "mgr_mon_bytes",
68 g_conf().get_val
<Option::size_t>("mgr_mon_bytes"))),
69 mon_msg_throttler(new Throttle(g_ceph_context
, "mgr_mon_messsages",
70 g_conf().get_val
<uint64_t>("mgr_mon_messages"))),
74 daemon_state(daemon_state_
),
75 cluster_state(cluster_state_
),
76 py_modules(py_modules_
),
78 audit_clog(audit_clog_
),
81 timer(g_ceph_context
, lock
),
84 osd_perf_metric_collector_listener(this),
85 osd_perf_metric_collector(osd_perf_metric_collector_listener
)
87 g_conf().add_observer(this);
90 DaemonServer::~DaemonServer() {
92 g_conf().remove_observer(this);
95 int DaemonServer::init(uint64_t gid
, entity_addrvec_t client_addrs
)
97 // Initialize Messenger
98 std::string public_msgr_type
= g_conf()->ms_public_type
.empty() ?
99 g_conf().get_val
<std::string
>("ms_type") : g_conf()->ms_public_type
;
100 msgr
= Messenger::create(g_ceph_context
, public_msgr_type
,
101 entity_name_t::MGR(gid
),
104 msgr
->set_default_policy(Messenger::Policy::stateless_server(0));
106 msgr
->set_auth_client(monc
);
109 msgr
->set_policy_throttlers(entity_name_t::TYPE_CLIENT
,
110 client_byte_throttler
.get(),
111 client_msg_throttler
.get());
114 msgr
->set_policy_throttlers(entity_name_t::TYPE_OSD
,
115 osd_byte_throttler
.get(),
116 osd_msg_throttler
.get());
117 msgr
->set_policy_throttlers(entity_name_t::TYPE_MDS
,
118 mds_byte_throttler
.get(),
119 mds_msg_throttler
.get());
120 msgr
->set_policy_throttlers(entity_name_t::TYPE_MON
,
121 mon_byte_throttler
.get(),
122 mon_msg_throttler
.get());
124 entity_addrvec_t addrs
;
125 int r
= pick_addresses(cct
, CEPH_PICK_ADDRESS_PUBLIC
, &addrs
);
129 dout(20) << __func__
<< " will bind to " << addrs
<< dendl
;
130 r
= msgr
->bindv(addrs
);
132 derr
<< "unable to bind mgr to " << addrs
<< dendl
;
136 msgr
->set_myname(entity_name_t::MGR(gid
));
137 msgr
->set_addr_unknowns(client_addrs
);
140 msgr
->add_dispatcher_tail(this);
142 msgr
->set_auth_server(monc
);
143 monc
->set_handle_authentication_dispatcher(this);
145 started_at
= ceph_clock_now();
147 std::lock_guard
l(lock
);
150 schedule_tick_locked(
151 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
156 entity_addrvec_t
DaemonServer::get_myaddrs() const
158 return msgr
->get_myaddrs();
161 KeyStore
*DaemonServer::ms_get_auth1_authorizer_keystore()
163 return monc
->rotating_secrets
.get();
166 int DaemonServer::ms_handle_authentication(Connection
*con
)
169 MgrSession
*s
= new MgrSession(cct
);
171 s
->inst
.addr
= con
->get_peer_addr();
172 s
->entity_name
= con
->peer_name
;
173 dout(10) << __func__
<< " new session " << s
<< " con " << con
174 << " entity " << con
->peer_name
175 << " addr " << con
->get_peer_addrs()
178 AuthCapsInfo
&caps_info
= con
->get_peer_caps_info();
179 if (caps_info
.allow_all
) {
180 dout(10) << " session " << s
<< " " << s
->entity_name
181 << " allow_all" << dendl
;
182 s
->caps
.set_allow_all();
185 if (caps_info
.caps
.length() > 0) {
186 auto p
= caps_info
.caps
.cbegin();
191 catch (buffer::error
& e
) {
194 bool success
= s
->caps
.parse(str
);
196 dout(10) << " session " << s
<< " " << s
->entity_name
197 << " has caps " << s
->caps
<< " '" << str
<< "'" << dendl
;
200 dout(10) << " session " << s
<< " " << s
->entity_name
201 << " failed to parse caps '" << str
<< "'" << dendl
;
206 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
207 std::lock_guard
l(lock
);
208 s
->osd_id
= atoi(s
->entity_name
.get_id().c_str());
209 dout(10) << "registering osd." << s
->osd_id
<< " session "
210 << s
<< " con " << con
<< dendl
;
211 osd_cons
[s
->osd_id
].insert(con
);
217 bool DaemonServer::ms_get_authorizer(
219 AuthAuthorizer
**authorizer
)
221 dout(10) << "type=" << ceph_entity_type_name(dest_type
) << dendl
;
223 if (dest_type
== CEPH_ENTITY_TYPE_MON
) {
227 *authorizer
= monc
->build_authorizer(dest_type
);
228 dout(20) << "got authorizer " << *authorizer
<< dendl
;
229 return *authorizer
!= NULL
;
232 bool DaemonServer::ms_handle_reset(Connection
*con
)
234 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_OSD
) {
235 auto priv
= con
->get_priv();
236 auto session
= static_cast<MgrSession
*>(priv
.get());
240 std::lock_guard
l(lock
);
241 dout(10) << "unregistering osd." << session
->osd_id
242 << " session " << session
<< " con " << con
<< dendl
;
243 osd_cons
[session
->osd_id
].erase(con
);
245 auto iter
= daemon_connections
.find(con
);
246 if (iter
!= daemon_connections
.end()) {
247 daemon_connections
.erase(iter
);
253 bool DaemonServer::ms_handle_refused(Connection
*con
)
255 // do nothing for now
259 bool DaemonServer::ms_dispatch(Message
*m
)
261 // Note that we do *not* take ::lock here, in order to avoid
262 // serializing all message handling. It's up to each handler
263 // to take whatever locks it needs.
264 switch (m
->get_type()) {
266 cluster_state
.ingest_pgstats(static_cast<MPGStats
*>(m
));
267 maybe_ready(m
->get_source().num());
271 return handle_report(static_cast<MMgrReport
*>(m
));
273 return handle_open(static_cast<MMgrOpen
*>(m
));
275 return handle_close(static_cast<MMgrClose
*>(m
));
277 return handle_command(static_cast<MCommand
*>(m
));
279 dout(1) << "Unhandled message type " << m
->get_type() << dendl
;
284 void DaemonServer::maybe_ready(int32_t osd_id
)
286 if (pgmap_ready
.load()) {
287 // Fast path: we don't need to take lock because pgmap_ready
290 std::lock_guard
l(lock
);
292 if (reported_osds
.find(osd_id
) == reported_osds
.end()) {
293 dout(4) << "initial report from osd " << osd_id
<< dendl
;
294 reported_osds
.insert(osd_id
);
295 std::set
<int32_t> up_osds
;
297 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
298 osdmap
.get_up_osds(up_osds
);
301 std::set
<int32_t> unreported_osds
;
302 std::set_difference(up_osds
.begin(), up_osds
.end(),
303 reported_osds
.begin(), reported_osds
.end(),
304 std::inserter(unreported_osds
, unreported_osds
.begin()));
306 if (unreported_osds
.size() == 0) {
307 dout(4) << "all osds have reported, sending PG state to mon" << dendl
;
309 reported_osds
.clear();
310 // Avoid waiting for next tick
313 dout(4) << "still waiting for " << unreported_osds
.size() << " osds"
314 " to report in before PGMap is ready" << dendl
;
320 void DaemonServer::tick()
326 schedule_tick_locked(
327 g_conf().get_val
<std::chrono::seconds
>("mgr_tick_period").count());
330 // Currently modules do not set health checks in response to events delivered to
331 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
332 // if this pattern emerges in the future, this scheduler could be modified to
333 // fire after all modules have had a chance to set their health checks.
334 void DaemonServer::schedule_tick_locked(double delay_sec
)
336 ceph_assert(lock
.is_locked_by_me());
339 timer
.cancel_event(tick_event
);
340 tick_event
= nullptr;
343 // on shutdown start rejecting explicit requests to send reports that may
344 // originate from python land which may still be running.
348 tick_event
= timer
.add_event_after(delay_sec
,
349 new FunctionContext([this](int r
) {
354 void DaemonServer::schedule_tick(double delay_sec
)
356 std::lock_guard
l(lock
);
357 schedule_tick_locked(delay_sec
);
360 void DaemonServer::handle_osd_perf_metric_query_updated()
364 // Send a fresh MMgrConfigure to all clients, so that they can follow
365 // the new policy for transmitting stats
366 finisher
.queue(new FunctionContext([this](int r
) {
367 std::lock_guard
l(lock
);
368 for (auto &c
: daemon_connections
) {
369 if (c
->peer_is_osd()) {
376 void DaemonServer::shutdown()
378 dout(10) << "begin" << dendl
;
381 dout(10) << "done" << dendl
;
383 std::lock_guard
l(lock
);
384 shutting_down
= true;
388 static DaemonKey
key_from_service(
389 const std::string
& service_name
,
391 const std::string
& daemon_name
)
393 if (!service_name
.empty()) {
394 return DaemonKey(service_name
, daemon_name
);
396 return DaemonKey(ceph_entity_type_name(peer_type
), daemon_name
);
400 static bool key_from_string(
401 const std::string
& name
,
404 auto p
= name
.find('.');
405 if (p
== std::string::npos
) {
408 out
->first
= name
.substr(0, p
);
409 out
->second
= name
.substr(p
+ 1);
413 bool DaemonServer::handle_open(MMgrOpen
*m
)
415 std::lock_guard
l(lock
);
417 DaemonKey key
= key_from_service(m
->service_name
,
418 m
->get_connection()->get_peer_type(),
421 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
423 _send_configure(m
->get_connection());
425 DaemonStatePtr daemon
;
426 if (daemon_state
.exists(key
)) {
427 daemon
= daemon_state
.get(key
);
429 if (m
->service_daemon
&& !daemon
) {
430 dout(4) << "constructing new DaemonState for " << key
<< dendl
;
431 daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
433 daemon
->service_daemon
= true;
434 if (m
->daemon_metadata
.count("hostname")) {
435 daemon
->hostname
= m
->daemon_metadata
["hostname"];
437 daemon_state
.insert(daemon
);
440 dout(20) << "updating existing DaemonState for " << m
->daemon_name
<< dendl
;
441 std::lock_guard
l(daemon
->lock
);
442 daemon
->perf_counters
.clear();
444 if (m
->service_daemon
) {
445 daemon
->set_metadata(m
->daemon_metadata
);
446 daemon
->service_status
= m
->daemon_status
;
448 utime_t now
= ceph_clock_now();
449 auto d
= pending_service_map
.get_daemon(m
->service_name
,
451 if (d
->gid
!= (uint64_t)m
->get_source().num()) {
452 dout(10) << "registering " << key
<< " in pending_service_map" << dendl
;
453 d
->gid
= m
->get_source().num();
454 d
->addr
= m
->get_source_addr();
455 d
->start_epoch
= pending_service_map
.epoch
;
456 d
->start_stamp
= now
;
457 d
->metadata
= m
->daemon_metadata
;
458 pending_service_map_dirty
= pending_service_map
.epoch
;
462 auto p
= m
->config_bl
.cbegin();
463 if (p
!= m
->config_bl
.end()) {
464 decode(daemon
->config
, p
);
465 decode(daemon
->ignored_mon_config
, p
);
466 dout(20) << " got config " << daemon
->config
467 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
469 daemon
->config_defaults_bl
= m
->config_defaults_bl
;
470 daemon
->config_defaults
.clear();
471 dout(20) << " got config_defaults_bl " << daemon
->config_defaults_bl
.length()
472 << " bytes" << dendl
;
475 if (m
->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT
&&
476 m
->service_name
.empty())
478 // Store in set of the daemon/service connections, i.e. those
479 // connections that require an update in the event of stats
480 // configuration changes.
481 daemon_connections
.insert(m
->get_connection());
488 bool DaemonServer::handle_close(MMgrClose
*m
)
490 std::lock_guard
l(lock
);
492 DaemonKey key
= key_from_service(m
->service_name
,
493 m
->get_connection()->get_peer_type(),
495 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
497 if (daemon_state
.exists(key
)) {
498 DaemonStatePtr daemon
= daemon_state
.get(key
);
499 daemon_state
.rm(key
);
501 std::lock_guard
l(daemon
->lock
);
502 if (daemon
->service_daemon
) {
503 pending_service_map
.rm_daemon(m
->service_name
, m
->daemon_name
);
504 pending_service_map_dirty
= pending_service_map
.epoch
;
509 // send same message back as a reply
510 m
->get_connection()->send_message(m
);
514 bool DaemonServer::handle_report(MMgrReport
*m
)
517 if (!m
->service_name
.empty()) {
518 key
.first
= m
->service_name
;
520 key
.first
= ceph_entity_type_name(m
->get_connection()->get_peer_type());
522 key
.second
= m
->daemon_name
;
524 dout(4) << "from " << m
->get_connection() << " " << key
<< dendl
;
526 if (m
->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT
&&
527 m
->service_name
.empty()) {
528 // Clients should not be sending us stats unless they are declaring
529 // themselves to be a daemon for some service.
530 dout(4) << "rejecting report from non-daemon client " << m
->daemon_name
532 m
->get_connection()->mark_down();
537 // Look up the DaemonState
538 DaemonStatePtr daemon
;
539 if (daemon_state
.exists(key
)) {
540 dout(20) << "updating existing DaemonState for " << key
<< dendl
;
541 daemon
= daemon_state
.get(key
);
543 // we don't know the hostname at this stage, reject MMgrReport here.
544 dout(5) << "rejecting report from " << key
<< ", since we do not have its metadata now."
547 // issue metadata request in background
548 if (!daemon_state
.is_updating(key
) &&
549 (key
.first
== "osd" || key
.first
== "mds" || key
.first
== "mon")) {
551 std::ostringstream oss
;
552 auto c
= new MetadataUpdate(daemon_state
, key
);
553 if (key
.first
== "osd") {
554 oss
<< "{\"prefix\": \"osd metadata\", \"id\": "
557 } else if (key
.first
== "mds") {
558 c
->set_default("addr", stringify(m
->get_source_addr()));
559 oss
<< "{\"prefix\": \"mds metadata\", \"who\": \""
560 << key
.second
<< "\"}";
562 } else if (key
.first
== "mon") {
563 oss
<< "{\"prefix\": \"mon metadata\", \"id\": \""
564 << key
.second
<< "\"}";
569 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
573 std::lock_guard
l(lock
);
575 auto priv
= m
->get_connection()->get_priv();
576 auto session
= static_cast<MgrSession
*>(priv
.get());
580 m
->get_connection()->mark_down();
582 dout(10) << "unregistering osd." << session
->osd_id
583 << " session " << session
<< " con " << m
->get_connection() << dendl
;
585 if (osd_cons
.find(session
->osd_id
) != osd_cons
.end()) {
586 osd_cons
[session
->osd_id
].erase(m
->get_connection());
589 auto iter
= daemon_connections
.find(m
->get_connection());
590 if (iter
!= daemon_connections
.end()) {
591 daemon_connections
.erase(iter
);
598 // Update the DaemonState
599 ceph_assert(daemon
!= nullptr);
601 std::lock_guard
l(daemon
->lock
);
602 auto &daemon_counters
= daemon
->perf_counters
;
603 daemon_counters
.update(m
);
605 auto p
= m
->config_bl
.cbegin();
606 if (p
!= m
->config_bl
.end()) {
607 decode(daemon
->config
, p
);
608 decode(daemon
->ignored_mon_config
, p
);
609 dout(20) << " got config " << daemon
->config
610 << " ignored " << daemon
->ignored_mon_config
<< dendl
;
613 if (daemon
->service_daemon
) {
614 utime_t now
= ceph_clock_now();
615 if (m
->daemon_status
) {
616 daemon
->service_status
= *m
->daemon_status
;
617 daemon
->service_status_stamp
= now
;
619 daemon
->last_service_beacon
= now
;
620 } else if (m
->daemon_status
) {
621 derr
<< "got status from non-daemon " << key
<< dendl
;
623 if (m
->get_connection()->peer_is_osd() || m
->get_connection()->peer_is_mon()) {
624 // only OSD and MON send health_checks to me now
625 daemon
->daemon_health_metrics
= std::move(m
->daemon_health_metrics
);
626 dout(10) << "daemon_health_metrics " << daemon
->daemon_health_metrics
631 // if there are any schema updates, notify the python modules
632 if (!m
->declare_types
.empty() || !m
->undeclare_types
.empty()) {
634 oss
<< key
.first
<< '.' << key
.second
;
635 py_modules
.notify_all("perf_schema_update", oss
.str());
638 if (m
->get_connection()->peer_is_osd()) {
639 osd_perf_metric_collector
.process_reports(m
->osd_perf_metric_reports
);
647 void DaemonServer::_generate_command_map(
649 map
<string
,string
> ¶m_str_map
)
651 for (auto p
= cmdmap
.begin();
652 p
!= cmdmap
.end(); ++p
) {
653 if (p
->first
== "prefix")
655 if (p
->first
== "caps") {
657 if (cmd_getval(g_ceph_context
, cmdmap
, "caps", cv
) &&
658 cv
.size() % 2 == 0) {
659 for (unsigned i
= 0; i
< cv
.size(); i
+= 2) {
660 string k
= string("caps_") + cv
[i
];
661 param_str_map
[k
] = cv
[i
+ 1];
666 param_str_map
[p
->first
] = cmd_vartype_stringify(p
->second
);
670 const MonCommand
*DaemonServer::_get_mgrcommand(
671 const string
&cmd_prefix
,
672 const std::vector
<MonCommand
> &cmds
)
674 const MonCommand
*this_cmd
= nullptr;
675 for (const auto &cmd
: cmds
) {
676 if (cmd
.cmdstring
.compare(0, cmd_prefix
.size(), cmd_prefix
) == 0) {
684 bool DaemonServer::_allowed_command(
686 const string
&module
,
687 const string
&prefix
,
688 const cmdmap_t
& cmdmap
,
689 const map
<string
,string
>& param_str_map
,
690 const MonCommand
*this_cmd
) {
692 if (s
->entity_name
.is_mon()) {
693 // mon is all-powerful. even when it is forwarding commands on behalf of
694 // old clients; we expect the mon is validating commands before proxying!
698 bool cmd_r
= this_cmd
->requires_perm('r');
699 bool cmd_w
= this_cmd
->requires_perm('w');
700 bool cmd_x
= this_cmd
->requires_perm('x');
702 bool capable
= s
->caps
.is_capable(
704 CEPH_ENTITY_TYPE_MGR
,
706 module
, prefix
, param_str_map
,
710 dout(10) << " " << s
->entity_name
<< " "
711 << (capable
? "" : "not ") << "capable" << dendl
;
716 * The working data for processing an MCommand. This lives in
717 * a class to enable passing it into other threads for processing
718 * outside of the thread/locks that called handle_command.
720 class CommandContext
{
726 explicit CommandContext(MCommand
*m_
)
734 void reply(int r
, const std::stringstream
&ss
) {
738 void reply(int r
, const std::string
&rs
) {
739 // Let the connection drop as soon as we've sent our response
740 ConnectionRef con
= m
->get_connection();
742 con
->mark_disposable();
746 dout(4) << __func__
<< " success" << dendl
;
748 derr
<< __func__
<< " " << cpp_strerror(r
) << " " << rs
<< dendl
;
751 MCommandReply
*reply
= new MCommandReply(r
, rs
);
752 reply
->set_tid(m
->get_tid());
753 reply
->set_data(odata
);
754 con
->send_message(reply
);
760 * A context for receiving a bufferlist/error string from a background
761 * function and then calling back to a CommandContext when it's done
763 class ReplyOnFinish
: public Context
{
764 std::shared_ptr
<CommandContext
> cmdctx
;
770 explicit ReplyOnFinish(const std::shared_ptr
<CommandContext
> &cmdctx_
)
773 void finish(int r
) override
{
774 cmdctx
->odata
.claim_append(from_mon
);
775 cmdctx
->reply(r
, outs
);
779 bool DaemonServer::handle_command(MCommand
*m
)
781 std::lock_guard
l(lock
);
782 std::shared_ptr
<CommandContext
> cmdctx
= std::make_shared
<CommandContext
>(m
);
784 return _handle_command(m
, cmdctx
);
785 } catch (const bad_cmd_get
& e
) {
786 cmdctx
->reply(-EINVAL
, e
.what());
791 bool DaemonServer::_handle_command(
793 std::shared_ptr
<CommandContext
>& cmdctx
)
795 auto priv
= m
->get_connection()->get_priv();
796 auto session
= static_cast<MgrSession
*>(priv
.get());
800 if (session
->inst
.name
== entity_name_t())
801 session
->inst
.name
= m
->get_source();
804 boost::scoped_ptr
<Formatter
> f
;
805 map
<string
,string
> param_str_map
;
806 std::stringstream ss
;
809 if (!cmdmap_from_json(m
->cmd
, &(cmdctx
->cmdmap
), ss
)) {
810 cmdctx
->reply(-EINVAL
, ss
);
815 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "format", format
, string("plain"));
816 f
.reset(Formatter::create(format
));
820 cmd_getval(cct
, cmdctx
->cmdmap
, "prefix", prefix
);
822 dout(4) << "decoded " << cmdctx
->cmdmap
.size() << dendl
;
823 dout(4) << "prefix=" << prefix
<< dendl
;
825 if (prefix
== "get_command_descriptions") {
826 dout(10) << "reading commands from python modules" << dendl
;
827 const auto py_commands
= py_modules
.get_commands();
831 f
.open_object_section("command_descriptions");
833 auto dump_cmd
= [&cmdnum
, &f
, m
](const MonCommand
&mc
){
834 ostringstream secname
;
835 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
836 dump_cmddesc_to_json(&f
, m
->get_connection()->get_features(),
837 secname
.str(), mc
.cmdstring
, mc
.helpstring
,
838 mc
.module
, mc
.req_perms
, 0);
842 for (const auto &pyc
: py_commands
) {
846 for (const auto &mgr_cmd
: mgr_commands
) {
850 f
.close_section(); // command_descriptions
851 f
.flush(cmdctx
->odata
);
852 cmdctx
->reply(0, ss
);
857 const MonCommand
*mgr_cmd
= _get_mgrcommand(prefix
, mgr_commands
);
858 _generate_command_map(cmdctx
->cmdmap
, param_str_map
);
862 MonCommand py_command
= {"", "", "py", "rw"};
863 is_allowed
= _allowed_command(session
, py_command
.module
,
864 prefix
, cmdctx
->cmdmap
, param_str_map
, &py_command
);
866 // validate user's permissions for requested command
867 is_allowed
= _allowed_command(session
, mgr_cmd
->module
,
868 prefix
, cmdctx
->cmdmap
, param_str_map
, mgr_cmd
);
871 dout(1) << " access denied" << dendl
;
872 audit_clog
->info() << "from='" << session
->inst
<< "' "
873 << "entity='" << session
->entity_name
<< "' "
874 << "cmd=" << m
->cmd
<< ": access denied";
875 ss
<< "access denied: does your client key have mgr caps? "
876 "See http://docs.ceph.com/docs/master/mgr/administrator/"
877 "#client-authentication";
878 cmdctx
->reply(-EACCES
, ss
);
883 << "from='" << session
->inst
<< "' "
884 << "entity='" << session
->entity_name
<< "' "
885 << "cmd=" << m
->cmd
<< ": dispatch";
888 // service map commands
889 if (prefix
== "service dump") {
891 f
.reset(Formatter::create("json-pretty"));
892 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
893 f
->dump_object("service_map", service_map
);
895 f
->flush(cmdctx
->odata
);
896 cmdctx
->reply(0, ss
);
899 if (prefix
== "service status") {
901 f
.reset(Formatter::create("json-pretty"));
902 // only include state from services that are in the persisted service map
903 f
->open_object_section("service_status");
904 for (auto& p
: pending_service_map
.services
) {
905 f
->open_object_section(p
.first
.c_str());
906 for (auto& q
: p
.second
.daemons
) {
907 f
->open_object_section(q
.first
.c_str());
908 DaemonKey
key(p
.first
, q
.first
);
909 ceph_assert(daemon_state
.exists(key
));
910 auto daemon
= daemon_state
.get(key
);
911 std::lock_guard
l(daemon
->lock
);
912 f
->dump_stream("status_stamp") << daemon
->service_status_stamp
;
913 f
->dump_stream("last_beacon") << daemon
->last_service_beacon
;
914 f
->open_object_section("status");
915 for (auto& r
: daemon
->service_status
) {
916 f
->dump_string(r
.first
.c_str(), r
.second
);
924 f
->flush(cmdctx
->odata
);
925 cmdctx
->reply(0, ss
);
929 if (prefix
== "config set") {
932 cmd_getval(cct
, cmdctx
->cmdmap
, "key", key
);
933 cmd_getval(cct
, cmdctx
->cmdmap
, "value", val
);
934 r
= cct
->_conf
.set_val(key
, val
, &ss
);
936 cct
->_conf
.apply_changes(nullptr);
938 cmdctx
->reply(0, ss
);
945 if (prefix
== "pg scrub" ||
946 prefix
== "pg repair" ||
947 prefix
== "pg deep-scrub") {
948 string scrubop
= prefix
.substr(3, string::npos
);
952 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgidstr
);
953 if (!pgid
.parse(pgidstr
.c_str())) {
954 ss
<< "invalid pgid '" << pgidstr
<< "'";
955 cmdctx
->reply(-EINVAL
, ss
);
958 bool pg_exists
= false;
959 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
960 pg_exists
= osdmap
.pg_exists(pgid
);
963 ss
<< "pg " << pgid
<< " does not exist";
964 cmdctx
->reply(-ENOENT
, ss
);
967 int acting_primary
= -1;
969 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
970 epoch
= osdmap
.get_epoch();
971 osdmap
.get_primary_shard(pgid
, &acting_primary
, &spgid
);
973 if (acting_primary
== -1) {
974 ss
<< "pg " << pgid
<< " has no primary osd";
975 cmdctx
->reply(-EAGAIN
, ss
);
978 auto p
= osd_cons
.find(acting_primary
);
979 if (p
== osd_cons
.end()) {
980 ss
<< "pg " << pgid
<< " primary osd." << acting_primary
981 << " is not currently connected";
982 cmdctx
->reply(-EAGAIN
, ss
);
985 for (auto& con
: p
->second
) {
986 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
987 vector
<spg_t
> pgs
= { spgid
};
988 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
992 scrubop
== "deep-scrub"));
994 vector
<pg_t
> pgs
= { pgid
};
995 con
->send_message(new MOSDScrub(monc
->get_fsid(),
998 scrubop
== "deep-scrub"));
1001 ss
<< "instructing pg " << spgid
<< " on osd." << acting_primary
1002 << " to " << scrubop
;
1003 cmdctx
->reply(0, ss
);
1005 } else if (prefix
== "osd scrub" ||
1006 prefix
== "osd deep-scrub" ||
1007 prefix
== "osd repair") {
1009 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", whostr
);
1010 vector
<string
> pvec
;
1011 get_str_vec(prefix
, pvec
);
1014 if (whostr
== "*" || whostr
== "all" || whostr
== "any") {
1015 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1016 for (int i
= 0; i
< osdmap
.get_max_osd(); i
++)
1017 if (osdmap
.is_up(i
)) {
1022 long osd
= parse_osd_id(whostr
.c_str(), &ss
);
1024 ss
<< "invalid osd '" << whostr
<< "'";
1025 cmdctx
->reply(-EINVAL
, ss
);
1028 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1029 if (osdmap
.is_up(osd
)) {
1034 ss
<< "osd." << osd
<< " is not up";
1035 cmdctx
->reply(-EAGAIN
, ss
);
1039 set
<int> sent_osds
, failed_osds
;
1040 for (auto osd
: osds
) {
1043 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1044 epoch
= osdmap
.get_epoch();
1045 auto p
= pgmap
.pg_by_osd
.find(osd
);
1046 if (p
!= pgmap
.pg_by_osd
.end()) {
1047 for (auto pgid
: p
->second
) {
1050 osdmap
.get_primary_shard(pgid
, &primary
, &spg
);
1051 if (primary
== osd
) {
1052 spgs
.push_back(spg
);
1057 auto p
= osd_cons
.find(osd
);
1058 if (p
== osd_cons
.end()) {
1059 failed_osds
.insert(osd
);
1061 sent_osds
.insert(osd
);
1062 for (auto& con
: p
->second
) {
1063 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
1064 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1067 pvec
.back() == "repair",
1068 pvec
.back() == "deep-scrub"));
1070 con
->send_message(new MOSDScrub(monc
->get_fsid(),
1071 pvec
.back() == "repair",
1072 pvec
.back() == "deep-scrub"));
1077 if (failed_osds
.size() == osds
.size()) {
1078 ss
<< "failed to instruct osd(s) " << osds
<< " to " << pvec
.back()
1079 << " (not connected)";
1082 ss
<< "instructed osd(s) " << sent_osds
<< " to " << pvec
.back();
1083 if (!failed_osds
.empty()) {
1084 ss
<< "; osd(s) " << failed_osds
<< " were not connected";
1088 cmdctx
->reply(0, ss
);
1090 } else if (prefix
== "osd pool scrub" ||
1091 prefix
== "osd pool deep-scrub" ||
1092 prefix
== "osd pool repair") {
1093 vector
<string
> pool_names
;
1094 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", pool_names
);
1095 if (pool_names
.empty()) {
1096 ss
<< "must specify one or more pool names";
1097 cmdctx
->reply(-EINVAL
, ss
);
1101 map
<int32_t, vector
<pg_t
>> pgs_by_primary
; // legacy
1102 map
<int32_t, vector
<spg_t
>> spgs_by_primary
;
1103 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1104 epoch
= osdmap
.get_epoch();
1105 for (auto& pool_name
: pool_names
) {
1106 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1108 ss
<< "unrecognized pool '" << pool_name
<< "'";
1112 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1113 for (int i
= 0; i
< pool_pg_num
; i
++) {
1114 pg_t
pg(i
, pool_id
);
1117 auto got
= osdmap
.get_primary_shard(pg
, &primary
, &spg
);
1120 pgs_by_primary
[primary
].push_back(pg
);
1121 spgs_by_primary
[primary
].push_back(spg
);
1126 cmdctx
->reply(r
, ss
);
1129 for (auto& it
: spgs_by_primary
) {
1130 auto primary
= it
.first
;
1131 auto p
= osd_cons
.find(primary
);
1132 if (p
== osd_cons
.end()) {
1133 ss
<< "osd." << primary
<< " is not currently connected";
1134 cmdctx
->reply(-EAGAIN
, ss
);
1137 for (auto& con
: p
->second
) {
1138 if (HAVE_FEATURE(con
->get_features(), SERVER_MIMIC
)) {
1139 con
->send_message(new MOSDScrub2(monc
->get_fsid(),
1142 prefix
== "osd pool repair",
1143 prefix
== "osd pool deep-scrub"));
1146 auto q
= pgs_by_primary
.find(primary
);
1147 ceph_assert(q
!= pgs_by_primary
.end());
1148 con
->send_message(new MOSDScrub(monc
->get_fsid(),
1150 prefix
== "osd pool repair",
1151 prefix
== "osd pool deep-scrub"));
1155 cmdctx
->reply(0, "");
1157 } else if (prefix
== "osd reweight-by-pg" ||
1158 prefix
== "osd reweight-by-utilization" ||
1159 prefix
== "osd test-reweight-by-pg" ||
1160 prefix
== "osd test-reweight-by-utilization") {
1162 prefix
== "osd reweight-by-pg" || prefix
== "osd test-reweight-by-pg";
1164 prefix
== "osd test-reweight-by-pg" ||
1165 prefix
== "osd test-reweight-by-utilization";
1167 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "oload", oload
, int64_t(120));
1169 vector
<string
> poolnames
;
1170 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pools", poolnames
);
1171 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1172 for (const auto& poolname
: poolnames
) {
1173 int64_t pool
= osdmap
.lookup_pg_pool_name(poolname
);
1175 ss
<< "pool '" << poolname
<< "' does not exist";
1182 cmdctx
->reply(r
, ss
);
1186 double max_change
= g_conf().get_val
<double>("mon_reweight_max_change");
1187 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_change", max_change
);
1188 if (max_change
<= 0.0) {
1189 ss
<< "max_change " << max_change
<< " must be positive";
1190 cmdctx
->reply(-EINVAL
, ss
);
1193 int64_t max_osds
= g_conf().get_val
<int64_t>("mon_reweight_max_osds");
1194 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "max_osds", max_osds
);
1195 if (max_osds
<= 0) {
1196 ss
<< "max_osds " << max_osds
<< " must be positive";
1197 cmdctx
->reply(-EINVAL
, ss
);
1200 bool no_increasing
= false;
1201 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "no_increasing", no_increasing
);
1203 mempool::osdmap::map
<int32_t, uint32_t> new_weights
;
1204 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
&osdmap
, const PGMap
& pgmap
) {
1205 return reweight::by_utilization(osdmap
, pgmap
,
1210 pools
.empty() ? NULL
: &pools
,
1213 &ss
, &out_str
, f
.get());
1216 dout(10) << "reweight::by_utilization: finished with " << out_str
<< dendl
;
1219 f
->flush(cmdctx
->odata
);
1221 cmdctx
->odata
.append(out_str
);
1224 ss
<< "FAILED reweight-by-pg";
1225 cmdctx
->reply(r
, ss
);
1227 } else if (r
== 0 || dry_run
) {
1229 cmdctx
->reply(r
, ss
);
1232 json_spirit::Object json_object
;
1233 for (const auto& osd_weight
: new_weights
) {
1234 json_spirit::Config::add(json_object
,
1235 std::to_string(osd_weight
.first
),
1236 std::to_string(osd_weight
.second
));
1238 string s
= json_spirit::write(json_object
);
1239 std::replace(begin(s
), end(s
), '\"', '\'');
1242 "\"prefix\": \"osd reweightn\", "
1243 "\"weights\": \"" + s
+ "\""
1245 auto on_finish
= new ReplyOnFinish(cmdctx
);
1246 monc
->start_mon_command({cmd
}, {},
1247 &on_finish
->from_mon
, &on_finish
->outs
, on_finish
);
1250 } else if (prefix
== "osd df") {
1252 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "output_method", method
);
1255 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "filter_by", filter_by
);
1256 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "filter", filter
);
1257 if (filter_by
.empty() != filter
.empty()) {
1258 cmdctx
->reply(-EINVAL
, "you must specify both 'filter_by' and 'filter'");
1262 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pgmap
) {
1265 // sanity check filter(s)
1266 if (filter_by
== "class") {
1267 if (!osdmap
.crush
->class_exists(filter
)) {
1268 rs
<< "specified class '" << filter
<< "' does not exist";
1271 class_name
= filter
;
1273 if (filter_by
== "name") {
1274 if (!osdmap
.crush
->name_exists(filter
)) {
1275 rs
<< "specified name '" << filter
<< "' does not exist";
1280 print_osd_utilization(osdmap
, pgmap
, ss
,
1281 f
.get(), method
== "tree",
1282 class_name
, item_name
);
1284 cmdctx
->odata
.append(ss
);
1287 cmdctx
->reply(r
, rs
);
1289 } else if (prefix
== "osd pool stats") {
1291 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pool_name", pool_name
);
1292 int64_t poolid
= -ENOENT
;
1293 bool one_pool
= false;
1294 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1295 if (!pool_name
.empty()) {
1296 poolid
= osdmap
.lookup_pg_pool_name(pool_name
);
1298 ceph_assert(poolid
== -ENOENT
);
1299 ss
<< "unrecognized pool '" << pool_name
<< "'";
1306 f
->open_array_section("pool_stats");
1308 if (osdmap
.get_pools().empty()) {
1309 ss
<< "there are no pools!";
1313 for (auto &p
: osdmap
.get_pools()) {
1317 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, f
.get(), &rs
);
1325 f
->flush(cmdctx
->odata
);
1327 cmdctx
->odata
.append(rs
.str());
1331 if (r
!= -EOPNOTSUPP
) {
1332 cmdctx
->reply(r
, ss
);
1335 } else if (prefix
== "osd safe-to-destroy" ||
1336 prefix
== "osd destroy" ||
1337 prefix
== "osd purge") {
1340 if (prefix
== "osd safe-to-destroy") {
1342 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "ids", ids
);
1343 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1344 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1346 if (!r
&& osds
.empty()) {
1347 ss
<< "must specify one or more OSDs";
1352 if (!cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "id", id
)) {
1354 ss
<< "must specify OSD id";
1360 cmdctx
->reply(r
, ss
);
1363 set
<int> active_osds
, missing_stats
, stored_pgs
, safe_to_destroy
;
1364 int affected_pgs
= 0;
1365 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1366 if (pg_map
.num_pg_unknown
> 0) {
1367 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; cannot draw"
1368 << " any conclusions";
1372 int num_active_clean
= 0;
1373 for (auto& p
: pg_map
.num_pg_by_state
) {
1374 unsigned want
= PG_STATE_ACTIVE
|PG_STATE_CLEAN
;
1375 if ((p
.first
& want
) == want
) {
1376 num_active_clean
+= p
.second
;
1379 for (auto osd
: osds
) {
1380 if (!osdmap
.exists(osd
)) {
1381 safe_to_destroy
.insert(osd
);
1382 continue; // clearly safe to destroy
1384 auto q
= pg_map
.num_pg_by_osd
.find(osd
);
1385 if (q
!= pg_map
.num_pg_by_osd
.end()) {
1386 if (q
->second
.acting
> 0 || q
->second
.up
> 0) {
1387 active_osds
.insert(osd
);
1388 affected_pgs
+= q
->second
.acting
+ q
->second
.up
;
1392 if (num_active_clean
< pg_map
.num_pg
) {
1393 // all pgs aren't active+clean; we need to be careful.
1394 auto p
= pg_map
.osd_stat
.find(osd
);
1395 if (p
== pg_map
.osd_stat
.end()) {
1396 missing_stats
.insert(osd
);
1398 } else if (p
->second
.num_pgs
> 0) {
1399 stored_pgs
.insert(osd
);
1403 safe_to_destroy
.insert(osd
);
1406 if (r
&& prefix
== "osd safe-to-destroy") {
1407 cmdctx
->reply(r
, ss
); // regardless of formatter
1410 if (!r
&& (!active_osds
.empty() ||
1411 !missing_stats
.empty() || !stored_pgs
.empty())) {
1412 if (!safe_to_destroy
.empty()) {
1413 ss
<< "OSD(s) " << safe_to_destroy
1414 << " are safe to destroy without reducing data durability. ";
1416 if (!active_osds
.empty()) {
1417 ss
<< "OSD(s) " << active_osds
<< " have " << affected_pgs
1418 << " pgs currently mapped to them. ";
1420 if (!missing_stats
.empty()) {
1421 ss
<< "OSD(s) " << missing_stats
<< " have no reported stats, and not all"
1422 << " PGs are active+clean; we cannot draw any conclusions. ";
1424 if (!stored_pgs
.empty()) {
1425 ss
<< "OSD(s) " << stored_pgs
<< " last reported they still store some PG"
1426 << " data, and not all PGs are active+clean; we cannot be sure they"
1427 << " aren't still needed.";
1429 if (!active_osds
.empty() || !stored_pgs
.empty()) {
1436 if (prefix
== "osd safe-to-destroy") {
1438 ss
<< "OSD(s) " << osds
<< " are safe to destroy without reducing data"
1440 safe_to_destroy
.swap(osds
);
1443 f
->open_object_section("osd_status");
1444 f
->open_array_section("safe_to_destroy");
1445 for (auto i
: safe_to_destroy
)
1446 f
->dump_int("osd", i
);
1448 f
->open_array_section("active");
1449 for (auto i
: active_osds
)
1450 f
->dump_int("osd", i
);
1452 f
->open_array_section("missing_stats");
1453 for (auto i
: missing_stats
)
1454 f
->dump_int("osd", i
);
1456 f
->open_array_section("stored_pgs");
1457 for (auto i
: stored_pgs
)
1458 f
->dump_int("osd", i
);
1460 f
->close_section(); // osd_status
1461 f
->flush(cmdctx
->odata
);
1463 std::stringstream().swap(ss
);
1465 cmdctx
->reply(r
, ss
);
1471 cmd_getval(cct
, cmdctx
->cmdmap
, "force", force
);
1474 cmd_getval(cct
, cmdctx
->cmdmap
, "yes_i_really_mean_it", force
);
1477 ss
<< "\nYou can proceed by passing --force, but be warned that"
1478 " this will likely mean real, permanent data loss.";
1484 cmdctx
->reply(r
, ss
);
1489 "\"prefix\": \"" + prefix
+ "-actual\", "
1490 "\"id\": " + stringify(osds
) + ", "
1491 "\"yes_i_really_mean_it\": true"
1493 auto on_finish
= new ReplyOnFinish(cmdctx
);
1494 monc
->start_mon_command({cmd
}, {}, nullptr, &on_finish
->outs
, on_finish
);
1496 } else if (prefix
== "osd ok-to-stop") {
1498 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "ids", ids
);
1501 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1502 r
= osdmap
.parse_osd_id_list(ids
, &osds
, &ss
);
1504 if (!r
&& osds
.empty()) {
1505 ss
<< "must specify one or more OSDs";
1509 cmdctx
->reply(r
, ss
);
1512 map
<pg_t
,int> pg_delta
; // pgid -> net acting set size change
1513 int dangerous_pgs
= 0;
1514 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
1515 if (pg_map
.num_pg_unknown
> 0) {
1516 ss
<< pg_map
.num_pg_unknown
<< " pgs have unknown state; "
1517 << "cannot draw any conclusions";
1521 for (auto osd
: osds
) {
1522 auto p
= pg_map
.pg_by_osd
.find(osd
);
1523 if (p
!= pg_map
.pg_by_osd
.end()) {
1524 for (auto& pgid
: p
->second
) {
1529 for (auto& p
: pg_delta
) {
1530 auto q
= pg_map
.pg_stat
.find(p
.first
);
1531 if (q
== pg_map
.pg_stat
.end()) {
1532 ss
<< "missing information about " << p
.first
<< "; cannot draw"
1533 << " any conclusions";
1537 if (!(q
->second
.state
& PG_STATE_ACTIVE
) ||
1538 (q
->second
.state
& PG_STATE_DEGRADED
)) {
1539 // we don't currently have a good way to tell *how* degraded
1540 // a degraded PG is, so we have to assume we cannot remove
1541 // any more replicas/shards.
1545 const pg_pool_t
*pi
= osdmap
.get_pg_pool(p
.first
.pool());
1547 ++dangerous_pgs
; // pool is creating or deleting
1549 if (q
->second
.acting
.size() + p
.second
< pi
->min_size
) {
1556 cmdctx
->reply(r
, ss
);
1559 if (dangerous_pgs
) {
1560 ss
<< dangerous_pgs
<< " PGs are already degraded or might become "
1562 cmdctx
->reply(-EBUSY
, ss
);
1565 ss
<< "OSD(s) " << osds
<< " are ok to stop without reducing"
1566 << " availability, provided there are no other concurrent failures"
1567 << " or interventions. " << pg_delta
.size() << " PGs are likely to be"
1568 << " degraded (but remain available) as a result.";
1569 cmdctx
->reply(0, ss
);
1571 } else if (prefix
== "pg force-recovery" ||
1572 prefix
== "pg force-backfill" ||
1573 prefix
== "pg cancel-force-recovery" ||
1574 prefix
== "pg cancel-force-backfill" ||
1575 prefix
== "osd pool force-recovery" ||
1576 prefix
== "osd pool force-backfill" ||
1577 prefix
== "osd pool cancel-force-recovery" ||
1578 prefix
== "osd pool cancel-force-backfill") {
1580 get_str_vec(prefix
, vs
);
1581 auto& granularity
= vs
.front();
1582 auto& forceop
= vs
.back();
1585 // figure out actual op just once
1587 if (forceop
== "force-recovery") {
1588 actual_op
= OFR_RECOVERY
;
1589 } else if (forceop
== "force-backfill") {
1590 actual_op
= OFR_BACKFILL
;
1591 } else if (forceop
== "cancel-force-backfill") {
1592 actual_op
= OFR_BACKFILL
| OFR_CANCEL
;
1593 } else if (forceop
== "cancel-force-recovery") {
1594 actual_op
= OFR_RECOVERY
| OFR_CANCEL
;
1597 set
<pg_t
> candidates
; // deduped
1598 if (granularity
== "pg") {
1599 // covnert pg names to pgs, discard any invalid ones while at it
1600 vector
<string
> pgids
;
1601 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "pgid", pgids
);
1602 for (auto& i
: pgids
) {
1604 if (!pgid
.parse(i
.c_str())) {
1605 ss
<< "invlaid pgid '" << i
<< "'; ";
1609 candidates
.insert(pgid
);
1613 vector
<string
> pool_names
;
1614 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", pool_names
);
1615 if (pool_names
.empty()) {
1616 ss
<< "must specify one or more pool names";
1617 cmdctx
->reply(-EINVAL
, ss
);
1620 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1621 for (auto& pool_name
: pool_names
) {
1622 auto pool_id
= osdmap
.lookup_pg_pool_name(pool_name
);
1624 ss
<< "unrecognized pool '" << pool_name
<< "'";
1628 auto pool_pg_num
= osdmap
.get_pg_num(pool_id
);
1629 for (int i
= 0; i
< pool_pg_num
; i
++)
1630 candidates
.insert({(unsigned int)i
, (uint64_t)pool_id
});
1634 cmdctx
->reply(r
, ss
);
1639 cluster_state
.with_pgmap([&](const PGMap
& pg_map
) {
1640 for (auto& i
: candidates
) {
1641 auto it
= pg_map
.pg_stat
.find(i
);
1642 if (it
== pg_map
.pg_stat
.end()) {
1643 ss
<< "pg " << i
<< " does not exist; ";
1647 auto state
= it
->second
.state
;
1648 // discard pgs for which user requests are pointless
1649 switch (actual_op
) {
1651 if ((state
& (PG_STATE_DEGRADED
|
1652 PG_STATE_RECOVERY_WAIT
|
1653 PG_STATE_RECOVERING
)) == 0) {
1654 // don't return error, user script may be racing with cluster.
1656 ss
<< "pg " << i
<< " doesn't require recovery; ";
1658 } else if (state
& PG_STATE_FORCED_RECOVERY
) {
1659 ss
<< "pg " << i
<< " recovery already forced; ";
1660 // return error, as it may be a bug in user script
1666 if ((state
& (PG_STATE_DEGRADED
|
1667 PG_STATE_BACKFILL_WAIT
|
1668 PG_STATE_BACKFILLING
)) == 0) {
1669 ss
<< "pg " << i
<< " doesn't require backfilling; ";
1671 } else if (state
& PG_STATE_FORCED_BACKFILL
) {
1672 ss
<< "pg " << i
<< " backfill already forced; ";
1677 case OFR_BACKFILL
| OFR_CANCEL
:
1678 if ((state
& PG_STATE_FORCED_BACKFILL
) == 0) {
1679 ss
<< "pg " << i
<< " backfill not forced; ";
1683 case OFR_RECOVERY
| OFR_CANCEL
:
1684 if ((state
& PG_STATE_FORCED_RECOVERY
) == 0) {
1685 ss
<< "pg " << i
<< " recovery not forced; ";
1690 ceph_abort_msg("actual_op value is not supported");
1696 // respond with error only when no pgs are correct
1697 // yes, in case of mixed errors, only the last one will be emitted,
1698 // but the message presented will be fine
1699 if (pgs
.size() != 0) {
1700 // clear error to not confuse users/scripts
1704 // optimize the command -> messages conversion, use only one
1705 // message per distinct OSD
1706 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
1707 // group pgs to process by osd
1708 map
<int, vector
<spg_t
>> osdpgs
;
1709 for (auto& pgid
: pgs
) {
1712 if (osdmap
.get_primary_shard(pgid
, &primary
, &spg
)) {
1713 osdpgs
[primary
].push_back(spg
);
1716 for (auto& i
: osdpgs
) {
1717 if (osdmap
.is_up(i
.first
)) {
1718 auto p
= osd_cons
.find(i
.first
);
1719 if (p
== osd_cons
.end()) {
1720 ss
<< "osd." << i
.first
<< " is not currently connected";
1724 for (auto& con
: p
->second
) {
1726 new MOSDForceRecovery(monc
->get_fsid(), i
.second
, actual_op
));
1728 ss
<< "instructing pg(s) " << i
.second
<< " on osd." << i
.first
1729 << " to " << forceop
<< "; ";
1734 cmdctx
->reply(r
, ss
);
1736 } else if (prefix
== "config show" ||
1737 prefix
== "config show-with-defaults") {
1739 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", who
);
1741 auto dot
= who
.find('.');
1743 key
.first
= who
.substr(0, dot
);
1744 key
.second
= who
.substr(dot
+ 1);
1745 DaemonStatePtr daemon
= daemon_state
.get(key
);
1748 ss
<< "no config state for daemon " << who
;
1749 cmdctx
->reply(-ENOENT
, ss
);
1753 std::lock_guard
l(daemon
->lock
);
1755 if (cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "key", name
)) {
1756 auto p
= daemon
->config
.find(name
);
1757 if (p
!= daemon
->config
.end() &&
1758 !p
->second
.empty()) {
1759 cmdctx
->odata
.append(p
->second
.rbegin()->second
+ "\n");
1761 auto& defaults
= daemon
->_get_config_defaults();
1762 auto q
= defaults
.find(name
);
1763 if (q
!= defaults
.end()) {
1764 cmdctx
->odata
.append(q
->second
+ "\n");
1769 } else if (daemon
->config_defaults_bl
.length() > 0) {
1772 f
->open_array_section("config");
1774 tbl
.define_column("NAME", TextTable::LEFT
, TextTable::LEFT
);
1775 tbl
.define_column("VALUE", TextTable::LEFT
, TextTable::LEFT
);
1776 tbl
.define_column("SOURCE", TextTable::LEFT
, TextTable::LEFT
);
1777 tbl
.define_column("OVERRIDES", TextTable::LEFT
, TextTable::LEFT
);
1778 tbl
.define_column("IGNORES", TextTable::LEFT
, TextTable::LEFT
);
1780 if (prefix
== "config show") {
1782 for (auto& i
: daemon
->config
) {
1783 dout(20) << " " << i
.first
<< " -> " << i
.second
<< dendl
;
1784 if (i
.second
.empty()) {
1788 f
->open_object_section("value");
1789 f
->dump_string("name", i
.first
);
1790 f
->dump_string("value", i
.second
.rbegin()->second
);
1791 f
->dump_string("source", ceph_conf_level_name(
1792 i
.second
.rbegin()->first
));
1793 if (i
.second
.size() > 1) {
1794 f
->open_array_section("overrides");
1795 auto j
= i
.second
.rend();
1796 for (--j
; j
!= i
.second
.rbegin(); --j
) {
1797 f
->open_object_section("value");
1798 f
->dump_string("source", ceph_conf_level_name(j
->first
));
1799 f
->dump_string("value", j
->second
);
1804 if (daemon
->ignored_mon_config
.count(i
.first
)) {
1805 f
->dump_string("ignores", "mon");
1810 tbl
<< i
.second
.rbegin()->second
;
1811 tbl
<< ceph_conf_level_name(i
.second
.rbegin()->first
);
1812 if (i
.second
.size() > 1) {
1814 auto j
= i
.second
.rend();
1815 for (--j
; j
!= i
.second
.rbegin(); --j
) {
1816 if (j
->second
== i
.second
.rbegin()->second
) {
1817 ov
.push_front(string("(") + ceph_conf_level_name(j
->first
) +
1818 string("[") + j
->second
+ string("]") +
1821 ov
.push_front(ceph_conf_level_name(j
->first
) +
1822 string("[") + j
->second
+ string("]"));
1830 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
1831 tbl
<< TextTable::endrow
;
1835 // show-with-defaults
1836 auto& defaults
= daemon
->_get_config_defaults();
1837 for (auto& i
: defaults
) {
1839 f
->open_object_section("value");
1840 f
->dump_string("name", i
.first
);
1844 auto j
= daemon
->config
.find(i
.first
);
1845 if (j
!= daemon
->config
.end() && !j
->second
.empty()) {
1848 f
->dump_string("value", j
->second
.rbegin()->second
);
1849 f
->dump_string("source", ceph_conf_level_name(
1850 j
->second
.rbegin()->first
));
1851 if (j
->second
.size() > 1) {
1852 f
->open_array_section("overrides");
1853 auto k
= j
->second
.rend();
1854 for (--k
; k
!= j
->second
.rbegin(); --k
) {
1855 f
->open_object_section("value");
1856 f
->dump_string("source", ceph_conf_level_name(k
->first
));
1857 f
->dump_string("value", k
->second
);
1862 if (daemon
->ignored_mon_config
.count(i
.first
)) {
1863 f
->dump_string("ignores", "mon");
1867 tbl
<< j
->second
.rbegin()->second
;
1868 tbl
<< ceph_conf_level_name(j
->second
.rbegin()->first
);
1869 if (j
->second
.size() > 1) {
1871 auto k
= j
->second
.rend();
1872 for (--k
; k
!= j
->second
.rbegin(); --k
) {
1873 if (k
->second
== j
->second
.rbegin()->second
) {
1874 ov
.push_front(string("(") + ceph_conf_level_name(k
->first
) +
1875 string("[") + k
->second
+ string("]") +
1878 ov
.push_front(ceph_conf_level_name(k
->first
) +
1879 string("[") + k
->second
+ string("]"));
1886 tbl
<< (daemon
->ignored_mon_config
.count(i
.first
) ? "mon" : "");
1887 tbl
<< TextTable::endrow
;
1890 // only have default
1892 f
->dump_string("value", i
.second
);
1893 f
->dump_string("source", ceph_conf_level_name(CONF_DEFAULT
));
1897 tbl
<< ceph_conf_level_name(CONF_DEFAULT
);
1900 tbl
<< TextTable::endrow
;
1907 f
->flush(cmdctx
->odata
);
1909 cmdctx
->odata
.append(stringify(tbl
));
1912 cmdctx
->reply(r
, ss
);
1914 } else if (prefix
== "device ls") {
1918 f
->open_array_section("devices");
1919 daemon_state
.with_devices([&f
](const DeviceState
& dev
) {
1920 f
->dump_object("device", dev
);
1923 f
->flush(cmdctx
->odata
);
1925 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
1926 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
1927 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
1928 tbl
.define_column("LIFE EXPECTANCY", TextTable::LEFT
, TextTable::LEFT
);
1929 auto now
= ceph_clock_now();
1930 daemon_state
.with_devices([&tbl
, now
](const DeviceState
& dev
) {
1932 for (auto& i
: dev
.devnames
) {
1936 h
+= i
.first
+ ":" + i
.second
;
1939 for (auto& i
: dev
.daemons
) {
1948 << dev
.get_life_expectancy_str(now
)
1949 << TextTable::endrow
;
1951 cmdctx
->odata
.append(stringify(tbl
));
1953 cmdctx
->reply(0, ss
);
1955 } else if (prefix
== "device ls-by-daemon") {
1957 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "who", who
);
1959 if (!key_from_string(who
, &k
)) {
1960 ss
<< who
<< " is not a valid daemon name";
1963 auto dm
= daemon_state
.get(k
);
1966 f
->open_array_section("devices");
1967 for (auto& i
: dm
->devices
) {
1968 daemon_state
.with_device(i
.first
, [&f
] (const DeviceState
& dev
) {
1969 f
->dump_object("device", dev
);
1973 f
->flush(cmdctx
->odata
);
1976 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
1977 tbl
.define_column("HOST:DEV", TextTable::LEFT
, TextTable::LEFT
);
1978 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
,
1980 auto now
= ceph_clock_now();
1981 for (auto& i
: dm
->devices
) {
1982 daemon_state
.with_device(
1983 i
.first
, [&tbl
, now
] (const DeviceState
& dev
) {
1985 for (auto& i
: dev
.devnames
) {
1989 h
+= i
.first
+ ":" + i
.second
;
1993 << dev
.get_life_expectancy_str(now
)
1994 << TextTable::endrow
;
1997 cmdctx
->odata
.append(stringify(tbl
));
2001 ss
<< "daemon " << who
<< " not found";
2003 cmdctx
->reply(r
, ss
);
2005 } else if (prefix
== "device ls-by-host") {
2007 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "host", host
);
2009 daemon_state
.list_devids_by_server(host
, &devids
);
2011 f
->open_array_section("devices");
2012 for (auto& devid
: devids
) {
2013 daemon_state
.with_device(
2014 devid
, [&f
] (const DeviceState
& dev
) {
2015 f
->dump_object("device", dev
);
2019 f
->flush(cmdctx
->odata
);
2022 tbl
.define_column("DEVICE", TextTable::LEFT
, TextTable::LEFT
);
2023 tbl
.define_column("DEV", TextTable::LEFT
, TextTable::LEFT
);
2024 tbl
.define_column("DAEMONS", TextTable::LEFT
, TextTable::LEFT
);
2025 tbl
.define_column("EXPECTED FAILURE", TextTable::LEFT
, TextTable::LEFT
);
2026 auto now
= ceph_clock_now();
2027 for (auto& devid
: devids
) {
2028 daemon_state
.with_device(
2029 devid
, [&tbl
, &host
, now
] (const DeviceState
& dev
) {
2031 for (auto& j
: dev
.devnames
) {
2032 if (j
.first
== host
) {
2040 for (auto& i
: dev
.daemons
) {
2049 << dev
.get_life_expectancy_str(now
)
2050 << TextTable::endrow
;
2053 cmdctx
->odata
.append(stringify(tbl
));
2055 cmdctx
->reply(0, ss
);
2057 } else if (prefix
== "device info") {
2059 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "devid", devid
);
2062 if (!daemon_state
.with_device(devid
,
2063 [&f
, &rs
] (const DeviceState
& dev
) {
2065 f
->dump_object("device", dev
);
2070 ss
<< "device " << devid
<< " not found";
2074 f
->flush(cmdctx
->odata
);
2076 cmdctx
->odata
.append(rs
.str());
2079 cmdctx
->reply(r
, ss
);
2081 } else if (prefix
== "device set-life-expectancy") {
2083 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "devid", devid
);
2084 string from_str
, to_str
;
2085 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "from", from_str
);
2086 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "to", to_str
);
2088 if (!from
.parse(from_str
)) {
2089 ss
<< "unable to parse datetime '" << from_str
<< "'";
2091 cmdctx
->reply(r
, ss
);
2092 } else if (to_str
.size() && !to
.parse(to_str
)) {
2093 ss
<< "unable to parse datetime '" << to_str
<< "'";
2095 cmdctx
->reply(r
, ss
);
2097 map
<string
,string
> meta
;
2098 daemon_state
.with_device_create(
2100 [from
, to
, &meta
] (DeviceState
& dev
) {
2101 dev
.set_life_expectancy(from
, to
, ceph_clock_now());
2102 meta
= dev
.metadata
;
2104 json_spirit::Object json_object
;
2105 for (auto& i
: meta
) {
2106 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2109 json
.append(json_spirit::write(json_object
));
2112 "\"prefix\": \"config-key set\", "
2113 "\"key\": \"device/" + devid
+ "\""
2115 auto on_finish
= new ReplyOnFinish(cmdctx
);
2116 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2119 } else if (prefix
== "device rm-life-expectancy") {
2121 cmd_getval(g_ceph_context
, cmdctx
->cmdmap
, "devid", devid
);
2122 map
<string
,string
> meta
;
2123 if (daemon_state
.with_device_write(devid
, [&meta
] (DeviceState
& dev
) {
2124 dev
.rm_life_expectancy();
2125 meta
= dev
.metadata
;
2132 "\"prefix\": \"config-key rm\", "
2133 "\"key\": \"device/" + devid
+ "\""
2136 json_spirit::Object json_object
;
2137 for (auto& i
: meta
) {
2138 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
2140 json
.append(json_spirit::write(json_object
));
2143 "\"prefix\": \"config-key set\", "
2144 "\"key\": \"device/" + devid
+ "\""
2147 auto on_finish
= new ReplyOnFinish(cmdctx
);
2148 monc
->start_mon_command({cmd
}, json
, nullptr, nullptr, on_finish
);
2150 cmdctx
->reply(0, ss
);
2155 ss
<< "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2158 f
->open_object_section("pg_info");
2159 f
->dump_bool("pg_ready", pgmap_ready
);
2162 // fall back to feeding command to PGMap
2163 r
= cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2164 return process_pg_map_command(prefix
, cmdctx
->cmdmap
, pg_map
, osdmap
,
2165 f
.get(), &ss
, &cmdctx
->odata
);
2171 if (r
!= -EOPNOTSUPP
) {
2173 f
->flush(cmdctx
->odata
);
2175 cmdctx
->reply(r
, ss
);
2180 // Resolve the command to the name of the module that will
2181 // handle it (if the command exists)
2182 std::string handler_name
;
2183 auto py_commands
= py_modules
.get_py_commands();
2184 for (const auto &pyc
: py_commands
) {
2185 auto pyc_prefix
= cmddesc_get_prefix(pyc
.cmdstring
);
2186 if (pyc_prefix
== prefix
) {
2187 handler_name
= pyc
.module_name
;
2192 // Was the command unfound?
2193 if (handler_name
.empty()) {
2194 ss
<< "No handler found for '" << prefix
<< "'";
2195 dout(4) << "No handler found for '" << prefix
<< "'" << dendl
;
2196 cmdctx
->reply(-EINVAL
, ss
);
2200 dout(4) << "passing through " << cmdctx
->cmdmap
.size() << dendl
;
2201 finisher
.queue(new FunctionContext([this, cmdctx
, handler_name
, prefix
](int r_
) {
2202 std::stringstream ss
;
2204 // Validate that the module is enabled
2205 PyModuleRef module
= py_modules
.get_module(handler_name
);
2206 ceph_assert(module
);
2207 if (!module
->is_enabled()) {
2208 ss
<< "Module '" << handler_name
<< "' is not enabled (required by "
2209 "command '" << prefix
<< "'): use `ceph mgr module enable "
2210 << handler_name
<< "` to enable it";
2211 dout(4) << ss
.str() << dendl
;
2212 cmdctx
->reply(-EOPNOTSUPP
, ss
);
2216 // Hack: allow the self-test method to run on unhealthy modules.
2217 // Fix this in future by creating a special path for self test rather
2218 // than having the hook be a normal module command.
2219 std::string self_test_prefix
= handler_name
+ " " + "self-test";
2221 // Validate that the module is healthy
2222 bool accept_command
;
2223 if (module
->is_loaded()) {
2224 if (module
->get_can_run() && !module
->is_failed()) {
2226 accept_command
= true;
2227 } else if (self_test_prefix
== prefix
) {
2228 // Unhealthy, but allow because it's a self test command
2229 accept_command
= true;
2231 accept_command
= false;
2232 ss
<< "Module '" << handler_name
<< "' has experienced an error and "
2233 "cannot handle commands: " << module
->get_error_string();
2236 // Module not loaded
2237 accept_command
= false;
2238 ss
<< "Module '" << handler_name
<< "' failed to load and "
2239 "cannot handle commands: " << module
->get_error_string();
2242 if (!accept_command
) {
2243 dout(4) << ss
.str() << dendl
;
2244 cmdctx
->reply(-EIO
, ss
);
2248 std::stringstream ds
;
2249 bufferlist inbl
= cmdctx
->m
->get_data();
2250 int r
= py_modules
.handle_command(handler_name
, cmdctx
->cmdmap
, inbl
, &ds
, &ss
);
2251 cmdctx
->odata
.append(ds
);
2252 cmdctx
->reply(r
, ss
);
2257 void DaemonServer::_prune_pending_service_map()
2259 utime_t cutoff
= ceph_clock_now();
2260 cutoff
-= g_conf().get_val
<double>("mgr_service_beacon_grace");
2261 auto p
= pending_service_map
.services
.begin();
2262 while (p
!= pending_service_map
.services
.end()) {
2263 auto q
= p
->second
.daemons
.begin();
2264 while (q
!= p
->second
.daemons
.end()) {
2265 DaemonKey
key(p
->first
, q
->first
);
2266 if (!daemon_state
.exists(key
)) {
2267 derr
<< "missing key " << key
<< dendl
;
2271 auto daemon
= daemon_state
.get(key
);
2272 std::lock_guard
l(daemon
->lock
);
2273 if (daemon
->last_service_beacon
== utime_t()) {
2274 // we must have just restarted; assume they are alive now.
2275 daemon
->last_service_beacon
= ceph_clock_now();
2279 if (daemon
->last_service_beacon
< cutoff
) {
2280 dout(10) << "pruning stale " << p
->first
<< "." << q
->first
2281 << " last_beacon " << daemon
->last_service_beacon
<< dendl
;
2282 q
= p
->second
.daemons
.erase(q
);
2283 pending_service_map_dirty
= pending_service_map
.epoch
;
2288 if (p
->second
.daemons
.empty()) {
2289 p
= pending_service_map
.services
.erase(p
);
2290 pending_service_map_dirty
= pending_service_map
.epoch
;
2297 void DaemonServer::send_report()
2300 if (ceph_clock_now() - started_at
> g_conf().get_val
<int64_t>("mgr_stats_period") * 4.0) {
2302 reported_osds
.clear();
2303 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2304 << "potentially incomplete PG state to mon" << dendl
;
2306 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2312 auto m
= new MMonMgrReport();
2313 py_modules
.get_health_checks(&m
->health_checks
);
2314 py_modules
.get_progress_events(&m
->progress_events
);
2316 cluster_state
.with_mutable_pgmap([&](PGMap
& pg_map
) {
2317 cluster_state
.update_delta_stats();
2319 if (pending_service_map
.epoch
) {
2320 _prune_pending_service_map();
2321 if (pending_service_map_dirty
>= pending_service_map
.epoch
) {
2322 pending_service_map
.modified
= ceph_clock_now();
2323 encode(pending_service_map
, m
->service_map_bl
, CEPH_FEATURES_ALL
);
2324 dout(10) << "sending service_map e" << pending_service_map
.epoch
2326 pending_service_map
.epoch
++;
2330 cluster_state
.with_osdmap([&](const OSDMap
& osdmap
) {
2331 // FIXME: no easy way to get mon features here. this will do for
2332 // now, though, as long as we don't make a backward-incompat change.
2333 pg_map
.encode_digest(osdmap
, m
->get_data(), CEPH_FEATURES_ALL
);
2334 dout(10) << pg_map
<< dendl
;
2336 pg_map
.get_health_checks(g_ceph_context
, osdmap
,
2339 dout(10) << m
->health_checks
.checks
.size() << " health checks"
2341 dout(20) << "health checks:\n";
2342 JSONFormatter
jf(true);
2343 jf
.dump_object("health_checks", m
->health_checks
);
2346 if (osdmap
.require_osd_release
>= CEPH_RELEASE_LUMINOUS
) {
2347 clog
->debug() << "pgmap v" << pg_map
.version
<< ": " << pg_map
;
2352 map
<daemon_metric
, unique_ptr
<DaemonHealthMetricCollector
>> accumulated
;
2353 for (auto service
: {"osd", "mon"} ) {
2354 auto daemons
= daemon_state
.get_by_service(service
);
2355 for (const auto& [key
,state
] : daemons
) {
2356 std::lock_guard l
{state
->lock
};
2357 for (const auto& metric
: state
->daemon_health_metrics
) {
2358 auto acc
= accumulated
.find(metric
.get_type());
2359 if (acc
== accumulated
.end()) {
2360 auto collector
= DaemonHealthMetricCollector::create(metric
.get_type());
2362 derr
<< __func__
<< " " << key
.first
<< "." << key
.second
2363 << " sent me an unknown health metric: "
2364 << std::hex
<< static_cast<uint8_t>(metric
.get_type())
2365 << std::dec
<< dendl
;
2368 dout(20) << " + " << state
->key
<< " "
2370 tie(acc
, std::ignore
) = accumulated
.emplace(metric
.get_type(),
2371 std::move(collector
));
2373 acc
->second
->update(key
, metric
);
2377 for (const auto& acc
: accumulated
) {
2378 acc
.second
->summarize(m
->health_checks
);
2380 // TODO? We currently do not notify the PyModules
2381 // TODO: respect needs_send, so we send the report only if we are asked to do
2382 // so, or the state is updated.
2383 monc
->send_mon_message(m
);
2386 void DaemonServer::adjust_pgs()
2389 unsigned max
= std::max
<int64_t>(1, g_conf()->mon_osd_max_creating_pgs
);
2390 double max_misplaced
= g_conf().get_val
<double>("target_max_misplaced_ratio");
2391 bool aggro
= g_conf().get_val
<bool>("mgr_debug_aggressive_pg_num_changes");
2393 map
<string
,unsigned> pg_num_to_set
;
2394 map
<string
,unsigned> pgp_num_to_set
;
2395 set
<pg_t
> upmaps_to_clear
;
2396 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
, const PGMap
& pg_map
) {
2397 unsigned creating_or_unknown
= 0;
2398 for (auto& i
: pg_map
.num_pg_by_state
) {
2399 if ((i
.first
& (PG_STATE_CREATING
)) ||
2401 creating_or_unknown
+= i
.second
;
2404 unsigned left
= max
;
2405 if (creating_or_unknown
>= max
) {
2408 left
-= creating_or_unknown
;
2409 dout(10) << "creating_or_unknown " << creating_or_unknown
2410 << " max_creating " << max
2414 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2415 // can run more frequently than we get updated pg stats from OSDs. We
2416 // may make multiple adjustments with stale informaiton.
2417 double misplaced_ratio
, degraded_ratio
;
2418 double inactive_pgs_ratio
, unknown_pgs_ratio
;
2419 pg_map
.get_recovery_stats(&misplaced_ratio
, °raded_ratio
,
2420 &inactive_pgs_ratio
, &unknown_pgs_ratio
);
2421 dout(20) << "misplaced_ratio " << misplaced_ratio
2422 << " degraded_ratio " << degraded_ratio
2423 << " inactive_pgs_ratio " << inactive_pgs_ratio
2424 << " unknown_pgs_ratio " << unknown_pgs_ratio
2425 << "; target_max_misplaced_ratio " << max_misplaced
2428 for (auto& i
: osdmap
.get_pools()) {
2429 const pg_pool_t
& p
= i
.second
;
2432 if (p
.get_pg_num_target() != p
.get_pg_num()) {
2433 dout(20) << "pool " << i
.first
2434 << " pg_num " << p
.get_pg_num()
2435 << " target " << p
.get_pg_num_target()
2437 if (p
.has_flag(pg_pool_t::FLAG_CREATING
)) {
2438 dout(10) << "pool " << i
.first
2439 << " pg_num_target " << p
.get_pg_num_target()
2440 << " pg_num " << p
.get_pg_num()
2441 << " - still creating initial pgs"
2443 } else if (p
.get_pg_num_target() < p
.get_pg_num()) {
2444 // pg_num decrease (merge)
2445 pg_t
merge_source(p
.get_pg_num() - 1, i
.first
);
2446 pg_t merge_target
= merge_source
.get_parent();
2449 if (p
.get_pg_num() != p
.get_pg_num_pending()) {
2450 dout(10) << "pool " << i
.first
2451 << " pg_num_target " << p
.get_pg_num_target()
2452 << " pg_num " << p
.get_pg_num()
2453 << " - decrease and pg_num_pending != pg_num, waiting"
2456 } else if (p
.get_pg_num() == p
.get_pgp_num()) {
2457 dout(10) << "pool " << i
.first
2458 << " pg_num_target " << p
.get_pg_num_target()
2459 << " pg_num " << p
.get_pg_num()
2460 << " - decrease blocked by pgp_num "
2465 for (auto &merge_participant
: {merge_source
, merge_target
}) {
2466 bool is_merge_source
= merge_participant
== merge_source
;
2467 if (osdmap
.have_pg_upmaps(merge_participant
)) {
2468 dout(10) << "pool " << i
.first
2469 << " pg_num_target " << p
.get_pg_num_target()
2470 << " pg_num " << p
.get_pg_num()
2471 << (is_merge_source
? " - merge source " : " - merge target ")
2472 << merge_participant
2473 << " has upmap" << dendl
;
2474 upmaps_to_clear
.insert(merge_participant
);
2477 auto q
= pg_map
.pg_stat
.find(merge_participant
);
2478 if (q
== pg_map
.pg_stat
.end()) {
2479 dout(10) << "pool " << i
.first
2480 << " pg_num_target " << p
.get_pg_num_target()
2481 << " pg_num " << p
.get_pg_num()
2482 << " - no state for " << merge_participant
2483 << (is_merge_source
? " (merge source)" : " (merge target)")
2486 } else if ((q
->second
.state
& (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) !=
2487 (PG_STATE_ACTIVE
| PG_STATE_CLEAN
)) {
2488 dout(10) << "pool " << i
.first
2489 << " pg_num_target " << p
.get_pg_num_target()
2490 << " pg_num " << p
.get_pg_num()
2491 << (is_merge_source
? " - merge source " : " - merge target ")
2492 << merge_participant
2493 << " not clean (" << pg_state_string(q
->second
.state
)
2500 unsigned target
= p
.get_pg_num() - 1;
2501 dout(10) << "pool " << i
.first
2502 << " pg_num_target " << p
.get_pg_num_target()
2503 << " pg_num " << p
.get_pg_num()
2505 << " (merging " << merge_source
2506 << " and " << merge_target
2508 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2510 } else if (p
.get_pg_num_target() > p
.get_pg_num()) {
2511 // pg_num increase (split)
2513 auto q
= pg_map
.num_pg_by_pool_state
.find(i
.first
);
2514 if (q
!= pg_map
.num_pg_by_pool_state
.end()) {
2515 for (auto& j
: q
->second
) {
2516 if ((j
.first
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)) == 0) {
2517 dout(20) << "pool " << i
.first
<< " has " << j
.second
2518 << " pgs in " << pg_state_string(j
.first
)
2528 dout(10) << "pool " << i
.first
2529 << " pg_num_target " << p
.get_pg_num_target()
2530 << " pg_num " << p
.get_pg_num()
2531 << " - not all pgs active"
2534 unsigned add
= std::min(
2536 p
.get_pg_num_target() - p
.get_pg_num());
2537 unsigned target
= p
.get_pg_num() + add
;
2539 dout(10) << "pool " << i
.first
2540 << " pg_num_target " << p
.get_pg_num_target()
2541 << " pg_num " << p
.get_pg_num()
2542 << " -> " << target
<< dendl
;
2543 pg_num_to_set
[osdmap
.get_pool_name(i
.first
)] = target
;
2549 unsigned target
= std::min(p
.get_pg_num_pending(),
2550 p
.get_pgp_num_target());
2551 if (target
!= p
.get_pgp_num()) {
2552 dout(20) << "pool " << i
.first
2553 << " pgp_num_target " << p
.get_pgp_num_target()
2554 << " pgp_num " << p
.get_pgp_num()
2555 << " -> " << target
<< dendl
;
2556 if (target
> p
.get_pgp_num() &&
2557 p
.get_pgp_num() == p
.get_pg_num()) {
2558 dout(10) << "pool " << i
.first
2559 << " pgp_num_target " << p
.get_pgp_num_target()
2560 << " pgp_num " << p
.get_pgp_num()
2561 << " - increase blocked by pg_num " << p
.get_pg_num()
2563 } else if (!aggro
&& (inactive_pgs_ratio
> 0 ||
2564 degraded_ratio
> 0 ||
2565 unknown_pgs_ratio
> 0)) {
2566 dout(10) << "pool " << i
.first
2567 << " pgp_num_target " << p
.get_pgp_num_target()
2568 << " pgp_num " << p
.get_pgp_num()
2569 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2570 << " update" << dendl
;
2571 } else if (!aggro
&& (misplaced_ratio
> max_misplaced
)) {
2572 dout(10) << "pool " << i
.first
2573 << " pgp_num_target " << p
.get_pgp_num_target()
2574 << " pgp_num " << p
.get_pgp_num()
2575 << " - misplaced_ratio " << misplaced_ratio
2576 << " > max " << max_misplaced
2577 << ", deferring pgp_num update" << dendl
;
2579 // NOTE: this calculation assumes objects are
2580 // basically uniformly distributed across all PGs
2581 // (regardless of pool), which is probably not
2582 // perfectly correct, but it's a start. make no
2583 // single adjustment that's more than half of the
2584 // max_misplaced, to somewhat limit the magnitude of
2585 // our potential error here.
2591 std::min
<double>(max_misplaced
- misplaced_ratio
,
2592 misplaced_ratio
/ 2.0);
2593 unsigned estmax
= std::max
<unsigned>(
2594 (double)p
.get_pg_num() * room
, 1u);
2595 int delta
= target
- p
.get_pgp_num();
2596 next
= p
.get_pgp_num();
2598 next
+= std::max
<int>(-estmax
, delta
);
2600 next
+= std::min
<int>(estmax
, delta
);
2602 dout(20) << " room " << room
<< " estmax " << estmax
2603 << " delta " << delta
<< " next " << next
<< dendl
;
2604 if (p
.get_pgp_num_target() == p
.get_pg_num_target()) {
2605 // since pgp_num is tracking pg_num, ceph is handling
2606 // pgp_num. so, be responsible: don't let pgp_num get
2607 // too far out ahead of merges (if we are merging).
2608 // this avoids moving lots of unmerged pgs onto a
2609 // small number of OSDs where we might blow out the
2611 unsigned max_outpace_merges
=
2612 std::max
<unsigned>(8, p
.get_pg_num() * max_misplaced
);
2613 if (next
+ max_outpace_merges
< p
.get_pg_num()) {
2614 next
= p
.get_pg_num() - max_outpace_merges
;
2615 dout(10) << " using next " << next
2616 << " to avoid outpacing merges (max_outpace_merges "
2617 << max_outpace_merges
<< ")" << dendl
;
2621 dout(10) << "pool " << i
.first
2622 << " pgp_num_target " << p
.get_pgp_num_target()
2623 << " pgp_num " << p
.get_pgp_num()
2624 << " -> " << next
<< dendl
;
2625 pgp_num_to_set
[osdmap
.get_pool_name(i
.first
)] = next
;
2633 for (auto i
: pg_num_to_set
) {
2636 "\"prefix\": \"osd pool set\", "
2637 "\"pool\": \"" + i
.first
+ "\", "
2638 "\"var\": \"pg_num_actual\", "
2639 "\"val\": \"" + stringify(i
.second
) + "\""
2641 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2643 for (auto i
: pgp_num_to_set
) {
2646 "\"prefix\": \"osd pool set\", "
2647 "\"pool\": \"" + i
.first
+ "\", "
2648 "\"var\": \"pgp_num_actual\", "
2649 "\"val\": \"" + stringify(i
.second
) + "\""
2651 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2653 for (auto pg
: upmaps_to_clear
) {
2656 "\"prefix\": \"osd rm-pg-upmap\", "
2657 "\"pgid\": \"" + stringify(pg
) + "\""
2659 monc
->start_mon_command({cmd
}, {}, nullptr, nullptr, nullptr);
2662 "\"prefix\": \"osd rm-pg-upmap-items\", "
2663 "\"pgid\": \"" + stringify(pg
) + "\"" +
2665 monc
->start_mon_command({cmd2
}, {}, nullptr, nullptr, nullptr);
2669 void DaemonServer::got_service_map()
2671 std::lock_guard
l(lock
);
2673 cluster_state
.with_servicemap([&](const ServiceMap
& service_map
) {
2674 if (pending_service_map
.epoch
== 0) {
2675 // we just started up
2676 dout(10) << "got initial map e" << service_map
.epoch
<< dendl
;
2677 pending_service_map
= service_map
;
2679 // we we already active and therefore must have persisted it,
2680 // which means ours is the same or newer.
2681 dout(10) << "got updated map e" << service_map
.epoch
<< dendl
;
2683 pending_service_map
.epoch
= service_map
.epoch
+ 1;
2686 // cull missing daemons, populate new ones
2687 for (auto& p
: pending_service_map
.services
) {
2688 std::set
<std::string
> names
;
2689 for (auto& q
: p
.second
.daemons
) {
2690 names
.insert(q
.first
);
2691 DaemonKey
key(p
.first
, q
.first
);
2692 if (!daemon_state
.exists(key
)) {
2693 auto daemon
= std::make_shared
<DaemonState
>(daemon_state
.types
);
2695 daemon
->set_metadata(q
.second
.metadata
);
2696 if (q
.second
.metadata
.count("hostname")) {
2697 daemon
->hostname
= q
.second
.metadata
["hostname"];
2699 daemon
->service_daemon
= true;
2700 daemon_state
.insert(daemon
);
2701 dout(10) << "added missing " << key
<< dendl
;
2704 daemon_state
.cull(p
.first
, names
);
2708 void DaemonServer::got_mgr_map()
2710 std::lock_guard
l(lock
);
2711 set
<std::string
> have
;
2712 cluster_state
.with_mgrmap([&](const MgrMap
& mgrmap
) {
2713 auto md_update
= [&] (DaemonKey key
) {
2714 std::ostringstream oss
;
2715 auto c
= new MetadataUpdate(daemon_state
, key
);
2716 // FIXME remove post-nautilus: include 'id' for luminous mons
2717 oss
<< "{\"prefix\": \"mgr metadata\", \"who\": \""
2718 << key
.second
<< "\", \"id\": \"" << key
.second
<< "\"}";
2719 monc
->start_mon_command({oss
.str()}, {}, &c
->outbl
, &c
->outs
, c
);
2721 if (mgrmap
.active_name
.size()) {
2722 DaemonKey
key("mgr", mgrmap
.active_name
);
2723 have
.insert(mgrmap
.active_name
);
2724 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
2726 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
2729 for (auto& i
: mgrmap
.standbys
) {
2730 DaemonKey
key("mgr", i
.second
.name
);
2731 have
.insert(i
.second
.name
);
2732 if (!daemon_state
.exists(key
) && !daemon_state
.is_updating(key
)) {
2734 dout(10) << "triggered addition of " << key
<< " via metadata update" << dendl
;
2738 daemon_state
.cull("mgr", have
);
2741 const char** DaemonServer::get_tracked_conf_keys() const
2743 static const char *KEYS
[] = {
2744 "mgr_stats_threshold",
2752 void DaemonServer::handle_conf_change(const ConfigProxy
& conf
,
2753 const std::set
<std::string
> &changed
)
2756 if (changed
.count("mgr_stats_threshold") || changed
.count("mgr_stats_period")) {
2757 dout(4) << "Updating stats threshold/period on "
2758 << daemon_connections
.size() << " clients" << dendl
;
2759 // Send a fresh MMgrConfigure to all clients, so that they can follow
2760 // the new policy for transmitting stats
2761 finisher
.queue(new FunctionContext([this](int r
) {
2762 std::lock_guard
l(lock
);
2763 for (auto &c
: daemon_connections
) {
2770 void DaemonServer::_send_configure(ConnectionRef c
)
2772 ceph_assert(lock
.is_locked_by_me());
2774 auto configure
= new MMgrConfigure();
2775 configure
->stats_period
= g_conf().get_val
<int64_t>("mgr_stats_period");
2776 configure
->stats_threshold
= g_conf().get_val
<int64_t>("mgr_stats_threshold");
2778 if (c
->peer_is_osd()) {
2779 configure
->osd_perf_metric_queries
=
2780 osd_perf_metric_collector
.get_queries();
2783 c
->send_message(configure
);
2786 OSDPerfMetricQueryID
DaemonServer::add_osd_perf_query(
2787 const OSDPerfMetricQuery
&query
,
2788 const std::optional
<OSDPerfMetricLimit
> &limit
)
2790 return osd_perf_metric_collector
.add_query(query
, limit
);
2793 int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id
)
2795 return osd_perf_metric_collector
.remove_query(query_id
);
2798 int DaemonServer::get_osd_perf_counters(
2799 OSDPerfMetricQueryID query_id
,
2800 std::map
<OSDPerfMetricKey
, PerformanceCounters
> *counters
)
2802 return osd_perf_metric_collector
.get_counters(query_id
, counters
);