]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/DaemonServer.cc
import ceph 14.2.5
[ceph.git] / ceph / src / mgr / DaemonServer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#include "DaemonServer.h"
b32b8144 15#include "mgr/Mgr.h"
7c673cae 16
b32b8144 17#include "include/stringify.h"
31f18b77 18#include "include/str_list.h"
7c673cae 19#include "auth/RotatingKeyRing.h"
7c673cae
FG
20#include "json_spirit/json_spirit_writer.h"
21
c07f9fc5 22#include "mgr/mgr_commands.h"
11fdf7f2
TL
23#include "mgr/DaemonHealthMetricCollector.h"
24#include "mgr/OSDPerfMetricCollector.h"
c07f9fc5
FG
25#include "mon/MonCommand.h"
26
7c673cae 27#include "messages/MMgrOpen.h"
11fdf7f2 28#include "messages/MMgrClose.h"
7c673cae 29#include "messages/MMgrConfigure.h"
31f18b77 30#include "messages/MMonMgrReport.h"
7c673cae
FG
31#include "messages/MCommand.h"
32#include "messages/MCommandReply.h"
33#include "messages/MPGStats.h"
34#include "messages/MOSDScrub.h"
11fdf7f2 35#include "messages/MOSDScrub2.h"
c07f9fc5 36#include "messages/MOSDForceRecovery.h"
224ce89b 37#include "common/errno.h"
11fdf7f2 38#include "common/pick_address.h"
7c673cae
FG
39
40#define dout_context g_ceph_context
41#define dout_subsys ceph_subsys_mgr
42#undef dout_prefix
43#define dout_prefix *_dout << "mgr.server " << __func__ << " "
44
c07f9fc5
FG
45
46
7c673cae
FG
47DaemonServer::DaemonServer(MonClient *monc_,
48 Finisher &finisher_,
49 DaemonStateIndex &daemon_state_,
50 ClusterState &cluster_state_,
3efd9988 51 PyModuleRegistry &py_modules_,
7c673cae
FG
52 LogChannelRef clog_,
53 LogChannelRef audit_clog_)
54 : Dispatcher(g_ceph_context),
55 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
11fdf7f2 56 g_conf().get_val<Option::size_t>("mgr_client_bytes"))),
7c673cae 57 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
11fdf7f2 58 g_conf().get_val<uint64_t>("mgr_client_messages"))),
7c673cae 59 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
11fdf7f2 60 g_conf().get_val<Option::size_t>("mgr_osd_bytes"))),
7c673cae 61 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
11fdf7f2 62 g_conf().get_val<uint64_t>("mgr_osd_messages"))),
7c673cae 63 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
11fdf7f2 64 g_conf().get_val<Option::size_t>("mgr_mds_bytes"))),
7c673cae 65 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
11fdf7f2 66 g_conf().get_val<uint64_t>("mgr_mds_messages"))),
7c673cae 67 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
11fdf7f2 68 g_conf().get_val<Option::size_t>("mgr_mon_bytes"))),
7c673cae 69 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
11fdf7f2 70 g_conf().get_val<uint64_t>("mgr_mon_messages"))),
7c673cae
FG
71 msgr(nullptr),
72 monc(monc_),
73 finisher(finisher_),
74 daemon_state(daemon_state_),
75 cluster_state(cluster_state_),
76 py_modules(py_modules_),
77 clog(clog_),
78 audit_clog(audit_clog_),
3efd9988 79 lock("DaemonServer"),
11fdf7f2
TL
80 pgmap_ready(false),
81 timer(g_ceph_context, lock),
82 shutting_down(false),
83 tick_event(nullptr),
84 osd_perf_metric_collector_listener(this),
85 osd_perf_metric_collector(osd_perf_metric_collector_listener)
3efd9988 86{
11fdf7f2 87 g_conf().add_observer(this);
3efd9988 88}
7c673cae
FG
89
90DaemonServer::~DaemonServer() {
91 delete msgr;
11fdf7f2 92 g_conf().remove_observer(this);
7c673cae
FG
93}
94
11fdf7f2 95int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
7c673cae
FG
96{
97 // Initialize Messenger
11fdf7f2
TL
98 std::string public_msgr_type = g_conf()->ms_public_type.empty() ?
99 g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
7c673cae
FG
100 msgr = Messenger::create(g_ceph_context, public_msgr_type,
101 entity_name_t::MGR(gid),
102 "mgr",
103 getpid(), 0);
104 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
105
11fdf7f2
TL
106 msgr->set_auth_client(monc);
107
7c673cae
FG
108 // throttle clients
109 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
110 client_byte_throttler.get(),
111 client_msg_throttler.get());
112
113 // servers
114 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
115 osd_byte_throttler.get(),
116 osd_msg_throttler.get());
117 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
118 mds_byte_throttler.get(),
119 mds_msg_throttler.get());
120 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
121 mon_byte_throttler.get(),
122 mon_msg_throttler.get());
123
11fdf7f2
TL
124 entity_addrvec_t addrs;
125 int r = pick_addresses(cct, CEPH_PICK_ADDRESS_PUBLIC, &addrs);
126 if (r < 0) {
127 return r;
128 }
129 dout(20) << __func__ << " will bind to " << addrs << dendl;
130 r = msgr->bindv(addrs);
7c673cae 131 if (r < 0) {
11fdf7f2 132 derr << "unable to bind mgr to " << addrs << dendl;
7c673cae
FG
133 return r;
134 }
135
136 msgr->set_myname(entity_name_t::MGR(gid));
11fdf7f2 137 msgr->set_addr_unknowns(client_addrs);
7c673cae
FG
138
139 msgr->start();
140 msgr->add_dispatcher_tail(this);
141
11fdf7f2
TL
142 msgr->set_auth_server(monc);
143 monc->set_handle_authentication_dispatcher(this);
144
224ce89b
WB
145 started_at = ceph_clock_now();
146
11fdf7f2
TL
147 std::lock_guard l(lock);
148 timer.init();
149
150 schedule_tick_locked(
151 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
152
7c673cae
FG
153 return 0;
154}
155
11fdf7f2 156entity_addrvec_t DaemonServer::get_myaddrs() const
7c673cae 157{
11fdf7f2 158 return msgr->get_myaddrs();
7c673cae
FG
159}
160
11fdf7f2 161KeyStore *DaemonServer::ms_get_auth1_authorizer_keystore()
7c673cae 162{
11fdf7f2
TL
163 return monc->rotating_secrets.get();
164}
7c673cae 165
11fdf7f2
TL
166int DaemonServer::ms_handle_authentication(Connection *con)
167{
168 int ret = 0;
169 MgrSession *s = new MgrSession(cct);
170 con->set_priv(s);
7c673cae 171 s->inst.addr = con->get_peer_addr();
11fdf7f2
TL
172 s->entity_name = con->peer_name;
173 dout(10) << __func__ << " new session " << s << " con " << con
174 << " entity " << con->peer_name
175 << " addr " << con->get_peer_addrs()
176 << dendl;
177
178 AuthCapsInfo &caps_info = con->get_peer_caps_info();
179 if (caps_info.allow_all) {
180 dout(10) << " session " << s << " " << s->entity_name
181 << " allow_all" << dendl;
182 s->caps.set_allow_all();
c07f9fc5 183 }
7c673cae 184
11fdf7f2
TL
185 if (caps_info.caps.length() > 0) {
186 auto p = caps_info.caps.cbegin();
187 string str;
188 try {
189 decode(str, p);
190 }
191 catch (buffer::error& e) {
192 ret = -EPERM;
193 }
194 bool success = s->caps.parse(str);
195 if (success) {
7c673cae 196 dout(10) << " session " << s << " " << s->entity_name
11fdf7f2
TL
197 << " has caps " << s->caps << " '" << str << "'" << dendl;
198 ret = 1;
199 } else {
200 dout(10) << " session " << s << " " << s->entity_name
201 << " failed to parse caps '" << str << "'" << dendl;
202 ret = -EPERM;
7c673cae 203 }
11fdf7f2 204 }
31f18b77 205
11fdf7f2
TL
206 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
207 std::lock_guard l(lock);
208 s->osd_id = atoi(s->entity_name.get_id().c_str());
209 dout(10) << "registering osd." << s->osd_id << " session "
210 << s << " con " << con << dendl;
211 osd_cons[s->osd_id].insert(con);
7c673cae
FG
212 }
213
11fdf7f2 214 return ret;
7c673cae
FG
215}
216
11fdf7f2
TL
217bool DaemonServer::ms_get_authorizer(
218 int dest_type,
219 AuthAuthorizer **authorizer)
7c673cae
FG
220{
221 dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
222
223 if (dest_type == CEPH_ENTITY_TYPE_MON) {
224 return true;
225 }
226
7c673cae
FG
227 *authorizer = monc->build_authorizer(dest_type);
228 dout(20) << "got authorizer " << *authorizer << dendl;
229 return *authorizer != NULL;
230}
231
31f18b77
FG
232bool DaemonServer::ms_handle_reset(Connection *con)
233{
234 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
11fdf7f2
TL
235 auto priv = con->get_priv();
236 auto session = static_cast<MgrSession*>(priv.get());
31f18b77
FG
237 if (!session) {
238 return false;
239 }
11fdf7f2 240 std::lock_guard l(lock);
224ce89b 241 dout(10) << "unregistering osd." << session->osd_id
31f18b77
FG
242 << " session " << session << " con " << con << dendl;
243 osd_cons[session->osd_id].erase(con);
3efd9988
FG
244
245 auto iter = daemon_connections.find(con);
246 if (iter != daemon_connections.end()) {
247 daemon_connections.erase(iter);
248 }
31f18b77
FG
249 }
250 return false;
251}
252
7c673cae
FG
253bool DaemonServer::ms_handle_refused(Connection *con)
254{
255 // do nothing for now
256 return false;
257}
258
259bool DaemonServer::ms_dispatch(Message *m)
260{
3efd9988
FG
261 // Note that we do *not* take ::lock here, in order to avoid
262 // serializing all message handling. It's up to each handler
263 // to take whatever locks it needs.
7c673cae
FG
264 switch (m->get_type()) {
265 case MSG_PGSTATS:
266 cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
224ce89b 267 maybe_ready(m->get_source().num());
7c673cae
FG
268 m->put();
269 return true;
270 case MSG_MGR_REPORT:
271 return handle_report(static_cast<MMgrReport*>(m));
272 case MSG_MGR_OPEN:
273 return handle_open(static_cast<MMgrOpen*>(m));
11fdf7f2
TL
274 case MSG_MGR_CLOSE:
275 return handle_close(static_cast<MMgrClose*>(m));
7c673cae
FG
276 case MSG_COMMAND:
277 return handle_command(static_cast<MCommand*>(m));
278 default:
279 dout(1) << "Unhandled message type " << m->get_type() << dendl;
280 return false;
281 };
282}
283
224ce89b
WB
284void DaemonServer::maybe_ready(int32_t osd_id)
285{
3efd9988
FG
286 if (pgmap_ready.load()) {
287 // Fast path: we don't need to take lock because pgmap_ready
288 // is already set
289 } else {
11fdf7f2 290 std::lock_guard l(lock);
224ce89b 291
3efd9988
FG
292 if (reported_osds.find(osd_id) == reported_osds.end()) {
293 dout(4) << "initial report from osd " << osd_id << dendl;
294 reported_osds.insert(osd_id);
295 std::set<int32_t> up_osds;
224ce89b 296
3efd9988
FG
297 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
298 osdmap.get_up_osds(up_osds);
299 });
224ce89b 300
3efd9988
FG
301 std::set<int32_t> unreported_osds;
302 std::set_difference(up_osds.begin(), up_osds.end(),
303 reported_osds.begin(), reported_osds.end(),
304 std::inserter(unreported_osds, unreported_osds.begin()));
305
306 if (unreported_osds.size() == 0) {
307 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
308 pgmap_ready = true;
309 reported_osds.clear();
310 // Avoid waiting for next tick
311 send_report();
312 } else {
313 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
314 " to report in before PGMap is ready" << dendl;
315 }
224ce89b
WB
316 }
317 }
318}
319
11fdf7f2
TL
320void DaemonServer::tick()
321{
322 dout(10) << dendl;
323 send_report();
324 adjust_pgs();
325
326 schedule_tick_locked(
327 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
328}
329
330// Currently modules do not set health checks in response to events delivered to
331// all modules (e.g. notify) so we do not risk a thundering hurd situation here.
332// if this pattern emerges in the future, this scheduler could be modified to
333// fire after all modules have had a chance to set their health checks.
334void DaemonServer::schedule_tick_locked(double delay_sec)
335{
336 ceph_assert(lock.is_locked_by_me());
337
338 if (tick_event) {
339 timer.cancel_event(tick_event);
340 tick_event = nullptr;
341 }
342
343 // on shutdown start rejecting explicit requests to send reports that may
344 // originate from python land which may still be running.
345 if (shutting_down)
346 return;
347
348 tick_event = timer.add_event_after(delay_sec,
349 new FunctionContext([this](int r) {
350 tick();
351 }));
352}
353
354void DaemonServer::schedule_tick(double delay_sec)
355{
356 std::lock_guard l(lock);
357 schedule_tick_locked(delay_sec);
358}
359
360void DaemonServer::handle_osd_perf_metric_query_updated()
361{
362 dout(10) << dendl;
363
364 // Send a fresh MMgrConfigure to all clients, so that they can follow
365 // the new policy for transmitting stats
366 finisher.queue(new FunctionContext([this](int r) {
367 std::lock_guard l(lock);
368 for (auto &c : daemon_connections) {
369 if (c->peer_is_osd()) {
370 _send_configure(c);
371 }
372 }
373 }));
374}
375
7c673cae
FG
376void DaemonServer::shutdown()
377{
224ce89b 378 dout(10) << "begin" << dendl;
7c673cae
FG
379 msgr->shutdown();
380 msgr->wait();
eafe8130 381 cluster_state.shutdown();
224ce89b 382 dout(10) << "done" << dendl;
11fdf7f2
TL
383
384 std::lock_guard l(lock);
385 shutting_down = true;
386 timer.shutdown();
7c673cae
FG
387}
388
11fdf7f2
TL
389static DaemonKey key_from_service(
390 const std::string& service_name,
391 int peer_type,
392 const std::string& daemon_name)
393{
394 if (!service_name.empty()) {
395 return DaemonKey(service_name, daemon_name);
396 } else {
397 return DaemonKey(ceph_entity_type_name(peer_type), daemon_name);
398 }
399}
7c673cae 400
11fdf7f2
TL
401static bool key_from_string(
402 const std::string& name,
403 DaemonKey *out)
404{
405 auto p = name.find('.');
406 if (p == std::string::npos) {
407 return false;
408 }
409 out->first = name.substr(0, p);
410 out->second = name.substr(p + 1);
411 return true;
412}
7c673cae
FG
413
414bool DaemonServer::handle_open(MMgrOpen *m)
415{
11fdf7f2 416 std::lock_guard l(lock);
3efd9988 417
11fdf7f2
TL
418 DaemonKey key = key_from_service(m->service_name,
419 m->get_connection()->get_peer_type(),
420 m->daemon_name);
7c673cae 421
224ce89b 422 dout(4) << "from " << m->get_connection() << " " << key << dendl;
7c673cae 423
3efd9988 424 _send_configure(m->get_connection());
7c673cae 425
c07f9fc5 426 DaemonStatePtr daemon;
7c673cae 427 if (daemon_state.exists(key)) {
c07f9fc5
FG
428 daemon = daemon_state.get(key);
429 }
11fdf7f2
TL
430 if (m->service_daemon && !daemon) {
431 dout(4) << "constructing new DaemonState for " << key << dendl;
432 daemon = std::make_shared<DaemonState>(daemon_state.types);
433 daemon->key = key;
434 daemon->service_daemon = true;
11fdf7f2
TL
435 daemon_state.insert(daemon);
436 }
c07f9fc5 437 if (daemon) {
7c673cae 438 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
11fdf7f2 439 std::lock_guard l(daemon->lock);
3efd9988 440 daemon->perf_counters.clear();
7c673cae 441
11fdf7f2
TL
442 if (m->service_daemon) {
443 daemon->set_metadata(m->daemon_metadata);
444 daemon->service_status = m->daemon_status;
445
446 utime_t now = ceph_clock_now();
447 auto d = pending_service_map.get_daemon(m->service_name,
448 m->daemon_name);
449 if (d->gid != (uint64_t)m->get_source().num()) {
450 dout(10) << "registering " << key << " in pending_service_map" << dendl;
451 d->gid = m->get_source().num();
452 d->addr = m->get_source_addr();
453 d->start_epoch = pending_service_map.epoch;
454 d->start_stamp = now;
455 d->metadata = m->daemon_metadata;
456 pending_service_map_dirty = pending_service_map.epoch;
224ce89b 457 }
224ce89b 458 }
11fdf7f2
TL
459
460 auto p = m->config_bl.cbegin();
461 if (p != m->config_bl.end()) {
462 decode(daemon->config, p);
463 decode(daemon->ignored_mon_config, p);
464 dout(20) << " got config " << daemon->config
465 << " ignored " << daemon->ignored_mon_config << dendl;
224ce89b 466 }
11fdf7f2
TL
467 daemon->config_defaults_bl = m->config_defaults_bl;
468 daemon->config_defaults.clear();
469 dout(20) << " got config_defaults_bl " << daemon->config_defaults_bl.length()
470 << " bytes" << dendl;
224ce89b
WB
471 }
472
3efd9988
FG
473 if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT &&
474 m->service_name.empty())
475 {
476 // Store in set of the daemon/service connections, i.e. those
477 // connections that require an update in the event of stats
478 // configuration changes.
479 daemon_connections.insert(m->get_connection());
480 }
481
7c673cae
FG
482 m->put();
483 return true;
484}
485
11fdf7f2
TL
486bool DaemonServer::handle_close(MMgrClose *m)
487{
488 std::lock_guard l(lock);
489
490 DaemonKey key = key_from_service(m->service_name,
491 m->get_connection()->get_peer_type(),
492 m->daemon_name);
493 dout(4) << "from " << m->get_connection() << " " << key << dendl;
494
495 if (daemon_state.exists(key)) {
496 DaemonStatePtr daemon = daemon_state.get(key);
497 daemon_state.rm(key);
498 {
499 std::lock_guard l(daemon->lock);
500 if (daemon->service_daemon) {
501 pending_service_map.rm_daemon(m->service_name, m->daemon_name);
502 pending_service_map_dirty = pending_service_map.epoch;
503 }
504 }
505 }
506
507 // send same message back as a reply
508 m->get_connection()->send_message(m);
509 return true;
510}
511
7c673cae
FG
512bool DaemonServer::handle_report(MMgrReport *m)
513{
224ce89b
WB
514 DaemonKey key;
515 if (!m->service_name.empty()) {
516 key.first = m->service_name;
517 } else {
518 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
519 }
520 key.second = m->daemon_name;
7c673cae 521
224ce89b 522 dout(4) << "from " << m->get_connection() << " " << key << dendl;
31f18b77 523
224ce89b
WB
524 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
525 m->service_name.empty()) {
526 // Clients should not be sending us stats unless they are declaring
527 // themselves to be a daemon for some service.
528 dout(4) << "rejecting report from non-daemon client " << m->daemon_name
529 << dendl;
b32b8144 530 m->get_connection()->mark_down();
31f18b77
FG
531 m->put();
532 return true;
533 }
7c673cae 534
3efd9988 535 // Look up the DaemonState
7c673cae
FG
536 DaemonStatePtr daemon;
537 if (daemon_state.exists(key)) {
224ce89b 538 dout(20) << "updating existing DaemonState for " << key << dendl;
7c673cae
FG
539 daemon = daemon_state.get(key);
540 } else {
b32b8144
FG
541 // we don't know the hostname at this stage, reject MMgrReport here.
542 dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
543 << dendl;
544
545 // issue metadata request in background
546 if (!daemon_state.is_updating(key) &&
11fdf7f2 547 (key.first == "osd" || key.first == "mds" || key.first == "mon")) {
b32b8144
FG
548
549 std::ostringstream oss;
550 auto c = new MetadataUpdate(daemon_state, key);
551 if (key.first == "osd") {
552 oss << "{\"prefix\": \"osd metadata\", \"id\": "
553 << key.second<< "}";
554
555 } else if (key.first == "mds") {
556 c->set_default("addr", stringify(m->get_source_addr()));
557 oss << "{\"prefix\": \"mds metadata\", \"who\": \""
558 << key.second << "\"}";
559
11fdf7f2
TL
560 } else if (key.first == "mon") {
561 oss << "{\"prefix\": \"mon metadata\", \"id\": \""
562 << key.second << "\"}";
b32b8144
FG
563 } else {
564 ceph_abort();
565 }
566
567 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
568 }
569
570 {
11fdf7f2 571 std::lock_guard l(lock);
b32b8144 572 // kill session
11fdf7f2
TL
573 auto priv = m->get_connection()->get_priv();
574 auto session = static_cast<MgrSession*>(priv.get());
b32b8144
FG
575 if (!session) {
576 return false;
577 }
578 m->get_connection()->mark_down();
b32b8144
FG
579
580 dout(10) << "unregistering osd." << session->osd_id
581 << " session " << session << " con " << m->get_connection() << dendl;
582
583 if (osd_cons.find(session->osd_id) != osd_cons.end()) {
584 osd_cons[session->osd_id].erase(m->get_connection());
585 }
586
587 auto iter = daemon_connections.find(m->get_connection());
588 if (iter != daemon_connections.end()) {
589 daemon_connections.erase(iter);
590 }
591 }
592
593 return false;
7c673cae 594 }
3efd9988
FG
595
596 // Update the DaemonState
11fdf7f2 597 ceph_assert(daemon != nullptr);
c07f9fc5 598 {
11fdf7f2 599 std::lock_guard l(daemon->lock);
3efd9988 600 auto &daemon_counters = daemon->perf_counters;
c07f9fc5 601 daemon_counters.update(m);
3efd9988 602
11fdf7f2
TL
603 auto p = m->config_bl.cbegin();
604 if (p != m->config_bl.end()) {
605 decode(daemon->config, p);
606 decode(daemon->ignored_mon_config, p);
607 dout(20) << " got config " << daemon->config
608 << " ignored " << daemon->ignored_mon_config << dendl;
609 }
610
3efd9988
FG
611 if (daemon->service_daemon) {
612 utime_t now = ceph_clock_now();
613 if (m->daemon_status) {
614 daemon->service_status = *m->daemon_status;
615 daemon->service_status_stamp = now;
616 }
617 daemon->last_service_beacon = now;
618 } else if (m->daemon_status) {
619 derr << "got status from non-daemon " << key << dendl;
620 }
11fdf7f2
TL
621 if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
622 // only OSD and MON send health_checks to me now
623 daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
624 dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
625 << dendl;
b32b8144 626 }
c07f9fc5 627 }
3efd9988 628
c07f9fc5
FG
629 // if there are any schema updates, notify the python modules
630 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
631 ostringstream oss;
632 oss << key.first << '.' << key.second;
633 py_modules.notify_all("perf_schema_update", oss.str());
634 }
224ce89b 635
11fdf7f2
TL
636 if (m->get_connection()->peer_is_osd()) {
637 osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
638 }
639
7c673cae
FG
640 m->put();
641 return true;
642}
643
7c673cae
FG
644
645void DaemonServer::_generate_command_map(
11fdf7f2 646 cmdmap_t& cmdmap,
7c673cae
FG
647 map<string,string> &param_str_map)
648{
11fdf7f2 649 for (auto p = cmdmap.begin();
7c673cae
FG
650 p != cmdmap.end(); ++p) {
651 if (p->first == "prefix")
652 continue;
653 if (p->first == "caps") {
654 vector<string> cv;
655 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
656 cv.size() % 2 == 0) {
657 for (unsigned i = 0; i < cv.size(); i += 2) {
658 string k = string("caps_") + cv[i];
659 param_str_map[k] = cv[i + 1];
660 }
661 continue;
662 }
663 }
664 param_str_map[p->first] = cmd_vartype_stringify(p->second);
665 }
666}
667
c07f9fc5 668const MonCommand *DaemonServer::_get_mgrcommand(
7c673cae 669 const string &cmd_prefix,
c07f9fc5 670 const std::vector<MonCommand> &cmds)
7c673cae 671{
c07f9fc5
FG
672 const MonCommand *this_cmd = nullptr;
673 for (const auto &cmd : cmds) {
674 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
675 this_cmd = &cmd;
7c673cae
FG
676 break;
677 }
678 }
679 return this_cmd;
680}
681
682bool DaemonServer::_allowed_command(
683 MgrSession *s,
684 const string &module,
685 const string &prefix,
11fdf7f2 686 const cmdmap_t& cmdmap,
7c673cae 687 const map<string,string>& param_str_map,
c07f9fc5 688 const MonCommand *this_cmd) {
7c673cae
FG
689
690 if (s->entity_name.is_mon()) {
691 // mon is all-powerful. even when it is forwarding commands on behalf of
692 // old clients; we expect the mon is validating commands before proxying!
693 return true;
694 }
695
696 bool cmd_r = this_cmd->requires_perm('r');
697 bool cmd_w = this_cmd->requires_perm('w');
698 bool cmd_x = this_cmd->requires_perm('x');
699
700 bool capable = s->caps.is_capable(
701 g_ceph_context,
702 CEPH_ENTITY_TYPE_MGR,
703 s->entity_name,
704 module, prefix, param_str_map,
11fdf7f2
TL
705 cmd_r, cmd_w, cmd_x,
706 s->get_peer_addr());
7c673cae
FG
707
708 dout(10) << " " << s->entity_name << " "
709 << (capable ? "" : "not ") << "capable" << dendl;
710 return capable;
711}
712
11fdf7f2
TL
713/**
714 * The working data for processing an MCommand. This lives in
715 * a class to enable passing it into other threads for processing
716 * outside of the thread/locks that called handle_command.
717 */
718class CommandContext {
719public:
720 MCommand *m;
721 bufferlist odata;
722 cmdmap_t cmdmap;
723
724 explicit CommandContext(MCommand *m_)
725 : m(m_) {
726 }
7c673cae 727
11fdf7f2
TL
728 ~CommandContext() {
729 m->put();
730 }
7c673cae 731
11fdf7f2
TL
732 void reply(int r, const std::stringstream &ss) {
733 reply(r, ss.str());
734 }
7c673cae 735
11fdf7f2
TL
736 void reply(int r, const std::string &rs) {
737 // Let the connection drop as soon as we've sent our response
738 ConnectionRef con = m->get_connection();
739 if (con) {
740 con->mark_disposable();
7c673cae
FG
741 }
742
11fdf7f2
TL
743 if (r == 0) {
744 dout(4) << __func__ << " success" << dendl;
745 } else {
746 derr << __func__ << " " << cpp_strerror(r) << " " << rs << dendl;
7c673cae 747 }
11fdf7f2
TL
748 if (con) {
749 MCommandReply *reply = new MCommandReply(r, rs);
750 reply->set_tid(m->get_tid());
751 reply->set_data(odata);
752 con->send_message(reply);
7c673cae 753 }
11fdf7f2
TL
754 }
755};
7c673cae 756
11fdf7f2
TL
757/**
758 * A context for receiving a bufferlist/error string from a background
759 * function and then calling back to a CommandContext when it's done
760 */
761class ReplyOnFinish : public Context {
762 std::shared_ptr<CommandContext> cmdctx;
7c673cae 763
11fdf7f2
TL
764public:
765 bufferlist from_mon;
766 string outs;
7c673cae 767
11fdf7f2
TL
768 explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_)
769 : cmdctx(cmdctx_)
7c673cae 770 {}
11fdf7f2
TL
771 void finish(int r) override {
772 cmdctx->odata.claim_append(from_mon);
773 cmdctx->reply(r, outs);
774 }
775};
7c673cae 776
11fdf7f2
TL
777bool DaemonServer::handle_command(MCommand *m)
778{
779 std::lock_guard l(lock);
7c673cae 780 std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
11fdf7f2
TL
781 try {
782 return _handle_command(m, cmdctx);
783 } catch (const bad_cmd_get& e) {
784 cmdctx->reply(-EINVAL, e.what());
785 return true;
786 }
787}
7c673cae 788
11fdf7f2
TL
789bool DaemonServer::_handle_command(
790 MCommand *m,
791 std::shared_ptr<CommandContext>& cmdctx)
792{
793 auto priv = m->get_connection()->get_priv();
794 auto session = static_cast<MgrSession*>(priv.get());
7c673cae
FG
795 if (!session) {
796 return true;
797 }
7c673cae
FG
798 if (session->inst.name == entity_name_t())
799 session->inst.name = m->get_source();
800
801 std::string format;
802 boost::scoped_ptr<Formatter> f;
803 map<string,string> param_str_map;
11fdf7f2
TL
804 std::stringstream ss;
805 int r = 0;
7c673cae
FG
806
807 if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
808 cmdctx->reply(-EINVAL, ss);
809 return true;
810 }
811
812 {
813 cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
814 f.reset(Formatter::create(format));
815 }
816
11fdf7f2 817 string prefix;
7c673cae
FG
818 cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
819
820 dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
821 dout(4) << "prefix=" << prefix << dendl;
822
823 if (prefix == "get_command_descriptions") {
7c673cae 824 dout(10) << "reading commands from python modules" << dendl;
c07f9fc5 825 const auto py_commands = py_modules.get_commands();
7c673cae 826
c07f9fc5 827 int cmdnum = 0;
7c673cae
FG
828 JSONFormatter f;
829 f.open_object_section("command_descriptions");
c07f9fc5 830
11fdf7f2 831 auto dump_cmd = [&cmdnum, &f, m](const MonCommand &mc){
7c673cae
FG
832 ostringstream secname;
833 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
11fdf7f2
TL
834 dump_cmddesc_to_json(&f, m->get_connection()->get_features(),
835 secname.str(), mc.cmdstring, mc.helpstring,
836 mc.module, mc.req_perms, 0);
7c673cae 837 cmdnum++;
c07f9fc5
FG
838 };
839
840 for (const auto &pyc : py_commands) {
841 dump_cmd(pyc);
7c673cae 842 }
7c673cae 843
c07f9fc5
FG
844 for (const auto &mgr_cmd : mgr_commands) {
845 dump_cmd(mgr_cmd);
7c673cae 846 }
c07f9fc5 847
7c673cae
FG
848 f.close_section(); // command_descriptions
849 f.flush(cmdctx->odata);
850 cmdctx->reply(0, ss);
851 return true;
852 }
853
854 // lookup command
c07f9fc5 855 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
7c673cae 856 _generate_command_map(cmdctx->cmdmap, param_str_map);
11fdf7f2
TL
857
858 bool is_allowed;
7c673cae 859 if (!mgr_cmd) {
11fdf7f2
TL
860 MonCommand py_command = {"", "", "py", "rw"};
861 is_allowed = _allowed_command(session, py_command.module,
862 prefix, cmdctx->cmdmap, param_str_map, &py_command);
7c673cae
FG
863 } else {
864 // validate user's permissions for requested command
11fdf7f2
TL
865 is_allowed = _allowed_command(session, mgr_cmd->module,
866 prefix, cmdctx->cmdmap, param_str_map, mgr_cmd);
867 }
868 if (!is_allowed) {
7c673cae
FG
869 dout(1) << " access denied" << dendl;
870 audit_clog->info() << "from='" << session->inst << "' "
871 << "entity='" << session->entity_name << "' "
872 << "cmd=" << m->cmd << ": access denied";
11fdf7f2
TL
873 ss << "access denied: does your client key have mgr caps? "
874 "See http://docs.ceph.com/docs/master/mgr/administrator/"
875 "#client-authentication";
7c673cae
FG
876 cmdctx->reply(-EACCES, ss);
877 return true;
7c673cae
FG
878 }
879
880 audit_clog->debug()
881 << "from='" << session->inst << "' "
882 << "entity='" << session->entity_name << "' "
883 << "cmd=" << m->cmd << ": dispatch";
884
224ce89b
WB
885 // ----------------
886 // service map commands
887 if (prefix == "service dump") {
888 if (!f)
889 f.reset(Formatter::create("json-pretty"));
890 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
891 f->dump_object("service_map", service_map);
892 });
893 f->flush(cmdctx->odata);
894 cmdctx->reply(0, ss);
895 return true;
896 }
897 if (prefix == "service status") {
898 if (!f)
899 f.reset(Formatter::create("json-pretty"));
900 // only include state from services that are in the persisted service map
901 f->open_object_section("service_status");
f64942e4 902 for (auto& p : pending_service_map.services) {
224ce89b
WB
903 f->open_object_section(p.first.c_str());
904 for (auto& q : p.second.daemons) {
905 f->open_object_section(q.first.c_str());
906 DaemonKey key(p.first, q.first);
11fdf7f2 907 ceph_assert(daemon_state.exists(key));
224ce89b 908 auto daemon = daemon_state.get(key);
11fdf7f2 909 std::lock_guard l(daemon->lock);
224ce89b
WB
910 f->dump_stream("status_stamp") << daemon->service_status_stamp;
911 f->dump_stream("last_beacon") << daemon->last_service_beacon;
912 f->open_object_section("status");
913 for (auto& r : daemon->service_status) {
914 f->dump_string(r.first.c_str(), r.second);
915 }
916 f->close_section();
917 f->close_section();
918 }
919 f->close_section();
920 }
921 f->close_section();
922 f->flush(cmdctx->odata);
923 cmdctx->reply(0, ss);
924 return true;
925 }
926
3efd9988
FG
927 if (prefix == "config set") {
928 std::string key;
929 std::string val;
930 cmd_getval(cct, cmdctx->cmdmap, "key", key);
931 cmd_getval(cct, cmdctx->cmdmap, "value", val);
11fdf7f2 932 r = cct->_conf.set_val(key, val, &ss);
3efd9988 933 if (r == 0) {
11fdf7f2 934 cct->_conf.apply_changes(nullptr);
3efd9988
FG
935 }
936 cmdctx->reply(0, ss);
937 return true;
938 }
939
7c673cae
FG
940 // -----------
941 // PG commands
942
943 if (prefix == "pg scrub" ||
944 prefix == "pg repair" ||
945 prefix == "pg deep-scrub") {
946 string scrubop = prefix.substr(3, string::npos);
947 pg_t pgid;
11fdf7f2 948 spg_t spgid;
7c673cae
FG
949 string pgidstr;
950 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
951 if (!pgid.parse(pgidstr.c_str())) {
952 ss << "invalid pgid '" << pgidstr << "'";
953 cmdctx->reply(-EINVAL, ss);
954 return true;
955 }
956 bool pg_exists = false;
957 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
958 pg_exists = osdmap.pg_exists(pgid);
959 });
960 if (!pg_exists) {
11fdf7f2 961 ss << "pg " << pgid << " does not exist";
7c673cae
FG
962 cmdctx->reply(-ENOENT, ss);
963 return true;
964 }
965 int acting_primary = -1;
11fdf7f2 966 epoch_t epoch;
7c673cae 967 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
11fdf7f2
TL
968 epoch = osdmap.get_epoch();
969 osdmap.get_primary_shard(pgid, &acting_primary, &spgid);
7c673cae
FG
970 });
971 if (acting_primary == -1) {
972 ss << "pg " << pgid << " has no primary osd";
973 cmdctx->reply(-EAGAIN, ss);
974 return true;
975 }
31f18b77
FG
976 auto p = osd_cons.find(acting_primary);
977 if (p == osd_cons.end()) {
978 ss << "pg " << pgid << " primary osd." << acting_primary
979 << " is not currently connected";
980 cmdctx->reply(-EAGAIN, ss);
f64942e4 981 return true;
31f18b77 982 }
31f18b77 983 for (auto& con : p->second) {
11fdf7f2
TL
984 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
985 vector<spg_t> pgs = { spgid };
986 con->send_message(new MOSDScrub2(monc->get_fsid(),
987 epoch,
988 pgs,
989 scrubop == "repair",
990 scrubop == "deep-scrub"));
991 } else {
992 vector<pg_t> pgs = { pgid };
993 con->send_message(new MOSDScrub(monc->get_fsid(),
994 pgs,
995 scrubop == "repair",
996 scrubop == "deep-scrub"));
997 }
31f18b77 998 }
11fdf7f2 999 ss << "instructing pg " << spgid << " on osd." << acting_primary
31f18b77
FG
1000 << " to " << scrubop;
1001 cmdctx->reply(0, ss);
1002 return true;
1003 } else if (prefix == "osd scrub" ||
1004 prefix == "osd deep-scrub" ||
1005 prefix == "osd repair") {
1006 string whostr;
1007 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
1008 vector<string> pvec;
1009 get_str_vec(prefix, pvec);
1010
1011 set<int> osds;
224ce89b 1012 if (whostr == "*" || whostr == "all" || whostr == "any") {
31f18b77
FG
1013 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1014 for (int i = 0; i < osdmap.get_max_osd(); i++)
1015 if (osdmap.is_up(i)) {
1016 osds.insert(i);
1017 }
1018 });
1019 } else {
1020 long osd = parse_osd_id(whostr.c_str(), &ss);
1021 if (osd < 0) {
1022 ss << "invalid osd '" << whostr << "'";
1023 cmdctx->reply(-EINVAL, ss);
1024 return true;
1025 }
1026 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1027 if (osdmap.is_up(osd)) {
1028 osds.insert(osd);
1029 }
1030 });
1031 if (osds.empty()) {
1032 ss << "osd." << osd << " is not up";
1033 cmdctx->reply(-EAGAIN, ss);
1034 return true;
1035 }
1036 }
1037 set<int> sent_osds, failed_osds;
1038 for (auto osd : osds) {
11fdf7f2
TL
1039 vector<spg_t> spgs;
1040 epoch_t epoch;
1041 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1042 epoch = osdmap.get_epoch();
1043 auto p = pgmap.pg_by_osd.find(osd);
1044 if (p != pgmap.pg_by_osd.end()) {
1045 for (auto pgid : p->second) {
1046 int primary;
1047 spg_t spg;
1048 osdmap.get_primary_shard(pgid, &primary, &spg);
1049 if (primary == osd) {
1050 spgs.push_back(spg);
1051 }
1052 }
1053 }
1054 });
31f18b77
FG
1055 auto p = osd_cons.find(osd);
1056 if (p == osd_cons.end()) {
1057 failed_osds.insert(osd);
1058 } else {
1059 sent_osds.insert(osd);
1060 for (auto& con : p->second) {
11fdf7f2
TL
1061 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
1062 con->send_message(new MOSDScrub2(monc->get_fsid(),
1063 epoch,
1064 spgs,
1065 pvec.back() == "repair",
1066 pvec.back() == "deep-scrub"));
1067 } else {
1068 con->send_message(new MOSDScrub(monc->get_fsid(),
1069 pvec.back() == "repair",
1070 pvec.back() == "deep-scrub"));
1071 }
31f18b77
FG
1072 }
1073 }
1074 }
1075 if (failed_osds.size() == osds.size()) {
1076 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
1077 << " (not connected)";
1078 r = -EAGAIN;
1079 } else {
1080 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
1081 if (!failed_osds.empty()) {
1082 ss << "; osd(s) " << failed_osds << " were not connected";
1083 }
1084 r = 0;
1085 }
7c673cae
FG
1086 cmdctx->reply(0, ss);
1087 return true;
11fdf7f2
TL
1088 } else if (prefix == "osd pool scrub" ||
1089 prefix == "osd pool deep-scrub" ||
1090 prefix == "osd pool repair") {
1091 vector<string> pool_names;
1092 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", pool_names);
1093 if (pool_names.empty()) {
1094 ss << "must specify one or more pool names";
1095 cmdctx->reply(-EINVAL, ss);
1096 return true;
1097 }
1098 epoch_t epoch;
1099 map<int32_t, vector<pg_t>> pgs_by_primary; // legacy
1100 map<int32_t, vector<spg_t>> spgs_by_primary;
1101 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1102 epoch = osdmap.get_epoch();
1103 for (auto& pool_name : pool_names) {
1104 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1105 if (pool_id < 0) {
1106 ss << "unrecognized pool '" << pool_name << "'";
1107 r = -ENOENT;
1108 return;
1109 }
1110 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1111 for (int i = 0; i < pool_pg_num; i++) {
1112 pg_t pg(i, pool_id);
1113 int primary;
1114 spg_t spg;
1115 auto got = osdmap.get_primary_shard(pg, &primary, &spg);
1116 if (!got)
1117 continue;
1118 pgs_by_primary[primary].push_back(pg);
1119 spgs_by_primary[primary].push_back(spg);
1120 }
1121 }
1122 });
1123 if (r < 0) {
1124 cmdctx->reply(r, ss);
1125 return true;
1126 }
1127 for (auto& it : spgs_by_primary) {
1128 auto primary = it.first;
1129 auto p = osd_cons.find(primary);
1130 if (p == osd_cons.end()) {
1131 ss << "osd." << primary << " is not currently connected";
1132 cmdctx->reply(-EAGAIN, ss);
1133 return true;
1134 }
1135 for (auto& con : p->second) {
1136 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
1137 con->send_message(new MOSDScrub2(monc->get_fsid(),
1138 epoch,
1139 it.second,
1140 prefix == "osd pool repair",
1141 prefix == "osd pool deep-scrub"));
1142 } else {
1143 // legacy
1144 auto q = pgs_by_primary.find(primary);
1145 ceph_assert(q != pgs_by_primary.end());
1146 con->send_message(new MOSDScrub(monc->get_fsid(),
1147 q->second,
1148 prefix == "osd pool repair",
1149 prefix == "osd pool deep-scrub"));
1150 }
1151 }
1152 }
1153 cmdctx->reply(0, "");
1154 return true;
7c673cae
FG
1155 } else if (prefix == "osd reweight-by-pg" ||
1156 prefix == "osd reweight-by-utilization" ||
1157 prefix == "osd test-reweight-by-pg" ||
1158 prefix == "osd test-reweight-by-utilization") {
1159 bool by_pg =
1160 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
1161 bool dry_run =
1162 prefix == "osd test-reweight-by-pg" ||
1163 prefix == "osd test-reweight-by-utilization";
1164 int64_t oload;
1165 cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
1166 set<int64_t> pools;
1167 vector<string> poolnames;
1168 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
1169 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1170 for (const auto& poolname : poolnames) {
1171 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
1172 if (pool < 0) {
1173 ss << "pool '" << poolname << "' does not exist";
1174 r = -ENOENT;
1175 }
1176 pools.insert(pool);
1177 }
1178 });
1179 if (r) {
1180 cmdctx->reply(r, ss);
1181 return true;
1182 }
11fdf7f2
TL
1183
1184 double max_change = g_conf().get_val<double>("mon_reweight_max_change");
7c673cae
FG
1185 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
1186 if (max_change <= 0.0) {
1187 ss << "max_change " << max_change << " must be positive";
1188 cmdctx->reply(-EINVAL, ss);
1189 return true;
1190 }
11fdf7f2 1191 int64_t max_osds = g_conf().get_val<int64_t>("mon_reweight_max_osds");
7c673cae
FG
1192 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
1193 if (max_osds <= 0) {
1194 ss << "max_osds " << max_osds << " must be positive";
1195 cmdctx->reply(-EINVAL, ss);
1196 return true;
1197 }
11fdf7f2 1198 bool no_increasing = false;
7c673cae
FG
1199 cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
1200 string out_str;
1201 mempool::osdmap::map<int32_t, uint32_t> new_weights;
11fdf7f2
TL
1202 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap &osdmap, const PGMap& pgmap) {
1203 return reweight::by_utilization(osdmap, pgmap,
1204 oload,
1205 max_change,
1206 max_osds,
1207 by_pg,
1208 pools.empty() ? NULL : &pools,
1209 no_increasing,
1210 &new_weights,
1211 &ss, &out_str, f.get());
7c673cae
FG
1212 });
1213 if (r >= 0) {
1214 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
1215 }
1216 if (f) {
1217 f->flush(cmdctx->odata);
1218 } else {
1219 cmdctx->odata.append(out_str);
1220 }
1221 if (r < 0) {
1222 ss << "FAILED reweight-by-pg";
1223 cmdctx->reply(r, ss);
1224 return true;
1225 } else if (r == 0 || dry_run) {
1226 ss << "no change";
1227 cmdctx->reply(r, ss);
1228 return true;
1229 } else {
1230 json_spirit::Object json_object;
1231 for (const auto& osd_weight : new_weights) {
1232 json_spirit::Config::add(json_object,
1233 std::to_string(osd_weight.first),
1234 std::to_string(osd_weight.second));
1235 }
1236 string s = json_spirit::write(json_object);
1237 std::replace(begin(s), end(s), '\"', '\'');
1238 const string cmd =
1239 "{"
1240 "\"prefix\": \"osd reweightn\", "
1241 "\"weights\": \"" + s + "\""
1242 "}";
1243 auto on_finish = new ReplyOnFinish(cmdctx);
1244 monc->start_mon_command({cmd}, {},
1245 &on_finish->from_mon, &on_finish->outs, on_finish);
1246 return true;
1247 }
31f18b77
FG
1248 } else if (prefix == "osd df") {
1249 string method;
1250 cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
11fdf7f2
TL
1251 string filter_by;
1252 string filter;
1253 cmd_getval(g_ceph_context, cmdctx->cmdmap, "filter_by", filter_by);
1254 cmd_getval(g_ceph_context, cmdctx->cmdmap, "filter", filter);
1255 if (filter_by.empty() != filter.empty()) {
1256 cmdctx->reply(-EINVAL, "you must specify both 'filter_by' and 'filter'");
1257 return true;
1258 }
1259 stringstream rs;
1260 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1261 string class_name;
1262 string item_name;
1263 // sanity check filter(s)
1264 if (filter_by == "class") {
1265 if (!osdmap.crush->class_exists(filter)) {
1266 rs << "specified class '" << filter << "' does not exist";
1267 return -EINVAL;
1268 }
1269 class_name = filter;
1270 }
1271 if (filter_by == "name") {
1272 if (!osdmap.crush->name_exists(filter)) {
1273 rs << "specified name '" << filter << "' does not exist";
1274 return -EINVAL;
1275 }
1276 item_name = filter;
1277 }
1278 print_osd_utilization(osdmap, pgmap, ss,
1279 f.get(), method == "tree",
1280 class_name, item_name);
1281
1282 cmdctx->odata.append(ss);
1283 return 0;
31f18b77 1284 });
11fdf7f2 1285 cmdctx->reply(r, rs);
31f18b77 1286 return true;
11fdf7f2
TL
1287 } else if (prefix == "osd pool stats") {
1288 string pool_name;
1289 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pool_name", pool_name);
1290 int64_t poolid = -ENOENT;
1291 bool one_pool = false;
1292 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1293 if (!pool_name.empty()) {
1294 poolid = osdmap.lookup_pg_pool_name(pool_name);
1295 if (poolid < 0) {
1296 ceph_assert(poolid == -ENOENT);
1297 ss << "unrecognized pool '" << pool_name << "'";
1298 return -ENOENT;
1299 }
1300 one_pool = true;
1301 }
1302 stringstream rs;
1303 if (f)
1304 f->open_array_section("pool_stats");
1305 else {
1306 if (osdmap.get_pools().empty()) {
1307 ss << "there are no pools!";
1308 goto stats_out;
1309 }
1310 }
1311 for (auto &p : osdmap.get_pools()) {
1312 if (!one_pool) {
1313 poolid = p.first;
1314 }
1315 pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, f.get(), &rs);
1316 if (one_pool) {
1317 break;
1318 }
1319 }
1320 stats_out:
1321 if (f) {
1322 f->close_section();
1323 f->flush(cmdctx->odata);
1324 } else {
1325 cmdctx->odata.append(rs.str());
1326 }
1327 return 0;
35e4c445 1328 });
11fdf7f2
TL
1329 if (r != -EOPNOTSUPP) {
1330 cmdctx->reply(r, ss);
1331 return true;
1332 }
1333 } else if (prefix == "osd safe-to-destroy" ||
1334 prefix == "osd destroy" ||
1335 prefix == "osd purge") {
1336 set<int> osds;
1337 int r = 0;
1338 if (prefix == "osd safe-to-destroy") {
1339 vector<string> ids;
1340 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1341 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1342 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1343 });
1344 if (!r && osds.empty()) {
1345 ss << "must specify one or more OSDs";
1346 r = -EINVAL;
1347 }
1348 } else {
1349 int64_t id;
1350 if (!cmd_getval(g_ceph_context, cmdctx->cmdmap, "id", id)) {
1351 r = -EINVAL;
1352 ss << "must specify OSD id";
1353 } else {
1354 osds.insert(id);
1355 }
35e4c445
FG
1356 }
1357 if (r < 0) {
1358 cmdctx->reply(r, ss);
1359 return true;
1360 }
11fdf7f2 1361 set<int> active_osds, missing_stats, stored_pgs, safe_to_destroy;
35e4c445 1362 int affected_pgs = 0;
11fdf7f2 1363 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
35e4c445
FG
1364 if (pg_map.num_pg_unknown > 0) {
1365 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
1366 << " any conclusions";
1367 r = -EAGAIN;
1368 return;
1369 }
1370 int num_active_clean = 0;
1371 for (auto& p : pg_map.num_pg_by_state) {
1372 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
1373 if ((p.first & want) == want) {
1374 num_active_clean += p.second;
1375 }
1376 }
11fdf7f2
TL
1377 for (auto osd : osds) {
1378 if (!osdmap.exists(osd)) {
1379 safe_to_destroy.insert(osd);
1380 continue; // clearly safe to destroy
1381 }
1382 auto q = pg_map.num_pg_by_osd.find(osd);
1383 if (q != pg_map.num_pg_by_osd.end()) {
81eedcae 1384 if (q->second.acting > 0 || q->second.up_not_acting > 0) {
11fdf7f2 1385 active_osds.insert(osd);
81eedcae
TL
1386 // XXX: For overlapping PGs, this counts them again
1387 affected_pgs += q->second.acting + q->second.up_not_acting;
11fdf7f2 1388 continue;
35e4c445 1389 }
11fdf7f2
TL
1390 }
1391 if (num_active_clean < pg_map.num_pg) {
1392 // all pgs aren't active+clean; we need to be careful.
1393 auto p = pg_map.osd_stat.find(osd);
81eedcae 1394 if (p == pg_map.osd_stat.end() || !osdmap.is_up(osd)) {
11fdf7f2
TL
1395 missing_stats.insert(osd);
1396 continue;
1397 } else if (p->second.num_pgs > 0) {
1398 stored_pgs.insert(osd);
1399 continue;
1400 }
1401 }
1402 safe_to_destroy.insert(osd);
1403 }
35e4c445 1404 });
11fdf7f2
TL
1405 if (r && prefix == "osd safe-to-destroy") {
1406 cmdctx->reply(r, ss); // regardless of formatter
1407 return true;
1408 }
1409 if (!r && (!active_osds.empty() ||
1410 !missing_stats.empty() || !stored_pgs.empty())) {
1411 if (!safe_to_destroy.empty()) {
1412 ss << "OSD(s) " << safe_to_destroy
1413 << " are safe to destroy without reducing data durability. ";
1414 }
1415 if (!active_osds.empty()) {
1416 ss << "OSD(s) " << active_osds << " have " << affected_pgs
1417 << " pgs currently mapped to them. ";
1418 }
1419 if (!missing_stats.empty()) {
1420 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1421 << " PGs are active+clean; we cannot draw any conclusions. ";
1422 }
1423 if (!stored_pgs.empty()) {
1424 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1425 << " data, and not all PGs are active+clean; we cannot be sure they"
1426 << " aren't still needed.";
1427 }
1428 if (!active_osds.empty() || !stored_pgs.empty()) {
1429 r = -EBUSY;
1430 } else {
1431 r = -EAGAIN;
1432 }
1433 }
1434
1435 if (prefix == "osd safe-to-destroy") {
1436 if (!r) {
1437 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1438 << " durability.";
11fdf7f2
TL
1439 }
1440 if (f) {
1441 f->open_object_section("osd_status");
1442 f->open_array_section("safe_to_destroy");
1443 for (auto i : safe_to_destroy)
1444 f->dump_int("osd", i);
1445 f->close_section();
1446 f->open_array_section("active");
1447 for (auto i : active_osds)
1448 f->dump_int("osd", i);
1449 f->close_section();
1450 f->open_array_section("missing_stats");
1451 for (auto i : missing_stats)
1452 f->dump_int("osd", i);
1453 f->close_section();
1454 f->open_array_section("stored_pgs");
1455 for (auto i : stored_pgs)
1456 f->dump_int("osd", i);
1457 f->close_section();
1458 f->close_section(); // osd_status
1459 f->flush(cmdctx->odata);
1460 r = 0;
1461 std::stringstream().swap(ss);
1462 }
1463 cmdctx->reply(r, ss);
1464 return true;
1465 }
1466
1467 if (r) {
1468 bool force = false;
1469 cmd_getval(cct, cmdctx->cmdmap, "force", force);
1470 if (!force) {
1471 // Backward compat
1472 cmd_getval(cct, cmdctx->cmdmap, "yes_i_really_mean_it", force);
1473 }
1474 if (!force) {
1475 ss << "\nYou can proceed by passing --force, but be warned that"
1476 " this will likely mean real, permanent data loss.";
1477 } else {
1478 r = 0;
1479 }
35e4c445
FG
1480 }
1481 if (r) {
1482 cmdctx->reply(r, ss);
1483 return true;
1484 }
11fdf7f2
TL
1485 const string cmd =
1486 "{"
1487 "\"prefix\": \"" + prefix + "-actual\", "
1488 "\"id\": " + stringify(osds) + ", "
1489 "\"yes_i_really_mean_it\": true"
1490 "}";
1491 auto on_finish = new ReplyOnFinish(cmdctx);
1492 monc->start_mon_command({cmd}, {}, nullptr, &on_finish->outs, on_finish);
35e4c445
FG
1493 return true;
1494 } else if (prefix == "osd ok-to-stop") {
1495 vector<string> ids;
1496 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1497 set<int> osds;
1498 int r;
1499 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1500 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1501 });
1502 if (!r && osds.empty()) {
1503 ss << "must specify one or more OSDs";
1504 r = -EINVAL;
1505 }
1506 if (r < 0) {
1507 cmdctx->reply(r, ss);
1508 return true;
1509 }
81eedcae 1510 int touched_pgs = 0;
35e4c445 1511 int dangerous_pgs = 0;
11fdf7f2
TL
1512 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1513 if (pg_map.num_pg_unknown > 0) {
1514 ss << pg_map.num_pg_unknown << " pgs have unknown state; "
1515 << "cannot draw any conclusions";
1516 r = -EAGAIN;
1517 return;
1518 }
81eedcae
TL
1519 for (const auto& q : pg_map.pg_stat) {
1520 set<int32_t> pg_acting; // net acting sets (with no missing if degraded)
1521 bool found = false;
1522 if (q.second.state & PG_STATE_DEGRADED) {
1523 for (auto& anm : q.second.avail_no_missing) {
1524 if (osds.count(anm.osd)) {
1525 found = true;
1526 continue;
1527 }
1528 pg_acting.insert(anm.osd);
1529 }
1530 } else {
1531 for (auto& a : q.second.acting) {
1532 if (osds.count(a)) {
1533 found = true;
1534 continue;
1535 }
1536 pg_acting.insert(a);
35e4c445 1537 }
11fdf7f2 1538 }
81eedcae
TL
1539 if (!found) {
1540 continue;
11fdf7f2 1541 }
81eedcae
TL
1542 touched_pgs++;
1543 if (!(q.second.state & PG_STATE_ACTIVE) ||
1544 (q.second.state & PG_STATE_DEGRADED)) {
11fdf7f2
TL
1545 ++dangerous_pgs;
1546 continue;
1547 }
81eedcae 1548 const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
11fdf7f2
TL
1549 if (!pi) {
1550 ++dangerous_pgs; // pool is creating or deleting
1551 } else {
81eedcae 1552 if (pg_acting.size() < pi->min_size) {
11fdf7f2 1553 ++dangerous_pgs;
35e4c445 1554 }
11fdf7f2
TL
1555 }
1556 }
35e4c445
FG
1557 });
1558 if (r) {
1559 cmdctx->reply(r, ss);
1560 return true;
1561 }
1562 if (dangerous_pgs) {
81eedcae
TL
1563 ss << dangerous_pgs << " PGs are already too degraded, would become"
1564 << " too degraded or might become unavailable";
35e4c445
FG
1565 cmdctx->reply(-EBUSY, ss);
1566 return true;
1567 }
1568 ss << "OSD(s) " << osds << " are ok to stop without reducing"
81eedcae
TL
1569 << " availability or risking data, provided there are no other concurrent failures"
1570 << " or interventions." << std::endl;
1571 ss << touched_pgs << " PGs are likely to be"
35e4c445
FG
1572 << " degraded (but remain available) as a result.";
1573 cmdctx->reply(0, ss);
1574 return true;
c07f9fc5 1575 } else if (prefix == "pg force-recovery" ||
11fdf7f2
TL
1576 prefix == "pg force-backfill" ||
1577 prefix == "pg cancel-force-recovery" ||
1578 prefix == "pg cancel-force-backfill" ||
1579 prefix == "osd pool force-recovery" ||
1580 prefix == "osd pool force-backfill" ||
1581 prefix == "osd pool cancel-force-recovery" ||
1582 prefix == "osd pool cancel-force-backfill") {
1583 vector<string> vs;
1584 get_str_vec(prefix, vs);
1585 auto& granularity = vs.front();
1586 auto& forceop = vs.back();
1587 vector<pg_t> pgs;
c07f9fc5
FG
1588
1589 // figure out actual op just once
1590 int actual_op = 0;
1591 if (forceop == "force-recovery") {
1592 actual_op = OFR_RECOVERY;
1593 } else if (forceop == "force-backfill") {
1594 actual_op = OFR_BACKFILL;
1595 } else if (forceop == "cancel-force-backfill") {
1596 actual_op = OFR_BACKFILL | OFR_CANCEL;
1597 } else if (forceop == "cancel-force-recovery") {
1598 actual_op = OFR_RECOVERY | OFR_CANCEL;
1599 }
1600
11fdf7f2
TL
1601 set<pg_t> candidates; // deduped
1602 if (granularity == "pg") {
1603 // covnert pg names to pgs, discard any invalid ones while at it
1604 vector<string> pgids;
1605 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgids);
1606 for (auto& i : pgids) {
1607 pg_t pgid;
1608 if (!pgid.parse(i.c_str())) {
1609 ss << "invlaid pgid '" << i << "'; ";
1610 r = -EINVAL;
1611 continue;
1612 }
1613 candidates.insert(pgid);
c07f9fc5 1614 }
11fdf7f2
TL
1615 } else {
1616 // per pool
1617 vector<string> pool_names;
1618 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", pool_names);
1619 if (pool_names.empty()) {
1620 ss << "must specify one or more pool names";
1621 cmdctx->reply(-EINVAL, ss);
1622 return true;
1623 }
1624 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1625 for (auto& pool_name : pool_names) {
1626 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1627 if (pool_id < 0) {
1628 ss << "unrecognized pool '" << pool_name << "'";
1629 r = -ENOENT;
1630 return;
1631 }
1632 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1633 for (int i = 0; i < pool_pg_num; i++)
1634 candidates.insert({(unsigned int)i, (uint64_t)pool_id});
1635 }
c07f9fc5 1636 });
11fdf7f2
TL
1637 if (r < 0) {
1638 cmdctx->reply(r, ss);
1639 return true;
1640 }
c07f9fc5
FG
1641 }
1642
11fdf7f2
TL
1643 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1644 for (auto& i : candidates) {
1645 auto it = pg_map.pg_stat.find(i);
1646 if (it == pg_map.pg_stat.end()) {
1647 ss << "pg " << i << " does not exist; ";
1648 r = -ENOENT;
1649 continue;
1650 }
1651 auto state = it->second.state;
1652 // discard pgs for which user requests are pointless
1653 switch (actual_op) {
1654 case OFR_RECOVERY:
1655 if ((state & (PG_STATE_DEGRADED |
1656 PG_STATE_RECOVERY_WAIT |
1657 PG_STATE_RECOVERING)) == 0) {
1658 // don't return error, user script may be racing with cluster.
1659 // not fatal.
1660 ss << "pg " << i << " doesn't require recovery; ";
1661 continue;
1662 } else if (state & PG_STATE_FORCED_RECOVERY) {
1663 ss << "pg " << i << " recovery already forced; ";
1664 // return error, as it may be a bug in user script
1665 r = -EINVAL;
1666 continue;
1667 }
1668 break;
1669 case OFR_BACKFILL:
1670 if ((state & (PG_STATE_DEGRADED |
1671 PG_STATE_BACKFILL_WAIT |
1672 PG_STATE_BACKFILLING)) == 0) {
1673 ss << "pg " << i << " doesn't require backfilling; ";
1674 continue;
1675 } else if (state & PG_STATE_FORCED_BACKFILL) {
1676 ss << "pg " << i << " backfill already forced; ";
1677 r = -EINVAL;
1678 continue;
1679 }
1680 break;
1681 case OFR_BACKFILL | OFR_CANCEL:
1682 if ((state & PG_STATE_FORCED_BACKFILL) == 0) {
1683 ss << "pg " << i << " backfill not forced; ";
1684 continue;
1685 }
1686 break;
1687 case OFR_RECOVERY | OFR_CANCEL:
1688 if ((state & PG_STATE_FORCED_RECOVERY) == 0) {
1689 ss << "pg " << i << " recovery not forced; ";
1690 continue;
1691 }
1692 break;
1693 default:
1694 ceph_abort_msg("actual_op value is not supported");
1695 }
1696 pgs.push_back(i);
1697 } // for
1698 });
1699
c07f9fc5
FG
1700 // respond with error only when no pgs are correct
1701 // yes, in case of mixed errors, only the last one will be emitted,
1702 // but the message presented will be fine
11fdf7f2 1703 if (pgs.size() != 0) {
c07f9fc5
FG
1704 // clear error to not confuse users/scripts
1705 r = 0;
1706 }
1707
11fdf7f2
TL
1708 // optimize the command -> messages conversion, use only one
1709 // message per distinct OSD
c07f9fc5 1710 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
11fdf7f2
TL
1711 // group pgs to process by osd
1712 map<int, vector<spg_t>> osdpgs;
1713 for (auto& pgid : pgs) {
1714 int primary;
1715 spg_t spg;
1716 if (osdmap.get_primary_shard(pgid, &primary, &spg)) {
1717 osdpgs[primary].push_back(spg);
1718 }
1719 }
1720 for (auto& i : osdpgs) {
1721 if (osdmap.is_up(i.first)) {
1722 auto p = osd_cons.find(i.first);
1723 if (p == osd_cons.end()) {
1724 ss << "osd." << i.first << " is not currently connected";
1725 r = -EAGAIN;
1726 continue;
1727 }
1728 for (auto& con : p->second) {
1729 con->send_message(
1730 new MOSDForceRecovery(monc->get_fsid(), i.second, actual_op));
1731 }
1732 ss << "instructing pg(s) " << i.second << " on osd." << i.first
1733 << " to " << forceop << "; ";
1734 }
1735 }
1736 });
1737 ss << std::endl;
1738 cmdctx->reply(r, ss);
1739 return true;
1740 } else if (prefix == "config show" ||
1741 prefix == "config show-with-defaults") {
1742 string who;
1743 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", who);
1744 int r = 0;
1745 auto dot = who.find('.');
1746 DaemonKey key;
1747 key.first = who.substr(0, dot);
1748 key.second = who.substr(dot + 1);
1749 DaemonStatePtr daemon = daemon_state.get(key);
1750 string name;
1751 if (!daemon) {
1752 ss << "no config state for daemon " << who;
1753 cmdctx->reply(-ENOENT, ss);
1754 return true;
1755 }
1756
1757 std::lock_guard l(daemon->lock);
1758
1759 if (cmd_getval(g_ceph_context, cmdctx->cmdmap, "key", name)) {
1760 auto p = daemon->config.find(name);
1761 if (p != daemon->config.end() &&
1762 !p->second.empty()) {
1763 cmdctx->odata.append(p->second.rbegin()->second + "\n");
1764 } else {
1765 auto& defaults = daemon->_get_config_defaults();
1766 auto q = defaults.find(name);
1767 if (q != defaults.end()) {
1768 cmdctx->odata.append(q->second + "\n");
1769 } else {
1770 r = -ENOENT;
1771 }
1772 }
1773 } else if (daemon->config_defaults_bl.length() > 0) {
1774 TextTable tbl;
1775 if (f) {
1776 f->open_array_section("config");
1777 } else {
1778 tbl.define_column("NAME", TextTable::LEFT, TextTable::LEFT);
1779 tbl.define_column("VALUE", TextTable::LEFT, TextTable::LEFT);
1780 tbl.define_column("SOURCE", TextTable::LEFT, TextTable::LEFT);
1781 tbl.define_column("OVERRIDES", TextTable::LEFT, TextTable::LEFT);
1782 tbl.define_column("IGNORES", TextTable::LEFT, TextTable::LEFT);
1783 }
1784 if (prefix == "config show") {
1785 // show
1786 for (auto& i : daemon->config) {
1787 dout(20) << " " << i.first << " -> " << i.second << dendl;
1788 if (i.second.empty()) {
c07f9fc5
FG
1789 continue;
1790 }
11fdf7f2
TL
1791 if (f) {
1792 f->open_object_section("value");
1793 f->dump_string("name", i.first);
1794 f->dump_string("value", i.second.rbegin()->second);
1795 f->dump_string("source", ceph_conf_level_name(
1796 i.second.rbegin()->first));
1797 if (i.second.size() > 1) {
1798 f->open_array_section("overrides");
1799 auto j = i.second.rend();
1800 for (--j; j != i.second.rbegin(); --j) {
1801 f->open_object_section("value");
1802 f->dump_string("source", ceph_conf_level_name(j->first));
1803 f->dump_string("value", j->second);
1804 f->close_section();
1805 }
1806 f->close_section();
1807 }
1808 if (daemon->ignored_mon_config.count(i.first)) {
1809 f->dump_string("ignores", "mon");
1810 }
1811 f->close_section();
1812 } else {
1813 tbl << i.first;
1814 tbl << i.second.rbegin()->second;
1815 tbl << ceph_conf_level_name(i.second.rbegin()->first);
1816 if (i.second.size() > 1) {
1817 list<string> ov;
1818 auto j = i.second.rend();
1819 for (--j; j != i.second.rbegin(); --j) {
1820 if (j->second == i.second.rbegin()->second) {
1821 ov.push_front(string("(") + ceph_conf_level_name(j->first) +
1822 string("[") + j->second + string("]") +
1823 string(")"));
1824 } else {
1825 ov.push_front(ceph_conf_level_name(j->first) +
1826 string("[") + j->second + string("]"));
1827
1828 }
1829 }
1830 tbl << ov;
1831 } else {
1832 tbl << "";
1833 }
1834 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
1835 tbl << TextTable::endrow;
1836 }
1837 }
1838 } else {
1839 // show-with-defaults
1840 auto& defaults = daemon->_get_config_defaults();
1841 for (auto& i : defaults) {
1842 if (f) {
1843 f->open_object_section("value");
1844 f->dump_string("name", i.first);
1845 } else {
1846 tbl << i.first;
1847 }
1848 auto j = daemon->config.find(i.first);
1849 if (j != daemon->config.end() && !j->second.empty()) {
1850 // have config
1851 if (f) {
1852 f->dump_string("value", j->second.rbegin()->second);
1853 f->dump_string("source", ceph_conf_level_name(
1854 j->second.rbegin()->first));
1855 if (j->second.size() > 1) {
1856 f->open_array_section("overrides");
1857 auto k = j->second.rend();
1858 for (--k; k != j->second.rbegin(); --k) {
1859 f->open_object_section("value");
1860 f->dump_string("source", ceph_conf_level_name(k->first));
1861 f->dump_string("value", k->second);
1862 f->close_section();
1863 }
1864 f->close_section();
1865 }
1866 if (daemon->ignored_mon_config.count(i.first)) {
1867 f->dump_string("ignores", "mon");
1868 }
1869 f->close_section();
1870 } else {
1871 tbl << j->second.rbegin()->second;
1872 tbl << ceph_conf_level_name(j->second.rbegin()->first);
1873 if (j->second.size() > 1) {
1874 list<string> ov;
1875 auto k = j->second.rend();
1876 for (--k; k != j->second.rbegin(); --k) {
1877 if (k->second == j->second.rbegin()->second) {
1878 ov.push_front(string("(") + ceph_conf_level_name(k->first) +
1879 string("[") + k->second + string("]") +
1880 string(")"));
1881 } else {
1882 ov.push_front(ceph_conf_level_name(k->first) +
1883 string("[") + k->second + string("]"));
1884 }
1885 }
1886 tbl << ov;
1887 } else {
1888 tbl << "";
1889 }
1890 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
1891 tbl << TextTable::endrow;
1892 }
1893 } else {
1894 // only have default
1895 if (f) {
1896 f->dump_string("value", i.second);
1897 f->dump_string("source", ceph_conf_level_name(CONF_DEFAULT));
1898 f->close_section();
1899 } else {
1900 tbl << i.second;
1901 tbl << ceph_conf_level_name(CONF_DEFAULT);
1902 tbl << "";
1903 tbl << "";
1904 tbl << TextTable::endrow;
1905 }
c07f9fc5 1906 }
c07f9fc5
FG
1907 }
1908 }
11fdf7f2
TL
1909 if (f) {
1910 f->close_section();
1911 f->flush(cmdctx->odata);
1912 } else {
1913 cmdctx->odata.append(stringify(tbl));
1914 }
1915 }
c07f9fc5
FG
1916 cmdctx->reply(r, ss);
1917 return true;
11fdf7f2
TL
1918 } else if (prefix == "device ls") {
1919 set<string> devids;
1920 TextTable tbl;
1921 if (f) {
1922 f->open_array_section("devices");
1923 daemon_state.with_devices([&f](const DeviceState& dev) {
1924 f->dump_object("device", dev);
1925 });
1926 f->close_section();
1927 f->flush(cmdctx->odata);
1928 } else {
1929 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
1930 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
1931 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
1932 tbl.define_column("LIFE EXPECTANCY", TextTable::LEFT, TextTable::LEFT);
1933 auto now = ceph_clock_now();
1934 daemon_state.with_devices([&tbl, now](const DeviceState& dev) {
1935 string h;
1936 for (auto& i : dev.devnames) {
1937 if (h.size()) {
1938 h += " ";
1939 }
1940 h += i.first + ":" + i.second;
1941 }
1942 string d;
1943 for (auto& i : dev.daemons) {
1944 if (d.size()) {
1945 d += " ";
1946 }
1947 d += to_string(i);
1948 }
1949 tbl << dev.devid
1950 << h
1951 << d
1952 << dev.get_life_expectancy_str(now)
1953 << TextTable::endrow;
1954 });
1955 cmdctx->odata.append(stringify(tbl));
1956 }
1957 cmdctx->reply(0, ss);
1958 return true;
1959 } else if (prefix == "device ls-by-daemon") {
1960 string who;
1961 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", who);
1962 DaemonKey k;
1963 if (!key_from_string(who, &k)) {
1964 ss << who << " is not a valid daemon name";
1965 r = -EINVAL;
1966 } else {
1967 auto dm = daemon_state.get(k);
1968 if (dm) {
1969 if (f) {
1970 f->open_array_section("devices");
1971 for (auto& i : dm->devices) {
1972 daemon_state.with_device(i.first, [&f] (const DeviceState& dev) {
1973 f->dump_object("device", dev);
1974 });
1975 }
1976 f->close_section();
1977 f->flush(cmdctx->odata);
1978 } else {
1979 TextTable tbl;
1980 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
1981 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
1982 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT,
1983 TextTable::LEFT);
1984 auto now = ceph_clock_now();
1985 for (auto& i : dm->devices) {
1986 daemon_state.with_device(
1987 i.first, [&tbl, now] (const DeviceState& dev) {
1988 string h;
1989 for (auto& i : dev.devnames) {
1990 if (h.size()) {
1991 h += " ";
1992 }
1993 h += i.first + ":" + i.second;
1994 }
1995 tbl << dev.devid
1996 << h
1997 << dev.get_life_expectancy_str(now)
1998 << TextTable::endrow;
1999 });
2000 }
2001 cmdctx->odata.append(stringify(tbl));
2002 }
2003 } else {
2004 r = -ENOENT;
2005 ss << "daemon " << who << " not found";
2006 }
2007 cmdctx->reply(r, ss);
2008 }
2009 } else if (prefix == "device ls-by-host") {
2010 string host;
2011 cmd_getval(g_ceph_context, cmdctx->cmdmap, "host", host);
2012 set<string> devids;
2013 daemon_state.list_devids_by_server(host, &devids);
2014 if (f) {
2015 f->open_array_section("devices");
2016 for (auto& devid : devids) {
2017 daemon_state.with_device(
2018 devid, [&f] (const DeviceState& dev) {
2019 f->dump_object("device", dev);
7c673cae 2020 });
11fdf7f2
TL
2021 }
2022 f->close_section();
2023 f->flush(cmdctx->odata);
2024 } else {
2025 TextTable tbl;
2026 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2027 tbl.define_column("DEV", TextTable::LEFT, TextTable::LEFT);
2028 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
2029 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT, TextTable::LEFT);
2030 auto now = ceph_clock_now();
2031 for (auto& devid : devids) {
2032 daemon_state.with_device(
2033 devid, [&tbl, &host, now] (const DeviceState& dev) {
2034 string n;
2035 for (auto& j : dev.devnames) {
2036 if (j.first == host) {
2037 if (n.size()) {
2038 n += " ";
2039 }
2040 n += j.second;
2041 }
2042 }
2043 string d;
2044 for (auto& i : dev.daemons) {
2045 if (d.size()) {
2046 d += " ";
2047 }
2048 d += to_string(i);
2049 }
2050 tbl << dev.devid
2051 << n
2052 << d
2053 << dev.get_life_expectancy_str(now)
2054 << TextTable::endrow;
2055 });
2056 }
2057 cmdctx->odata.append(stringify(tbl));
2058 }
2059 cmdctx->reply(0, ss);
2060 return true;
2061 } else if (prefix == "device info") {
2062 string devid;
2063 cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
2064 int r = 0;
2065 ostringstream rs;
2066 if (!daemon_state.with_device(devid,
2067 [&f, &rs] (const DeviceState& dev) {
2068 if (f) {
2069 f->dump_object("device", dev);
2070 } else {
2071 dev.print(rs);
2072 }
2073 })) {
2074 ss << "device " << devid << " not found";
2075 r = -ENOENT;
2076 } else {
2077 if (f) {
2078 f->flush(cmdctx->odata);
2079 } else {
2080 cmdctx->odata.append(rs.str());
2081 }
2082 }
2083 cmdctx->reply(r, ss);
2084 return true;
2085 } else if (prefix == "device set-life-expectancy") {
2086 string devid;
2087 cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
2088 string from_str, to_str;
2089 cmd_getval(g_ceph_context, cmdctx->cmdmap, "from", from_str);
2090 cmd_getval(g_ceph_context, cmdctx->cmdmap, "to", to_str);
2091 utime_t from, to;
2092 if (!from.parse(from_str)) {
2093 ss << "unable to parse datetime '" << from_str << "'";
2094 r = -EINVAL;
2095 cmdctx->reply(r, ss);
2096 } else if (to_str.size() && !to.parse(to_str)) {
2097 ss << "unable to parse datetime '" << to_str << "'";
2098 r = -EINVAL;
2099 cmdctx->reply(r, ss);
2100 } else {
2101 map<string,string> meta;
2102 daemon_state.with_device_create(
2103 devid,
2104 [from, to, &meta] (DeviceState& dev) {
2105 dev.set_life_expectancy(from, to, ceph_clock_now());
2106 meta = dev.metadata;
2107 });
2108 json_spirit::Object json_object;
2109 for (auto& i : meta) {
2110 json_spirit::Config::add(json_object, i.first, i.second);
2111 }
2112 bufferlist json;
2113 json.append(json_spirit::write(json_object));
2114 const string cmd =
2115 "{"
2116 "\"prefix\": \"config-key set\", "
2117 "\"key\": \"device/" + devid + "\""
2118 "}";
2119 auto on_finish = new ReplyOnFinish(cmdctx);
2120 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2121 }
2122 return true;
2123 } else if (prefix == "device rm-life-expectancy") {
2124 string devid;
2125 cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
2126 map<string,string> meta;
2127 if (daemon_state.with_device_write(devid, [&meta] (DeviceState& dev) {
2128 dev.rm_life_expectancy();
2129 meta = dev.metadata;
2130 })) {
2131 string cmd;
2132 bufferlist json;
2133 if (meta.empty()) {
2134 cmd =
2135 "{"
2136 "\"prefix\": \"config-key rm\", "
2137 "\"key\": \"device/" + devid + "\""
2138 "}";
2139 } else {
2140 json_spirit::Object json_object;
2141 for (auto& i : meta) {
2142 json_spirit::Config::add(json_object, i.first, i.second);
2143 }
2144 json.append(json_spirit::write(json_object));
2145 cmd =
2146 "{"
2147 "\"prefix\": \"config-key set\", "
2148 "\"key\": \"device/" + devid + "\""
2149 "}";
2150 }
2151 auto on_finish = new ReplyOnFinish(cmdctx);
2152 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2153 } else {
2154 cmdctx->reply(0, ss);
2155 }
2156 return true;
2157 } else {
2158 if (!pgmap_ready) {
2159 ss << "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2160 }
2161 if (f) {
2162 f->open_object_section("pg_info");
2163 f->dump_bool("pg_ready", pgmap_ready);
2164 }
2165
2166 // fall back to feeding command to PGMap
2167 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2168 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
2169 f.get(), &ss, &cmdctx->odata);
7c673cae
FG
2170 });
2171
11fdf7f2
TL
2172 if (f) {
2173 f->close_section();
2174 }
7c673cae 2175 if (r != -EOPNOTSUPP) {
11fdf7f2
TL
2176 if (f) {
2177 f->flush(cmdctx->odata);
2178 }
7c673cae
FG
2179 cmdctx->reply(r, ss);
2180 return true;
2181 }
2182 }
2183
11fdf7f2
TL
2184 // Resolve the command to the name of the module that will
2185 // handle it (if the command exists)
2186 std::string handler_name;
c07f9fc5 2187 auto py_commands = py_modules.get_py_commands();
7c673cae
FG
2188 for (const auto &pyc : py_commands) {
2189 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
7c673cae 2190 if (pyc_prefix == prefix) {
11fdf7f2 2191 handler_name = pyc.module_name;
7c673cae
FG
2192 break;
2193 }
2194 }
2195
11fdf7f2
TL
2196 // Was the command unfound?
2197 if (handler_name.empty()) {
7c673cae
FG
2198 ss << "No handler found for '" << prefix << "'";
2199 dout(4) << "No handler found for '" << prefix << "'" << dendl;
2200 cmdctx->reply(-EINVAL, ss);
2201 return true;
7c673cae 2202 }
11fdf7f2
TL
2203
2204 dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
2205 finisher.queue(new FunctionContext([this, cmdctx, handler_name, prefix](int r_) {
2206 std::stringstream ss;
2207
2208 // Validate that the module is enabled
2209 PyModuleRef module = py_modules.get_module(handler_name);
2210 ceph_assert(module);
2211 if (!module->is_enabled()) {
2212 ss << "Module '" << handler_name << "' is not enabled (required by "
2213 "command '" << prefix << "'): use `ceph mgr module enable "
2214 << handler_name << "` to enable it";
2215 dout(4) << ss.str() << dendl;
2216 cmdctx->reply(-EOPNOTSUPP, ss);
2217 return;
2218 }
2219
2220 // Hack: allow the self-test method to run on unhealthy modules.
2221 // Fix this in future by creating a special path for self test rather
2222 // than having the hook be a normal module command.
2223 std::string self_test_prefix = handler_name + " " + "self-test";
2224
2225 // Validate that the module is healthy
2226 bool accept_command;
2227 if (module->is_loaded()) {
2228 if (module->get_can_run() && !module->is_failed()) {
2229 // Healthy module
2230 accept_command = true;
2231 } else if (self_test_prefix == prefix) {
2232 // Unhealthy, but allow because it's a self test command
2233 accept_command = true;
2234 } else {
2235 accept_command = false;
2236 ss << "Module '" << handler_name << "' has experienced an error and "
2237 "cannot handle commands: " << module->get_error_string();
2238 }
2239 } else {
2240 // Module not loaded
2241 accept_command = false;
2242 ss << "Module '" << handler_name << "' failed to load and "
2243 "cannot handle commands: " << module->get_error_string();
2244 }
2245
2246 if (!accept_command) {
2247 dout(4) << ss.str() << dendl;
2248 cmdctx->reply(-EIO, ss);
2249 return;
2250 }
2251
2252 std::stringstream ds;
2253 bufferlist inbl = cmdctx->m->get_data();
2254 int r = py_modules.handle_command(handler_name, cmdctx->cmdmap, inbl, &ds, &ss);
2255 cmdctx->odata.append(ds);
2256 cmdctx->reply(r, ss);
2257 }));
2258 return true;
7c673cae 2259}
31f18b77 2260
224ce89b
WB
2261void DaemonServer::_prune_pending_service_map()
2262{
2263 utime_t cutoff = ceph_clock_now();
11fdf7f2 2264 cutoff -= g_conf().get_val<double>("mgr_service_beacon_grace");
224ce89b
WB
2265 auto p = pending_service_map.services.begin();
2266 while (p != pending_service_map.services.end()) {
2267 auto q = p->second.daemons.begin();
2268 while (q != p->second.daemons.end()) {
2269 DaemonKey key(p->first, q->first);
2270 if (!daemon_state.exists(key)) {
2271 derr << "missing key " << key << dendl;
2272 ++q;
2273 continue;
2274 }
2275 auto daemon = daemon_state.get(key);
11fdf7f2 2276 std::lock_guard l(daemon->lock);
224ce89b
WB
2277 if (daemon->last_service_beacon == utime_t()) {
2278 // we must have just restarted; assume they are alive now.
2279 daemon->last_service_beacon = ceph_clock_now();
2280 ++q;
2281 continue;
2282 }
2283 if (daemon->last_service_beacon < cutoff) {
2284 dout(10) << "pruning stale " << p->first << "." << q->first
2285 << " last_beacon " << daemon->last_service_beacon << dendl;
2286 q = p->second.daemons.erase(q);
2287 pending_service_map_dirty = pending_service_map.epoch;
2288 } else {
2289 ++q;
2290 }
2291 }
2292 if (p->second.daemons.empty()) {
2293 p = pending_service_map.services.erase(p);
2294 pending_service_map_dirty = pending_service_map.epoch;
2295 } else {
2296 ++p;
2297 }
2298 }
2299}
2300
31f18b77
FG
2301void DaemonServer::send_report()
2302{
224ce89b 2303 if (!pgmap_ready) {
11fdf7f2 2304 if (ceph_clock_now() - started_at > g_conf().get_val<int64_t>("mgr_stats_period") * 4.0) {
224ce89b
WB
2305 pgmap_ready = true;
2306 reported_osds.clear();
2307 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2308 << "potentially incomplete PG state to mon" << dendl;
2309 } else {
2310 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2311 << dendl;
2312 return;
2313 }
2314 }
2315
31f18b77 2316 auto m = new MMonMgrReport();
c07f9fc5 2317 py_modules.get_health_checks(&m->health_checks);
11fdf7f2 2318 py_modules.get_progress_events(&m->progress_events);
c07f9fc5 2319
11fdf7f2 2320 cluster_state.with_mutable_pgmap([&](PGMap& pg_map) {
31f18b77
FG
2321 cluster_state.update_delta_stats();
2322
224ce89b
WB
2323 if (pending_service_map.epoch) {
2324 _prune_pending_service_map();
2325 if (pending_service_map_dirty >= pending_service_map.epoch) {
2326 pending_service_map.modified = ceph_clock_now();
11fdf7f2 2327 encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
224ce89b
WB
2328 dout(10) << "sending service_map e" << pending_service_map.epoch
2329 << dendl;
2330 pending_service_map.epoch++;
2331 }
2332 }
2333
31f18b77
FG
2334 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
2335 // FIXME: no easy way to get mon features here. this will do for
2336 // now, though, as long as we don't make a backward-incompat change.
2337 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
2338 dout(10) << pg_map << dendl;
224ce89b
WB
2339
2340 pg_map.get_health_checks(g_ceph_context, osdmap,
2341 &m->health_checks);
c07f9fc5 2342
224ce89b
WB
2343 dout(10) << m->health_checks.checks.size() << " health checks"
2344 << dendl;
2345 dout(20) << "health checks:\n";
2346 JSONFormatter jf(true);
2347 jf.dump_object("health_checks", m->health_checks);
2348 jf.flush(*_dout);
2349 *_dout << dendl;
a8e16298
TL
2350 if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2351 clog->debug() << "pgmap v" << pg_map.version << ": " << pg_map;
2352 }
31f18b77
FG
2353 });
2354 });
b32b8144 2355
11fdf7f2
TL
2356 map<daemon_metric, unique_ptr<DaemonHealthMetricCollector>> accumulated;
2357 for (auto service : {"osd", "mon"} ) {
2358 auto daemons = daemon_state.get_by_service(service);
2359 for (const auto& [key,state] : daemons) {
2360 std::lock_guard l{state->lock};
2361 for (const auto& metric : state->daemon_health_metrics) {
2362 auto acc = accumulated.find(metric.get_type());
2363 if (acc == accumulated.end()) {
2364 auto collector = DaemonHealthMetricCollector::create(metric.get_type());
2365 if (!collector) {
2366 derr << __func__ << " " << key.first << "." << key.second
2367 << " sent me an unknown health metric: "
2368 << std::hex << static_cast<uint8_t>(metric.get_type())
2369 << std::dec << dendl;
2370 continue;
2371 }
2372 dout(20) << " + " << state->key << " "
2373 << metric << dendl;
2374 tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
2375 std::move(collector));
2376 }
2377 acc->second->update(key, metric);
b32b8144 2378 }
b32b8144
FG
2379 }
2380 }
2381 for (const auto& acc : accumulated) {
2382 acc.second->summarize(m->health_checks);
2383 }
31f18b77
FG
2384 // TODO? We currently do not notify the PyModules
2385 // TODO: respect needs_send, so we send the report only if we are asked to do
2386 // so, or the state is updated.
2387 monc->send_mon_message(m);
2388}
224ce89b 2389
11fdf7f2
TL
2390void DaemonServer::adjust_pgs()
2391{
2392 dout(20) << dendl;
2393 unsigned max = std::max<int64_t>(1, g_conf()->mon_osd_max_creating_pgs);
2394 double max_misplaced = g_conf().get_val<double>("target_max_misplaced_ratio");
2395 bool aggro = g_conf().get_val<bool>("mgr_debug_aggressive_pg_num_changes");
2396
2397 map<string,unsigned> pg_num_to_set;
2398 map<string,unsigned> pgp_num_to_set;
2399 set<pg_t> upmaps_to_clear;
2400 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2401 unsigned creating_or_unknown = 0;
2402 for (auto& i : pg_map.num_pg_by_state) {
2403 if ((i.first & (PG_STATE_CREATING)) ||
2404 i.first == 0) {
2405 creating_or_unknown += i.second;
2406 }
2407 }
2408 unsigned left = max;
2409 if (creating_or_unknown >= max) {
2410 return;
2411 }
2412 left -= creating_or_unknown;
2413 dout(10) << "creating_or_unknown " << creating_or_unknown
2414 << " max_creating " << max
2415 << " left " << left
2416 << dendl;
2417
2418 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2419 // can run more frequently than we get updated pg stats from OSDs. We
2420 // may make multiple adjustments with stale informaiton.
2421 double misplaced_ratio, degraded_ratio;
2422 double inactive_pgs_ratio, unknown_pgs_ratio;
2423 pg_map.get_recovery_stats(&misplaced_ratio, &degraded_ratio,
2424 &inactive_pgs_ratio, &unknown_pgs_ratio);
2425 dout(20) << "misplaced_ratio " << misplaced_ratio
2426 << " degraded_ratio " << degraded_ratio
2427 << " inactive_pgs_ratio " << inactive_pgs_ratio
2428 << " unknown_pgs_ratio " << unknown_pgs_ratio
2429 << "; target_max_misplaced_ratio " << max_misplaced
2430 << dendl;
2431
2432 for (auto& i : osdmap.get_pools()) {
2433 const pg_pool_t& p = i.second;
2434
2435 // adjust pg_num?
2436 if (p.get_pg_num_target() != p.get_pg_num()) {
2437 dout(20) << "pool " << i.first
2438 << " pg_num " << p.get_pg_num()
2439 << " target " << p.get_pg_num_target()
2440 << dendl;
2441 if (p.has_flag(pg_pool_t::FLAG_CREATING)) {
2442 dout(10) << "pool " << i.first
2443 << " pg_num_target " << p.get_pg_num_target()
2444 << " pg_num " << p.get_pg_num()
2445 << " - still creating initial pgs"
2446 << dendl;
2447 } else if (p.get_pg_num_target() < p.get_pg_num()) {
2448 // pg_num decrease (merge)
2449 pg_t merge_source(p.get_pg_num() - 1, i.first);
2450 pg_t merge_target = merge_source.get_parent();
2451 bool ok = true;
2452
2453 if (p.get_pg_num() != p.get_pg_num_pending()) {
2454 dout(10) << "pool " << i.first
2455 << " pg_num_target " << p.get_pg_num_target()
2456 << " pg_num " << p.get_pg_num()
2457 << " - decrease and pg_num_pending != pg_num, waiting"
2458 << dendl;
2459 ok = false;
2460 } else if (p.get_pg_num() == p.get_pgp_num()) {
2461 dout(10) << "pool " << i.first
2462 << " pg_num_target " << p.get_pg_num_target()
2463 << " pg_num " << p.get_pg_num()
2464 << " - decrease blocked by pgp_num "
2465 << p.get_pgp_num()
2466 << dendl;
2467 ok = false;
2468 }
2469 for (auto &merge_participant : {merge_source, merge_target}) {
2470 bool is_merge_source = merge_participant == merge_source;
2471 if (osdmap.have_pg_upmaps(merge_participant)) {
2472 dout(10) << "pool " << i.first
2473 << " pg_num_target " << p.get_pg_num_target()
2474 << " pg_num " << p.get_pg_num()
2475 << (is_merge_source ? " - merge source " : " - merge target ")
2476 << merge_participant
2477 << " has upmap" << dendl;
2478 upmaps_to_clear.insert(merge_participant);
2479 ok = false;
2480 }
2481 auto q = pg_map.pg_stat.find(merge_participant);
2482 if (q == pg_map.pg_stat.end()) {
2483 dout(10) << "pool " << i.first
2484 << " pg_num_target " << p.get_pg_num_target()
2485 << " pg_num " << p.get_pg_num()
2486 << " - no state for " << merge_participant
2487 << (is_merge_source ? " (merge source)" : " (merge target)")
2488 << dendl;
2489 ok = false;
2490 } else if ((q->second.state & (PG_STATE_ACTIVE | PG_STATE_CLEAN)) !=
2491 (PG_STATE_ACTIVE | PG_STATE_CLEAN)) {
2492 dout(10) << "pool " << i.first
2493 << " pg_num_target " << p.get_pg_num_target()
2494 << " pg_num " << p.get_pg_num()
2495 << (is_merge_source ? " - merge source " : " - merge target ")
2496 << merge_participant
2497 << " not clean (" << pg_state_string(q->second.state)
2498 << ")" << dendl;
2499 ok = false;
2500 }
2501 }
2502
2503 if (ok) {
2504 unsigned target = p.get_pg_num() - 1;
2505 dout(10) << "pool " << i.first
2506 << " pg_num_target " << p.get_pg_num_target()
2507 << " pg_num " << p.get_pg_num()
2508 << " -> " << target
2509 << " (merging " << merge_source
2510 << " and " << merge_target
2511 << ")" << dendl;
2512 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2513 }
2514 } else if (p.get_pg_num_target() > p.get_pg_num()) {
2515 // pg_num increase (split)
2516 bool active = true;
2517 auto q = pg_map.num_pg_by_pool_state.find(i.first);
2518 if (q != pg_map.num_pg_by_pool_state.end()) {
2519 for (auto& j : q->second) {
2520 if ((j.first & (PG_STATE_ACTIVE|PG_STATE_PEERED)) == 0) {
2521 dout(20) << "pool " << i.first << " has " << j.second
2522 << " pgs in " << pg_state_string(j.first)
2523 << dendl;
2524 active = false;
2525 break;
2526 }
2527 }
2528 } else {
2529 active = false;
2530 }
2531 if (!active) {
2532 dout(10) << "pool " << i.first
2533 << " pg_num_target " << p.get_pg_num_target()
2534 << " pg_num " << p.get_pg_num()
2535 << " - not all pgs active"
2536 << dendl;
2537 } else {
2538 unsigned add = std::min(
2539 left,
2540 p.get_pg_num_target() - p.get_pg_num());
2541 unsigned target = p.get_pg_num() + add;
2542 left -= add;
2543 dout(10) << "pool " << i.first
2544 << " pg_num_target " << p.get_pg_num_target()
2545 << " pg_num " << p.get_pg_num()
2546 << " -> " << target << dendl;
2547 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2548 }
2549 }
2550 }
2551
2552 // adjust pgp_num?
2553 unsigned target = std::min(p.get_pg_num_pending(),
2554 p.get_pgp_num_target());
2555 if (target != p.get_pgp_num()) {
2556 dout(20) << "pool " << i.first
2557 << " pgp_num_target " << p.get_pgp_num_target()
2558 << " pgp_num " << p.get_pgp_num()
2559 << " -> " << target << dendl;
2560 if (target > p.get_pgp_num() &&
2561 p.get_pgp_num() == p.get_pg_num()) {
2562 dout(10) << "pool " << i.first
2563 << " pgp_num_target " << p.get_pgp_num_target()
2564 << " pgp_num " << p.get_pgp_num()
2565 << " - increase blocked by pg_num " << p.get_pg_num()
2566 << dendl;
2567 } else if (!aggro && (inactive_pgs_ratio > 0 ||
2568 degraded_ratio > 0 ||
2569 unknown_pgs_ratio > 0)) {
2570 dout(10) << "pool " << i.first
2571 << " pgp_num_target " << p.get_pgp_num_target()
2572 << " pgp_num " << p.get_pgp_num()
2573 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2574 << " update" << dendl;
2575 } else if (!aggro && (misplaced_ratio > max_misplaced)) {
2576 dout(10) << "pool " << i.first
2577 << " pgp_num_target " << p.get_pgp_num_target()
2578 << " pgp_num " << p.get_pgp_num()
2579 << " - misplaced_ratio " << misplaced_ratio
2580 << " > max " << max_misplaced
2581 << ", deferring pgp_num update" << dendl;
2582 } else {
2583 // NOTE: this calculation assumes objects are
2584 // basically uniformly distributed across all PGs
2585 // (regardless of pool), which is probably not
2586 // perfectly correct, but it's a start. make no
2587 // single adjustment that's more than half of the
2588 // max_misplaced, to somewhat limit the magnitude of
2589 // our potential error here.
2590 int next;
81eedcae
TL
2591
2592 pool_stat_t s = pg_map.get_pg_pool_sum_stat(i.first);
2593 if (aggro ||
2594 // pool is (virtually) empty; just jump to final pgp_num?
2595 (p.get_pgp_num_target() > p.get_pgp_num() &&
2596 s.stats.sum.num_objects <= p.get_pgp_num_target())) {
11fdf7f2
TL
2597 next = target;
2598 } else {
2599 double room =
2600 std::min<double>(max_misplaced - misplaced_ratio,
81eedcae 2601 max_misplaced / 2.0);
11fdf7f2
TL
2602 unsigned estmax = std::max<unsigned>(
2603 (double)p.get_pg_num() * room, 1u);
2604 int delta = target - p.get_pgp_num();
2605 next = p.get_pgp_num();
2606 if (delta < 0) {
2607 next += std::max<int>(-estmax, delta);
2608 } else {
2609 next += std::min<int>(estmax, delta);
2610 }
2611 dout(20) << " room " << room << " estmax " << estmax
2612 << " delta " << delta << " next " << next << dendl;
81eedcae
TL
2613 if (p.get_pgp_num_target() == p.get_pg_num_target() &&
2614 p.get_pgp_num_target() < p.get_pg_num()) {
11fdf7f2
TL
2615 // since pgp_num is tracking pg_num, ceph is handling
2616 // pgp_num. so, be responsible: don't let pgp_num get
2617 // too far out ahead of merges (if we are merging).
2618 // this avoids moving lots of unmerged pgs onto a
2619 // small number of OSDs where we might blow out the
2620 // per-osd pg max.
2621 unsigned max_outpace_merges =
2622 std::max<unsigned>(8, p.get_pg_num() * max_misplaced);
2623 if (next + max_outpace_merges < p.get_pg_num()) {
2624 next = p.get_pg_num() - max_outpace_merges;
2625 dout(10) << " using next " << next
2626 << " to avoid outpacing merges (max_outpace_merges "
2627 << max_outpace_merges << ")" << dendl;
2628 }
2629 }
2630 }
2631 dout(10) << "pool " << i.first
2632 << " pgp_num_target " << p.get_pgp_num_target()
2633 << " pgp_num " << p.get_pgp_num()
2634 << " -> " << next << dendl;
2635 pgp_num_to_set[osdmap.get_pool_name(i.first)] = next;
2636 }
2637 }
2638 if (left == 0) {
2639 return;
2640 }
2641 }
2642 });
2643 for (auto i : pg_num_to_set) {
2644 const string cmd =
2645 "{"
2646 "\"prefix\": \"osd pool set\", "
2647 "\"pool\": \"" + i.first + "\", "
2648 "\"var\": \"pg_num_actual\", "
2649 "\"val\": \"" + stringify(i.second) + "\""
2650 "}";
2651 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2652 }
2653 for (auto i : pgp_num_to_set) {
2654 const string cmd =
2655 "{"
2656 "\"prefix\": \"osd pool set\", "
2657 "\"pool\": \"" + i.first + "\", "
2658 "\"var\": \"pgp_num_actual\", "
2659 "\"val\": \"" + stringify(i.second) + "\""
2660 "}";
2661 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2662 }
2663 for (auto pg : upmaps_to_clear) {
2664 const string cmd =
2665 "{"
2666 "\"prefix\": \"osd rm-pg-upmap\", "
2667 "\"pgid\": \"" + stringify(pg) + "\""
2668 "}";
2669 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2670 const string cmd2 =
2671 "{"
2672 "\"prefix\": \"osd rm-pg-upmap-items\", "
2673 "\"pgid\": \"" + stringify(pg) + "\"" +
2674 "}";
2675 monc->start_mon_command({cmd2}, {}, nullptr, nullptr, nullptr);
2676 }
2677}
2678
224ce89b
WB
2679void DaemonServer::got_service_map()
2680{
11fdf7f2 2681 std::lock_guard l(lock);
224ce89b
WB
2682
2683 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
2684 if (pending_service_map.epoch == 0) {
2685 // we just started up
2686 dout(10) << "got initial map e" << service_map.epoch << dendl;
2687 pending_service_map = service_map;
2688 } else {
2689 // we we already active and therefore must have persisted it,
2690 // which means ours is the same or newer.
2691 dout(10) << "got updated map e" << service_map.epoch << dendl;
2692 }
2693 pending_service_map.epoch = service_map.epoch + 1;
2694 });
2695
2696 // cull missing daemons, populate new ones
2697 for (auto& p : pending_service_map.services) {
2698 std::set<std::string> names;
2699 for (auto& q : p.second.daemons) {
2700 names.insert(q.first);
2701 DaemonKey key(p.first, q.first);
2702 if (!daemon_state.exists(key)) {
2703 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
2704 daemon->key = key;
11fdf7f2 2705 daemon->set_metadata(q.second.metadata);
224ce89b
WB
2706 daemon->service_daemon = true;
2707 daemon_state.insert(daemon);
2708 dout(10) << "added missing " << key << dendl;
2709 }
2710 }
2711 daemon_state.cull(p.first, names);
2712 }
2713}
3efd9988 2714
11fdf7f2
TL
2715void DaemonServer::got_mgr_map()
2716{
2717 std::lock_guard l(lock);
2718 set<std::string> have;
2719 cluster_state.with_mgrmap([&](const MgrMap& mgrmap) {
2720 auto md_update = [&] (DaemonKey key) {
2721 std::ostringstream oss;
2722 auto c = new MetadataUpdate(daemon_state, key);
2723 // FIXME remove post-nautilus: include 'id' for luminous mons
2724 oss << "{\"prefix\": \"mgr metadata\", \"who\": \""
2725 << key.second << "\", \"id\": \"" << key.second << "\"}";
2726 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
2727 };
2728 if (mgrmap.active_name.size()) {
2729 DaemonKey key("mgr", mgrmap.active_name);
2730 have.insert(mgrmap.active_name);
2731 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
2732 md_update(key);
2733 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
2734 }
2735 }
2736 for (auto& i : mgrmap.standbys) {
2737 DaemonKey key("mgr", i.second.name);
2738 have.insert(i.second.name);
2739 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
2740 md_update(key);
2741 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
2742 }
2743 }
2744 });
2745 daemon_state.cull("mgr", have);
2746}
3efd9988
FG
2747
2748const char** DaemonServer::get_tracked_conf_keys() const
2749{
2750 static const char *KEYS[] = {
2751 "mgr_stats_threshold",
2752 "mgr_stats_period",
2753 nullptr
2754 };
2755
2756 return KEYS;
2757}
2758
11fdf7f2
TL
2759void DaemonServer::handle_conf_change(const ConfigProxy& conf,
2760 const std::set <std::string> &changed)
3efd9988 2761{
3efd9988
FG
2762
2763 if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
2764 dout(4) << "Updating stats threshold/period on "
2765 << daemon_connections.size() << " clients" << dendl;
2766 // Send a fresh MMgrConfigure to all clients, so that they can follow
2767 // the new policy for transmitting stats
11fdf7f2
TL
2768 finisher.queue(new FunctionContext([this](int r) {
2769 std::lock_guard l(lock);
2770 for (auto &c : daemon_connections) {
2771 _send_configure(c);
2772 }
2773 }));
3efd9988
FG
2774 }
2775}
2776
2777void DaemonServer::_send_configure(ConnectionRef c)
2778{
11fdf7f2 2779 ceph_assert(lock.is_locked_by_me());
3efd9988
FG
2780
2781 auto configure = new MMgrConfigure();
11fdf7f2
TL
2782 configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
2783 configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
2784
2785 if (c->peer_is_osd()) {
2786 configure->osd_perf_metric_queries =
2787 osd_perf_metric_collector.get_queries();
2788 }
2789
3efd9988
FG
2790 c->send_message(configure);
2791}
2792
11fdf7f2
TL
2793OSDPerfMetricQueryID DaemonServer::add_osd_perf_query(
2794 const OSDPerfMetricQuery &query,
2795 const std::optional<OSDPerfMetricLimit> &limit)
2796{
2797 return osd_perf_metric_collector.add_query(query, limit);
2798}
2799
2800int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
2801{
2802 return osd_perf_metric_collector.remove_query(query_id);
2803}
2804
2805int DaemonServer::get_osd_perf_counters(
2806 OSDPerfMetricQueryID query_id,
2807 std::map<OSDPerfMetricKey, PerformanceCounters> *counters)
2808{
2809 return osd_perf_metric_collector.get_counters(query_id, counters);
2810}