]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/DaemonServer.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / mgr / DaemonServer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#include "DaemonServer.h"
f67539c2 15#include <boost/algorithm/string.hpp>
b32b8144 16#include "mgr/Mgr.h"
7c673cae 17
b32b8144 18#include "include/stringify.h"
31f18b77 19#include "include/str_list.h"
7c673cae 20#include "auth/RotatingKeyRing.h"
7c673cae
FG
21#include "json_spirit/json_spirit_writer.h"
22
c07f9fc5 23#include "mgr/mgr_commands.h"
11fdf7f2
TL
24#include "mgr/DaemonHealthMetricCollector.h"
25#include "mgr/OSDPerfMetricCollector.h"
f67539c2 26#include "mgr/MDSPerfMetricCollector.h"
c07f9fc5
FG
27#include "mon/MonCommand.h"
28
7c673cae 29#include "messages/MMgrOpen.h"
11fdf7f2 30#include "messages/MMgrClose.h"
7c673cae 31#include "messages/MMgrConfigure.h"
31f18b77 32#include "messages/MMonMgrReport.h"
7c673cae
FG
33#include "messages/MCommand.h"
34#include "messages/MCommandReply.h"
9f95a23c
TL
35#include "messages/MMgrCommand.h"
36#include "messages/MMgrCommandReply.h"
7c673cae
FG
37#include "messages/MPGStats.h"
38#include "messages/MOSDScrub.h"
11fdf7f2 39#include "messages/MOSDScrub2.h"
c07f9fc5 40#include "messages/MOSDForceRecovery.h"
224ce89b 41#include "common/errno.h"
11fdf7f2 42#include "common/pick_address.h"
7c673cae
FG
43
44#define dout_context g_ceph_context
45#define dout_subsys ceph_subsys_mgr
46#undef dout_prefix
47#define dout_prefix *_dout << "mgr.server " << __func__ << " "
9f95a23c
TL
48using namespace TOPNSPC::common;
49namespace {
50 template <typename Map>
51 bool map_compare(Map const &lhs, Map const &rhs) {
52 return lhs.size() == rhs.size()
53 && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
54 [] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
55 }
56}
c07f9fc5 57
7c673cae
FG
58DaemonServer::DaemonServer(MonClient *monc_,
59 Finisher &finisher_,
60 DaemonStateIndex &daemon_state_,
61 ClusterState &cluster_state_,
3efd9988 62 PyModuleRegistry &py_modules_,
7c673cae
FG
63 LogChannelRef clog_,
64 LogChannelRef audit_clog_)
65 : Dispatcher(g_ceph_context),
66 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
11fdf7f2 67 g_conf().get_val<Option::size_t>("mgr_client_bytes"))),
7c673cae 68 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
11fdf7f2 69 g_conf().get_val<uint64_t>("mgr_client_messages"))),
7c673cae 70 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
11fdf7f2 71 g_conf().get_val<Option::size_t>("mgr_osd_bytes"))),
7c673cae 72 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
11fdf7f2 73 g_conf().get_val<uint64_t>("mgr_osd_messages"))),
7c673cae 74 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
11fdf7f2 75 g_conf().get_val<Option::size_t>("mgr_mds_bytes"))),
7c673cae 76 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
11fdf7f2 77 g_conf().get_val<uint64_t>("mgr_mds_messages"))),
7c673cae 78 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
11fdf7f2 79 g_conf().get_val<Option::size_t>("mgr_mon_bytes"))),
7c673cae 80 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
11fdf7f2 81 g_conf().get_val<uint64_t>("mgr_mon_messages"))),
7c673cae
FG
82 msgr(nullptr),
83 monc(monc_),
84 finisher(finisher_),
85 daemon_state(daemon_state_),
86 cluster_state(cluster_state_),
87 py_modules(py_modules_),
88 clog(clog_),
89 audit_clog(audit_clog_),
11fdf7f2
TL
90 pgmap_ready(false),
91 timer(g_ceph_context, lock),
92 shutting_down(false),
93 tick_event(nullptr),
94 osd_perf_metric_collector_listener(this),
f67539c2
TL
95 osd_perf_metric_collector(osd_perf_metric_collector_listener),
96 mds_perf_metric_collector_listener(this),
97 mds_perf_metric_collector(mds_perf_metric_collector_listener)
3efd9988 98{
11fdf7f2 99 g_conf().add_observer(this);
3efd9988 100}
7c673cae
FG
101
102DaemonServer::~DaemonServer() {
103 delete msgr;
11fdf7f2 104 g_conf().remove_observer(this);
7c673cae
FG
105}
106
11fdf7f2 107int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
7c673cae
FG
108{
109 // Initialize Messenger
11fdf7f2
TL
110 std::string public_msgr_type = g_conf()->ms_public_type.empty() ?
111 g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
7c673cae
FG
112 msgr = Messenger::create(g_ceph_context, public_msgr_type,
113 entity_name_t::MGR(gid),
114 "mgr",
f67539c2 115 Messenger::get_pid_nonce());
7c673cae
FG
116 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
117
11fdf7f2
TL
118 msgr->set_auth_client(monc);
119
7c673cae
FG
120 // throttle clients
121 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
122 client_byte_throttler.get(),
123 client_msg_throttler.get());
124
125 // servers
126 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
127 osd_byte_throttler.get(),
128 osd_msg_throttler.get());
129 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
130 mds_byte_throttler.get(),
131 mds_msg_throttler.get());
132 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
133 mon_byte_throttler.get(),
134 mon_msg_throttler.get());
135
11fdf7f2
TL
136 entity_addrvec_t addrs;
137 int r = pick_addresses(cct, CEPH_PICK_ADDRESS_PUBLIC, &addrs);
138 if (r < 0) {
139 return r;
140 }
141 dout(20) << __func__ << " will bind to " << addrs << dendl;
142 r = msgr->bindv(addrs);
7c673cae 143 if (r < 0) {
11fdf7f2 144 derr << "unable to bind mgr to " << addrs << dendl;
7c673cae
FG
145 return r;
146 }
147
148 msgr->set_myname(entity_name_t::MGR(gid));
11fdf7f2 149 msgr->set_addr_unknowns(client_addrs);
7c673cae
FG
150
151 msgr->start();
152 msgr->add_dispatcher_tail(this);
153
11fdf7f2
TL
154 msgr->set_auth_server(monc);
155 monc->set_handle_authentication_dispatcher(this);
156
224ce89b
WB
157 started_at = ceph_clock_now();
158
11fdf7f2
TL
159 std::lock_guard l(lock);
160 timer.init();
161
162 schedule_tick_locked(
163 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
164
7c673cae
FG
165 return 0;
166}
167
11fdf7f2 168entity_addrvec_t DaemonServer::get_myaddrs() const
7c673cae 169{
11fdf7f2 170 return msgr->get_myaddrs();
7c673cae
FG
171}
172
11fdf7f2
TL
173int DaemonServer::ms_handle_authentication(Connection *con)
174{
9f95a23c 175 auto s = ceph::make_ref<MgrSession>(cct);
11fdf7f2 176 con->set_priv(s);
7c673cae 177 s->inst.addr = con->get_peer_addr();
11fdf7f2
TL
178 s->entity_name = con->peer_name;
179 dout(10) << __func__ << " new session " << s << " con " << con
180 << " entity " << con->peer_name
181 << " addr " << con->get_peer_addrs()
182 << dendl;
183
184 AuthCapsInfo &caps_info = con->get_peer_caps_info();
185 if (caps_info.allow_all) {
186 dout(10) << " session " << s << " " << s->entity_name
187 << " allow_all" << dendl;
188 s->caps.set_allow_all();
9f95a23c 189 } else if (caps_info.caps.length() > 0) {
11fdf7f2
TL
190 auto p = caps_info.caps.cbegin();
191 string str;
192 try {
193 decode(str, p);
194 }
195 catch (buffer::error& e) {
7c673cae 196 dout(10) << " session " << s << " " << s->entity_name
9f95a23c
TL
197 << " failed to decode caps" << dendl;
198 return -EACCES;
199 }
200 if (!s->caps.parse(str)) {
11fdf7f2
TL
201 dout(10) << " session " << s << " " << s->entity_name
202 << " failed to parse caps '" << str << "'" << dendl;
9f95a23c 203 return -EACCES;
7c673cae 204 }
9f95a23c
TL
205 dout(10) << " session " << s << " " << s->entity_name
206 << " has caps " << s->caps << " '" << str << "'" << dendl;
11fdf7f2 207 }
31f18b77 208
11fdf7f2
TL
209 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
210 std::lock_guard l(lock);
211 s->osd_id = atoi(s->entity_name.get_id().c_str());
212 dout(10) << "registering osd." << s->osd_id << " session "
213 << s << " con " << con << dendl;
214 osd_cons[s->osd_id].insert(con);
7c673cae
FG
215 }
216
9f95a23c 217 return 1;
7c673cae
FG
218}
219
31f18b77
FG
220bool DaemonServer::ms_handle_reset(Connection *con)
221{
222 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
11fdf7f2
TL
223 auto priv = con->get_priv();
224 auto session = static_cast<MgrSession*>(priv.get());
31f18b77
FG
225 if (!session) {
226 return false;
227 }
11fdf7f2 228 std::lock_guard l(lock);
224ce89b 229 dout(10) << "unregistering osd." << session->osd_id
31f18b77
FG
230 << " session " << session << " con " << con << dendl;
231 osd_cons[session->osd_id].erase(con);
3efd9988
FG
232
233 auto iter = daemon_connections.find(con);
234 if (iter != daemon_connections.end()) {
235 daemon_connections.erase(iter);
236 }
31f18b77
FG
237 }
238 return false;
239}
240
7c673cae
FG
241bool DaemonServer::ms_handle_refused(Connection *con)
242{
243 // do nothing for now
244 return false;
245}
246
9f95a23c 247bool DaemonServer::ms_dispatch2(const ref_t<Message>& m)
7c673cae 248{
3efd9988
FG
249 // Note that we do *not* take ::lock here, in order to avoid
250 // serializing all message handling. It's up to each handler
251 // to take whatever locks it needs.
7c673cae
FG
252 switch (m->get_type()) {
253 case MSG_PGSTATS:
9f95a23c 254 cluster_state.ingest_pgstats(ref_cast<MPGStats>(m));
224ce89b 255 maybe_ready(m->get_source().num());
7c673cae
FG
256 return true;
257 case MSG_MGR_REPORT:
9f95a23c 258 return handle_report(ref_cast<MMgrReport>(m));
7c673cae 259 case MSG_MGR_OPEN:
9f95a23c 260 return handle_open(ref_cast<MMgrOpen>(m));
11fdf7f2 261 case MSG_MGR_CLOSE:
9f95a23c 262 return handle_close(ref_cast<MMgrClose>(m));
7c673cae 263 case MSG_COMMAND:
9f95a23c
TL
264 return handle_command(ref_cast<MCommand>(m));
265 case MSG_MGR_COMMAND:
266 return handle_command(ref_cast<MMgrCommand>(m));
7c673cae
FG
267 default:
268 dout(1) << "Unhandled message type " << m->get_type() << dendl;
269 return false;
270 };
271}
272
9f95a23c
TL
273void DaemonServer::dump_pg_ready(ceph::Formatter *f)
274{
275 f->dump_bool("pg_ready", pgmap_ready.load());
276}
277
224ce89b
WB
278void DaemonServer::maybe_ready(int32_t osd_id)
279{
3efd9988
FG
280 if (pgmap_ready.load()) {
281 // Fast path: we don't need to take lock because pgmap_ready
282 // is already set
283 } else {
11fdf7f2 284 std::lock_guard l(lock);
224ce89b 285
3efd9988
FG
286 if (reported_osds.find(osd_id) == reported_osds.end()) {
287 dout(4) << "initial report from osd " << osd_id << dendl;
288 reported_osds.insert(osd_id);
289 std::set<int32_t> up_osds;
224ce89b 290
3efd9988
FG
291 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
292 osdmap.get_up_osds(up_osds);
293 });
224ce89b 294
3efd9988
FG
295 std::set<int32_t> unreported_osds;
296 std::set_difference(up_osds.begin(), up_osds.end(),
297 reported_osds.begin(), reported_osds.end(),
298 std::inserter(unreported_osds, unreported_osds.begin()));
299
300 if (unreported_osds.size() == 0) {
301 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
302 pgmap_ready = true;
303 reported_osds.clear();
304 // Avoid waiting for next tick
305 send_report();
306 } else {
307 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
308 " to report in before PGMap is ready" << dendl;
309 }
224ce89b
WB
310 }
311 }
312}
313
11fdf7f2
TL
314void DaemonServer::tick()
315{
316 dout(10) << dendl;
317 send_report();
318 adjust_pgs();
319
320 schedule_tick_locked(
321 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
322}
323
324// Currently modules do not set health checks in response to events delivered to
325// all modules (e.g. notify) so we do not risk a thundering hurd situation here.
326// if this pattern emerges in the future, this scheduler could be modified to
327// fire after all modules have had a chance to set their health checks.
328void DaemonServer::schedule_tick_locked(double delay_sec)
329{
9f95a23c 330 ceph_assert(ceph_mutex_is_locked_by_me(lock));
11fdf7f2
TL
331
332 if (tick_event) {
333 timer.cancel_event(tick_event);
334 tick_event = nullptr;
335 }
336
337 // on shutdown start rejecting explicit requests to send reports that may
338 // originate from python land which may still be running.
339 if (shutting_down)
340 return;
341
342 tick_event = timer.add_event_after(delay_sec,
9f95a23c 343 new LambdaContext([this](int r) {
11fdf7f2
TL
344 tick();
345 }));
346}
347
348void DaemonServer::schedule_tick(double delay_sec)
349{
350 std::lock_guard l(lock);
351 schedule_tick_locked(delay_sec);
352}
353
354void DaemonServer::handle_osd_perf_metric_query_updated()
355{
356 dout(10) << dendl;
357
358 // Send a fresh MMgrConfigure to all clients, so that they can follow
359 // the new policy for transmitting stats
9f95a23c 360 finisher.queue(new LambdaContext([this](int r) {
11fdf7f2
TL
361 std::lock_guard l(lock);
362 for (auto &c : daemon_connections) {
363 if (c->peer_is_osd()) {
364 _send_configure(c);
365 }
366 }
367 }));
368}
369
f67539c2
TL
370void DaemonServer::handle_mds_perf_metric_query_updated()
371{
372 dout(10) << dendl;
373
374 // Send a fresh MMgrConfigure to all clients, so that they can follow
375 // the new policy for transmitting stats
376 finisher.queue(new LambdaContext([this](int r) {
377 std::lock_guard l(lock);
378 for (auto &c : daemon_connections) {
379 if (c->peer_is_mds()) {
380 _send_configure(c);
381 }
382 }
383 }));
384}
385
7c673cae
FG
386void DaemonServer::shutdown()
387{
224ce89b 388 dout(10) << "begin" << dendl;
7c673cae
FG
389 msgr->shutdown();
390 msgr->wait();
eafe8130 391 cluster_state.shutdown();
224ce89b 392 dout(10) << "done" << dendl;
11fdf7f2
TL
393
394 std::lock_guard l(lock);
395 shutting_down = true;
396 timer.shutdown();
7c673cae
FG
397}
398
11fdf7f2
TL
399static DaemonKey key_from_service(
400 const std::string& service_name,
401 int peer_type,
402 const std::string& daemon_name)
403{
404 if (!service_name.empty()) {
9f95a23c 405 return DaemonKey{service_name, daemon_name};
11fdf7f2 406 } else {
9f95a23c 407 return DaemonKey{ceph_entity_type_name(peer_type), daemon_name};
11fdf7f2
TL
408 }
409}
7c673cae 410
1911f103
TL
411void DaemonServer::fetch_missing_metadata(const DaemonKey& key,
412 const entity_addr_t& addr)
413{
414 if (!daemon_state.is_updating(key) &&
415 (key.type == "osd" || key.type == "mds" || key.type == "mon")) {
416 std::ostringstream oss;
417 auto c = new MetadataUpdate(daemon_state, key);
418 if (key.type == "osd") {
419 oss << "{\"prefix\": \"osd metadata\", \"id\": "
420 << key.name<< "}";
421 } else if (key.type == "mds") {
422 c->set_default("addr", stringify(addr));
423 oss << "{\"prefix\": \"mds metadata\", \"who\": \""
424 << key.name << "\"}";
425 } else if (key.type == "mon") {
426 oss << "{\"prefix\": \"mon metadata\", \"id\": \""
427 << key.name << "\"}";
f67539c2
TL
428 } else {
429 ceph_abort();
1911f103
TL
430 }
431 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
432 }
433}
434
9f95a23c 435bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
7c673cae 436{
1911f103 437 std::unique_lock l(lock);
3efd9988 438
11fdf7f2
TL
439 DaemonKey key = key_from_service(m->service_name,
440 m->get_connection()->get_peer_type(),
441 m->daemon_name);
7c673cae 442
92f5a8d4 443 auto con = m->get_connection();
9f95a23c 444 dout(10) << "from " << key << " " << con->get_peer_addr() << dendl;
7c673cae 445
92f5a8d4 446 _send_configure(con);
7c673cae 447
c07f9fc5 448 DaemonStatePtr daemon;
7c673cae 449 if (daemon_state.exists(key)) {
92f5a8d4 450 dout(20) << "updating existing DaemonState for " << key << dendl;
c07f9fc5
FG
451 daemon = daemon_state.get(key);
452 }
92f5a8d4
TL
453 if (!daemon) {
454 if (m->service_daemon) {
455 dout(4) << "constructing new DaemonState for " << key << dendl;
456 daemon = std::make_shared<DaemonState>(daemon_state.types);
457 daemon->key = key;
458 daemon->service_daemon = true;
459 daemon_state.insert(daemon);
460 } else {
461 /* A normal Ceph daemon has connected but we are or should be waiting on
462 * metadata for it. Close the session so that it tries to reconnect.
463 */
464 dout(2) << "ignoring open from " << key << " " << con->get_peer_addr()
465 << "; not ready for session (expect reconnect)" << dendl;
466 con->mark_down();
1911f103
TL
467 l.unlock();
468 fetch_missing_metadata(key, m->get_source_addr());
92f5a8d4
TL
469 return true;
470 }
11fdf7f2 471 }
c07f9fc5 472 if (daemon) {
92f5a8d4
TL
473 if (m->service_daemon) {
474 // update the metadata through the daemon state index to
475 // ensure it's kept up-to-date
476 daemon_state.update_metadata(daemon, m->daemon_metadata);
477 }
478
11fdf7f2 479 std::lock_guard l(daemon->lock);
3efd9988 480 daemon->perf_counters.clear();
7c673cae 481
9f95a23c 482 daemon->service_daemon = m->service_daemon;
11fdf7f2 483 if (m->service_daemon) {
11fdf7f2
TL
484 daemon->service_status = m->daemon_status;
485
486 utime_t now = ceph_clock_now();
9f95a23c
TL
487 auto [d, added] = pending_service_map.get_daemon(m->service_name,
488 m->daemon_name);
489 if (added || d->gid != (uint64_t)m->get_source().num()) {
11fdf7f2
TL
490 dout(10) << "registering " << key << " in pending_service_map" << dendl;
491 d->gid = m->get_source().num();
492 d->addr = m->get_source_addr();
493 d->start_epoch = pending_service_map.epoch;
494 d->start_stamp = now;
495 d->metadata = m->daemon_metadata;
496 pending_service_map_dirty = pending_service_map.epoch;
224ce89b 497 }
224ce89b 498 }
11fdf7f2
TL
499
500 auto p = m->config_bl.cbegin();
501 if (p != m->config_bl.end()) {
502 decode(daemon->config, p);
503 decode(daemon->ignored_mon_config, p);
504 dout(20) << " got config " << daemon->config
505 << " ignored " << daemon->ignored_mon_config << dendl;
224ce89b 506 }
11fdf7f2
TL
507 daemon->config_defaults_bl = m->config_defaults_bl;
508 daemon->config_defaults.clear();
509 dout(20) << " got config_defaults_bl " << daemon->config_defaults_bl.length()
510 << " bytes" << dendl;
224ce89b
WB
511 }
512
92f5a8d4 513 if (con->get_peer_type() != entity_name_t::TYPE_CLIENT &&
3efd9988
FG
514 m->service_name.empty())
515 {
516 // Store in set of the daemon/service connections, i.e. those
517 // connections that require an update in the event of stats
518 // configuration changes.
92f5a8d4 519 daemon_connections.insert(con);
3efd9988
FG
520 }
521
7c673cae
FG
522 return true;
523}
524
9f95a23c 525bool DaemonServer::handle_close(const ref_t<MMgrClose>& m)
11fdf7f2
TL
526{
527 std::lock_guard l(lock);
528
529 DaemonKey key = key_from_service(m->service_name,
530 m->get_connection()->get_peer_type(),
531 m->daemon_name);
532 dout(4) << "from " << m->get_connection() << " " << key << dendl;
533
534 if (daemon_state.exists(key)) {
535 DaemonStatePtr daemon = daemon_state.get(key);
536 daemon_state.rm(key);
537 {
538 std::lock_guard l(daemon->lock);
539 if (daemon->service_daemon) {
540 pending_service_map.rm_daemon(m->service_name, m->daemon_name);
541 pending_service_map_dirty = pending_service_map.epoch;
542 }
543 }
544 }
545
546 // send same message back as a reply
9f95a23c 547 m->get_connection()->send_message2(m);
11fdf7f2
TL
548 return true;
549}
550
f91f0fd5
TL
551void DaemonServer::update_task_status(
552 DaemonKey key,
553 const std::map<std::string,std::string>& task_status)
554{
9f95a23c
TL
555 dout(10) << "got task status from " << key << dendl;
556
f91f0fd5
TL
557 [[maybe_unused]] auto [daemon, added] =
558 pending_service_map.get_daemon(key.type, key.name);
559 if (daemon->task_status != task_status) {
560 daemon->task_status = task_status;
9f95a23c
TL
561 pending_service_map_dirty = pending_service_map.epoch;
562 }
563}
564
565bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
7c673cae 566{
224ce89b
WB
567 DaemonKey key;
568 if (!m->service_name.empty()) {
9f95a23c 569 key.type = m->service_name;
224ce89b 570 } else {
9f95a23c 571 key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
224ce89b 572 }
9f95a23c 573 key.name = m->daemon_name;
7c673cae 574
9f95a23c 575 dout(10) << "from " << m->get_connection() << " " << key << dendl;
31f18b77 576
224ce89b
WB
577 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
578 m->service_name.empty()) {
579 // Clients should not be sending us stats unless they are declaring
580 // themselves to be a daemon for some service.
9f95a23c
TL
581 dout(10) << "rejecting report from non-daemon client " << m->daemon_name
582 << dendl;
583 clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
584 << " at " << m->get_connection()->get_peer_addrs();
b32b8144 585 m->get_connection()->mark_down();
31f18b77
FG
586 return true;
587 }
7c673cae 588
9f95a23c
TL
589
590 {
591 std::unique_lock locker(lock);
592
593 DaemonStatePtr daemon;
594 // Look up the DaemonState
595 if (daemon_state.exists(key)) {
596 dout(20) << "updating existing DaemonState for " << key << dendl;
597 daemon = daemon_state.get(key);
598 } else {
599 locker.unlock();
600
601 // we don't know the hostname at this stage, reject MMgrReport here.
602 dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
603 << dendl;
604 // issue metadata request in background
1911f103 605 fetch_missing_metadata(key, m->get_source_addr());
b32b8144 606
9f95a23c
TL
607 locker.lock();
608
b32b8144 609 // kill session
11fdf7f2
TL
610 auto priv = m->get_connection()->get_priv();
611 auto session = static_cast<MgrSession*>(priv.get());
b32b8144 612 if (!session) {
9f95a23c 613 return false;
b32b8144
FG
614 }
615 m->get_connection()->mark_down();
b32b8144
FG
616
617 dout(10) << "unregistering osd." << session->osd_id
9f95a23c 618 << " session " << session << " con " << m->get_connection() << dendl;
b32b8144
FG
619
620 if (osd_cons.find(session->osd_id) != osd_cons.end()) {
9f95a23c
TL
621 osd_cons[session->osd_id].erase(m->get_connection());
622 }
b32b8144
FG
623
624 auto iter = daemon_connections.find(m->get_connection());
625 if (iter != daemon_connections.end()) {
9f95a23c 626 daemon_connections.erase(iter);
b32b8144 627 }
3efd9988 628
9f95a23c 629 return false;
11fdf7f2
TL
630 }
631
9f95a23c
TL
632 // Update the DaemonState
633 ceph_assert(daemon != nullptr);
634 {
635 std::lock_guard l(daemon->lock);
636 auto &daemon_counters = daemon->perf_counters;
637 daemon_counters.update(*m.get());
638
639 auto p = m->config_bl.cbegin();
640 if (p != m->config_bl.end()) {
641 decode(daemon->config, p);
642 decode(daemon->ignored_mon_config, p);
643 dout(20) << " got config " << daemon->config
644 << " ignored " << daemon->ignored_mon_config << dendl;
645 }
646
3efd9988 647 utime_t now = ceph_clock_now();
9f95a23c
TL
648 if (daemon->service_daemon) {
649 if (m->daemon_status) {
650 daemon->service_status_stamp = now;
651 daemon->service_status = *m->daemon_status;
652 }
653 daemon->last_service_beacon = now;
654 } else if (m->daemon_status) {
655 derr << "got status from non-daemon " << key << dendl;
656 }
657 // update task status
658 if (m->task_status) {
f91f0fd5 659 update_task_status(key, *m->task_status);
9f95a23c
TL
660 daemon->last_service_beacon = now;
661 }
662 if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
663 // only OSD and MON send health_checks to me now
664 daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
665 dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
666 << dendl;
3efd9988 667 }
b32b8144 668 }
c07f9fc5 669 }
3efd9988 670
c07f9fc5
FG
671 // if there are any schema updates, notify the python modules
672 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
9f95a23c 673 py_modules.notify_all("perf_schema_update", ceph::to_string(key));
c07f9fc5 674 }
224ce89b 675
11fdf7f2
TL
676 if (m->get_connection()->peer_is_osd()) {
677 osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
678 }
679
9f95a23c
TL
680 if (m->metric_report_message) {
681 const MetricReportMessage &message = *m->metric_report_message;
682 boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
683 }
684
7c673cae
FG
685 return true;
686}
687
7c673cae
FG
688
689void DaemonServer::_generate_command_map(
11fdf7f2 690 cmdmap_t& cmdmap,
7c673cae
FG
691 map<string,string> &param_str_map)
692{
11fdf7f2 693 for (auto p = cmdmap.begin();
7c673cae
FG
694 p != cmdmap.end(); ++p) {
695 if (p->first == "prefix")
696 continue;
697 if (p->first == "caps") {
698 vector<string> cv;
9f95a23c 699 if (cmd_getval(cmdmap, "caps", cv) &&
7c673cae
FG
700 cv.size() % 2 == 0) {
701 for (unsigned i = 0; i < cv.size(); i += 2) {
702 string k = string("caps_") + cv[i];
703 param_str_map[k] = cv[i + 1];
704 }
705 continue;
706 }
707 }
708 param_str_map[p->first] = cmd_vartype_stringify(p->second);
709 }
710}
711
c07f9fc5 712const MonCommand *DaemonServer::_get_mgrcommand(
7c673cae 713 const string &cmd_prefix,
c07f9fc5 714 const std::vector<MonCommand> &cmds)
7c673cae 715{
c07f9fc5
FG
716 const MonCommand *this_cmd = nullptr;
717 for (const auto &cmd : cmds) {
718 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
719 this_cmd = &cmd;
7c673cae
FG
720 break;
721 }
722 }
723 return this_cmd;
724}
725
726bool DaemonServer::_allowed_command(
727 MgrSession *s,
92f5a8d4 728 const string &service,
7c673cae
FG
729 const string &module,
730 const string &prefix,
11fdf7f2 731 const cmdmap_t& cmdmap,
7c673cae 732 const map<string,string>& param_str_map,
c07f9fc5 733 const MonCommand *this_cmd) {
7c673cae
FG
734
735 if (s->entity_name.is_mon()) {
736 // mon is all-powerful. even when it is forwarding commands on behalf of
737 // old clients; we expect the mon is validating commands before proxying!
738 return true;
739 }
740
741 bool cmd_r = this_cmd->requires_perm('r');
742 bool cmd_w = this_cmd->requires_perm('w');
743 bool cmd_x = this_cmd->requires_perm('x');
744
745 bool capable = s->caps.is_capable(
746 g_ceph_context,
7c673cae 747 s->entity_name,
92f5a8d4 748 service, module, prefix, param_str_map,
11fdf7f2
TL
749 cmd_r, cmd_w, cmd_x,
750 s->get_peer_addr());
7c673cae
FG
751
752 dout(10) << " " << s->entity_name << " "
753 << (capable ? "" : "not ") << "capable" << dendl;
754 return capable;
755}
756
11fdf7f2
TL
757/**
758 * The working data for processing an MCommand. This lives in
759 * a class to enable passing it into other threads for processing
760 * outside of the thread/locks that called handle_command.
761 */
762class CommandContext {
763public:
9f95a23c
TL
764 ceph::ref_t<MCommand> m_tell;
765 ceph::ref_t<MMgrCommand> m_mgr;
766 const std::vector<std::string>& cmd; ///< ref into m_tell or m_mgr
767 const bufferlist& data; ///< ref into m_tell or m_mgr
11fdf7f2
TL
768 bufferlist odata;
769 cmdmap_t cmdmap;
770
9f95a23c
TL
771 explicit CommandContext(ceph::ref_t<MCommand> m)
772 : m_tell{std::move(m)},
773 cmd(m_tell->cmd),
774 data(m_tell->get_data()) {
11fdf7f2 775 }
9f95a23c
TL
776 explicit CommandContext(ceph::ref_t<MMgrCommand> m)
777 : m_mgr{std::move(m)},
778 cmd(m_mgr->cmd),
779 data(m_mgr->get_data()) {
11fdf7f2 780 }
7c673cae 781
11fdf7f2
TL
782 void reply(int r, const std::stringstream &ss) {
783 reply(r, ss.str());
784 }
7c673cae 785
11fdf7f2
TL
786 void reply(int r, const std::string &rs) {
787 // Let the connection drop as soon as we've sent our response
9f95a23c
TL
788 ConnectionRef con = m_tell ? m_tell->get_connection()
789 : m_mgr->get_connection();
11fdf7f2
TL
790 if (con) {
791 con->mark_disposable();
7c673cae
FG
792 }
793
11fdf7f2 794 if (r == 0) {
9f95a23c 795 dout(20) << "success" << dendl;
11fdf7f2
TL
796 } else {
797 derr << __func__ << " " << cpp_strerror(r) << " " << rs << dendl;
7c673cae 798 }
11fdf7f2 799 if (con) {
9f95a23c
TL
800 if (m_tell) {
801 MCommandReply *reply = new MCommandReply(r, rs);
802 reply->set_tid(m_tell->get_tid());
803 reply->set_data(odata);
804 con->send_message(reply);
805 } else {
806 MMgrCommandReply *reply = new MMgrCommandReply(r, rs);
807 reply->set_tid(m_mgr->get_tid());
808 reply->set_data(odata);
809 con->send_message(reply);
810 }
7c673cae 811 }
11fdf7f2
TL
812 }
813};
7c673cae 814
11fdf7f2
TL
815/**
816 * A context for receiving a bufferlist/error string from a background
817 * function and then calling back to a CommandContext when it's done
818 */
819class ReplyOnFinish : public Context {
820 std::shared_ptr<CommandContext> cmdctx;
7c673cae 821
11fdf7f2
TL
822public:
823 bufferlist from_mon;
824 string outs;
7c673cae 825
11fdf7f2
TL
826 explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_)
827 : cmdctx(cmdctx_)
7c673cae 828 {}
11fdf7f2
TL
829 void finish(int r) override {
830 cmdctx->odata.claim_append(from_mon);
831 cmdctx->reply(r, outs);
832 }
833};
7c673cae 834
9f95a23c 835bool DaemonServer::handle_command(const ref_t<MCommand>& m)
11fdf7f2
TL
836{
837 std::lock_guard l(lock);
1911f103
TL
838 auto cmdctx = std::make_shared<CommandContext>(m);
839 try {
840 return _handle_command(cmdctx);
841 } catch (const bad_cmd_get& e) {
842 cmdctx->reply(-EINVAL, e.what());
9f95a23c 843 return true;
9f95a23c
TL
844 }
845}
846
847bool DaemonServer::handle_command(const ref_t<MMgrCommand>& m)
848{
849 std::lock_guard l(lock);
850 auto cmdctx = std::make_shared<CommandContext>(m);
11fdf7f2 851 try {
9f95a23c 852 return _handle_command(cmdctx);
11fdf7f2
TL
853 } catch (const bad_cmd_get& e) {
854 cmdctx->reply(-EINVAL, e.what());
855 return true;
856 }
857}
7c673cae 858
92f5a8d4
TL
859void DaemonServer::log_access_denied(
860 std::shared_ptr<CommandContext>& cmdctx,
861 MgrSession* session, std::stringstream& ss) {
862 dout(1) << " access denied" << dendl;
863 audit_clog->info() << "from='" << session->inst << "' "
864 << "entity='" << session->entity_name << "' "
9f95a23c 865 << "cmd=" << cmdctx->cmd << ": access denied";
92f5a8d4 866 ss << "access denied: does your client key have mgr caps? "
f67539c2 867 "See http://docs.ceph.com/en/latest/mgr/administrator/"
92f5a8d4
TL
868 "#client-authentication";
869}
870
f67539c2
TL
871void DaemonServer::_check_offlines_pgs(
872 const set<int>& osds,
873 const OSDMap& osdmap,
874 const PGMap& pgmap,
875 offline_pg_report *report)
876{
877 // reset output
878 *report = offline_pg_report();
879 report->osds = osds;
880
881 for (const auto& q : pgmap.pg_stat) {
882 set<int32_t> pg_acting; // net acting sets (with no missing if degraded)
883 bool found = false;
884 if (q.second.state == 0) {
885 report->unknown.insert(q.first);
886 continue;
887 }
888 if (q.second.state & PG_STATE_DEGRADED) {
889 for (auto& anm : q.second.avail_no_missing) {
890 if (osds.count(anm.osd)) {
891 found = true;
892 continue;
893 }
894 if (anm.osd != CRUSH_ITEM_NONE) {
895 pg_acting.insert(anm.osd);
896 }
897 }
898 } else {
899 for (auto& a : q.second.acting) {
900 if (osds.count(a)) {
901 found = true;
902 continue;
903 }
904 if (a != CRUSH_ITEM_NONE) {
905 pg_acting.insert(a);
906 }
907 }
908 }
909 if (!found) {
910 continue;
911 }
912 const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
913 bool dangerous = false;
914 if (!pi) {
915 report->bad_no_pool.insert(q.first); // pool is creating or deleting
916 dangerous = true;
917 }
918 if (!(q.second.state & PG_STATE_ACTIVE)) {
919 report->bad_already_inactive.insert(q.first);
920 dangerous = true;
921 }
922 if (pg_acting.size() < pi->min_size) {
923 report->bad_become_inactive.insert(q.first);
924 dangerous = true;
925 }
926 if (dangerous) {
927 report->not_ok.insert(q.first);
928 } else {
929 report->ok.insert(q.first);
930 if (q.second.state & PG_STATE_DEGRADED) {
931 report->ok_become_more_degraded.insert(q.first);
932 } else {
933 report->ok_become_degraded.insert(q.first);
934 }
935 }
936 }
937 dout(20) << osds << " -> " << report->ok.size() << " ok, "
938 << report->not_ok.size() << " not ok, "
939 << report->unknown.size() << " unknown"
940 << dendl;
941}
942
943void DaemonServer::_maximize_ok_to_stop_set(
944 const set<int>& orig_osds,
945 unsigned max,
946 const OSDMap& osdmap,
947 const PGMap& pgmap,
948 offline_pg_report *out_report)
949{
950 dout(20) << "orig_osds " << orig_osds << " max " << max << dendl;
951 _check_offlines_pgs(orig_osds, osdmap, pgmap, out_report);
952 if (!out_report->ok_to_stop()) {
953 return;
954 }
955 if (orig_osds.size() >= max) {
956 // already at max
957 return;
958 }
959
960 // semi-arbitrarily start with the first osd in the set
961 offline_pg_report report;
962 set<int> osds = orig_osds;
963 int parent = *osds.begin();
964 set<int> children;
965
966 while (true) {
967 // identify the next parent
968 int r = osdmap.crush->get_immediate_parent_id(parent, &parent);
969 if (r < 0) {
970 return; // just go with what we have so far!
971 }
972
973 // get candidate additions that are beneath this point in the tree
974 children.clear();
975 r = osdmap.crush->get_all_children(parent, &children);
976 if (r < 0) {
977 return; // just go with what we have so far!
978 }
979 dout(20) << " parent " << parent << " children " << children << dendl;
980
981 // try adding in more osds
982 int failed = 0; // how many children we failed to add to our set
983 for (auto o : children) {
984 if (o >= 0 && osdmap.is_up(o) && osds.count(o) == 0) {
985 osds.insert(o);
986 _check_offlines_pgs(osds, osdmap, pgmap, &report);
987 if (!report.ok_to_stop()) {
988 osds.erase(o);
989 ++failed;
990 continue;
991 }
992 *out_report = report;
993 if (osds.size() == max) {
994 dout(20) << " hit max" << dendl;
995 return; // yay, we hit the max
996 }
997 }
998 }
999
1000 if (failed) {
1001 // we hit some failures; go with what we have
1002 dout(20) << " hit some peer failures" << dendl;
1003 return;
1004 }
1005 }
1006}
1007
11fdf7f2 1008bool DaemonServer::_handle_command(
11fdf7f2
TL
1009 std::shared_ptr<CommandContext>& cmdctx)
1010{
9f95a23c 1011 MessageRef m;
1911f103 1012 bool admin_socket_cmd = false;
9f95a23c
TL
1013 if (cmdctx->m_tell) {
1014 m = cmdctx->m_tell;
1911f103
TL
1015 // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
1016 // command.
1017 admin_socket_cmd = (cmdctx->m_tell->fsid != uuid_d());
9f95a23c
TL
1018 } else {
1019 m = cmdctx->m_mgr;
1020 }
11fdf7f2
TL
1021 auto priv = m->get_connection()->get_priv();
1022 auto session = static_cast<MgrSession*>(priv.get());
7c673cae
FG
1023 if (!session) {
1024 return true;
1025 }
9f95a23c 1026 if (session->inst.name == entity_name_t()) {
7c673cae 1027 session->inst.name = m->get_source();
9f95a23c 1028 }
7c673cae 1029
7c673cae 1030 map<string,string> param_str_map;
11fdf7f2
TL
1031 std::stringstream ss;
1032 int r = 0;
7c673cae 1033
9f95a23c 1034 if (!cmdmap_from_json(cmdctx->cmd, &(cmdctx->cmdmap), ss)) {
7c673cae
FG
1035 cmdctx->reply(-EINVAL, ss);
1036 return true;
1037 }
1038
11fdf7f2 1039 string prefix;
9f95a23c 1040 cmd_getval(cmdctx->cmdmap, "prefix", prefix);
f67539c2 1041 dout(10) << "decoded-size=" << cmdctx->cmdmap.size() << " prefix=" << prefix << dendl;
7c673cae 1042
f67539c2
TL
1043 boost::scoped_ptr<Formatter> f;
1044 {
1045 std::string format;
1046 if (boost::algorithm::ends_with(prefix, "_json")) {
1047 format = "json";
1048 } else {
1049 cmd_getval(cmdctx->cmdmap, "format", format, string("plain"));
1050 }
1051 f.reset(Formatter::create(format));
1052 }
7c673cae 1053
1911f103
TL
1054 // this is just for mgr commands - admin socket commands will fall
1055 // through and use the admin socket version of
1056 // get_command_descriptions
1057 if (prefix == "get_command_descriptions" && !admin_socket_cmd) {
7c673cae 1058 dout(10) << "reading commands from python modules" << dendl;
c07f9fc5 1059 const auto py_commands = py_modules.get_commands();
7c673cae 1060
c07f9fc5 1061 int cmdnum = 0;
7c673cae
FG
1062 JSONFormatter f;
1063 f.open_object_section("command_descriptions");
c07f9fc5 1064
11fdf7f2 1065 auto dump_cmd = [&cmdnum, &f, m](const MonCommand &mc){
7c673cae
FG
1066 ostringstream secname;
1067 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
11fdf7f2
TL
1068 dump_cmddesc_to_json(&f, m->get_connection()->get_features(),
1069 secname.str(), mc.cmdstring, mc.helpstring,
1070 mc.module, mc.req_perms, 0);
7c673cae 1071 cmdnum++;
c07f9fc5
FG
1072 };
1073
1074 for (const auto &pyc : py_commands) {
1075 dump_cmd(pyc);
7c673cae 1076 }
7c673cae 1077
c07f9fc5
FG
1078 for (const auto &mgr_cmd : mgr_commands) {
1079 dump_cmd(mgr_cmd);
7c673cae 1080 }
c07f9fc5 1081
7c673cae
FG
1082 f.close_section(); // command_descriptions
1083 f.flush(cmdctx->odata);
1084 cmdctx->reply(0, ss);
1085 return true;
1086 }
1087
1088 // lookup command
c07f9fc5 1089 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
7c673cae 1090 _generate_command_map(cmdctx->cmdmap, param_str_map);
11fdf7f2 1091
92f5a8d4
TL
1092 bool is_allowed = false;
1093 ModuleCommand py_command;
1911f103
TL
1094 if (admin_socket_cmd) {
1095 // admin socket commands require all capabilities
1096 is_allowed = session->caps.is_allow_all();
1097 } else if (!mgr_cmd) {
92f5a8d4
TL
1098 // Resolve the command to the name of the module that will
1099 // handle it (if the command exists)
1100 auto py_commands = py_modules.get_py_commands();
1101 for (const auto &pyc : py_commands) {
1102 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1103 if (pyc_prefix == prefix) {
1104 py_command = pyc;
1105 break;
1106 }
1107 }
1108
1109 MonCommand pyc = {"", "", "py", py_command.perm};
1110 is_allowed = _allowed_command(session, "py", py_command.module_name,
1111 prefix, cmdctx->cmdmap, param_str_map,
1112 &pyc);
7c673cae
FG
1113 } else {
1114 // validate user's permissions for requested command
92f5a8d4 1115 is_allowed = _allowed_command(session, mgr_cmd->module, "",
11fdf7f2
TL
1116 prefix, cmdctx->cmdmap, param_str_map, mgr_cmd);
1117 }
92f5a8d4 1118
11fdf7f2 1119 if (!is_allowed) {
92f5a8d4 1120 log_access_denied(cmdctx, session, ss);
7c673cae
FG
1121 cmdctx->reply(-EACCES, ss);
1122 return true;
7c673cae
FG
1123 }
1124
1125 audit_clog->debug()
1126 << "from='" << session->inst << "' "
1127 << "entity='" << session->entity_name << "' "
9f95a23c 1128 << "cmd=" << cmdctx->cmd << ": dispatch";
7c673cae 1129
1911f103
TL
1130 if (admin_socket_cmd) {
1131 cct->get_admin_socket()->queue_tell_command(cmdctx->m_tell);
1132 return true;
1133 }
1134
224ce89b
WB
1135 // ----------------
1136 // service map commands
1137 if (prefix == "service dump") {
1138 if (!f)
1139 f.reset(Formatter::create("json-pretty"));
1140 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
1141 f->dump_object("service_map", service_map);
1142 });
1143 f->flush(cmdctx->odata);
1144 cmdctx->reply(0, ss);
1145 return true;
1146 }
1147 if (prefix == "service status") {
1148 if (!f)
1149 f.reset(Formatter::create("json-pretty"));
1150 // only include state from services that are in the persisted service map
1151 f->open_object_section("service_status");
9f95a23c
TL
1152 for (auto& [type, service] : pending_service_map.services) {
1153 if (ServiceMap::is_normal_ceph_entity(type)) {
1154 continue;
1155 }
1156
1157 f->open_object_section(type.c_str());
1158 for (auto& q : service.daemons) {
224ce89b 1159 f->open_object_section(q.first.c_str());
9f95a23c 1160 DaemonKey key{type, q.first};
11fdf7f2 1161 ceph_assert(daemon_state.exists(key));
224ce89b 1162 auto daemon = daemon_state.get(key);
11fdf7f2 1163 std::lock_guard l(daemon->lock);
224ce89b
WB
1164 f->dump_stream("status_stamp") << daemon->service_status_stamp;
1165 f->dump_stream("last_beacon") << daemon->last_service_beacon;
1166 f->open_object_section("status");
1167 for (auto& r : daemon->service_status) {
1168 f->dump_string(r.first.c_str(), r.second);
1169 }
1170 f->close_section();
1171 f->close_section();
1172 }
1173 f->close_section();
1174 }
1175 f->close_section();
1176 f->flush(cmdctx->odata);
1177 cmdctx->reply(0, ss);
1178 return true;
1179 }
1180
3efd9988
FG
1181 if (prefix == "config set") {
1182 std::string key;
1183 std::string val;
9f95a23c
TL
1184 cmd_getval(cmdctx->cmdmap, "key", key);
1185 cmd_getval(cmdctx->cmdmap, "value", val);
11fdf7f2 1186 r = cct->_conf.set_val(key, val, &ss);
3efd9988 1187 if (r == 0) {
11fdf7f2 1188 cct->_conf.apply_changes(nullptr);
3efd9988
FG
1189 }
1190 cmdctx->reply(0, ss);
1191 return true;
1192 }
1193
7c673cae
FG
1194 // -----------
1195 // PG commands
1196
1197 if (prefix == "pg scrub" ||
1198 prefix == "pg repair" ||
1199 prefix == "pg deep-scrub") {
1200 string scrubop = prefix.substr(3, string::npos);
1201 pg_t pgid;
11fdf7f2 1202 spg_t spgid;
7c673cae 1203 string pgidstr;
9f95a23c 1204 cmd_getval(cmdctx->cmdmap, "pgid", pgidstr);
7c673cae
FG
1205 if (!pgid.parse(pgidstr.c_str())) {
1206 ss << "invalid pgid '" << pgidstr << "'";
1207 cmdctx->reply(-EINVAL, ss);
1208 return true;
1209 }
1210 bool pg_exists = false;
1211 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1212 pg_exists = osdmap.pg_exists(pgid);
1213 });
1214 if (!pg_exists) {
11fdf7f2 1215 ss << "pg " << pgid << " does not exist";
7c673cae
FG
1216 cmdctx->reply(-ENOENT, ss);
1217 return true;
1218 }
1219 int acting_primary = -1;
11fdf7f2 1220 epoch_t epoch;
7c673cae 1221 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
11fdf7f2
TL
1222 epoch = osdmap.get_epoch();
1223 osdmap.get_primary_shard(pgid, &acting_primary, &spgid);
7c673cae
FG
1224 });
1225 if (acting_primary == -1) {
1226 ss << "pg " << pgid << " has no primary osd";
1227 cmdctx->reply(-EAGAIN, ss);
1228 return true;
1229 }
31f18b77
FG
1230 auto p = osd_cons.find(acting_primary);
1231 if (p == osd_cons.end()) {
1232 ss << "pg " << pgid << " primary osd." << acting_primary
1233 << " is not currently connected";
1234 cmdctx->reply(-EAGAIN, ss);
f64942e4 1235 return true;
31f18b77 1236 }
31f18b77 1237 for (auto& con : p->second) {
11fdf7f2
TL
1238 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
1239 vector<spg_t> pgs = { spgid };
1240 con->send_message(new MOSDScrub2(monc->get_fsid(),
1241 epoch,
1242 pgs,
1243 scrubop == "repair",
1244 scrubop == "deep-scrub"));
1245 } else {
1246 vector<pg_t> pgs = { pgid };
1247 con->send_message(new MOSDScrub(monc->get_fsid(),
1248 pgs,
1249 scrubop == "repair",
1250 scrubop == "deep-scrub"));
1251 }
31f18b77 1252 }
11fdf7f2 1253 ss << "instructing pg " << spgid << " on osd." << acting_primary
31f18b77
FG
1254 << " to " << scrubop;
1255 cmdctx->reply(0, ss);
1256 return true;
1257 } else if (prefix == "osd scrub" ||
1258 prefix == "osd deep-scrub" ||
1259 prefix == "osd repair") {
1260 string whostr;
9f95a23c 1261 cmd_getval(cmdctx->cmdmap, "who", whostr);
31f18b77
FG
1262 vector<string> pvec;
1263 get_str_vec(prefix, pvec);
1264
1265 set<int> osds;
224ce89b 1266 if (whostr == "*" || whostr == "all" || whostr == "any") {
31f18b77
FG
1267 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1268 for (int i = 0; i < osdmap.get_max_osd(); i++)
1269 if (osdmap.is_up(i)) {
1270 osds.insert(i);
1271 }
1272 });
1273 } else {
1274 long osd = parse_osd_id(whostr.c_str(), &ss);
1275 if (osd < 0) {
1276 ss << "invalid osd '" << whostr << "'";
1277 cmdctx->reply(-EINVAL, ss);
1278 return true;
1279 }
1280 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1281 if (osdmap.is_up(osd)) {
1282 osds.insert(osd);
1283 }
1284 });
1285 if (osds.empty()) {
1286 ss << "osd." << osd << " is not up";
1287 cmdctx->reply(-EAGAIN, ss);
1288 return true;
1289 }
1290 }
1291 set<int> sent_osds, failed_osds;
1292 for (auto osd : osds) {
11fdf7f2
TL
1293 vector<spg_t> spgs;
1294 epoch_t epoch;
1295 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1296 epoch = osdmap.get_epoch();
1297 auto p = pgmap.pg_by_osd.find(osd);
1298 if (p != pgmap.pg_by_osd.end()) {
1299 for (auto pgid : p->second) {
1300 int primary;
1301 spg_t spg;
1302 osdmap.get_primary_shard(pgid, &primary, &spg);
1303 if (primary == osd) {
1304 spgs.push_back(spg);
1305 }
1306 }
1307 }
1308 });
31f18b77
FG
1309 auto p = osd_cons.find(osd);
1310 if (p == osd_cons.end()) {
1311 failed_osds.insert(osd);
1312 } else {
1313 sent_osds.insert(osd);
1314 for (auto& con : p->second) {
11fdf7f2
TL
1315 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
1316 con->send_message(new MOSDScrub2(monc->get_fsid(),
1317 epoch,
1318 spgs,
1319 pvec.back() == "repair",
1320 pvec.back() == "deep-scrub"));
1321 } else {
1322 con->send_message(new MOSDScrub(monc->get_fsid(),
1323 pvec.back() == "repair",
1324 pvec.back() == "deep-scrub"));
1325 }
31f18b77
FG
1326 }
1327 }
1328 }
1329 if (failed_osds.size() == osds.size()) {
1330 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
1331 << " (not connected)";
1332 r = -EAGAIN;
1333 } else {
1334 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
1335 if (!failed_osds.empty()) {
1336 ss << "; osd(s) " << failed_osds << " were not connected";
1337 }
1338 r = 0;
1339 }
7c673cae
FG
1340 cmdctx->reply(0, ss);
1341 return true;
11fdf7f2
TL
1342 } else if (prefix == "osd pool scrub" ||
1343 prefix == "osd pool deep-scrub" ||
1344 prefix == "osd pool repair") {
1345 vector<string> pool_names;
9f95a23c 1346 cmd_getval(cmdctx->cmdmap, "who", pool_names);
11fdf7f2
TL
1347 if (pool_names.empty()) {
1348 ss << "must specify one or more pool names";
1349 cmdctx->reply(-EINVAL, ss);
1350 return true;
1351 }
1352 epoch_t epoch;
1353 map<int32_t, vector<pg_t>> pgs_by_primary; // legacy
1354 map<int32_t, vector<spg_t>> spgs_by_primary;
1355 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1356 epoch = osdmap.get_epoch();
1357 for (auto& pool_name : pool_names) {
1358 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1359 if (pool_id < 0) {
1360 ss << "unrecognized pool '" << pool_name << "'";
1361 r = -ENOENT;
1362 return;
1363 }
1364 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1365 for (int i = 0; i < pool_pg_num; i++) {
1366 pg_t pg(i, pool_id);
1367 int primary;
1368 spg_t spg;
1369 auto got = osdmap.get_primary_shard(pg, &primary, &spg);
1370 if (!got)
1371 continue;
1372 pgs_by_primary[primary].push_back(pg);
1373 spgs_by_primary[primary].push_back(spg);
1374 }
1375 }
1376 });
1377 if (r < 0) {
1378 cmdctx->reply(r, ss);
1379 return true;
1380 }
1381 for (auto& it : spgs_by_primary) {
1382 auto primary = it.first;
1383 auto p = osd_cons.find(primary);
1384 if (p == osd_cons.end()) {
1385 ss << "osd." << primary << " is not currently connected";
1386 cmdctx->reply(-EAGAIN, ss);
1387 return true;
1388 }
1389 for (auto& con : p->second) {
1390 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
1391 con->send_message(new MOSDScrub2(monc->get_fsid(),
1392 epoch,
1393 it.second,
1394 prefix == "osd pool repair",
1395 prefix == "osd pool deep-scrub"));
1396 } else {
1397 // legacy
1398 auto q = pgs_by_primary.find(primary);
1399 ceph_assert(q != pgs_by_primary.end());
1400 con->send_message(new MOSDScrub(monc->get_fsid(),
1401 q->second,
1402 prefix == "osd pool repair",
1403 prefix == "osd pool deep-scrub"));
1404 }
1405 }
1406 }
1407 cmdctx->reply(0, "");
1408 return true;
7c673cae
FG
1409 } else if (prefix == "osd reweight-by-pg" ||
1410 prefix == "osd reweight-by-utilization" ||
1411 prefix == "osd test-reweight-by-pg" ||
1412 prefix == "osd test-reweight-by-utilization") {
1413 bool by_pg =
1414 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
1415 bool dry_run =
1416 prefix == "osd test-reweight-by-pg" ||
1417 prefix == "osd test-reweight-by-utilization";
1418 int64_t oload;
9f95a23c 1419 cmd_getval(cmdctx->cmdmap, "oload", oload, int64_t(120));
7c673cae
FG
1420 set<int64_t> pools;
1421 vector<string> poolnames;
9f95a23c 1422 cmd_getval(cmdctx->cmdmap, "pools", poolnames);
7c673cae
FG
1423 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1424 for (const auto& poolname : poolnames) {
1425 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
1426 if (pool < 0) {
1427 ss << "pool '" << poolname << "' does not exist";
1428 r = -ENOENT;
1429 }
1430 pools.insert(pool);
1431 }
1432 });
1433 if (r) {
1434 cmdctx->reply(r, ss);
1435 return true;
1436 }
11fdf7f2
TL
1437
1438 double max_change = g_conf().get_val<double>("mon_reweight_max_change");
9f95a23c 1439 cmd_getval(cmdctx->cmdmap, "max_change", max_change);
7c673cae
FG
1440 if (max_change <= 0.0) {
1441 ss << "max_change " << max_change << " must be positive";
1442 cmdctx->reply(-EINVAL, ss);
1443 return true;
1444 }
11fdf7f2 1445 int64_t max_osds = g_conf().get_val<int64_t>("mon_reweight_max_osds");
9f95a23c 1446 cmd_getval(cmdctx->cmdmap, "max_osds", max_osds);
7c673cae
FG
1447 if (max_osds <= 0) {
1448 ss << "max_osds " << max_osds << " must be positive";
1449 cmdctx->reply(-EINVAL, ss);
1450 return true;
1451 }
11fdf7f2 1452 bool no_increasing = false;
9f95a23c 1453 cmd_getval(cmdctx->cmdmap, "no_increasing", no_increasing);
7c673cae
FG
1454 string out_str;
1455 mempool::osdmap::map<int32_t, uint32_t> new_weights;
11fdf7f2
TL
1456 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap &osdmap, const PGMap& pgmap) {
1457 return reweight::by_utilization(osdmap, pgmap,
1458 oload,
1459 max_change,
1460 max_osds,
1461 by_pg,
1462 pools.empty() ? NULL : &pools,
1463 no_increasing,
1464 &new_weights,
1465 &ss, &out_str, f.get());
7c673cae
FG
1466 });
1467 if (r >= 0) {
1468 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
1469 }
1470 if (f) {
1471 f->flush(cmdctx->odata);
1472 } else {
1473 cmdctx->odata.append(out_str);
1474 }
1475 if (r < 0) {
1476 ss << "FAILED reweight-by-pg";
1477 cmdctx->reply(r, ss);
1478 return true;
1479 } else if (r == 0 || dry_run) {
1480 ss << "no change";
1481 cmdctx->reply(r, ss);
1482 return true;
1483 } else {
1484 json_spirit::Object json_object;
1485 for (const auto& osd_weight : new_weights) {
1486 json_spirit::Config::add(json_object,
1487 std::to_string(osd_weight.first),
1488 std::to_string(osd_weight.second));
1489 }
1490 string s = json_spirit::write(json_object);
1491 std::replace(begin(s), end(s), '\"', '\'');
1492 const string cmd =
1493 "{"
1494 "\"prefix\": \"osd reweightn\", "
1495 "\"weights\": \"" + s + "\""
1496 "}";
1497 auto on_finish = new ReplyOnFinish(cmdctx);
1498 monc->start_mon_command({cmd}, {},
1499 &on_finish->from_mon, &on_finish->outs, on_finish);
1500 return true;
1501 }
31f18b77 1502 } else if (prefix == "osd df") {
9f95a23c
TL
1503 string method, filter;
1504 cmd_getval(cmdctx->cmdmap, "output_method", method);
1505 cmd_getval(cmdctx->cmdmap, "filter", filter);
11fdf7f2
TL
1506 stringstream rs;
1507 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
11fdf7f2 1508 // sanity check filter(s)
9f95a23c
TL
1509 if (!filter.empty() &&
1510 osdmap.lookup_pg_pool_name(filter) < 0 &&
1511 !osdmap.crush->class_exists(filter) &&
1512 !osdmap.crush->name_exists(filter)) {
1513 rs << "'" << filter << "' not a pool, crush node or device class name";
1514 return -EINVAL;
11fdf7f2
TL
1515 }
1516 print_osd_utilization(osdmap, pgmap, ss,
9f95a23c 1517 f.get(), method == "tree", filter);
11fdf7f2
TL
1518 cmdctx->odata.append(ss);
1519 return 0;
31f18b77 1520 });
11fdf7f2 1521 cmdctx->reply(r, rs);
31f18b77 1522 return true;
11fdf7f2
TL
1523 } else if (prefix == "osd pool stats") {
1524 string pool_name;
9f95a23c 1525 cmd_getval(cmdctx->cmdmap, "pool_name", pool_name);
11fdf7f2
TL
1526 int64_t poolid = -ENOENT;
1527 bool one_pool = false;
1528 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1529 if (!pool_name.empty()) {
1530 poolid = osdmap.lookup_pg_pool_name(pool_name);
1531 if (poolid < 0) {
1532 ceph_assert(poolid == -ENOENT);
1533 ss << "unrecognized pool '" << pool_name << "'";
1534 return -ENOENT;
1535 }
1536 one_pool = true;
1537 }
1538 stringstream rs;
1539 if (f)
1540 f->open_array_section("pool_stats");
1541 else {
1542 if (osdmap.get_pools().empty()) {
1543 ss << "there are no pools!";
1544 goto stats_out;
1545 }
1546 }
1547 for (auto &p : osdmap.get_pools()) {
1548 if (!one_pool) {
1549 poolid = p.first;
1550 }
1551 pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, f.get(), &rs);
1552 if (one_pool) {
1553 break;
1554 }
1555 }
1556 stats_out:
1557 if (f) {
1558 f->close_section();
1559 f->flush(cmdctx->odata);
1560 } else {
1561 cmdctx->odata.append(rs.str());
1562 }
1563 return 0;
35e4c445 1564 });
11fdf7f2
TL
1565 if (r != -EOPNOTSUPP) {
1566 cmdctx->reply(r, ss);
1567 return true;
1568 }
1569 } else if (prefix == "osd safe-to-destroy" ||
1570 prefix == "osd destroy" ||
1571 prefix == "osd purge") {
1572 set<int> osds;
1573 int r = 0;
1574 if (prefix == "osd safe-to-destroy") {
1575 vector<string> ids;
9f95a23c 1576 cmd_getval(cmdctx->cmdmap, "ids", ids);
11fdf7f2
TL
1577 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1578 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1579 });
1580 if (!r && osds.empty()) {
1581 ss << "must specify one or more OSDs";
1582 r = -EINVAL;
1583 }
1584 } else {
1585 int64_t id;
9f95a23c 1586 if (!cmd_getval(cmdctx->cmdmap, "id", id)) {
11fdf7f2
TL
1587 r = -EINVAL;
1588 ss << "must specify OSD id";
1589 } else {
1590 osds.insert(id);
1591 }
35e4c445
FG
1592 }
1593 if (r < 0) {
1594 cmdctx->reply(r, ss);
1595 return true;
1596 }
11fdf7f2 1597 set<int> active_osds, missing_stats, stored_pgs, safe_to_destroy;
35e4c445 1598 int affected_pgs = 0;
11fdf7f2 1599 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
35e4c445
FG
1600 if (pg_map.num_pg_unknown > 0) {
1601 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
1602 << " any conclusions";
1603 r = -EAGAIN;
1604 return;
1605 }
1606 int num_active_clean = 0;
1607 for (auto& p : pg_map.num_pg_by_state) {
1608 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
1609 if ((p.first & want) == want) {
1610 num_active_clean += p.second;
1611 }
1612 }
11fdf7f2
TL
1613 for (auto osd : osds) {
1614 if (!osdmap.exists(osd)) {
1615 safe_to_destroy.insert(osd);
1616 continue; // clearly safe to destroy
1617 }
1618 auto q = pg_map.num_pg_by_osd.find(osd);
1619 if (q != pg_map.num_pg_by_osd.end()) {
81eedcae 1620 if (q->second.acting > 0 || q->second.up_not_acting > 0) {
11fdf7f2 1621 active_osds.insert(osd);
81eedcae
TL
1622 // XXX: For overlapping PGs, this counts them again
1623 affected_pgs += q->second.acting + q->second.up_not_acting;
11fdf7f2 1624 continue;
35e4c445 1625 }
11fdf7f2
TL
1626 }
1627 if (num_active_clean < pg_map.num_pg) {
1628 // all pgs aren't active+clean; we need to be careful.
1629 auto p = pg_map.osd_stat.find(osd);
81eedcae 1630 if (p == pg_map.osd_stat.end() || !osdmap.is_up(osd)) {
11fdf7f2
TL
1631 missing_stats.insert(osd);
1632 continue;
1633 } else if (p->second.num_pgs > 0) {
1634 stored_pgs.insert(osd);
1635 continue;
1636 }
1637 }
1638 safe_to_destroy.insert(osd);
1639 }
35e4c445 1640 });
11fdf7f2
TL
1641 if (r && prefix == "osd safe-to-destroy") {
1642 cmdctx->reply(r, ss); // regardless of formatter
1643 return true;
1644 }
1645 if (!r && (!active_osds.empty() ||
1646 !missing_stats.empty() || !stored_pgs.empty())) {
1647 if (!safe_to_destroy.empty()) {
1648 ss << "OSD(s) " << safe_to_destroy
1649 << " are safe to destroy without reducing data durability. ";
1650 }
1651 if (!active_osds.empty()) {
1652 ss << "OSD(s) " << active_osds << " have " << affected_pgs
1653 << " pgs currently mapped to them. ";
1654 }
1655 if (!missing_stats.empty()) {
1656 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1657 << " PGs are active+clean; we cannot draw any conclusions. ";
1658 }
1659 if (!stored_pgs.empty()) {
1660 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1661 << " data, and not all PGs are active+clean; we cannot be sure they"
1662 << " aren't still needed.";
1663 }
1664 if (!active_osds.empty() || !stored_pgs.empty()) {
1665 r = -EBUSY;
1666 } else {
1667 r = -EAGAIN;
1668 }
1669 }
1670
1671 if (prefix == "osd safe-to-destroy") {
1672 if (!r) {
1673 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1674 << " durability.";
11fdf7f2
TL
1675 }
1676 if (f) {
1677 f->open_object_section("osd_status");
1678 f->open_array_section("safe_to_destroy");
1679 for (auto i : safe_to_destroy)
1680 f->dump_int("osd", i);
1681 f->close_section();
1682 f->open_array_section("active");
1683 for (auto i : active_osds)
1684 f->dump_int("osd", i);
1685 f->close_section();
1686 f->open_array_section("missing_stats");
1687 for (auto i : missing_stats)
1688 f->dump_int("osd", i);
1689 f->close_section();
1690 f->open_array_section("stored_pgs");
1691 for (auto i : stored_pgs)
1692 f->dump_int("osd", i);
1693 f->close_section();
1694 f->close_section(); // osd_status
1695 f->flush(cmdctx->odata);
1696 r = 0;
1697 std::stringstream().swap(ss);
1698 }
1699 cmdctx->reply(r, ss);
1700 return true;
1701 }
1702
1703 if (r) {
1704 bool force = false;
9f95a23c 1705 cmd_getval(cmdctx->cmdmap, "force", force);
11fdf7f2
TL
1706 if (!force) {
1707 // Backward compat
9f95a23c 1708 cmd_getval(cmdctx->cmdmap, "yes_i_really_mean_it", force);
11fdf7f2
TL
1709 }
1710 if (!force) {
1711 ss << "\nYou can proceed by passing --force, but be warned that"
1712 " this will likely mean real, permanent data loss.";
1713 } else {
1714 r = 0;
1715 }
35e4c445
FG
1716 }
1717 if (r) {
1718 cmdctx->reply(r, ss);
1719 return true;
1720 }
11fdf7f2
TL
1721 const string cmd =
1722 "{"
1723 "\"prefix\": \"" + prefix + "-actual\", "
1724 "\"id\": " + stringify(osds) + ", "
1725 "\"yes_i_really_mean_it\": true"
1726 "}";
1727 auto on_finish = new ReplyOnFinish(cmdctx);
1728 monc->start_mon_command({cmd}, {}, nullptr, &on_finish->outs, on_finish);
35e4c445
FG
1729 return true;
1730 } else if (prefix == "osd ok-to-stop") {
1731 vector<string> ids;
9f95a23c 1732 cmd_getval(cmdctx->cmdmap, "ids", ids);
35e4c445 1733 set<int> osds;
f67539c2
TL
1734 int64_t max = 1;
1735 cmd_getval(cmdctx->cmdmap, "max", max);
35e4c445
FG
1736 int r;
1737 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1738 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1739 });
1740 if (!r && osds.empty()) {
1741 ss << "must specify one or more OSDs";
1742 r = -EINVAL;
1743 }
f67539c2
TL
1744 if (max < (int)osds.size()) {
1745 max = osds.size();
1746 }
35e4c445
FG
1747 if (r < 0) {
1748 cmdctx->reply(r, ss);
1749 return true;
1750 }
f67539c2 1751 offline_pg_report out_report;
11fdf7f2 1752 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
f67539c2
TL
1753 _maximize_ok_to_stop_set(
1754 osds, max, osdmap, pg_map,
1755 &out_report);
35e4c445 1756 });
f67539c2
TL
1757 if (!f) {
1758 f.reset(Formatter::create("json"));
35e4c445 1759 }
f67539c2
TL
1760 f->dump_object("ok_to_stop", out_report);
1761 f->flush(cmdctx->odata);
1762 cmdctx->odata.append("\n");
1763 if (!out_report.unknown.empty()) {
1764 ss << out_report.unknown.size() << " pgs have unknown state; "
1765 << "cannot draw any conclusions";
1766 cmdctx->reply(-EAGAIN, ss);
1767 }
1768 if (!out_report.ok_to_stop()) {
1769 ss << "unsafe to stop osd(s) at this time (" << out_report.not_ok.size() << " PGs are or would become offline)";
35e4c445 1770 cmdctx->reply(-EBUSY, ss);
f67539c2
TL
1771 } else {
1772 cmdctx->reply(0, ss);
35e4c445 1773 }
35e4c445 1774 return true;
c07f9fc5 1775 } else if (prefix == "pg force-recovery" ||
11fdf7f2
TL
1776 prefix == "pg force-backfill" ||
1777 prefix == "pg cancel-force-recovery" ||
1778 prefix == "pg cancel-force-backfill" ||
1779 prefix == "osd pool force-recovery" ||
1780 prefix == "osd pool force-backfill" ||
1781 prefix == "osd pool cancel-force-recovery" ||
1782 prefix == "osd pool cancel-force-backfill") {
1783 vector<string> vs;
1784 get_str_vec(prefix, vs);
1785 auto& granularity = vs.front();
1786 auto& forceop = vs.back();
1787 vector<pg_t> pgs;
c07f9fc5
FG
1788
1789 // figure out actual op just once
1790 int actual_op = 0;
1791 if (forceop == "force-recovery") {
1792 actual_op = OFR_RECOVERY;
1793 } else if (forceop == "force-backfill") {
1794 actual_op = OFR_BACKFILL;
1795 } else if (forceop == "cancel-force-backfill") {
1796 actual_op = OFR_BACKFILL | OFR_CANCEL;
1797 } else if (forceop == "cancel-force-recovery") {
1798 actual_op = OFR_RECOVERY | OFR_CANCEL;
1799 }
1800
11fdf7f2
TL
1801 set<pg_t> candidates; // deduped
1802 if (granularity == "pg") {
1803 // covnert pg names to pgs, discard any invalid ones while at it
1804 vector<string> pgids;
9f95a23c 1805 cmd_getval(cmdctx->cmdmap, "pgid", pgids);
11fdf7f2
TL
1806 for (auto& i : pgids) {
1807 pg_t pgid;
1808 if (!pgid.parse(i.c_str())) {
1809 ss << "invlaid pgid '" << i << "'; ";
1810 r = -EINVAL;
1811 continue;
1812 }
1813 candidates.insert(pgid);
c07f9fc5 1814 }
11fdf7f2
TL
1815 } else {
1816 // per pool
1817 vector<string> pool_names;
9f95a23c 1818 cmd_getval(cmdctx->cmdmap, "who", pool_names);
11fdf7f2
TL
1819 if (pool_names.empty()) {
1820 ss << "must specify one or more pool names";
1821 cmdctx->reply(-EINVAL, ss);
1822 return true;
1823 }
1824 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1825 for (auto& pool_name : pool_names) {
1826 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1827 if (pool_id < 0) {
1828 ss << "unrecognized pool '" << pool_name << "'";
1829 r = -ENOENT;
1830 return;
1831 }
1832 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1833 for (int i = 0; i < pool_pg_num; i++)
1834 candidates.insert({(unsigned int)i, (uint64_t)pool_id});
1835 }
c07f9fc5 1836 });
11fdf7f2
TL
1837 if (r < 0) {
1838 cmdctx->reply(r, ss);
1839 return true;
1840 }
c07f9fc5
FG
1841 }
1842
11fdf7f2
TL
1843 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1844 for (auto& i : candidates) {
1845 auto it = pg_map.pg_stat.find(i);
1846 if (it == pg_map.pg_stat.end()) {
1847 ss << "pg " << i << " does not exist; ";
1848 r = -ENOENT;
1849 continue;
1850 }
1851 auto state = it->second.state;
1852 // discard pgs for which user requests are pointless
1853 switch (actual_op) {
1854 case OFR_RECOVERY:
1855 if ((state & (PG_STATE_DEGRADED |
1856 PG_STATE_RECOVERY_WAIT |
1857 PG_STATE_RECOVERING)) == 0) {
1858 // don't return error, user script may be racing with cluster.
1859 // not fatal.
1860 ss << "pg " << i << " doesn't require recovery; ";
1861 continue;
1862 } else if (state & PG_STATE_FORCED_RECOVERY) {
1863 ss << "pg " << i << " recovery already forced; ";
1864 // return error, as it may be a bug in user script
1865 r = -EINVAL;
1866 continue;
1867 }
1868 break;
1869 case OFR_BACKFILL:
1870 if ((state & (PG_STATE_DEGRADED |
1871 PG_STATE_BACKFILL_WAIT |
1872 PG_STATE_BACKFILLING)) == 0) {
1873 ss << "pg " << i << " doesn't require backfilling; ";
1874 continue;
1875 } else if (state & PG_STATE_FORCED_BACKFILL) {
1876 ss << "pg " << i << " backfill already forced; ";
1877 r = -EINVAL;
1878 continue;
1879 }
1880 break;
1881 case OFR_BACKFILL | OFR_CANCEL:
1882 if ((state & PG_STATE_FORCED_BACKFILL) == 0) {
1883 ss << "pg " << i << " backfill not forced; ";
1884 continue;
1885 }
1886 break;
1887 case OFR_RECOVERY | OFR_CANCEL:
1888 if ((state & PG_STATE_FORCED_RECOVERY) == 0) {
1889 ss << "pg " << i << " recovery not forced; ";
1890 continue;
1891 }
1892 break;
1893 default:
1894 ceph_abort_msg("actual_op value is not supported");
1895 }
1896 pgs.push_back(i);
1897 } // for
1898 });
1899
c07f9fc5
FG
1900 // respond with error only when no pgs are correct
1901 // yes, in case of mixed errors, only the last one will be emitted,
1902 // but the message presented will be fine
11fdf7f2 1903 if (pgs.size() != 0) {
c07f9fc5
FG
1904 // clear error to not confuse users/scripts
1905 r = 0;
1906 }
1907
11fdf7f2
TL
1908 // optimize the command -> messages conversion, use only one
1909 // message per distinct OSD
c07f9fc5 1910 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
11fdf7f2
TL
1911 // group pgs to process by osd
1912 map<int, vector<spg_t>> osdpgs;
1913 for (auto& pgid : pgs) {
1914 int primary;
1915 spg_t spg;
1916 if (osdmap.get_primary_shard(pgid, &primary, &spg)) {
1917 osdpgs[primary].push_back(spg);
1918 }
1919 }
1920 for (auto& i : osdpgs) {
1921 if (osdmap.is_up(i.first)) {
1922 auto p = osd_cons.find(i.first);
1923 if (p == osd_cons.end()) {
1924 ss << "osd." << i.first << " is not currently connected";
1925 r = -EAGAIN;
1926 continue;
1927 }
1928 for (auto& con : p->second) {
1929 con->send_message(
1930 new MOSDForceRecovery(monc->get_fsid(), i.second, actual_op));
1931 }
1932 ss << "instructing pg(s) " << i.second << " on osd." << i.first
1933 << " to " << forceop << "; ";
1934 }
1935 }
1936 });
1937 ss << std::endl;
1938 cmdctx->reply(r, ss);
1939 return true;
1940 } else if (prefix == "config show" ||
1941 prefix == "config show-with-defaults") {
1942 string who;
9f95a23c
TL
1943 cmd_getval(cmdctx->cmdmap, "who", who);
1944 auto [key, valid] = DaemonKey::parse(who);
1945 if (!valid) {
1946 ss << "invalid daemon name: use <type>.<id>";
1947 cmdctx->reply(-EINVAL, ss);
1948 return true;
1949 }
11fdf7f2 1950 DaemonStatePtr daemon = daemon_state.get(key);
11fdf7f2
TL
1951 if (!daemon) {
1952 ss << "no config state for daemon " << who;
1953 cmdctx->reply(-ENOENT, ss);
1954 return true;
1955 }
1956
1957 std::lock_guard l(daemon->lock);
1958
9f95a23c
TL
1959 int r = 0;
1960 string name;
1961 if (cmd_getval(cmdctx->cmdmap, "key", name)) {
f6b5b4d7
TL
1962 // handle special options
1963 if (name == "fsid") {
1964 cmdctx->odata.append(stringify(monc->get_fsid()) + "\n");
1965 cmdctx->reply(r, ss);
1966 return true;
1967 }
11fdf7f2
TL
1968 auto p = daemon->config.find(name);
1969 if (p != daemon->config.end() &&
1970 !p->second.empty()) {
1971 cmdctx->odata.append(p->second.rbegin()->second + "\n");
1972 } else {
1973 auto& defaults = daemon->_get_config_defaults();
1974 auto q = defaults.find(name);
1975 if (q != defaults.end()) {
1976 cmdctx->odata.append(q->second + "\n");
1977 } else {
1978 r = -ENOENT;
1979 }
1980 }
1981 } else if (daemon->config_defaults_bl.length() > 0) {
1982 TextTable tbl;
1983 if (f) {
1984 f->open_array_section("config");
1985 } else {
1986 tbl.define_column("NAME", TextTable::LEFT, TextTable::LEFT);
1987 tbl.define_column("VALUE", TextTable::LEFT, TextTable::LEFT);
1988 tbl.define_column("SOURCE", TextTable::LEFT, TextTable::LEFT);
1989 tbl.define_column("OVERRIDES", TextTable::LEFT, TextTable::LEFT);
1990 tbl.define_column("IGNORES", TextTable::LEFT, TextTable::LEFT);
1991 }
1992 if (prefix == "config show") {
1993 // show
1994 for (auto& i : daemon->config) {
1995 dout(20) << " " << i.first << " -> " << i.second << dendl;
1996 if (i.second.empty()) {
c07f9fc5
FG
1997 continue;
1998 }
11fdf7f2
TL
1999 if (f) {
2000 f->open_object_section("value");
2001 f->dump_string("name", i.first);
2002 f->dump_string("value", i.second.rbegin()->second);
2003 f->dump_string("source", ceph_conf_level_name(
2004 i.second.rbegin()->first));
2005 if (i.second.size() > 1) {
2006 f->open_array_section("overrides");
2007 auto j = i.second.rend();
2008 for (--j; j != i.second.rbegin(); --j) {
2009 f->open_object_section("value");
2010 f->dump_string("source", ceph_conf_level_name(j->first));
2011 f->dump_string("value", j->second);
2012 f->close_section();
2013 }
2014 f->close_section();
2015 }
2016 if (daemon->ignored_mon_config.count(i.first)) {
2017 f->dump_string("ignores", "mon");
2018 }
2019 f->close_section();
2020 } else {
2021 tbl << i.first;
2022 tbl << i.second.rbegin()->second;
2023 tbl << ceph_conf_level_name(i.second.rbegin()->first);
2024 if (i.second.size() > 1) {
2025 list<string> ov;
2026 auto j = i.second.rend();
2027 for (--j; j != i.second.rbegin(); --j) {
2028 if (j->second == i.second.rbegin()->second) {
2029 ov.push_front(string("(") + ceph_conf_level_name(j->first) +
2030 string("[") + j->second + string("]") +
2031 string(")"));
2032 } else {
2033 ov.push_front(ceph_conf_level_name(j->first) +
2034 string("[") + j->second + string("]"));
2035
2036 }
2037 }
2038 tbl << ov;
2039 } else {
2040 tbl << "";
2041 }
2042 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
2043 tbl << TextTable::endrow;
2044 }
2045 }
2046 } else {
2047 // show-with-defaults
2048 auto& defaults = daemon->_get_config_defaults();
2049 for (auto& i : defaults) {
2050 if (f) {
2051 f->open_object_section("value");
2052 f->dump_string("name", i.first);
2053 } else {
2054 tbl << i.first;
2055 }
2056 auto j = daemon->config.find(i.first);
2057 if (j != daemon->config.end() && !j->second.empty()) {
2058 // have config
2059 if (f) {
2060 f->dump_string("value", j->second.rbegin()->second);
2061 f->dump_string("source", ceph_conf_level_name(
2062 j->second.rbegin()->first));
2063 if (j->second.size() > 1) {
2064 f->open_array_section("overrides");
2065 auto k = j->second.rend();
2066 for (--k; k != j->second.rbegin(); --k) {
2067 f->open_object_section("value");
2068 f->dump_string("source", ceph_conf_level_name(k->first));
2069 f->dump_string("value", k->second);
2070 f->close_section();
2071 }
2072 f->close_section();
2073 }
2074 if (daemon->ignored_mon_config.count(i.first)) {
2075 f->dump_string("ignores", "mon");
2076 }
2077 f->close_section();
2078 } else {
2079 tbl << j->second.rbegin()->second;
2080 tbl << ceph_conf_level_name(j->second.rbegin()->first);
2081 if (j->second.size() > 1) {
2082 list<string> ov;
2083 auto k = j->second.rend();
2084 for (--k; k != j->second.rbegin(); --k) {
2085 if (k->second == j->second.rbegin()->second) {
2086 ov.push_front(string("(") + ceph_conf_level_name(k->first) +
2087 string("[") + k->second + string("]") +
2088 string(")"));
2089 } else {
2090 ov.push_front(ceph_conf_level_name(k->first) +
2091 string("[") + k->second + string("]"));
2092 }
2093 }
2094 tbl << ov;
2095 } else {
2096 tbl << "";
2097 }
2098 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
2099 tbl << TextTable::endrow;
2100 }
2101 } else {
2102 // only have default
2103 if (f) {
2104 f->dump_string("value", i.second);
2105 f->dump_string("source", ceph_conf_level_name(CONF_DEFAULT));
2106 f->close_section();
2107 } else {
2108 tbl << i.second;
2109 tbl << ceph_conf_level_name(CONF_DEFAULT);
2110 tbl << "";
2111 tbl << "";
2112 tbl << TextTable::endrow;
2113 }
c07f9fc5 2114 }
c07f9fc5
FG
2115 }
2116 }
11fdf7f2
TL
2117 if (f) {
2118 f->close_section();
2119 f->flush(cmdctx->odata);
2120 } else {
2121 cmdctx->odata.append(stringify(tbl));
2122 }
2123 }
c07f9fc5
FG
2124 cmdctx->reply(r, ss);
2125 return true;
11fdf7f2
TL
2126 } else if (prefix == "device ls") {
2127 set<string> devids;
2128 TextTable tbl;
2129 if (f) {
2130 f->open_array_section("devices");
2131 daemon_state.with_devices([&f](const DeviceState& dev) {
2132 f->dump_object("device", dev);
2133 });
2134 f->close_section();
2135 f->flush(cmdctx->odata);
2136 } else {
2137 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2138 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
2139 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
f67539c2 2140 tbl.define_column("WEAR", TextTable::RIGHT, TextTable::RIGHT);
11fdf7f2
TL
2141 tbl.define_column("LIFE EXPECTANCY", TextTable::LEFT, TextTable::LEFT);
2142 auto now = ceph_clock_now();
2143 daemon_state.with_devices([&tbl, now](const DeviceState& dev) {
2144 string h;
9f95a23c 2145 for (auto& i : dev.attachments) {
11fdf7f2
TL
2146 if (h.size()) {
2147 h += " ";
2148 }
9f95a23c 2149 h += std::get<0>(i) + ":" + std::get<1>(i);
11fdf7f2
TL
2150 }
2151 string d;
2152 for (auto& i : dev.daemons) {
2153 if (d.size()) {
2154 d += " ";
2155 }
2156 d += to_string(i);
2157 }
f67539c2
TL
2158 char wear_level_str[16] = {0};
2159 if (dev.wear_level >= 0) {
2160 snprintf(wear_level_str, sizeof(wear_level_str)-1, "%d%%",
2161 (int)(100.1 * dev.wear_level));
2162 }
11fdf7f2
TL
2163 tbl << dev.devid
2164 << h
2165 << d
f67539c2 2166 << wear_level_str
11fdf7f2
TL
2167 << dev.get_life_expectancy_str(now)
2168 << TextTable::endrow;
2169 });
2170 cmdctx->odata.append(stringify(tbl));
2171 }
2172 cmdctx->reply(0, ss);
2173 return true;
2174 } else if (prefix == "device ls-by-daemon") {
2175 string who;
9f95a23c
TL
2176 cmd_getval(cmdctx->cmdmap, "who", who);
2177 if (auto [k, valid] = DaemonKey::parse(who); !valid) {
11fdf7f2
TL
2178 ss << who << " is not a valid daemon name";
2179 r = -EINVAL;
2180 } else {
2181 auto dm = daemon_state.get(k);
2182 if (dm) {
2183 if (f) {
2184 f->open_array_section("devices");
2185 for (auto& i : dm->devices) {
2186 daemon_state.with_device(i.first, [&f] (const DeviceState& dev) {
2187 f->dump_object("device", dev);
2188 });
2189 }
2190 f->close_section();
2191 f->flush(cmdctx->odata);
2192 } else {
2193 TextTable tbl;
2194 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2195 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
2196 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT,
2197 TextTable::LEFT);
2198 auto now = ceph_clock_now();
2199 for (auto& i : dm->devices) {
2200 daemon_state.with_device(
2201 i.first, [&tbl, now] (const DeviceState& dev) {
2202 string h;
9f95a23c 2203 for (auto& i : dev.attachments) {
11fdf7f2
TL
2204 if (h.size()) {
2205 h += " ";
2206 }
9f95a23c 2207 h += std::get<0>(i) + ":" + std::get<1>(i);
11fdf7f2
TL
2208 }
2209 tbl << dev.devid
2210 << h
2211 << dev.get_life_expectancy_str(now)
2212 << TextTable::endrow;
2213 });
2214 }
2215 cmdctx->odata.append(stringify(tbl));
2216 }
2217 } else {
2218 r = -ENOENT;
2219 ss << "daemon " << who << " not found";
2220 }
2221 cmdctx->reply(r, ss);
2222 }
2223 } else if (prefix == "device ls-by-host") {
2224 string host;
9f95a23c 2225 cmd_getval(cmdctx->cmdmap, "host", host);
11fdf7f2
TL
2226 set<string> devids;
2227 daemon_state.list_devids_by_server(host, &devids);
2228 if (f) {
2229 f->open_array_section("devices");
2230 for (auto& devid : devids) {
2231 daemon_state.with_device(
2232 devid, [&f] (const DeviceState& dev) {
2233 f->dump_object("device", dev);
7c673cae 2234 });
11fdf7f2
TL
2235 }
2236 f->close_section();
2237 f->flush(cmdctx->odata);
2238 } else {
2239 TextTable tbl;
2240 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2241 tbl.define_column("DEV", TextTable::LEFT, TextTable::LEFT);
2242 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
2243 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT, TextTable::LEFT);
2244 auto now = ceph_clock_now();
2245 for (auto& devid : devids) {
2246 daemon_state.with_device(
2247 devid, [&tbl, &host, now] (const DeviceState& dev) {
2248 string n;
9f95a23c
TL
2249 for (auto& j : dev.attachments) {
2250 if (std::get<0>(j) == host) {
11fdf7f2
TL
2251 if (n.size()) {
2252 n += " ";
2253 }
9f95a23c 2254 n += std::get<1>(j);
11fdf7f2
TL
2255 }
2256 }
2257 string d;
2258 for (auto& i : dev.daemons) {
2259 if (d.size()) {
2260 d += " ";
2261 }
2262 d += to_string(i);
2263 }
2264 tbl << dev.devid
2265 << n
2266 << d
2267 << dev.get_life_expectancy_str(now)
2268 << TextTable::endrow;
2269 });
2270 }
2271 cmdctx->odata.append(stringify(tbl));
2272 }
2273 cmdctx->reply(0, ss);
2274 return true;
2275 } else if (prefix == "device info") {
2276 string devid;
9f95a23c 2277 cmd_getval(cmdctx->cmdmap, "devid", devid);
11fdf7f2
TL
2278 int r = 0;
2279 ostringstream rs;
2280 if (!daemon_state.with_device(devid,
2281 [&f, &rs] (const DeviceState& dev) {
2282 if (f) {
2283 f->dump_object("device", dev);
2284 } else {
2285 dev.print(rs);
2286 }
2287 })) {
2288 ss << "device " << devid << " not found";
2289 r = -ENOENT;
2290 } else {
2291 if (f) {
2292 f->flush(cmdctx->odata);
2293 } else {
2294 cmdctx->odata.append(rs.str());
2295 }
2296 }
2297 cmdctx->reply(r, ss);
2298 return true;
2299 } else if (prefix == "device set-life-expectancy") {
2300 string devid;
9f95a23c 2301 cmd_getval(cmdctx->cmdmap, "devid", devid);
11fdf7f2 2302 string from_str, to_str;
9f95a23c
TL
2303 cmd_getval(cmdctx->cmdmap, "from", from_str);
2304 cmd_getval(cmdctx->cmdmap, "to", to_str);
11fdf7f2
TL
2305 utime_t from, to;
2306 if (!from.parse(from_str)) {
2307 ss << "unable to parse datetime '" << from_str << "'";
2308 r = -EINVAL;
2309 cmdctx->reply(r, ss);
2310 } else if (to_str.size() && !to.parse(to_str)) {
2311 ss << "unable to parse datetime '" << to_str << "'";
2312 r = -EINVAL;
2313 cmdctx->reply(r, ss);
2314 } else {
2315 map<string,string> meta;
2316 daemon_state.with_device_create(
2317 devid,
2318 [from, to, &meta] (DeviceState& dev) {
2319 dev.set_life_expectancy(from, to, ceph_clock_now());
2320 meta = dev.metadata;
2321 });
2322 json_spirit::Object json_object;
2323 for (auto& i : meta) {
2324 json_spirit::Config::add(json_object, i.first, i.second);
2325 }
2326 bufferlist json;
2327 json.append(json_spirit::write(json_object));
2328 const string cmd =
2329 "{"
2330 "\"prefix\": \"config-key set\", "
2331 "\"key\": \"device/" + devid + "\""
2332 "}";
2333 auto on_finish = new ReplyOnFinish(cmdctx);
2334 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2335 }
2336 return true;
2337 } else if (prefix == "device rm-life-expectancy") {
2338 string devid;
9f95a23c 2339 cmd_getval(cmdctx->cmdmap, "devid", devid);
11fdf7f2
TL
2340 map<string,string> meta;
2341 if (daemon_state.with_device_write(devid, [&meta] (DeviceState& dev) {
2342 dev.rm_life_expectancy();
2343 meta = dev.metadata;
2344 })) {
2345 string cmd;
2346 bufferlist json;
2347 if (meta.empty()) {
2348 cmd =
2349 "{"
2350 "\"prefix\": \"config-key rm\", "
2351 "\"key\": \"device/" + devid + "\""
2352 "}";
2353 } else {
2354 json_spirit::Object json_object;
2355 for (auto& i : meta) {
2356 json_spirit::Config::add(json_object, i.first, i.second);
2357 }
2358 json.append(json_spirit::write(json_object));
2359 cmd =
2360 "{"
2361 "\"prefix\": \"config-key set\", "
2362 "\"key\": \"device/" + devid + "\""
2363 "}";
2364 }
2365 auto on_finish = new ReplyOnFinish(cmdctx);
2366 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2367 } else {
2368 cmdctx->reply(0, ss);
2369 }
2370 return true;
2371 } else {
2372 if (!pgmap_ready) {
2373 ss << "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2374 }
2375 if (f) {
2376 f->open_object_section("pg_info");
2377 f->dump_bool("pg_ready", pgmap_ready);
2378 }
2379
2380 // fall back to feeding command to PGMap
2381 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2382 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
2383 f.get(), &ss, &cmdctx->odata);
7c673cae
FG
2384 });
2385
11fdf7f2
TL
2386 if (f) {
2387 f->close_section();
2388 }
7c673cae 2389 if (r != -EOPNOTSUPP) {
11fdf7f2
TL
2390 if (f) {
2391 f->flush(cmdctx->odata);
2392 }
7c673cae
FG
2393 cmdctx->reply(r, ss);
2394 return true;
2395 }
2396 }
2397
11fdf7f2 2398 // Was the command unfound?
92f5a8d4 2399 if (py_command.cmdstring.empty()) {
7c673cae
FG
2400 ss << "No handler found for '" << prefix << "'";
2401 dout(4) << "No handler found for '" << prefix << "'" << dendl;
2402 cmdctx->reply(-EINVAL, ss);
2403 return true;
7c673cae 2404 }
11fdf7f2 2405
f67539c2 2406 dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
9f95a23c
TL
2407 finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]
2408 (int r_) mutable {
11fdf7f2
TL
2409 std::stringstream ss;
2410
f67539c2
TL
2411 dout(10) << "dispatching command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
2412
11fdf7f2 2413 // Validate that the module is enabled
92f5a8d4
TL
2414 auto& py_handler_name = py_command.module_name;
2415 PyModuleRef module = py_modules.get_module(py_handler_name);
11fdf7f2
TL
2416 ceph_assert(module);
2417 if (!module->is_enabled()) {
92f5a8d4 2418 ss << "Module '" << py_handler_name << "' is not enabled (required by "
11fdf7f2 2419 "command '" << prefix << "'): use `ceph mgr module enable "
92f5a8d4 2420 << py_handler_name << "` to enable it";
11fdf7f2
TL
2421 dout(4) << ss.str() << dendl;
2422 cmdctx->reply(-EOPNOTSUPP, ss);
2423 return;
2424 }
2425
2426 // Hack: allow the self-test method to run on unhealthy modules.
2427 // Fix this in future by creating a special path for self test rather
2428 // than having the hook be a normal module command.
92f5a8d4 2429 std::string self_test_prefix = py_handler_name + " " + "self-test";
11fdf7f2
TL
2430
2431 // Validate that the module is healthy
2432 bool accept_command;
2433 if (module->is_loaded()) {
2434 if (module->get_can_run() && !module->is_failed()) {
2435 // Healthy module
2436 accept_command = true;
2437 } else if (self_test_prefix == prefix) {
2438 // Unhealthy, but allow because it's a self test command
2439 accept_command = true;
2440 } else {
2441 accept_command = false;
92f5a8d4 2442 ss << "Module '" << py_handler_name << "' has experienced an error and "
11fdf7f2
TL
2443 "cannot handle commands: " << module->get_error_string();
2444 }
2445 } else {
2446 // Module not loaded
2447 accept_command = false;
92f5a8d4 2448 ss << "Module '" << py_handler_name << "' failed to load and "
11fdf7f2
TL
2449 "cannot handle commands: " << module->get_error_string();
2450 }
2451
2452 if (!accept_command) {
2453 dout(4) << ss.str() << dendl;
2454 cmdctx->reply(-EIO, ss);
2455 return;
2456 }
2457
2458 std::stringstream ds;
9f95a23c 2459 bufferlist inbl = cmdctx->data;
92f5a8d4
TL
2460 int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
2461 inbl, &ds, &ss);
2462 if (r == -EACCES) {
2463 log_access_denied(cmdctx, session, ss);
2464 }
2465
11fdf7f2
TL
2466 cmdctx->odata.append(ds);
2467 cmdctx->reply(r, ss);
f67539c2 2468 dout(10) << " command returned " << r << dendl;
11fdf7f2
TL
2469 }));
2470 return true;
7c673cae 2471}
31f18b77 2472
224ce89b
WB
2473void DaemonServer::_prune_pending_service_map()
2474{
2475 utime_t cutoff = ceph_clock_now();
11fdf7f2 2476 cutoff -= g_conf().get_val<double>("mgr_service_beacon_grace");
224ce89b
WB
2477 auto p = pending_service_map.services.begin();
2478 while (p != pending_service_map.services.end()) {
2479 auto q = p->second.daemons.begin();
2480 while (q != p->second.daemons.end()) {
9f95a23c 2481 DaemonKey key{p->first, q->first};
224ce89b 2482 if (!daemon_state.exists(key)) {
1911f103
TL
2483 if (ServiceMap::is_normal_ceph_entity(p->first)) {
2484 dout(10) << "daemon " << key << " in service map but not in daemon state "
2485 << "index -- force pruning" << dendl;
2486 q = p->second.daemons.erase(q);
2487 pending_service_map_dirty = pending_service_map.epoch;
2488 } else {
2489 derr << "missing key " << key << dendl;
2490 ++q;
2491 }
2492
2493 continue;
224ce89b 2494 }
1911f103 2495
224ce89b 2496 auto daemon = daemon_state.get(key);
11fdf7f2 2497 std::lock_guard l(daemon->lock);
224ce89b
WB
2498 if (daemon->last_service_beacon == utime_t()) {
2499 // we must have just restarted; assume they are alive now.
2500 daemon->last_service_beacon = ceph_clock_now();
2501 ++q;
2502 continue;
2503 }
2504 if (daemon->last_service_beacon < cutoff) {
2505 dout(10) << "pruning stale " << p->first << "." << q->first
2506 << " last_beacon " << daemon->last_service_beacon << dendl;
2507 q = p->second.daemons.erase(q);
2508 pending_service_map_dirty = pending_service_map.epoch;
2509 } else {
2510 ++q;
2511 }
2512 }
2513 if (p->second.daemons.empty()) {
2514 p = pending_service_map.services.erase(p);
2515 pending_service_map_dirty = pending_service_map.epoch;
2516 } else {
2517 ++p;
2518 }
2519 }
2520}
2521
31f18b77
FG
2522void DaemonServer::send_report()
2523{
224ce89b 2524 if (!pgmap_ready) {
11fdf7f2 2525 if (ceph_clock_now() - started_at > g_conf().get_val<int64_t>("mgr_stats_period") * 4.0) {
224ce89b
WB
2526 pgmap_ready = true;
2527 reported_osds.clear();
2528 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2529 << "potentially incomplete PG state to mon" << dendl;
2530 } else {
2531 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2532 << dendl;
2533 return;
2534 }
2535 }
2536
9f95a23c 2537 auto m = ceph::make_message<MMonMgrReport>();
f67539c2 2538 m->gid = monc->get_global_id();
c07f9fc5 2539 py_modules.get_health_checks(&m->health_checks);
11fdf7f2 2540 py_modules.get_progress_events(&m->progress_events);
c07f9fc5 2541
11fdf7f2 2542 cluster_state.with_mutable_pgmap([&](PGMap& pg_map) {
31f18b77
FG
2543 cluster_state.update_delta_stats();
2544
224ce89b
WB
2545 if (pending_service_map.epoch) {
2546 _prune_pending_service_map();
2547 if (pending_service_map_dirty >= pending_service_map.epoch) {
2548 pending_service_map.modified = ceph_clock_now();
11fdf7f2 2549 encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
224ce89b
WB
2550 dout(10) << "sending service_map e" << pending_service_map.epoch
2551 << dendl;
2552 pending_service_map.epoch++;
2553 }
2554 }
2555
31f18b77
FG
2556 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
2557 // FIXME: no easy way to get mon features here. this will do for
2558 // now, though, as long as we don't make a backward-incompat change.
2559 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
2560 dout(10) << pg_map << dendl;
224ce89b
WB
2561
2562 pg_map.get_health_checks(g_ceph_context, osdmap,
2563 &m->health_checks);
c07f9fc5 2564
224ce89b
WB
2565 dout(10) << m->health_checks.checks.size() << " health checks"
2566 << dendl;
2567 dout(20) << "health checks:\n";
2568 JSONFormatter jf(true);
2569 jf.dump_object("health_checks", m->health_checks);
2570 jf.flush(*_dout);
2571 *_dout << dendl;
9f95a23c 2572 if (osdmap.require_osd_release >= ceph_release_t::luminous) {
a8e16298
TL
2573 clog->debug() << "pgmap v" << pg_map.version << ": " << pg_map;
2574 }
31f18b77
FG
2575 });
2576 });
b32b8144 2577
11fdf7f2
TL
2578 map<daemon_metric, unique_ptr<DaemonHealthMetricCollector>> accumulated;
2579 for (auto service : {"osd", "mon"} ) {
2580 auto daemons = daemon_state.get_by_service(service);
2581 for (const auto& [key,state] : daemons) {
2582 std::lock_guard l{state->lock};
2583 for (const auto& metric : state->daemon_health_metrics) {
2584 auto acc = accumulated.find(metric.get_type());
2585 if (acc == accumulated.end()) {
2586 auto collector = DaemonHealthMetricCollector::create(metric.get_type());
2587 if (!collector) {
9f95a23c 2588 derr << __func__ << " " << key
11fdf7f2
TL
2589 << " sent me an unknown health metric: "
2590 << std::hex << static_cast<uint8_t>(metric.get_type())
2591 << std::dec << dendl;
2592 continue;
2593 }
2594 dout(20) << " + " << state->key << " "
2595 << metric << dendl;
2596 tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
2597 std::move(collector));
2598 }
2599 acc->second->update(key, metric);
b32b8144 2600 }
b32b8144
FG
2601 }
2602 }
2603 for (const auto& acc : accumulated) {
2604 acc.second->summarize(m->health_checks);
2605 }
31f18b77
FG
2606 // TODO? We currently do not notify the PyModules
2607 // TODO: respect needs_send, so we send the report only if we are asked to do
2608 // so, or the state is updated.
9f95a23c 2609 monc->send_mon_message(std::move(m));
31f18b77 2610}
224ce89b 2611
11fdf7f2
TL
2612void DaemonServer::adjust_pgs()
2613{
2614 dout(20) << dendl;
2615 unsigned max = std::max<int64_t>(1, g_conf()->mon_osd_max_creating_pgs);
2616 double max_misplaced = g_conf().get_val<double>("target_max_misplaced_ratio");
2617 bool aggro = g_conf().get_val<bool>("mgr_debug_aggressive_pg_num_changes");
2618
2619 map<string,unsigned> pg_num_to_set;
2620 map<string,unsigned> pgp_num_to_set;
2621 set<pg_t> upmaps_to_clear;
2622 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2623 unsigned creating_or_unknown = 0;
2624 for (auto& i : pg_map.num_pg_by_state) {
2625 if ((i.first & (PG_STATE_CREATING)) ||
2626 i.first == 0) {
2627 creating_or_unknown += i.second;
2628 }
2629 }
2630 unsigned left = max;
2631 if (creating_or_unknown >= max) {
2632 return;
2633 }
2634 left -= creating_or_unknown;
2635 dout(10) << "creating_or_unknown " << creating_or_unknown
2636 << " max_creating " << max
2637 << " left " << left
2638 << dendl;
2639
2640 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2641 // can run more frequently than we get updated pg stats from OSDs. We
2642 // may make multiple adjustments with stale informaiton.
2643 double misplaced_ratio, degraded_ratio;
2644 double inactive_pgs_ratio, unknown_pgs_ratio;
2645 pg_map.get_recovery_stats(&misplaced_ratio, &degraded_ratio,
2646 &inactive_pgs_ratio, &unknown_pgs_ratio);
2647 dout(20) << "misplaced_ratio " << misplaced_ratio
2648 << " degraded_ratio " << degraded_ratio
2649 << " inactive_pgs_ratio " << inactive_pgs_ratio
2650 << " unknown_pgs_ratio " << unknown_pgs_ratio
2651 << "; target_max_misplaced_ratio " << max_misplaced
2652 << dendl;
2653
2654 for (auto& i : osdmap.get_pools()) {
2655 const pg_pool_t& p = i.second;
2656
2657 // adjust pg_num?
2658 if (p.get_pg_num_target() != p.get_pg_num()) {
2659 dout(20) << "pool " << i.first
2660 << " pg_num " << p.get_pg_num()
2661 << " target " << p.get_pg_num_target()
2662 << dendl;
2663 if (p.has_flag(pg_pool_t::FLAG_CREATING)) {
2664 dout(10) << "pool " << i.first
2665 << " pg_num_target " << p.get_pg_num_target()
2666 << " pg_num " << p.get_pg_num()
2667 << " - still creating initial pgs"
2668 << dendl;
2669 } else if (p.get_pg_num_target() < p.get_pg_num()) {
2670 // pg_num decrease (merge)
2671 pg_t merge_source(p.get_pg_num() - 1, i.first);
2672 pg_t merge_target = merge_source.get_parent();
2673 bool ok = true;
2674
2675 if (p.get_pg_num() != p.get_pg_num_pending()) {
2676 dout(10) << "pool " << i.first
2677 << " pg_num_target " << p.get_pg_num_target()
2678 << " pg_num " << p.get_pg_num()
2679 << " - decrease and pg_num_pending != pg_num, waiting"
2680 << dendl;
2681 ok = false;
2682 } else if (p.get_pg_num() == p.get_pgp_num()) {
2683 dout(10) << "pool " << i.first
2684 << " pg_num_target " << p.get_pg_num_target()
2685 << " pg_num " << p.get_pg_num()
2686 << " - decrease blocked by pgp_num "
2687 << p.get_pgp_num()
2688 << dendl;
2689 ok = false;
2690 }
9f95a23c 2691 vector<int32_t> source_acting;
11fdf7f2
TL
2692 for (auto &merge_participant : {merge_source, merge_target}) {
2693 bool is_merge_source = merge_participant == merge_source;
2694 if (osdmap.have_pg_upmaps(merge_participant)) {
2695 dout(10) << "pool " << i.first
2696 << " pg_num_target " << p.get_pg_num_target()
2697 << " pg_num " << p.get_pg_num()
2698 << (is_merge_source ? " - merge source " : " - merge target ")
2699 << merge_participant
2700 << " has upmap" << dendl;
2701 upmaps_to_clear.insert(merge_participant);
2702 ok = false;
2703 }
2704 auto q = pg_map.pg_stat.find(merge_participant);
2705 if (q == pg_map.pg_stat.end()) {
2706 dout(10) << "pool " << i.first
2707 << " pg_num_target " << p.get_pg_num_target()
2708 << " pg_num " << p.get_pg_num()
2709 << " - no state for " << merge_participant
2710 << (is_merge_source ? " (merge source)" : " (merge target)")
2711 << dendl;
2712 ok = false;
2713 } else if ((q->second.state & (PG_STATE_ACTIVE | PG_STATE_CLEAN)) !=
2714 (PG_STATE_ACTIVE | PG_STATE_CLEAN)) {
2715 dout(10) << "pool " << i.first
2716 << " pg_num_target " << p.get_pg_num_target()
2717 << " pg_num " << p.get_pg_num()
2718 << (is_merge_source ? " - merge source " : " - merge target ")
2719 << merge_participant
2720 << " not clean (" << pg_state_string(q->second.state)
2721 << ")" << dendl;
2722 ok = false;
2723 }
9f95a23c
TL
2724 if (is_merge_source) {
2725 source_acting = q->second.acting;
2726 } else if (ok && q->second.acting != source_acting) {
2727 dout(10) << "pool " << i.first
2728 << " pg_num_target " << p.get_pg_num_target()
2729 << " pg_num " << p.get_pg_num()
2730 << (is_merge_source ? " - merge source " : " - merge target ")
2731 << merge_participant
2732 << " acting does not match (source " << source_acting
2733 << " != target " << q->second.acting
2734 << ")" << dendl;
2735 ok = false;
2736 }
11fdf7f2
TL
2737 }
2738
2739 if (ok) {
2740 unsigned target = p.get_pg_num() - 1;
2741 dout(10) << "pool " << i.first
2742 << " pg_num_target " << p.get_pg_num_target()
2743 << " pg_num " << p.get_pg_num()
2744 << " -> " << target
2745 << " (merging " << merge_source
2746 << " and " << merge_target
2747 << ")" << dendl;
2748 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
9f95a23c 2749 continue;
11fdf7f2
TL
2750 }
2751 } else if (p.get_pg_num_target() > p.get_pg_num()) {
2752 // pg_num increase (split)
2753 bool active = true;
2754 auto q = pg_map.num_pg_by_pool_state.find(i.first);
2755 if (q != pg_map.num_pg_by_pool_state.end()) {
2756 for (auto& j : q->second) {
2757 if ((j.first & (PG_STATE_ACTIVE|PG_STATE_PEERED)) == 0) {
2758 dout(20) << "pool " << i.first << " has " << j.second
2759 << " pgs in " << pg_state_string(j.first)
2760 << dendl;
2761 active = false;
2762 break;
2763 }
2764 }
2765 } else {
2766 active = false;
2767 }
2768 if (!active) {
2769 dout(10) << "pool " << i.first
2770 << " pg_num_target " << p.get_pg_num_target()
2771 << " pg_num " << p.get_pg_num()
2772 << " - not all pgs active"
2773 << dendl;
2774 } else {
2775 unsigned add = std::min(
2776 left,
2777 p.get_pg_num_target() - p.get_pg_num());
2778 unsigned target = p.get_pg_num() + add;
2779 left -= add;
2780 dout(10) << "pool " << i.first
2781 << " pg_num_target " << p.get_pg_num_target()
2782 << " pg_num " << p.get_pg_num()
2783 << " -> " << target << dendl;
2784 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2785 }
2786 }
2787 }
2788
2789 // adjust pgp_num?
2790 unsigned target = std::min(p.get_pg_num_pending(),
2791 p.get_pgp_num_target());
2792 if (target != p.get_pgp_num()) {
2793 dout(20) << "pool " << i.first
2794 << " pgp_num_target " << p.get_pgp_num_target()
2795 << " pgp_num " << p.get_pgp_num()
2796 << " -> " << target << dendl;
2797 if (target > p.get_pgp_num() &&
2798 p.get_pgp_num() == p.get_pg_num()) {
2799 dout(10) << "pool " << i.first
2800 << " pgp_num_target " << p.get_pgp_num_target()
2801 << " pgp_num " << p.get_pgp_num()
2802 << " - increase blocked by pg_num " << p.get_pg_num()
2803 << dendl;
2804 } else if (!aggro && (inactive_pgs_ratio > 0 ||
2805 degraded_ratio > 0 ||
2806 unknown_pgs_ratio > 0)) {
2807 dout(10) << "pool " << i.first
2808 << " pgp_num_target " << p.get_pgp_num_target()
2809 << " pgp_num " << p.get_pgp_num()
2810 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2811 << " update" << dendl;
2812 } else if (!aggro && (misplaced_ratio > max_misplaced)) {
2813 dout(10) << "pool " << i.first
2814 << " pgp_num_target " << p.get_pgp_num_target()
2815 << " pgp_num " << p.get_pgp_num()
2816 << " - misplaced_ratio " << misplaced_ratio
2817 << " > max " << max_misplaced
2818 << ", deferring pgp_num update" << dendl;
2819 } else {
2820 // NOTE: this calculation assumes objects are
2821 // basically uniformly distributed across all PGs
2822 // (regardless of pool), which is probably not
2823 // perfectly correct, but it's a start. make no
2824 // single adjustment that's more than half of the
2825 // max_misplaced, to somewhat limit the magnitude of
2826 // our potential error here.
2827 int next;
9f95a23c 2828 static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP = 1;
81eedcae
TL
2829 pool_stat_t s = pg_map.get_pg_pool_sum_stat(i.first);
2830 if (aggro ||
2831 // pool is (virtually) empty; just jump to final pgp_num?
2832 (p.get_pgp_num_target() > p.get_pgp_num() &&
9f95a23c
TL
2833 s.stats.sum.num_objects <= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP *
2834 p.get_pgp_num_target()))) {
11fdf7f2
TL
2835 next = target;
2836 } else {
2837 double room =
2838 std::min<double>(max_misplaced - misplaced_ratio,
81eedcae 2839 max_misplaced / 2.0);
11fdf7f2
TL
2840 unsigned estmax = std::max<unsigned>(
2841 (double)p.get_pg_num() * room, 1u);
9f95a23c
TL
2842 next = std::clamp(target,
2843 p.get_pgp_num() - estmax,
2844 p.get_pgp_num() + estmax);
11fdf7f2 2845 dout(20) << " room " << room << " estmax " << estmax
9f95a23c
TL
2846 << " delta " << (target-p.get_pgp_num())
2847 << " next " << next << dendl;
81eedcae
TL
2848 if (p.get_pgp_num_target() == p.get_pg_num_target() &&
2849 p.get_pgp_num_target() < p.get_pg_num()) {
11fdf7f2
TL
2850 // since pgp_num is tracking pg_num, ceph is handling
2851 // pgp_num. so, be responsible: don't let pgp_num get
2852 // too far out ahead of merges (if we are merging).
2853 // this avoids moving lots of unmerged pgs onto a
2854 // small number of OSDs where we might blow out the
2855 // per-osd pg max.
2856 unsigned max_outpace_merges =
2857 std::max<unsigned>(8, p.get_pg_num() * max_misplaced);
2858 if (next + max_outpace_merges < p.get_pg_num()) {
2859 next = p.get_pg_num() - max_outpace_merges;
2860 dout(10) << " using next " << next
2861 << " to avoid outpacing merges (max_outpace_merges "
2862 << max_outpace_merges << ")" << dendl;
2863 }
2864 }
2865 }
2866 dout(10) << "pool " << i.first
2867 << " pgp_num_target " << p.get_pgp_num_target()
2868 << " pgp_num " << p.get_pgp_num()
2869 << " -> " << next << dendl;
2870 pgp_num_to_set[osdmap.get_pool_name(i.first)] = next;
2871 }
2872 }
2873 if (left == 0) {
2874 return;
2875 }
2876 }
2877 });
2878 for (auto i : pg_num_to_set) {
2879 const string cmd =
2880 "{"
2881 "\"prefix\": \"osd pool set\", "
2882 "\"pool\": \"" + i.first + "\", "
2883 "\"var\": \"pg_num_actual\", "
2884 "\"val\": \"" + stringify(i.second) + "\""
2885 "}";
2886 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2887 }
2888 for (auto i : pgp_num_to_set) {
2889 const string cmd =
2890 "{"
2891 "\"prefix\": \"osd pool set\", "
2892 "\"pool\": \"" + i.first + "\", "
2893 "\"var\": \"pgp_num_actual\", "
2894 "\"val\": \"" + stringify(i.second) + "\""
2895 "}";
2896 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2897 }
2898 for (auto pg : upmaps_to_clear) {
2899 const string cmd =
2900 "{"
2901 "\"prefix\": \"osd rm-pg-upmap\", "
2902 "\"pgid\": \"" + stringify(pg) + "\""
2903 "}";
2904 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2905 const string cmd2 =
2906 "{"
2907 "\"prefix\": \"osd rm-pg-upmap-items\", "
2908 "\"pgid\": \"" + stringify(pg) + "\"" +
2909 "}";
2910 monc->start_mon_command({cmd2}, {}, nullptr, nullptr, nullptr);
2911 }
2912}
2913
224ce89b
WB
2914void DaemonServer::got_service_map()
2915{
11fdf7f2 2916 std::lock_guard l(lock);
224ce89b
WB
2917
2918 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
2919 if (pending_service_map.epoch == 0) {
2920 // we just started up
2921 dout(10) << "got initial map e" << service_map.epoch << dendl;
2922 pending_service_map = service_map;
f91f0fd5 2923 pending_service_map.epoch = service_map.epoch + 1;
224ce89b
WB
2924 } else {
2925 // we we already active and therefore must have persisted it,
2926 // which means ours is the same or newer.
2927 dout(10) << "got updated map e" << service_map.epoch << dendl;
f91f0fd5 2928 ceph_assert(pending_service_map.epoch > service_map.epoch);
224ce89b 2929 }
224ce89b
WB
2930 });
2931
2932 // cull missing daemons, populate new ones
92f5a8d4 2933 std::set<std::string> types;
9f95a23c
TL
2934 for (auto& [type, service] : pending_service_map.services) {
2935 if (ServiceMap::is_normal_ceph_entity(type)) {
2936 continue;
2937 }
2938
2939 types.insert(type);
92f5a8d4 2940
224ce89b 2941 std::set<std::string> names;
9f95a23c 2942 for (auto& q : service.daemons) {
224ce89b 2943 names.insert(q.first);
9f95a23c 2944 DaemonKey key{type, q.first};
224ce89b
WB
2945 if (!daemon_state.exists(key)) {
2946 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
2947 daemon->key = key;
11fdf7f2 2948 daemon->set_metadata(q.second.metadata);
224ce89b
WB
2949 daemon->service_daemon = true;
2950 daemon_state.insert(daemon);
2951 dout(10) << "added missing " << key << dendl;
2952 }
2953 }
9f95a23c 2954 daemon_state.cull(type, names);
224ce89b 2955 }
92f5a8d4 2956 daemon_state.cull_services(types);
224ce89b 2957}
3efd9988 2958
11fdf7f2
TL
2959void DaemonServer::got_mgr_map()
2960{
2961 std::lock_guard l(lock);
2962 set<std::string> have;
2963 cluster_state.with_mgrmap([&](const MgrMap& mgrmap) {
2964 auto md_update = [&] (DaemonKey key) {
2965 std::ostringstream oss;
2966 auto c = new MetadataUpdate(daemon_state, key);
2967 // FIXME remove post-nautilus: include 'id' for luminous mons
2968 oss << "{\"prefix\": \"mgr metadata\", \"who\": \""
9f95a23c 2969 << key.name << "\", \"id\": \"" << key.name << "\"}";
11fdf7f2
TL
2970 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
2971 };
2972 if (mgrmap.active_name.size()) {
9f95a23c 2973 DaemonKey key{"mgr", mgrmap.active_name};
11fdf7f2
TL
2974 have.insert(mgrmap.active_name);
2975 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
2976 md_update(key);
2977 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
2978 }
2979 }
2980 for (auto& i : mgrmap.standbys) {
9f95a23c 2981 DaemonKey key{"mgr", i.second.name};
11fdf7f2
TL
2982 have.insert(i.second.name);
2983 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
2984 md_update(key);
2985 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
2986 }
2987 }
2988 });
2989 daemon_state.cull("mgr", have);
2990}
3efd9988
FG
2991
2992const char** DaemonServer::get_tracked_conf_keys() const
2993{
2994 static const char *KEYS[] = {
2995 "mgr_stats_threshold",
2996 "mgr_stats_period",
2997 nullptr
2998 };
2999
3000 return KEYS;
3001}
3002
11fdf7f2
TL
3003void DaemonServer::handle_conf_change(const ConfigProxy& conf,
3004 const std::set <std::string> &changed)
3efd9988 3005{
3efd9988
FG
3006
3007 if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
3008 dout(4) << "Updating stats threshold/period on "
3009 << daemon_connections.size() << " clients" << dendl;
3010 // Send a fresh MMgrConfigure to all clients, so that they can follow
3011 // the new policy for transmitting stats
9f95a23c 3012 finisher.queue(new LambdaContext([this](int r) {
11fdf7f2
TL
3013 std::lock_guard l(lock);
3014 for (auto &c : daemon_connections) {
3015 _send_configure(c);
3016 }
3017 }));
3efd9988
FG
3018 }
3019}
3020
3021void DaemonServer::_send_configure(ConnectionRef c)
3022{
9f95a23c 3023 ceph_assert(ceph_mutex_is_locked_by_me(lock));
3efd9988 3024
9f95a23c 3025 auto configure = make_message<MMgrConfigure>();
11fdf7f2
TL
3026 configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
3027 configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
3028
3029 if (c->peer_is_osd()) {
3030 configure->osd_perf_metric_queries =
3031 osd_perf_metric_collector.get_queries();
f67539c2
TL
3032 } else if (c->peer_is_mds()) {
3033 configure->metric_config_message =
3034 MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector.get_queries()));
11fdf7f2
TL
3035 }
3036
9f95a23c 3037 c->send_message2(configure);
3efd9988
FG
3038}
3039
9f95a23c 3040MetricQueryID DaemonServer::add_osd_perf_query(
11fdf7f2
TL
3041 const OSDPerfMetricQuery &query,
3042 const std::optional<OSDPerfMetricLimit> &limit)
3043{
3044 return osd_perf_metric_collector.add_query(query, limit);
3045}
3046
9f95a23c 3047int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
11fdf7f2
TL
3048{
3049 return osd_perf_metric_collector.remove_query(query_id);
3050}
3051
f67539c2
TL
3052int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector)
3053{
3054 return osd_perf_metric_collector.get_counters(collector);
3055}
3056
3057MetricQueryID DaemonServer::add_mds_perf_query(
3058 const MDSPerfMetricQuery &query,
3059 const std::optional<MDSPerfMetricLimit> &limit)
3060{
3061 return mds_perf_metric_collector.add_query(query, limit);
3062}
3063
3064int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
3065{
3066 return mds_perf_metric_collector.remove_query(query_id);
3067}
3068
3069int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
11fdf7f2 3070{
f67539c2 3071 return mds_perf_metric_collector.get_counters(collector);
11fdf7f2 3072}