]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/DaemonServer.cc
46c475394644fcb9051f1e2a557ef9e624620507
[ceph.git] / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "DaemonServer.h"
15 #include <boost/algorithm/string.hpp>
16 #include "mgr/Mgr.h"
17
18 #include "include/stringify.h"
19 #include "include/str_list.h"
20 #include "auth/RotatingKeyRing.h"
21 #include "json_spirit/json_spirit_writer.h"
22
23 #include "mgr/mgr_commands.h"
24 #include "mgr/DaemonHealthMetricCollector.h"
25 #include "mgr/OSDPerfMetricCollector.h"
26 #include "mgr/MDSPerfMetricCollector.h"
27 #include "mon/MonCommand.h"
28
29 #include "messages/MMgrOpen.h"
30 #include "messages/MMgrUpdate.h"
31 #include "messages/MMgrClose.h"
32 #include "messages/MMgrConfigure.h"
33 #include "messages/MMonMgrReport.h"
34 #include "messages/MCommand.h"
35 #include "messages/MCommandReply.h"
36 #include "messages/MMgrCommand.h"
37 #include "messages/MMgrCommandReply.h"
38 #include "messages/MPGStats.h"
39 #include "messages/MOSDScrub2.h"
40 #include "messages/MOSDForceRecovery.h"
41 #include "common/errno.h"
42 #include "common/pick_address.h"
43
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_mgr
46 #undef dout_prefix
47 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
48
49 using namespace TOPNSPC::common;
50
51 using std::list;
52 using std::ostringstream;
53 using std::string;
54 using std::stringstream;
55 using std::vector;
56 using std::unique_ptr;
57
58 namespace {
59 template <typename Map>
60 bool map_compare(Map const &lhs, Map const &rhs) {
61 return lhs.size() == rhs.size()
62 && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
63 [] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
64 }
65 }
66
67 DaemonServer::DaemonServer(MonClient *monc_,
68 Finisher &finisher_,
69 DaemonStateIndex &daemon_state_,
70 ClusterState &cluster_state_,
71 PyModuleRegistry &py_modules_,
72 LogChannelRef clog_,
73 LogChannelRef audit_clog_)
74 : Dispatcher(g_ceph_context),
75 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
76 g_conf().get_val<Option::size_t>("mgr_client_bytes"))),
77 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
78 g_conf().get_val<uint64_t>("mgr_client_messages"))),
79 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
80 g_conf().get_val<Option::size_t>("mgr_osd_bytes"))),
81 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
82 g_conf().get_val<uint64_t>("mgr_osd_messages"))),
83 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
84 g_conf().get_val<Option::size_t>("mgr_mds_bytes"))),
85 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
86 g_conf().get_val<uint64_t>("mgr_mds_messages"))),
87 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
88 g_conf().get_val<Option::size_t>("mgr_mon_bytes"))),
89 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
90 g_conf().get_val<uint64_t>("mgr_mon_messages"))),
91 msgr(nullptr),
92 monc(monc_),
93 finisher(finisher_),
94 daemon_state(daemon_state_),
95 cluster_state(cluster_state_),
96 py_modules(py_modules_),
97 clog(clog_),
98 audit_clog(audit_clog_),
99 pgmap_ready(false),
100 timer(g_ceph_context, lock),
101 shutting_down(false),
102 tick_event(nullptr),
103 osd_perf_metric_collector_listener(this),
104 osd_perf_metric_collector(osd_perf_metric_collector_listener),
105 mds_perf_metric_collector_listener(this),
106 mds_perf_metric_collector(mds_perf_metric_collector_listener)
107 {
108 g_conf().add_observer(this);
109 }
110
111 DaemonServer::~DaemonServer() {
112 delete msgr;
113 g_conf().remove_observer(this);
114 }
115
116 int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
117 {
118 // Initialize Messenger
119 std::string public_msgr_type = g_conf()->ms_public_type.empty() ?
120 g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
121 msgr = Messenger::create(g_ceph_context, public_msgr_type,
122 entity_name_t::MGR(gid),
123 "mgr",
124 Messenger::get_pid_nonce());
125 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
126
127 msgr->set_auth_client(monc);
128
129 // throttle clients
130 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
131 client_byte_throttler.get(),
132 client_msg_throttler.get());
133
134 // servers
135 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
136 osd_byte_throttler.get(),
137 osd_msg_throttler.get());
138 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
139 mds_byte_throttler.get(),
140 mds_msg_throttler.get());
141 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
142 mon_byte_throttler.get(),
143 mon_msg_throttler.get());
144
145 entity_addrvec_t addrs;
146 int r = pick_addresses(cct, CEPH_PICK_ADDRESS_PUBLIC, &addrs);
147 if (r < 0) {
148 return r;
149 }
150 dout(20) << __func__ << " will bind to " << addrs << dendl;
151 r = msgr->bindv(addrs);
152 if (r < 0) {
153 derr << "unable to bind mgr to " << addrs << dendl;
154 return r;
155 }
156
157 msgr->set_myname(entity_name_t::MGR(gid));
158 msgr->set_addr_unknowns(client_addrs);
159
160 msgr->start();
161 msgr->add_dispatcher_tail(this);
162
163 msgr->set_auth_server(monc);
164 monc->set_handle_authentication_dispatcher(this);
165
166 started_at = ceph_clock_now();
167
168 std::lock_guard l(lock);
169 timer.init();
170
171 schedule_tick_locked(
172 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
173
174 return 0;
175 }
176
177 entity_addrvec_t DaemonServer::get_myaddrs() const
178 {
179 return msgr->get_myaddrs();
180 }
181
182 int DaemonServer::ms_handle_authentication(Connection *con)
183 {
184 auto s = ceph::make_ref<MgrSession>(cct);
185 con->set_priv(s);
186 s->inst.addr = con->get_peer_addr();
187 s->entity_name = con->peer_name;
188 dout(10) << __func__ << " new session " << s << " con " << con
189 << " entity " << con->peer_name
190 << " addr " << con->get_peer_addrs()
191 << dendl;
192
193 AuthCapsInfo &caps_info = con->get_peer_caps_info();
194 if (caps_info.allow_all) {
195 dout(10) << " session " << s << " " << s->entity_name
196 << " allow_all" << dendl;
197 s->caps.set_allow_all();
198 } else if (caps_info.caps.length() > 0) {
199 auto p = caps_info.caps.cbegin();
200 string str;
201 try {
202 decode(str, p);
203 }
204 catch (buffer::error& e) {
205 dout(10) << " session " << s << " " << s->entity_name
206 << " failed to decode caps" << dendl;
207 return -EACCES;
208 }
209 if (!s->caps.parse(str)) {
210 dout(10) << " session " << s << " " << s->entity_name
211 << " failed to parse caps '" << str << "'" << dendl;
212 return -EACCES;
213 }
214 dout(10) << " session " << s << " " << s->entity_name
215 << " has caps " << s->caps << " '" << str << "'" << dendl;
216 }
217
218 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
219 std::lock_guard l(lock);
220 s->osd_id = atoi(s->entity_name.get_id().c_str());
221 dout(10) << "registering osd." << s->osd_id << " session "
222 << s << " con " << con << dendl;
223 osd_cons[s->osd_id].insert(con);
224 }
225
226 return 1;
227 }
228
229 bool DaemonServer::ms_handle_reset(Connection *con)
230 {
231 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
232 auto priv = con->get_priv();
233 auto session = static_cast<MgrSession*>(priv.get());
234 if (!session) {
235 return false;
236 }
237 std::lock_guard l(lock);
238 dout(10) << "unregistering osd." << session->osd_id
239 << " session " << session << " con " << con << dendl;
240 osd_cons[session->osd_id].erase(con);
241
242 auto iter = daemon_connections.find(con);
243 if (iter != daemon_connections.end()) {
244 daemon_connections.erase(iter);
245 }
246 }
247 return false;
248 }
249
250 bool DaemonServer::ms_handle_refused(Connection *con)
251 {
252 // do nothing for now
253 return false;
254 }
255
256 bool DaemonServer::ms_dispatch2(const ref_t<Message>& m)
257 {
258 // Note that we do *not* take ::lock here, in order to avoid
259 // serializing all message handling. It's up to each handler
260 // to take whatever locks it needs.
261 switch (m->get_type()) {
262 case MSG_PGSTATS:
263 cluster_state.ingest_pgstats(ref_cast<MPGStats>(m));
264 maybe_ready(m->get_source().num());
265 return true;
266 case MSG_MGR_REPORT:
267 return handle_report(ref_cast<MMgrReport>(m));
268 case MSG_MGR_OPEN:
269 return handle_open(ref_cast<MMgrOpen>(m));
270 case MSG_MGR_UPDATE:
271 return handle_update(ref_cast<MMgrUpdate>(m));
272 case MSG_MGR_CLOSE:
273 return handle_close(ref_cast<MMgrClose>(m));
274 case MSG_COMMAND:
275 return handle_command(ref_cast<MCommand>(m));
276 case MSG_MGR_COMMAND:
277 return handle_command(ref_cast<MMgrCommand>(m));
278 default:
279 dout(1) << "Unhandled message type " << m->get_type() << dendl;
280 return false;
281 };
282 }
283
284 void DaemonServer::dump_pg_ready(ceph::Formatter *f)
285 {
286 f->dump_bool("pg_ready", pgmap_ready.load());
287 }
288
289 void DaemonServer::maybe_ready(int32_t osd_id)
290 {
291 if (pgmap_ready.load()) {
292 // Fast path: we don't need to take lock because pgmap_ready
293 // is already set
294 } else {
295 std::lock_guard l(lock);
296
297 if (reported_osds.find(osd_id) == reported_osds.end()) {
298 dout(4) << "initial report from osd " << osd_id << dendl;
299 reported_osds.insert(osd_id);
300 std::set<int32_t> up_osds;
301
302 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
303 osdmap.get_up_osds(up_osds);
304 });
305
306 std::set<int32_t> unreported_osds;
307 std::set_difference(up_osds.begin(), up_osds.end(),
308 reported_osds.begin(), reported_osds.end(),
309 std::inserter(unreported_osds, unreported_osds.begin()));
310
311 if (unreported_osds.size() == 0) {
312 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
313 pgmap_ready = true;
314 reported_osds.clear();
315 // Avoid waiting for next tick
316 send_report();
317 } else {
318 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
319 " to report in before PGMap is ready" << dendl;
320 }
321 }
322 }
323 }
324
325 void DaemonServer::tick()
326 {
327 dout(10) << dendl;
328 send_report();
329 adjust_pgs();
330
331 schedule_tick_locked(
332 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
333 }
334
335 // Currently modules do not set health checks in response to events delivered to
336 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
337 // if this pattern emerges in the future, this scheduler could be modified to
338 // fire after all modules have had a chance to set their health checks.
339 void DaemonServer::schedule_tick_locked(double delay_sec)
340 {
341 ceph_assert(ceph_mutex_is_locked_by_me(lock));
342
343 if (tick_event) {
344 timer.cancel_event(tick_event);
345 tick_event = nullptr;
346 }
347
348 // on shutdown start rejecting explicit requests to send reports that may
349 // originate from python land which may still be running.
350 if (shutting_down)
351 return;
352
353 tick_event = timer.add_event_after(delay_sec,
354 new LambdaContext([this](int r) {
355 tick();
356 }));
357 }
358
359 void DaemonServer::schedule_tick(double delay_sec)
360 {
361 std::lock_guard l(lock);
362 schedule_tick_locked(delay_sec);
363 }
364
365 void DaemonServer::handle_osd_perf_metric_query_updated()
366 {
367 dout(10) << dendl;
368
369 // Send a fresh MMgrConfigure to all clients, so that they can follow
370 // the new policy for transmitting stats
371 finisher.queue(new LambdaContext([this](int r) {
372 std::lock_guard l(lock);
373 for (auto &c : daemon_connections) {
374 if (c->peer_is_osd()) {
375 _send_configure(c);
376 }
377 }
378 }));
379 }
380
381 void DaemonServer::handle_mds_perf_metric_query_updated()
382 {
383 dout(10) << dendl;
384
385 // Send a fresh MMgrConfigure to all clients, so that they can follow
386 // the new policy for transmitting stats
387 finisher.queue(new LambdaContext([this](int r) {
388 std::lock_guard l(lock);
389 for (auto &c : daemon_connections) {
390 if (c->peer_is_mds()) {
391 _send_configure(c);
392 }
393 }
394 }));
395 }
396
397 void DaemonServer::shutdown()
398 {
399 dout(10) << "begin" << dendl;
400 msgr->shutdown();
401 msgr->wait();
402 cluster_state.shutdown();
403 dout(10) << "done" << dendl;
404
405 std::lock_guard l(lock);
406 shutting_down = true;
407 timer.shutdown();
408 }
409
410 static DaemonKey key_from_service(
411 const std::string& service_name,
412 int peer_type,
413 const std::string& daemon_name)
414 {
415 if (!service_name.empty()) {
416 return DaemonKey{service_name, daemon_name};
417 } else {
418 return DaemonKey{ceph_entity_type_name(peer_type), daemon_name};
419 }
420 }
421
422 void DaemonServer::fetch_missing_metadata(const DaemonKey& key,
423 const entity_addr_t& addr)
424 {
425 if (!daemon_state.is_updating(key) &&
426 (key.type == "osd" || key.type == "mds" || key.type == "mon")) {
427 std::ostringstream oss;
428 auto c = new MetadataUpdate(daemon_state, key);
429 if (key.type == "osd") {
430 oss << "{\"prefix\": \"osd metadata\", \"id\": "
431 << key.name<< "}";
432 } else if (key.type == "mds") {
433 c->set_default("addr", stringify(addr));
434 oss << "{\"prefix\": \"mds metadata\", \"who\": \""
435 << key.name << "\"}";
436 } else if (key.type == "mon") {
437 oss << "{\"prefix\": \"mon metadata\", \"id\": \""
438 << key.name << "\"}";
439 } else {
440 ceph_abort();
441 }
442 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
443 }
444 }
445
446 bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
447 {
448 std::unique_lock l(lock);
449
450 DaemonKey key = key_from_service(m->service_name,
451 m->get_connection()->get_peer_type(),
452 m->daemon_name);
453
454 auto con = m->get_connection();
455 dout(10) << "from " << key << " " << con->get_peer_addr() << dendl;
456
457 _send_configure(con);
458
459 DaemonStatePtr daemon;
460 if (daemon_state.exists(key)) {
461 dout(20) << "updating existing DaemonState for " << key << dendl;
462 daemon = daemon_state.get(key);
463 }
464 if (!daemon) {
465 if (m->service_daemon) {
466 dout(4) << "constructing new DaemonState for " << key << dendl;
467 daemon = std::make_shared<DaemonState>(daemon_state.types);
468 daemon->key = key;
469 daemon->service_daemon = true;
470 daemon_state.insert(daemon);
471 } else {
472 /* A normal Ceph daemon has connected but we are or should be waiting on
473 * metadata for it. Close the session so that it tries to reconnect.
474 */
475 dout(2) << "ignoring open from " << key << " " << con->get_peer_addr()
476 << "; not ready for session (expect reconnect)" << dendl;
477 con->mark_down();
478 l.unlock();
479 fetch_missing_metadata(key, m->get_source_addr());
480 return true;
481 }
482 }
483 if (daemon) {
484 if (m->service_daemon) {
485 // update the metadata through the daemon state index to
486 // ensure it's kept up-to-date
487 daemon_state.update_metadata(daemon, m->daemon_metadata);
488 }
489
490 std::lock_guard l(daemon->lock);
491 daemon->perf_counters.clear();
492
493 daemon->service_daemon = m->service_daemon;
494 if (m->service_daemon) {
495 daemon->service_status = m->daemon_status;
496
497 utime_t now = ceph_clock_now();
498 auto [d, added] = pending_service_map.get_daemon(m->service_name,
499 m->daemon_name);
500 if (added || d->gid != (uint64_t)m->get_source().num()) {
501 dout(10) << "registering " << key << " in pending_service_map" << dendl;
502 d->gid = m->get_source().num();
503 d->addr = m->get_source_addr();
504 d->start_epoch = pending_service_map.epoch;
505 d->start_stamp = now;
506 d->metadata = m->daemon_metadata;
507 pending_service_map_dirty = pending_service_map.epoch;
508 }
509 }
510
511 auto p = m->config_bl.cbegin();
512 if (p != m->config_bl.end()) {
513 decode(daemon->config, p);
514 decode(daemon->ignored_mon_config, p);
515 dout(20) << " got config " << daemon->config
516 << " ignored " << daemon->ignored_mon_config << dendl;
517 }
518 daemon->config_defaults_bl = m->config_defaults_bl;
519 daemon->config_defaults.clear();
520 dout(20) << " got config_defaults_bl " << daemon->config_defaults_bl.length()
521 << " bytes" << dendl;
522 }
523
524 if (con->get_peer_type() != entity_name_t::TYPE_CLIENT &&
525 m->service_name.empty())
526 {
527 // Store in set of the daemon/service connections, i.e. those
528 // connections that require an update in the event of stats
529 // configuration changes.
530 daemon_connections.insert(con);
531 }
532
533 return true;
534 }
535
536 bool DaemonServer::handle_update(const ref_t<MMgrUpdate>& m)
537 {
538 DaemonKey key;
539 if (!m->service_name.empty()) {
540 key.type = m->service_name;
541 } else {
542 key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
543 }
544 key.name = m->daemon_name;
545
546 dout(10) << "from " << m->get_connection() << " " << key << dendl;
547
548 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
549 m->service_name.empty()) {
550 // Clients should not be sending us update request
551 dout(10) << "rejecting update request from non-daemon client " << m->daemon_name
552 << dendl;
553 clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
554 << " at " << m->get_connection()->get_peer_addrs();
555 m->get_connection()->mark_down();
556 return true;
557 }
558
559
560 {
561 std::unique_lock locker(lock);
562
563 DaemonStatePtr daemon;
564 // Look up the DaemonState
565 if (daemon_state.exists(key)) {
566 dout(20) << "updating existing DaemonState for " << key << dendl;
567
568 daemon = daemon_state.get(key);
569 if (m->need_metadata_update &&
570 !m->daemon_metadata.empty()) {
571 daemon_state.update_metadata(daemon, m->daemon_metadata);
572 }
573 }
574 }
575
576 return true;
577 }
578
579 bool DaemonServer::handle_close(const ref_t<MMgrClose>& m)
580 {
581 std::lock_guard l(lock);
582
583 DaemonKey key = key_from_service(m->service_name,
584 m->get_connection()->get_peer_type(),
585 m->daemon_name);
586 dout(4) << "from " << m->get_connection() << " " << key << dendl;
587
588 if (daemon_state.exists(key)) {
589 DaemonStatePtr daemon = daemon_state.get(key);
590 daemon_state.rm(key);
591 {
592 std::lock_guard l(daemon->lock);
593 if (daemon->service_daemon) {
594 pending_service_map.rm_daemon(m->service_name, m->daemon_name);
595 pending_service_map_dirty = pending_service_map.epoch;
596 }
597 }
598 }
599
600 // send same message back as a reply
601 m->get_connection()->send_message2(m);
602 return true;
603 }
604
605 void DaemonServer::update_task_status(
606 DaemonKey key,
607 const std::map<std::string,std::string>& task_status)
608 {
609 dout(10) << "got task status from " << key << dendl;
610
611 [[maybe_unused]] auto [daemon, added] =
612 pending_service_map.get_daemon(key.type, key.name);
613 if (daemon->task_status != task_status) {
614 daemon->task_status = task_status;
615 pending_service_map_dirty = pending_service_map.epoch;
616 }
617 }
618
619 bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
620 {
621 DaemonKey key;
622 if (!m->service_name.empty()) {
623 key.type = m->service_name;
624 } else {
625 key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
626 }
627 key.name = m->daemon_name;
628
629 dout(10) << "from " << m->get_connection() << " " << key << dendl;
630
631 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
632 m->service_name.empty()) {
633 // Clients should not be sending us stats unless they are declaring
634 // themselves to be a daemon for some service.
635 dout(10) << "rejecting report from non-daemon client " << m->daemon_name
636 << dendl;
637 clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
638 << " at " << m->get_connection()->get_peer_addrs();
639 m->get_connection()->mark_down();
640 return true;
641 }
642
643
644 {
645 std::unique_lock locker(lock);
646
647 DaemonStatePtr daemon;
648 // Look up the DaemonState
649 if (daemon = daemon_state.get(key); daemon != nullptr) {
650 dout(20) << "updating existing DaemonState for " << key << dendl;
651 } else {
652 locker.unlock();
653
654 // we don't know the hostname at this stage, reject MMgrReport here.
655 dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
656 << dendl;
657 // issue metadata request in background
658 fetch_missing_metadata(key, m->get_source_addr());
659
660 locker.lock();
661
662 // kill session
663 auto priv = m->get_connection()->get_priv();
664 auto session = static_cast<MgrSession*>(priv.get());
665 if (!session) {
666 return false;
667 }
668 m->get_connection()->mark_down();
669
670 dout(10) << "unregistering osd." << session->osd_id
671 << " session " << session << " con " << m->get_connection() << dendl;
672
673 if (osd_cons.find(session->osd_id) != osd_cons.end()) {
674 osd_cons[session->osd_id].erase(m->get_connection());
675 }
676
677 auto iter = daemon_connections.find(m->get_connection());
678 if (iter != daemon_connections.end()) {
679 daemon_connections.erase(iter);
680 }
681
682 return false;
683 }
684
685 // Update the DaemonState
686 ceph_assert(daemon != nullptr);
687 {
688 std::lock_guard l(daemon->lock);
689 auto &daemon_counters = daemon->perf_counters;
690 daemon_counters.update(*m.get());
691
692 auto p = m->config_bl.cbegin();
693 if (p != m->config_bl.end()) {
694 decode(daemon->config, p);
695 decode(daemon->ignored_mon_config, p);
696 dout(20) << " got config " << daemon->config
697 << " ignored " << daemon->ignored_mon_config << dendl;
698 }
699
700 utime_t now = ceph_clock_now();
701 if (daemon->service_daemon) {
702 if (m->daemon_status) {
703 daemon->service_status_stamp = now;
704 daemon->service_status = *m->daemon_status;
705 }
706 daemon->last_service_beacon = now;
707 } else if (m->daemon_status) {
708 derr << "got status from non-daemon " << key << dendl;
709 }
710 // update task status
711 if (m->task_status) {
712 update_task_status(key, *m->task_status);
713 daemon->last_service_beacon = now;
714 }
715 if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
716 // only OSD and MON send health_checks to me now
717 daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
718 dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
719 << dendl;
720 }
721 }
722 }
723
724 // if there are any schema updates, notify the python modules
725 /* no users currently
726 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
727 py_modules.notify_all("perf_schema_update", ceph::to_string(key));
728 }
729 */
730
731 if (m->get_connection()->peer_is_osd()) {
732 osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
733 }
734
735 if (m->metric_report_message) {
736 const MetricReportMessage &message = *m->metric_report_message;
737 boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
738 }
739
740 return true;
741 }
742
743
744 void DaemonServer::_generate_command_map(
745 cmdmap_t& cmdmap,
746 map<string,string> &param_str_map)
747 {
748 for (auto p = cmdmap.begin();
749 p != cmdmap.end(); ++p) {
750 if (p->first == "prefix")
751 continue;
752 if (p->first == "caps") {
753 vector<string> cv;
754 if (cmd_getval(cmdmap, "caps", cv) &&
755 cv.size() % 2 == 0) {
756 for (unsigned i = 0; i < cv.size(); i += 2) {
757 string k = string("caps_") + cv[i];
758 param_str_map[k] = cv[i + 1];
759 }
760 continue;
761 }
762 }
763 param_str_map[p->first] = cmd_vartype_stringify(p->second);
764 }
765 }
766
767 const MonCommand *DaemonServer::_get_mgrcommand(
768 const string &cmd_prefix,
769 const std::vector<MonCommand> &cmds)
770 {
771 const MonCommand *this_cmd = nullptr;
772 for (const auto &cmd : cmds) {
773 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
774 this_cmd = &cmd;
775 break;
776 }
777 }
778 return this_cmd;
779 }
780
781 bool DaemonServer::_allowed_command(
782 MgrSession *s,
783 const string &service,
784 const string &module,
785 const string &prefix,
786 const cmdmap_t& cmdmap,
787 const map<string,string>& param_str_map,
788 const MonCommand *this_cmd) {
789
790 if (s->entity_name.is_mon()) {
791 // mon is all-powerful. even when it is forwarding commands on behalf of
792 // old clients; we expect the mon is validating commands before proxying!
793 return true;
794 }
795
796 bool cmd_r = this_cmd->requires_perm('r');
797 bool cmd_w = this_cmd->requires_perm('w');
798 bool cmd_x = this_cmd->requires_perm('x');
799
800 bool capable = s->caps.is_capable(
801 g_ceph_context,
802 s->entity_name,
803 service, module, prefix, param_str_map,
804 cmd_r, cmd_w, cmd_x,
805 s->get_peer_addr());
806
807 dout(10) << " " << s->entity_name << " "
808 << (capable ? "" : "not ") << "capable" << dendl;
809 return capable;
810 }
811
812 /**
813 * The working data for processing an MCommand. This lives in
814 * a class to enable passing it into other threads for processing
815 * outside of the thread/locks that called handle_command.
816 */
817 class CommandContext {
818 public:
819 ceph::ref_t<MCommand> m_tell;
820 ceph::ref_t<MMgrCommand> m_mgr;
821 const std::vector<std::string>& cmd; ///< ref into m_tell or m_mgr
822 const bufferlist& data; ///< ref into m_tell or m_mgr
823 bufferlist odata;
824 cmdmap_t cmdmap;
825
826 explicit CommandContext(ceph::ref_t<MCommand> m)
827 : m_tell{std::move(m)},
828 cmd(m_tell->cmd),
829 data(m_tell->get_data()) {
830 }
831 explicit CommandContext(ceph::ref_t<MMgrCommand> m)
832 : m_mgr{std::move(m)},
833 cmd(m_mgr->cmd),
834 data(m_mgr->get_data()) {
835 }
836
837 void reply(int r, const std::stringstream &ss) {
838 reply(r, ss.str());
839 }
840
841 void reply(int r, const std::string &rs) {
842 // Let the connection drop as soon as we've sent our response
843 ConnectionRef con = m_tell ? m_tell->get_connection()
844 : m_mgr->get_connection();
845 if (con) {
846 con->mark_disposable();
847 }
848
849 if (r == 0) {
850 dout(20) << "success" << dendl;
851 } else {
852 derr << __func__ << " " << cpp_strerror(r) << " " << rs << dendl;
853 }
854 if (con) {
855 if (m_tell) {
856 MCommandReply *reply = new MCommandReply(r, rs);
857 reply->set_tid(m_tell->get_tid());
858 reply->set_data(odata);
859 con->send_message(reply);
860 } else {
861 MMgrCommandReply *reply = new MMgrCommandReply(r, rs);
862 reply->set_tid(m_mgr->get_tid());
863 reply->set_data(odata);
864 con->send_message(reply);
865 }
866 }
867 }
868 };
869
870 /**
871 * A context for receiving a bufferlist/error string from a background
872 * function and then calling back to a CommandContext when it's done
873 */
874 class ReplyOnFinish : public Context {
875 std::shared_ptr<CommandContext> cmdctx;
876
877 public:
878 bufferlist from_mon;
879 string outs;
880
881 explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_)
882 : cmdctx(cmdctx_)
883 {}
884 void finish(int r) override {
885 cmdctx->odata.claim_append(from_mon);
886 cmdctx->reply(r, outs);
887 }
888 };
889
890 bool DaemonServer::handle_command(const ref_t<MCommand>& m)
891 {
892 std::lock_guard l(lock);
893 auto cmdctx = std::make_shared<CommandContext>(m);
894 try {
895 return _handle_command(cmdctx);
896 } catch (const bad_cmd_get& e) {
897 cmdctx->reply(-EINVAL, e.what());
898 return true;
899 }
900 }
901
902 bool DaemonServer::handle_command(const ref_t<MMgrCommand>& m)
903 {
904 std::lock_guard l(lock);
905 auto cmdctx = std::make_shared<CommandContext>(m);
906 try {
907 return _handle_command(cmdctx);
908 } catch (const bad_cmd_get& e) {
909 cmdctx->reply(-EINVAL, e.what());
910 return true;
911 }
912 }
913
914 void DaemonServer::log_access_denied(
915 std::shared_ptr<CommandContext>& cmdctx,
916 MgrSession* session, std::stringstream& ss) {
917 dout(1) << " access denied" << dendl;
918 audit_clog->info() << "from='" << session->inst << "' "
919 << "entity='" << session->entity_name << "' "
920 << "cmd=" << cmdctx->cmd << ": access denied";
921 ss << "access denied: does your client key have mgr caps? "
922 "See http://docs.ceph.com/en/latest/mgr/administrator/"
923 "#client-authentication";
924 }
925
926 void DaemonServer::_check_offlines_pgs(
927 const set<int>& osds,
928 const OSDMap& osdmap,
929 const PGMap& pgmap,
930 offline_pg_report *report)
931 {
932 // reset output
933 *report = offline_pg_report();
934 report->osds = osds;
935
936 for (const auto& q : pgmap.pg_stat) {
937 set<int32_t> pg_acting; // net acting sets (with no missing if degraded)
938 bool found = false;
939 if (q.second.state == 0) {
940 report->unknown.insert(q.first);
941 continue;
942 }
943 if (q.second.state & PG_STATE_DEGRADED) {
944 for (auto& anm : q.second.avail_no_missing) {
945 if (osds.count(anm.osd)) {
946 found = true;
947 continue;
948 }
949 if (anm.osd != CRUSH_ITEM_NONE) {
950 pg_acting.insert(anm.osd);
951 }
952 }
953 } else {
954 for (auto& a : q.second.acting) {
955 if (osds.count(a)) {
956 found = true;
957 continue;
958 }
959 if (a != CRUSH_ITEM_NONE) {
960 pg_acting.insert(a);
961 }
962 }
963 }
964 if (!found) {
965 continue;
966 }
967 const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
968 bool dangerous = false;
969 if (!pi) {
970 report->bad_no_pool.insert(q.first); // pool is creating or deleting
971 dangerous = true;
972 }
973 if (!(q.second.state & PG_STATE_ACTIVE)) {
974 report->bad_already_inactive.insert(q.first);
975 dangerous = true;
976 }
977 if (pg_acting.size() < pi->min_size) {
978 report->bad_become_inactive.insert(q.first);
979 dangerous = true;
980 }
981 if (dangerous) {
982 report->not_ok.insert(q.first);
983 } else {
984 report->ok.insert(q.first);
985 if (q.second.state & PG_STATE_DEGRADED) {
986 report->ok_become_more_degraded.insert(q.first);
987 } else {
988 report->ok_become_degraded.insert(q.first);
989 }
990 }
991 }
992 dout(20) << osds << " -> " << report->ok.size() << " ok, "
993 << report->not_ok.size() << " not ok, "
994 << report->unknown.size() << " unknown"
995 << dendl;
996 }
997
998 void DaemonServer::_maximize_ok_to_stop_set(
999 const set<int>& orig_osds,
1000 unsigned max,
1001 const OSDMap& osdmap,
1002 const PGMap& pgmap,
1003 offline_pg_report *out_report)
1004 {
1005 dout(20) << "orig_osds " << orig_osds << " max " << max << dendl;
1006 _check_offlines_pgs(orig_osds, osdmap, pgmap, out_report);
1007 if (!out_report->ok_to_stop()) {
1008 return;
1009 }
1010 if (orig_osds.size() >= max) {
1011 // already at max
1012 return;
1013 }
1014
1015 // semi-arbitrarily start with the first osd in the set
1016 offline_pg_report report;
1017 set<int> osds = orig_osds;
1018 int parent = *osds.begin();
1019 set<int> children;
1020
1021 while (true) {
1022 // identify the next parent
1023 int r = osdmap.crush->get_immediate_parent_id(parent, &parent);
1024 if (r < 0) {
1025 return; // just go with what we have so far!
1026 }
1027
1028 // get candidate additions that are beneath this point in the tree
1029 children.clear();
1030 r = osdmap.crush->get_all_children(parent, &children);
1031 if (r < 0) {
1032 return; // just go with what we have so far!
1033 }
1034 dout(20) << " parent " << parent << " children " << children << dendl;
1035
1036 // try adding in more osds
1037 int failed = 0; // how many children we failed to add to our set
1038 for (auto o : children) {
1039 if (o >= 0 && osdmap.is_up(o) && osds.count(o) == 0) {
1040 osds.insert(o);
1041 _check_offlines_pgs(osds, osdmap, pgmap, &report);
1042 if (!report.ok_to_stop()) {
1043 osds.erase(o);
1044 ++failed;
1045 continue;
1046 }
1047 *out_report = report;
1048 if (osds.size() == max) {
1049 dout(20) << " hit max" << dendl;
1050 return; // yay, we hit the max
1051 }
1052 }
1053 }
1054
1055 if (failed) {
1056 // we hit some failures; go with what we have
1057 dout(20) << " hit some peer failures" << dendl;
1058 return;
1059 }
1060 }
1061 }
1062
1063 bool DaemonServer::_handle_command(
1064 std::shared_ptr<CommandContext>& cmdctx)
1065 {
1066 MessageRef m;
1067 bool admin_socket_cmd = false;
1068 if (cmdctx->m_tell) {
1069 m = cmdctx->m_tell;
1070 // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
1071 // command.
1072 admin_socket_cmd = (cmdctx->m_tell->fsid != uuid_d());
1073 } else {
1074 m = cmdctx->m_mgr;
1075 }
1076 auto priv = m->get_connection()->get_priv();
1077 auto session = static_cast<MgrSession*>(priv.get());
1078 if (!session) {
1079 return true;
1080 }
1081 if (session->inst.name == entity_name_t()) {
1082 session->inst.name = m->get_source();
1083 }
1084
1085 map<string,string> param_str_map;
1086 std::stringstream ss;
1087 int r = 0;
1088
1089 if (!cmdmap_from_json(cmdctx->cmd, &(cmdctx->cmdmap), ss)) {
1090 cmdctx->reply(-EINVAL, ss);
1091 return true;
1092 }
1093
1094 string prefix;
1095 cmd_getval(cmdctx->cmdmap, "prefix", prefix);
1096 dout(10) << "decoded-size=" << cmdctx->cmdmap.size() << " prefix=" << prefix << dendl;
1097
1098 boost::scoped_ptr<Formatter> f;
1099 {
1100 std::string format;
1101 if (boost::algorithm::ends_with(prefix, "_json")) {
1102 format = "json";
1103 } else {
1104 format = cmd_getval_or<string>(cmdctx->cmdmap, "format", "plain");
1105 }
1106 f.reset(Formatter::create(format));
1107 }
1108
1109 // this is just for mgr commands - admin socket commands will fall
1110 // through and use the admin socket version of
1111 // get_command_descriptions
1112 if (prefix == "get_command_descriptions" && !admin_socket_cmd) {
1113 dout(10) << "reading commands from python modules" << dendl;
1114 const auto py_commands = py_modules.get_commands();
1115
1116 int cmdnum = 0;
1117 JSONFormatter f;
1118 f.open_object_section("command_descriptions");
1119
1120 auto dump_cmd = [&cmdnum, &f, m](const MonCommand &mc){
1121 ostringstream secname;
1122 secname << "cmd" << std::setfill('0') << std::setw(3) << cmdnum;
1123 dump_cmddesc_to_json(&f, m->get_connection()->get_features(),
1124 secname.str(), mc.cmdstring, mc.helpstring,
1125 mc.module, mc.req_perms, 0);
1126 cmdnum++;
1127 };
1128
1129 for (const auto &pyc : py_commands) {
1130 dump_cmd(pyc);
1131 }
1132
1133 for (const auto &mgr_cmd : mgr_commands) {
1134 dump_cmd(mgr_cmd);
1135 }
1136
1137 f.close_section(); // command_descriptions
1138 f.flush(cmdctx->odata);
1139 cmdctx->reply(0, ss);
1140 return true;
1141 }
1142
1143 // lookup command
1144 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
1145 _generate_command_map(cmdctx->cmdmap, param_str_map);
1146
1147 bool is_allowed = false;
1148 ModuleCommand py_command;
1149 if (admin_socket_cmd) {
1150 // admin socket commands require all capabilities
1151 is_allowed = session->caps.is_allow_all();
1152 } else if (!mgr_cmd) {
1153 // Resolve the command to the name of the module that will
1154 // handle it (if the command exists)
1155 auto py_commands = py_modules.get_py_commands();
1156 for (const auto &pyc : py_commands) {
1157 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1158 if (pyc_prefix == prefix) {
1159 py_command = pyc;
1160 break;
1161 }
1162 }
1163
1164 MonCommand pyc = {"", "", "py", py_command.perm};
1165 is_allowed = _allowed_command(session, "py", py_command.module_name,
1166 prefix, cmdctx->cmdmap, param_str_map,
1167 &pyc);
1168 } else {
1169 // validate user's permissions for requested command
1170 is_allowed = _allowed_command(session, mgr_cmd->module, "",
1171 prefix, cmdctx->cmdmap, param_str_map, mgr_cmd);
1172 }
1173
1174 if (!is_allowed) {
1175 log_access_denied(cmdctx, session, ss);
1176 cmdctx->reply(-EACCES, ss);
1177 return true;
1178 }
1179
1180 audit_clog->debug()
1181 << "from='" << session->inst << "' "
1182 << "entity='" << session->entity_name << "' "
1183 << "cmd=" << cmdctx->cmd << ": dispatch";
1184
1185 if (admin_socket_cmd) {
1186 cct->get_admin_socket()->queue_tell_command(cmdctx->m_tell);
1187 return true;
1188 }
1189
1190 // ----------------
1191 // service map commands
1192 if (prefix == "service dump") {
1193 if (!f)
1194 f.reset(Formatter::create("json-pretty"));
1195 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
1196 f->dump_object("service_map", service_map);
1197 });
1198 f->flush(cmdctx->odata);
1199 cmdctx->reply(0, ss);
1200 return true;
1201 }
1202 if (prefix == "service status") {
1203 if (!f)
1204 f.reset(Formatter::create("json-pretty"));
1205 // only include state from services that are in the persisted service map
1206 f->open_object_section("service_status");
1207 for (auto& [type, service] : pending_service_map.services) {
1208 if (ServiceMap::is_normal_ceph_entity(type)) {
1209 continue;
1210 }
1211
1212 f->open_object_section(type.c_str());
1213 for (auto& q : service.daemons) {
1214 f->open_object_section(q.first.c_str());
1215 DaemonKey key{type, q.first};
1216 ceph_assert(daemon_state.exists(key));
1217 auto daemon = daemon_state.get(key);
1218 std::lock_guard l(daemon->lock);
1219 f->dump_stream("status_stamp") << daemon->service_status_stamp;
1220 f->dump_stream("last_beacon") << daemon->last_service_beacon;
1221 f->open_object_section("status");
1222 for (auto& r : daemon->service_status) {
1223 f->dump_string(r.first.c_str(), r.second);
1224 }
1225 f->close_section();
1226 f->close_section();
1227 }
1228 f->close_section();
1229 }
1230 f->close_section();
1231 f->flush(cmdctx->odata);
1232 cmdctx->reply(0, ss);
1233 return true;
1234 }
1235
1236 if (prefix == "config set") {
1237 std::string key;
1238 std::string val;
1239 cmd_getval(cmdctx->cmdmap, "key", key);
1240 cmd_getval(cmdctx->cmdmap, "value", val);
1241 r = cct->_conf.set_val(key, val, &ss);
1242 if (r == 0) {
1243 cct->_conf.apply_changes(nullptr);
1244 }
1245 cmdctx->reply(0, ss);
1246 return true;
1247 }
1248
1249 // -----------
1250 // PG commands
1251
1252 if (prefix == "pg scrub" ||
1253 prefix == "pg repair" ||
1254 prefix == "pg deep-scrub") {
1255 string scrubop = prefix.substr(3, string::npos);
1256 pg_t pgid;
1257 spg_t spgid;
1258 string pgidstr;
1259 cmd_getval(cmdctx->cmdmap, "pgid", pgidstr);
1260 if (!pgid.parse(pgidstr.c_str())) {
1261 ss << "invalid pgid '" << pgidstr << "'";
1262 cmdctx->reply(-EINVAL, ss);
1263 return true;
1264 }
1265 bool pg_exists = false;
1266 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1267 pg_exists = osdmap.pg_exists(pgid);
1268 });
1269 if (!pg_exists) {
1270 ss << "pg " << pgid << " does not exist";
1271 cmdctx->reply(-ENOENT, ss);
1272 return true;
1273 }
1274 int acting_primary = -1;
1275 epoch_t epoch;
1276 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1277 epoch = osdmap.get_epoch();
1278 osdmap.get_primary_shard(pgid, &acting_primary, &spgid);
1279 });
1280 if (acting_primary == -1) {
1281 ss << "pg " << pgid << " has no primary osd";
1282 cmdctx->reply(-EAGAIN, ss);
1283 return true;
1284 }
1285 auto p = osd_cons.find(acting_primary);
1286 if (p == osd_cons.end()) {
1287 ss << "pg " << pgid << " primary osd." << acting_primary
1288 << " is not currently connected";
1289 cmdctx->reply(-EAGAIN, ss);
1290 return true;
1291 }
1292 for (auto& con : p->second) {
1293 assert(HAVE_FEATURE(con->get_features(), SERVER_OCTOPUS));
1294 vector<spg_t> pgs = { spgid };
1295 con->send_message(new MOSDScrub2(monc->get_fsid(),
1296 epoch,
1297 pgs,
1298 scrubop == "repair",
1299 scrubop == "deep-scrub"));
1300 }
1301 ss << "instructing pg " << spgid << " on osd." << acting_primary
1302 << " to " << scrubop;
1303 cmdctx->reply(0, ss);
1304 return true;
1305 } else if (prefix == "osd scrub" ||
1306 prefix == "osd deep-scrub" ||
1307 prefix == "osd repair") {
1308 string whostr;
1309 cmd_getval(cmdctx->cmdmap, "who", whostr);
1310 vector<string> pvec;
1311 get_str_vec(prefix, pvec);
1312
1313 set<int> osds;
1314 if (whostr == "*" || whostr == "all" || whostr == "any") {
1315 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1316 for (int i = 0; i < osdmap.get_max_osd(); i++)
1317 if (osdmap.is_up(i)) {
1318 osds.insert(i);
1319 }
1320 });
1321 } else {
1322 long osd = parse_osd_id(whostr.c_str(), &ss);
1323 if (osd < 0) {
1324 ss << "invalid osd '" << whostr << "'";
1325 cmdctx->reply(-EINVAL, ss);
1326 return true;
1327 }
1328 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1329 if (osdmap.is_up(osd)) {
1330 osds.insert(osd);
1331 }
1332 });
1333 if (osds.empty()) {
1334 ss << "osd." << osd << " is not up";
1335 cmdctx->reply(-EAGAIN, ss);
1336 return true;
1337 }
1338 }
1339 set<int> sent_osds, failed_osds;
1340 for (auto osd : osds) {
1341 vector<spg_t> spgs;
1342 epoch_t epoch;
1343 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1344 epoch = osdmap.get_epoch();
1345 auto p = pgmap.pg_by_osd.find(osd);
1346 if (p != pgmap.pg_by_osd.end()) {
1347 for (auto pgid : p->second) {
1348 int primary;
1349 spg_t spg;
1350 osdmap.get_primary_shard(pgid, &primary, &spg);
1351 if (primary == osd) {
1352 spgs.push_back(spg);
1353 }
1354 }
1355 }
1356 });
1357 auto p = osd_cons.find(osd);
1358 if (p == osd_cons.end()) {
1359 failed_osds.insert(osd);
1360 } else {
1361 sent_osds.insert(osd);
1362 for (auto& con : p->second) {
1363 con->send_message(new MOSDScrub2(monc->get_fsid(),
1364 epoch,
1365 spgs,
1366 pvec.back() == "repair",
1367 pvec.back() == "deep-scrub"));
1368 }
1369 }
1370 }
1371 if (failed_osds.size() == osds.size()) {
1372 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
1373 << " (not connected)";
1374 r = -EAGAIN;
1375 } else {
1376 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
1377 if (!failed_osds.empty()) {
1378 ss << "; osd(s) " << failed_osds << " were not connected";
1379 }
1380 r = 0;
1381 }
1382 cmdctx->reply(0, ss);
1383 return true;
1384 } else if (prefix == "osd pool scrub" ||
1385 prefix == "osd pool deep-scrub" ||
1386 prefix == "osd pool repair") {
1387 vector<string> pool_names;
1388 cmd_getval(cmdctx->cmdmap, "who", pool_names);
1389 if (pool_names.empty()) {
1390 ss << "must specify one or more pool names";
1391 cmdctx->reply(-EINVAL, ss);
1392 return true;
1393 }
1394 epoch_t epoch;
1395 map<int32_t, vector<pg_t>> pgs_by_primary; // legacy
1396 map<int32_t, vector<spg_t>> spgs_by_primary;
1397 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1398 epoch = osdmap.get_epoch();
1399 for (auto& pool_name : pool_names) {
1400 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1401 if (pool_id < 0) {
1402 ss << "unrecognized pool '" << pool_name << "'";
1403 r = -ENOENT;
1404 return;
1405 }
1406 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1407 for (int i = 0; i < pool_pg_num; i++) {
1408 pg_t pg(i, pool_id);
1409 int primary;
1410 spg_t spg;
1411 auto got = osdmap.get_primary_shard(pg, &primary, &spg);
1412 if (!got)
1413 continue;
1414 pgs_by_primary[primary].push_back(pg);
1415 spgs_by_primary[primary].push_back(spg);
1416 }
1417 }
1418 });
1419 if (r < 0) {
1420 cmdctx->reply(r, ss);
1421 return true;
1422 }
1423 for (auto& it : spgs_by_primary) {
1424 auto primary = it.first;
1425 auto p = osd_cons.find(primary);
1426 if (p == osd_cons.end()) {
1427 ss << "osd." << primary << " is not currently connected";
1428 cmdctx->reply(-EAGAIN, ss);
1429 return true;
1430 }
1431 for (auto& con : p->second) {
1432 con->send_message(new MOSDScrub2(monc->get_fsid(),
1433 epoch,
1434 it.second,
1435 prefix == "osd pool repair",
1436 prefix == "osd pool deep-scrub"));
1437 }
1438 }
1439 cmdctx->reply(0, "");
1440 return true;
1441 } else if (prefix == "osd reweight-by-pg" ||
1442 prefix == "osd reweight-by-utilization" ||
1443 prefix == "osd test-reweight-by-pg" ||
1444 prefix == "osd test-reweight-by-utilization") {
1445 bool by_pg =
1446 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
1447 bool dry_run =
1448 prefix == "osd test-reweight-by-pg" ||
1449 prefix == "osd test-reweight-by-utilization";
1450 int64_t oload = cmd_getval_or<int64_t>(cmdctx->cmdmap, "oload", 120);
1451 set<int64_t> pools;
1452 vector<string> poolnames;
1453 cmd_getval(cmdctx->cmdmap, "pools", poolnames);
1454 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1455 for (const auto& poolname : poolnames) {
1456 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
1457 if (pool < 0) {
1458 ss << "pool '" << poolname << "' does not exist";
1459 r = -ENOENT;
1460 }
1461 pools.insert(pool);
1462 }
1463 });
1464 if (r) {
1465 cmdctx->reply(r, ss);
1466 return true;
1467 }
1468
1469 double max_change = g_conf().get_val<double>("mon_reweight_max_change");
1470 cmd_getval(cmdctx->cmdmap, "max_change", max_change);
1471 if (max_change <= 0.0) {
1472 ss << "max_change " << max_change << " must be positive";
1473 cmdctx->reply(-EINVAL, ss);
1474 return true;
1475 }
1476 int64_t max_osds = g_conf().get_val<int64_t>("mon_reweight_max_osds");
1477 cmd_getval(cmdctx->cmdmap, "max_osds", max_osds);
1478 if (max_osds <= 0) {
1479 ss << "max_osds " << max_osds << " must be positive";
1480 cmdctx->reply(-EINVAL, ss);
1481 return true;
1482 }
1483 bool no_increasing = false;
1484 cmd_getval_compat_cephbool(cmdctx->cmdmap, "no_increasing", no_increasing);
1485 string out_str;
1486 mempool::osdmap::map<int32_t, uint32_t> new_weights;
1487 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap &osdmap, const PGMap& pgmap) {
1488 return reweight::by_utilization(osdmap, pgmap,
1489 oload,
1490 max_change,
1491 max_osds,
1492 by_pg,
1493 pools.empty() ? NULL : &pools,
1494 no_increasing,
1495 &new_weights,
1496 &ss, &out_str, f.get());
1497 });
1498 if (r >= 0) {
1499 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
1500 }
1501 if (f) {
1502 f->flush(cmdctx->odata);
1503 } else {
1504 cmdctx->odata.append(out_str);
1505 }
1506 if (r < 0) {
1507 ss << "FAILED reweight-by-pg";
1508 cmdctx->reply(r, ss);
1509 return true;
1510 } else if (r == 0 || dry_run) {
1511 ss << "no change";
1512 cmdctx->reply(r, ss);
1513 return true;
1514 } else {
1515 json_spirit::Object json_object;
1516 for (const auto& osd_weight : new_weights) {
1517 json_spirit::Config::add(json_object,
1518 std::to_string(osd_weight.first),
1519 std::to_string(osd_weight.second));
1520 }
1521 string s = json_spirit::write(json_object);
1522 std::replace(begin(s), end(s), '\"', '\'');
1523 const string cmd =
1524 "{"
1525 "\"prefix\": \"osd reweightn\", "
1526 "\"weights\": \"" + s + "\""
1527 "}";
1528 auto on_finish = new ReplyOnFinish(cmdctx);
1529 monc->start_mon_command({cmd}, {},
1530 &on_finish->from_mon, &on_finish->outs, on_finish);
1531 return true;
1532 }
1533 } else if (prefix == "osd df") {
1534 string method, filter;
1535 cmd_getval(cmdctx->cmdmap, "output_method", method);
1536 cmd_getval(cmdctx->cmdmap, "filter", filter);
1537 stringstream rs;
1538 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1539 // sanity check filter(s)
1540 if (!filter.empty() &&
1541 osdmap.lookup_pg_pool_name(filter) < 0 &&
1542 !osdmap.crush->class_exists(filter) &&
1543 !osdmap.crush->name_exists(filter)) {
1544 rs << "'" << filter << "' not a pool, crush node or device class name";
1545 return -EINVAL;
1546 }
1547 print_osd_utilization(osdmap, pgmap, ss,
1548 f.get(), method == "tree", filter);
1549 cmdctx->odata.append(ss);
1550 return 0;
1551 });
1552 cmdctx->reply(r, rs);
1553 return true;
1554 } else if (prefix == "osd pool stats") {
1555 string pool_name;
1556 cmd_getval(cmdctx->cmdmap, "pool_name", pool_name);
1557 int64_t poolid = -ENOENT;
1558 bool one_pool = false;
1559 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1560 if (!pool_name.empty()) {
1561 poolid = osdmap.lookup_pg_pool_name(pool_name);
1562 if (poolid < 0) {
1563 ceph_assert(poolid == -ENOENT);
1564 ss << "unrecognized pool '" << pool_name << "'";
1565 return -ENOENT;
1566 }
1567 one_pool = true;
1568 }
1569 stringstream rs;
1570 if (f)
1571 f->open_array_section("pool_stats");
1572 else {
1573 if (osdmap.get_pools().empty()) {
1574 ss << "there are no pools!";
1575 goto stats_out;
1576 }
1577 }
1578 for (auto &p : osdmap.get_pools()) {
1579 if (!one_pool) {
1580 poolid = p.first;
1581 }
1582 pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, f.get(), &rs);
1583 if (one_pool) {
1584 break;
1585 }
1586 }
1587 stats_out:
1588 if (f) {
1589 f->close_section();
1590 f->flush(cmdctx->odata);
1591 } else {
1592 cmdctx->odata.append(rs.str());
1593 }
1594 return 0;
1595 });
1596 if (r != -EOPNOTSUPP) {
1597 cmdctx->reply(r, ss);
1598 return true;
1599 }
1600 } else if (prefix == "osd safe-to-destroy" ||
1601 prefix == "osd destroy" ||
1602 prefix == "osd purge") {
1603 set<int> osds;
1604 int r = 0;
1605 if (prefix == "osd safe-to-destroy") {
1606 vector<string> ids;
1607 cmd_getval(cmdctx->cmdmap, "ids", ids);
1608 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1609 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1610 });
1611 if (!r && osds.empty()) {
1612 ss << "must specify one or more OSDs";
1613 r = -EINVAL;
1614 }
1615 } else {
1616 int64_t id;
1617 if (!cmd_getval(cmdctx->cmdmap, "id", id)) {
1618 r = -EINVAL;
1619 ss << "must specify OSD id";
1620 } else {
1621 osds.insert(id);
1622 }
1623 }
1624 if (r < 0) {
1625 cmdctx->reply(r, ss);
1626 return true;
1627 }
1628 set<int> active_osds, missing_stats, stored_pgs, safe_to_destroy;
1629 int affected_pgs = 0;
1630 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1631 if (pg_map.num_pg_unknown > 0) {
1632 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
1633 << " any conclusions";
1634 r = -EAGAIN;
1635 return;
1636 }
1637 int num_active_clean = 0;
1638 for (auto& p : pg_map.num_pg_by_state) {
1639 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
1640 if ((p.first & want) == want) {
1641 num_active_clean += p.second;
1642 }
1643 }
1644 for (auto osd : osds) {
1645 if (!osdmap.exists(osd)) {
1646 safe_to_destroy.insert(osd);
1647 continue; // clearly safe to destroy
1648 }
1649 auto q = pg_map.num_pg_by_osd.find(osd);
1650 if (q != pg_map.num_pg_by_osd.end()) {
1651 if (q->second.acting > 0 || q->second.up_not_acting > 0) {
1652 active_osds.insert(osd);
1653 // XXX: For overlapping PGs, this counts them again
1654 affected_pgs += q->second.acting + q->second.up_not_acting;
1655 continue;
1656 }
1657 }
1658 if (num_active_clean < pg_map.num_pg) {
1659 // all pgs aren't active+clean; we need to be careful.
1660 auto p = pg_map.osd_stat.find(osd);
1661 if (p == pg_map.osd_stat.end() || !osdmap.is_up(osd)) {
1662 missing_stats.insert(osd);
1663 continue;
1664 } else if (p->second.num_pgs > 0) {
1665 stored_pgs.insert(osd);
1666 continue;
1667 }
1668 }
1669 safe_to_destroy.insert(osd);
1670 }
1671 });
1672 if (r && prefix == "osd safe-to-destroy") {
1673 cmdctx->reply(r, ss); // regardless of formatter
1674 return true;
1675 }
1676 if (!r && (!active_osds.empty() ||
1677 !missing_stats.empty() || !stored_pgs.empty())) {
1678 if (!safe_to_destroy.empty()) {
1679 ss << "OSD(s) " << safe_to_destroy
1680 << " are safe to destroy without reducing data durability. ";
1681 }
1682 if (!active_osds.empty()) {
1683 ss << "OSD(s) " << active_osds << " have " << affected_pgs
1684 << " pgs currently mapped to them. ";
1685 }
1686 if (!missing_stats.empty()) {
1687 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1688 << " PGs are active+clean; we cannot draw any conclusions. ";
1689 }
1690 if (!stored_pgs.empty()) {
1691 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1692 << " data, and not all PGs are active+clean; we cannot be sure they"
1693 << " aren't still needed.";
1694 }
1695 if (!active_osds.empty() || !stored_pgs.empty()) {
1696 r = -EBUSY;
1697 } else {
1698 r = -EAGAIN;
1699 }
1700 }
1701
1702 if (prefix == "osd safe-to-destroy") {
1703 if (!r) {
1704 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1705 << " durability.";
1706 }
1707 if (f) {
1708 f->open_object_section("osd_status");
1709 f->open_array_section("safe_to_destroy");
1710 for (auto i : safe_to_destroy)
1711 f->dump_int("osd", i);
1712 f->close_section();
1713 f->open_array_section("active");
1714 for (auto i : active_osds)
1715 f->dump_int("osd", i);
1716 f->close_section();
1717 f->open_array_section("missing_stats");
1718 for (auto i : missing_stats)
1719 f->dump_int("osd", i);
1720 f->close_section();
1721 f->open_array_section("stored_pgs");
1722 for (auto i : stored_pgs)
1723 f->dump_int("osd", i);
1724 f->close_section();
1725 f->close_section(); // osd_status
1726 f->flush(cmdctx->odata);
1727 r = 0;
1728 std::stringstream().swap(ss);
1729 }
1730 cmdctx->reply(r, ss);
1731 return true;
1732 }
1733
1734 if (r) {
1735 bool force = false;
1736 cmd_getval(cmdctx->cmdmap, "force", force);
1737 if (!force) {
1738 // Backward compat
1739 cmd_getval(cmdctx->cmdmap, "yes_i_really_mean_it", force);
1740 }
1741 if (!force) {
1742 ss << "\nYou can proceed by passing --force, but be warned that"
1743 " this will likely mean real, permanent data loss.";
1744 } else {
1745 r = 0;
1746 }
1747 }
1748 if (r) {
1749 cmdctx->reply(r, ss);
1750 return true;
1751 }
1752 const string cmd =
1753 "{"
1754 "\"prefix\": \"" + prefix + "-actual\", "
1755 "\"id\": " + stringify(osds) + ", "
1756 "\"yes_i_really_mean_it\": true"
1757 "}";
1758 auto on_finish = new ReplyOnFinish(cmdctx);
1759 monc->start_mon_command({cmd}, {}, nullptr, &on_finish->outs, on_finish);
1760 return true;
1761 } else if (prefix == "osd ok-to-stop") {
1762 vector<string> ids;
1763 cmd_getval(cmdctx->cmdmap, "ids", ids);
1764 set<int> osds;
1765 int64_t max = 1;
1766 cmd_getval(cmdctx->cmdmap, "max", max);
1767 int r;
1768 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1769 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1770 });
1771 if (!r && osds.empty()) {
1772 ss << "must specify one or more OSDs";
1773 r = -EINVAL;
1774 }
1775 if (max < (int)osds.size()) {
1776 max = osds.size();
1777 }
1778 if (r < 0) {
1779 cmdctx->reply(r, ss);
1780 return true;
1781 }
1782 offline_pg_report out_report;
1783 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1784 _maximize_ok_to_stop_set(
1785 osds, max, osdmap, pg_map,
1786 &out_report);
1787 });
1788 if (!f) {
1789 f.reset(Formatter::create("json"));
1790 }
1791 f->dump_object("ok_to_stop", out_report);
1792 f->flush(cmdctx->odata);
1793 cmdctx->odata.append("\n");
1794 if (!out_report.unknown.empty()) {
1795 ss << out_report.unknown.size() << " pgs have unknown state; "
1796 << "cannot draw any conclusions";
1797 cmdctx->reply(-EAGAIN, ss);
1798 }
1799 if (!out_report.ok_to_stop()) {
1800 ss << "unsafe to stop osd(s) at this time (" << out_report.not_ok.size() << " PGs are or would become offline)";
1801 cmdctx->reply(-EBUSY, ss);
1802 } else {
1803 cmdctx->reply(0, ss);
1804 }
1805 return true;
1806 } else if (prefix == "pg force-recovery" ||
1807 prefix == "pg force-backfill" ||
1808 prefix == "pg cancel-force-recovery" ||
1809 prefix == "pg cancel-force-backfill" ||
1810 prefix == "osd pool force-recovery" ||
1811 prefix == "osd pool force-backfill" ||
1812 prefix == "osd pool cancel-force-recovery" ||
1813 prefix == "osd pool cancel-force-backfill") {
1814 vector<string> vs;
1815 get_str_vec(prefix, vs);
1816 auto& granularity = vs.front();
1817 auto& forceop = vs.back();
1818 vector<pg_t> pgs;
1819
1820 // figure out actual op just once
1821 int actual_op = 0;
1822 if (forceop == "force-recovery") {
1823 actual_op = OFR_RECOVERY;
1824 } else if (forceop == "force-backfill") {
1825 actual_op = OFR_BACKFILL;
1826 } else if (forceop == "cancel-force-backfill") {
1827 actual_op = OFR_BACKFILL | OFR_CANCEL;
1828 } else if (forceop == "cancel-force-recovery") {
1829 actual_op = OFR_RECOVERY | OFR_CANCEL;
1830 }
1831
1832 set<pg_t> candidates; // deduped
1833 if (granularity == "pg") {
1834 // covnert pg names to pgs, discard any invalid ones while at it
1835 vector<string> pgids;
1836 cmd_getval(cmdctx->cmdmap, "pgid", pgids);
1837 for (auto& i : pgids) {
1838 pg_t pgid;
1839 if (!pgid.parse(i.c_str())) {
1840 ss << "invlaid pgid '" << i << "'; ";
1841 r = -EINVAL;
1842 continue;
1843 }
1844 candidates.insert(pgid);
1845 }
1846 } else {
1847 // per pool
1848 vector<string> pool_names;
1849 cmd_getval(cmdctx->cmdmap, "who", pool_names);
1850 if (pool_names.empty()) {
1851 ss << "must specify one or more pool names";
1852 cmdctx->reply(-EINVAL, ss);
1853 return true;
1854 }
1855 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1856 for (auto& pool_name : pool_names) {
1857 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1858 if (pool_id < 0) {
1859 ss << "unrecognized pool '" << pool_name << "'";
1860 r = -ENOENT;
1861 return;
1862 }
1863 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1864 for (int i = 0; i < pool_pg_num; i++)
1865 candidates.insert({(unsigned int)i, (uint64_t)pool_id});
1866 }
1867 });
1868 if (r < 0) {
1869 cmdctx->reply(r, ss);
1870 return true;
1871 }
1872 }
1873
1874 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1875 for (auto& i : candidates) {
1876 auto it = pg_map.pg_stat.find(i);
1877 if (it == pg_map.pg_stat.end()) {
1878 ss << "pg " << i << " does not exist; ";
1879 r = -ENOENT;
1880 continue;
1881 }
1882 auto state = it->second.state;
1883 // discard pgs for which user requests are pointless
1884 switch (actual_op) {
1885 case OFR_RECOVERY:
1886 if ((state & (PG_STATE_DEGRADED |
1887 PG_STATE_RECOVERY_WAIT |
1888 PG_STATE_RECOVERING)) == 0) {
1889 // don't return error, user script may be racing with cluster.
1890 // not fatal.
1891 ss << "pg " << i << " doesn't require recovery; ";
1892 continue;
1893 } else if (state & PG_STATE_FORCED_RECOVERY) {
1894 ss << "pg " << i << " recovery already forced; ";
1895 // return error, as it may be a bug in user script
1896 r = -EINVAL;
1897 continue;
1898 }
1899 break;
1900 case OFR_BACKFILL:
1901 if ((state & (PG_STATE_DEGRADED |
1902 PG_STATE_BACKFILL_WAIT |
1903 PG_STATE_BACKFILLING)) == 0) {
1904 ss << "pg " << i << " doesn't require backfilling; ";
1905 continue;
1906 } else if (state & PG_STATE_FORCED_BACKFILL) {
1907 ss << "pg " << i << " backfill already forced; ";
1908 r = -EINVAL;
1909 continue;
1910 }
1911 break;
1912 case OFR_BACKFILL | OFR_CANCEL:
1913 if ((state & PG_STATE_FORCED_BACKFILL) == 0) {
1914 ss << "pg " << i << " backfill not forced; ";
1915 continue;
1916 }
1917 break;
1918 case OFR_RECOVERY | OFR_CANCEL:
1919 if ((state & PG_STATE_FORCED_RECOVERY) == 0) {
1920 ss << "pg " << i << " recovery not forced; ";
1921 continue;
1922 }
1923 break;
1924 default:
1925 ceph_abort_msg("actual_op value is not supported");
1926 }
1927 pgs.push_back(i);
1928 } // for
1929 });
1930
1931 // respond with error only when no pgs are correct
1932 // yes, in case of mixed errors, only the last one will be emitted,
1933 // but the message presented will be fine
1934 if (pgs.size() != 0) {
1935 // clear error to not confuse users/scripts
1936 r = 0;
1937 }
1938
1939 // optimize the command -> messages conversion, use only one
1940 // message per distinct OSD
1941 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1942 // group pgs to process by osd
1943 map<int, vector<spg_t>> osdpgs;
1944 for (auto& pgid : pgs) {
1945 int primary;
1946 spg_t spg;
1947 if (osdmap.get_primary_shard(pgid, &primary, &spg)) {
1948 osdpgs[primary].push_back(spg);
1949 }
1950 }
1951 for (auto& i : osdpgs) {
1952 if (osdmap.is_up(i.first)) {
1953 auto p = osd_cons.find(i.first);
1954 if (p == osd_cons.end()) {
1955 ss << "osd." << i.first << " is not currently connected";
1956 r = -EAGAIN;
1957 continue;
1958 }
1959 for (auto& con : p->second) {
1960 con->send_message(
1961 new MOSDForceRecovery(monc->get_fsid(), i.second, actual_op));
1962 }
1963 ss << "instructing pg(s) " << i.second << " on osd." << i.first
1964 << " to " << forceop << "; ";
1965 }
1966 }
1967 });
1968 ss << std::endl;
1969 cmdctx->reply(r, ss);
1970 return true;
1971 } else if (prefix == "config show" ||
1972 prefix == "config show-with-defaults") {
1973 string who;
1974 cmd_getval(cmdctx->cmdmap, "who", who);
1975 auto [key, valid] = DaemonKey::parse(who);
1976 if (!valid) {
1977 ss << "invalid daemon name: use <type>.<id>";
1978 cmdctx->reply(-EINVAL, ss);
1979 return true;
1980 }
1981 DaemonStatePtr daemon = daemon_state.get(key);
1982 if (!daemon) {
1983 ss << "no config state for daemon " << who;
1984 cmdctx->reply(-ENOENT, ss);
1985 return true;
1986 }
1987
1988 std::lock_guard l(daemon->lock);
1989
1990 int r = 0;
1991 string name;
1992 if (cmd_getval(cmdctx->cmdmap, "key", name)) {
1993 // handle special options
1994 if (name == "fsid") {
1995 cmdctx->odata.append(stringify(monc->get_fsid()) + "\n");
1996 cmdctx->reply(r, ss);
1997 return true;
1998 }
1999 auto p = daemon->config.find(name);
2000 if (p != daemon->config.end() &&
2001 !p->second.empty()) {
2002 cmdctx->odata.append(p->second.rbegin()->second + "\n");
2003 } else {
2004 auto& defaults = daemon->_get_config_defaults();
2005 auto q = defaults.find(name);
2006 if (q != defaults.end()) {
2007 cmdctx->odata.append(q->second + "\n");
2008 } else {
2009 r = -ENOENT;
2010 }
2011 }
2012 } else if (daemon->config_defaults_bl.length() > 0) {
2013 TextTable tbl;
2014 if (f) {
2015 f->open_array_section("config");
2016 } else {
2017 tbl.define_column("NAME", TextTable::LEFT, TextTable::LEFT);
2018 tbl.define_column("VALUE", TextTable::LEFT, TextTable::LEFT);
2019 tbl.define_column("SOURCE", TextTable::LEFT, TextTable::LEFT);
2020 tbl.define_column("OVERRIDES", TextTable::LEFT, TextTable::LEFT);
2021 tbl.define_column("IGNORES", TextTable::LEFT, TextTable::LEFT);
2022 }
2023 if (prefix == "config show") {
2024 // show
2025 for (auto& i : daemon->config) {
2026 dout(20) << " " << i.first << " -> " << i.second << dendl;
2027 if (i.second.empty()) {
2028 continue;
2029 }
2030 if (f) {
2031 f->open_object_section("value");
2032 f->dump_string("name", i.first);
2033 f->dump_string("value", i.second.rbegin()->second);
2034 f->dump_string("source", ceph_conf_level_name(
2035 i.second.rbegin()->first));
2036 if (i.second.size() > 1) {
2037 f->open_array_section("overrides");
2038 auto j = i.second.rend();
2039 for (--j; j != i.second.rbegin(); --j) {
2040 f->open_object_section("value");
2041 f->dump_string("source", ceph_conf_level_name(j->first));
2042 f->dump_string("value", j->second);
2043 f->close_section();
2044 }
2045 f->close_section();
2046 }
2047 if (daemon->ignored_mon_config.count(i.first)) {
2048 f->dump_string("ignores", "mon");
2049 }
2050 f->close_section();
2051 } else {
2052 tbl << i.first;
2053 tbl << i.second.rbegin()->second;
2054 tbl << ceph_conf_level_name(i.second.rbegin()->first);
2055 if (i.second.size() > 1) {
2056 list<string> ov;
2057 auto j = i.second.rend();
2058 for (--j; j != i.second.rbegin(); --j) {
2059 if (j->second == i.second.rbegin()->second) {
2060 ov.push_front(string("(") + ceph_conf_level_name(j->first) +
2061 string("[") + j->second + string("]") +
2062 string(")"));
2063 } else {
2064 ov.push_front(ceph_conf_level_name(j->first) +
2065 string("[") + j->second + string("]"));
2066
2067 }
2068 }
2069 tbl << ov;
2070 } else {
2071 tbl << "";
2072 }
2073 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
2074 tbl << TextTable::endrow;
2075 }
2076 }
2077 } else {
2078 // show-with-defaults
2079 auto& defaults = daemon->_get_config_defaults();
2080 for (auto& i : defaults) {
2081 if (f) {
2082 f->open_object_section("value");
2083 f->dump_string("name", i.first);
2084 } else {
2085 tbl << i.first;
2086 }
2087 auto j = daemon->config.find(i.first);
2088 if (j != daemon->config.end() && !j->second.empty()) {
2089 // have config
2090 if (f) {
2091 f->dump_string("value", j->second.rbegin()->second);
2092 f->dump_string("source", ceph_conf_level_name(
2093 j->second.rbegin()->first));
2094 if (j->second.size() > 1) {
2095 f->open_array_section("overrides");
2096 auto k = j->second.rend();
2097 for (--k; k != j->second.rbegin(); --k) {
2098 f->open_object_section("value");
2099 f->dump_string("source", ceph_conf_level_name(k->first));
2100 f->dump_string("value", k->second);
2101 f->close_section();
2102 }
2103 f->close_section();
2104 }
2105 if (daemon->ignored_mon_config.count(i.first)) {
2106 f->dump_string("ignores", "mon");
2107 }
2108 f->close_section();
2109 } else {
2110 tbl << j->second.rbegin()->second;
2111 tbl << ceph_conf_level_name(j->second.rbegin()->first);
2112 if (j->second.size() > 1) {
2113 list<string> ov;
2114 auto k = j->second.rend();
2115 for (--k; k != j->second.rbegin(); --k) {
2116 if (k->second == j->second.rbegin()->second) {
2117 ov.push_front(string("(") + ceph_conf_level_name(k->first) +
2118 string("[") + k->second + string("]") +
2119 string(")"));
2120 } else {
2121 ov.push_front(ceph_conf_level_name(k->first) +
2122 string("[") + k->second + string("]"));
2123 }
2124 }
2125 tbl << ov;
2126 } else {
2127 tbl << "";
2128 }
2129 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
2130 tbl << TextTable::endrow;
2131 }
2132 } else {
2133 // only have default
2134 if (f) {
2135 f->dump_string("value", i.second);
2136 f->dump_string("source", ceph_conf_level_name(CONF_DEFAULT));
2137 f->close_section();
2138 } else {
2139 tbl << i.second;
2140 tbl << ceph_conf_level_name(CONF_DEFAULT);
2141 tbl << "";
2142 tbl << "";
2143 tbl << TextTable::endrow;
2144 }
2145 }
2146 }
2147 }
2148 if (f) {
2149 f->close_section();
2150 f->flush(cmdctx->odata);
2151 } else {
2152 cmdctx->odata.append(stringify(tbl));
2153 }
2154 }
2155 cmdctx->reply(r, ss);
2156 return true;
2157 } else if (prefix == "device ls") {
2158 set<string> devids;
2159 TextTable tbl;
2160 if (f) {
2161 f->open_array_section("devices");
2162 daemon_state.with_devices([&f](const DeviceState& dev) {
2163 f->dump_object("device", dev);
2164 });
2165 f->close_section();
2166 f->flush(cmdctx->odata);
2167 } else {
2168 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2169 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
2170 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
2171 tbl.define_column("WEAR", TextTable::RIGHT, TextTable::RIGHT);
2172 tbl.define_column("LIFE EXPECTANCY", TextTable::LEFT, TextTable::LEFT);
2173 auto now = ceph_clock_now();
2174 daemon_state.with_devices([&tbl, now](const DeviceState& dev) {
2175 string h;
2176 for (auto& i : dev.attachments) {
2177 if (h.size()) {
2178 h += " ";
2179 }
2180 h += std::get<0>(i) + ":" + std::get<1>(i);
2181 }
2182 string d;
2183 for (auto& i : dev.daemons) {
2184 if (d.size()) {
2185 d += " ";
2186 }
2187 d += to_string(i);
2188 }
2189 char wear_level_str[16] = {0};
2190 if (dev.wear_level >= 0) {
2191 snprintf(wear_level_str, sizeof(wear_level_str)-1, "%d%%",
2192 (int)(100.1 * dev.wear_level));
2193 }
2194 tbl << dev.devid
2195 << h
2196 << d
2197 << wear_level_str
2198 << dev.get_life_expectancy_str(now)
2199 << TextTable::endrow;
2200 });
2201 cmdctx->odata.append(stringify(tbl));
2202 }
2203 cmdctx->reply(0, ss);
2204 return true;
2205 } else if (prefix == "device ls-by-daemon") {
2206 string who;
2207 cmd_getval(cmdctx->cmdmap, "who", who);
2208 if (auto [k, valid] = DaemonKey::parse(who); !valid) {
2209 ss << who << " is not a valid daemon name";
2210 r = -EINVAL;
2211 } else {
2212 auto dm = daemon_state.get(k);
2213 if (dm) {
2214 if (f) {
2215 f->open_array_section("devices");
2216 for (auto& i : dm->devices) {
2217 daemon_state.with_device(i.first, [&f] (const DeviceState& dev) {
2218 f->dump_object("device", dev);
2219 });
2220 }
2221 f->close_section();
2222 f->flush(cmdctx->odata);
2223 } else {
2224 TextTable tbl;
2225 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2226 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
2227 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT,
2228 TextTable::LEFT);
2229 auto now = ceph_clock_now();
2230 for (auto& i : dm->devices) {
2231 daemon_state.with_device(
2232 i.first, [&tbl, now] (const DeviceState& dev) {
2233 string h;
2234 for (auto& i : dev.attachments) {
2235 if (h.size()) {
2236 h += " ";
2237 }
2238 h += std::get<0>(i) + ":" + std::get<1>(i);
2239 }
2240 tbl << dev.devid
2241 << h
2242 << dev.get_life_expectancy_str(now)
2243 << TextTable::endrow;
2244 });
2245 }
2246 cmdctx->odata.append(stringify(tbl));
2247 }
2248 } else {
2249 r = -ENOENT;
2250 ss << "daemon " << who << " not found";
2251 }
2252 cmdctx->reply(r, ss);
2253 }
2254 } else if (prefix == "device ls-by-host") {
2255 string host;
2256 cmd_getval(cmdctx->cmdmap, "host", host);
2257 set<string> devids;
2258 daemon_state.list_devids_by_server(host, &devids);
2259 if (f) {
2260 f->open_array_section("devices");
2261 for (auto& devid : devids) {
2262 daemon_state.with_device(
2263 devid, [&f] (const DeviceState& dev) {
2264 f->dump_object("device", dev);
2265 });
2266 }
2267 f->close_section();
2268 f->flush(cmdctx->odata);
2269 } else {
2270 TextTable tbl;
2271 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2272 tbl.define_column("DEV", TextTable::LEFT, TextTable::LEFT);
2273 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
2274 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT, TextTable::LEFT);
2275 auto now = ceph_clock_now();
2276 for (auto& devid : devids) {
2277 daemon_state.with_device(
2278 devid, [&tbl, &host, now] (const DeviceState& dev) {
2279 string n;
2280 for (auto& j : dev.attachments) {
2281 if (std::get<0>(j) == host) {
2282 if (n.size()) {
2283 n += " ";
2284 }
2285 n += std::get<1>(j);
2286 }
2287 }
2288 string d;
2289 for (auto& i : dev.daemons) {
2290 if (d.size()) {
2291 d += " ";
2292 }
2293 d += to_string(i);
2294 }
2295 tbl << dev.devid
2296 << n
2297 << d
2298 << dev.get_life_expectancy_str(now)
2299 << TextTable::endrow;
2300 });
2301 }
2302 cmdctx->odata.append(stringify(tbl));
2303 }
2304 cmdctx->reply(0, ss);
2305 return true;
2306 } else if (prefix == "device info") {
2307 string devid;
2308 cmd_getval(cmdctx->cmdmap, "devid", devid);
2309 int r = 0;
2310 ostringstream rs;
2311 if (!daemon_state.with_device(devid,
2312 [&f, &rs] (const DeviceState& dev) {
2313 if (f) {
2314 f->dump_object("device", dev);
2315 } else {
2316 dev.print(rs);
2317 }
2318 })) {
2319 ss << "device " << devid << " not found";
2320 r = -ENOENT;
2321 } else {
2322 if (f) {
2323 f->flush(cmdctx->odata);
2324 } else {
2325 cmdctx->odata.append(rs.str());
2326 }
2327 }
2328 cmdctx->reply(r, ss);
2329 return true;
2330 } else if (prefix == "device set-life-expectancy") {
2331 string devid;
2332 cmd_getval(cmdctx->cmdmap, "devid", devid);
2333 string from_str, to_str;
2334 cmd_getval(cmdctx->cmdmap, "from", from_str);
2335 cmd_getval(cmdctx->cmdmap, "to", to_str);
2336 utime_t from, to;
2337 if (!from.parse(from_str)) {
2338 ss << "unable to parse datetime '" << from_str << "'";
2339 r = -EINVAL;
2340 cmdctx->reply(r, ss);
2341 } else if (to_str.size() && !to.parse(to_str)) {
2342 ss << "unable to parse datetime '" << to_str << "'";
2343 r = -EINVAL;
2344 cmdctx->reply(r, ss);
2345 } else {
2346 map<string,string> meta;
2347 daemon_state.with_device_create(
2348 devid,
2349 [from, to, &meta] (DeviceState& dev) {
2350 dev.set_life_expectancy(from, to, ceph_clock_now());
2351 meta = dev.metadata;
2352 });
2353 json_spirit::Object json_object;
2354 for (auto& i : meta) {
2355 json_spirit::Config::add(json_object, i.first, i.second);
2356 }
2357 bufferlist json;
2358 json.append(json_spirit::write(json_object));
2359 const string cmd =
2360 "{"
2361 "\"prefix\": \"config-key set\", "
2362 "\"key\": \"device/" + devid + "\""
2363 "}";
2364 auto on_finish = new ReplyOnFinish(cmdctx);
2365 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2366 }
2367 return true;
2368 } else if (prefix == "device rm-life-expectancy") {
2369 string devid;
2370 cmd_getval(cmdctx->cmdmap, "devid", devid);
2371 map<string,string> meta;
2372 if (daemon_state.with_device_write(devid, [&meta] (DeviceState& dev) {
2373 dev.rm_life_expectancy();
2374 meta = dev.metadata;
2375 })) {
2376 string cmd;
2377 bufferlist json;
2378 if (meta.empty()) {
2379 cmd =
2380 "{"
2381 "\"prefix\": \"config-key rm\", "
2382 "\"key\": \"device/" + devid + "\""
2383 "}";
2384 } else {
2385 json_spirit::Object json_object;
2386 for (auto& i : meta) {
2387 json_spirit::Config::add(json_object, i.first, i.second);
2388 }
2389 json.append(json_spirit::write(json_object));
2390 cmd =
2391 "{"
2392 "\"prefix\": \"config-key set\", "
2393 "\"key\": \"device/" + devid + "\""
2394 "}";
2395 }
2396 auto on_finish = new ReplyOnFinish(cmdctx);
2397 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2398 } else {
2399 cmdctx->reply(0, ss);
2400 }
2401 return true;
2402 } else {
2403 if (!pgmap_ready) {
2404 ss << "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2405 }
2406 if (f) {
2407 f->open_object_section("pg_info");
2408 f->dump_bool("pg_ready", pgmap_ready);
2409 }
2410
2411 // fall back to feeding command to PGMap
2412 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2413 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
2414 f.get(), &ss, &cmdctx->odata);
2415 });
2416
2417 if (f) {
2418 f->close_section();
2419 }
2420 if (r != -EOPNOTSUPP) {
2421 if (f) {
2422 f->flush(cmdctx->odata);
2423 }
2424 cmdctx->reply(r, ss);
2425 return true;
2426 }
2427 }
2428
2429 // Was the command unfound?
2430 if (py_command.cmdstring.empty()) {
2431 ss << "No handler found for '" << prefix << "'";
2432 dout(4) << "No handler found for '" << prefix << "'" << dendl;
2433 cmdctx->reply(-EINVAL, ss);
2434 return true;
2435 }
2436
2437 // Validate that the module is active
2438 auto& mod_name = py_command.module_name;
2439 if (!py_modules.is_module_active(mod_name)) {
2440 ss << "Module '" << mod_name << "' is not enabled/loaded (required by "
2441 "command '" << prefix << "'): use `ceph mgr module enable "
2442 << mod_name << "` to enable it";
2443 dout(4) << ss.str() << dendl;
2444 cmdctx->reply(-EOPNOTSUPP, ss);
2445 return true;
2446 }
2447
2448 dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
2449 Finisher& mod_finisher = py_modules.get_active_module_finisher(mod_name);
2450 mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]
2451 (int r_) mutable {
2452 std::stringstream ss;
2453
2454 dout(10) << "dispatching command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
2455
2456 // Validate that the module is enabled
2457 auto& py_handler_name = py_command.module_name;
2458 PyModuleRef module = py_modules.get_module(py_handler_name);
2459 ceph_assert(module);
2460 if (!module->is_enabled()) {
2461 ss << "Module '" << py_handler_name << "' is not enabled (required by "
2462 "command '" << prefix << "'): use `ceph mgr module enable "
2463 << py_handler_name << "` to enable it";
2464 dout(4) << ss.str() << dendl;
2465 cmdctx->reply(-EOPNOTSUPP, ss);
2466 return;
2467 }
2468
2469 // Hack: allow the self-test method to run on unhealthy modules.
2470 // Fix this in future by creating a special path for self test rather
2471 // than having the hook be a normal module command.
2472 std::string self_test_prefix = py_handler_name + " " + "self-test";
2473
2474 // Validate that the module is healthy
2475 bool accept_command;
2476 if (module->is_loaded()) {
2477 if (module->get_can_run() && !module->is_failed()) {
2478 // Healthy module
2479 accept_command = true;
2480 } else if (self_test_prefix == prefix) {
2481 // Unhealthy, but allow because it's a self test command
2482 accept_command = true;
2483 } else {
2484 accept_command = false;
2485 ss << "Module '" << py_handler_name << "' has experienced an error and "
2486 "cannot handle commands: " << module->get_error_string();
2487 }
2488 } else {
2489 // Module not loaded
2490 accept_command = false;
2491 ss << "Module '" << py_handler_name << "' failed to load and "
2492 "cannot handle commands: " << module->get_error_string();
2493 }
2494
2495 if (!accept_command) {
2496 dout(4) << ss.str() << dendl;
2497 cmdctx->reply(-EIO, ss);
2498 return;
2499 }
2500
2501 std::stringstream ds;
2502 bufferlist inbl = cmdctx->data;
2503 int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
2504 inbl, &ds, &ss);
2505 if (r == -EACCES) {
2506 log_access_denied(cmdctx, session, ss);
2507 }
2508
2509 cmdctx->odata.append(ds);
2510 cmdctx->reply(r, ss);
2511 dout(10) << " command returned " << r << dendl;
2512 }));
2513 return true;
2514 }
2515
2516 void DaemonServer::_prune_pending_service_map()
2517 {
2518 utime_t cutoff = ceph_clock_now();
2519 cutoff -= g_conf().get_val<double>("mgr_service_beacon_grace");
2520 auto p = pending_service_map.services.begin();
2521 while (p != pending_service_map.services.end()) {
2522 auto q = p->second.daemons.begin();
2523 while (q != p->second.daemons.end()) {
2524 DaemonKey key{p->first, q->first};
2525 if (!daemon_state.exists(key)) {
2526 if (ServiceMap::is_normal_ceph_entity(p->first)) {
2527 dout(10) << "daemon " << key << " in service map but not in daemon state "
2528 << "index -- force pruning" << dendl;
2529 q = p->second.daemons.erase(q);
2530 pending_service_map_dirty = pending_service_map.epoch;
2531 } else {
2532 derr << "missing key " << key << dendl;
2533 ++q;
2534 }
2535
2536 continue;
2537 }
2538
2539 auto daemon = daemon_state.get(key);
2540 std::lock_guard l(daemon->lock);
2541 if (daemon->last_service_beacon == utime_t()) {
2542 // we must have just restarted; assume they are alive now.
2543 daemon->last_service_beacon = ceph_clock_now();
2544 ++q;
2545 continue;
2546 }
2547 if (daemon->last_service_beacon < cutoff) {
2548 dout(10) << "pruning stale " << p->first << "." << q->first
2549 << " last_beacon " << daemon->last_service_beacon << dendl;
2550 q = p->second.daemons.erase(q);
2551 pending_service_map_dirty = pending_service_map.epoch;
2552 } else {
2553 ++q;
2554 }
2555 }
2556 if (p->second.daemons.empty()) {
2557 p = pending_service_map.services.erase(p);
2558 pending_service_map_dirty = pending_service_map.epoch;
2559 } else {
2560 ++p;
2561 }
2562 }
2563 }
2564
2565 void DaemonServer::send_report()
2566 {
2567 if (!pgmap_ready) {
2568 if (ceph_clock_now() - started_at > g_conf().get_val<int64_t>("mgr_stats_period") * 4.0) {
2569 pgmap_ready = true;
2570 reported_osds.clear();
2571 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2572 << "potentially incomplete PG state to mon" << dendl;
2573 } else {
2574 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2575 << dendl;
2576 return;
2577 }
2578 }
2579
2580 auto m = ceph::make_message<MMonMgrReport>();
2581 m->gid = monc->get_global_id();
2582 py_modules.get_health_checks(&m->health_checks);
2583 py_modules.get_progress_events(&m->progress_events);
2584
2585 cluster_state.with_mutable_pgmap([&](PGMap& pg_map) {
2586 cluster_state.update_delta_stats();
2587
2588 if (pending_service_map.epoch) {
2589 _prune_pending_service_map();
2590 if (pending_service_map_dirty >= pending_service_map.epoch) {
2591 pending_service_map.modified = ceph_clock_now();
2592 encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
2593 dout(10) << "sending service_map e" << pending_service_map.epoch
2594 << dendl;
2595 pending_service_map.epoch++;
2596 }
2597 }
2598
2599 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
2600 // FIXME: no easy way to get mon features here. this will do for
2601 // now, though, as long as we don't make a backward-incompat change.
2602 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
2603 dout(10) << pg_map << dendl;
2604
2605 pg_map.get_health_checks(g_ceph_context, osdmap,
2606 &m->health_checks);
2607
2608 dout(10) << m->health_checks.checks.size() << " health checks"
2609 << dendl;
2610 dout(20) << "health checks:\n";
2611 JSONFormatter jf(true);
2612 jf.dump_object("health_checks", m->health_checks);
2613 jf.flush(*_dout);
2614 *_dout << dendl;
2615 if (osdmap.require_osd_release >= ceph_release_t::luminous) {
2616 clog->debug() << "pgmap v" << pg_map.version << ": " << pg_map;
2617 }
2618 });
2619 });
2620
2621 map<daemon_metric, unique_ptr<DaemonHealthMetricCollector>> accumulated;
2622 for (auto service : {"osd", "mon"} ) {
2623 auto daemons = daemon_state.get_by_service(service);
2624 for (const auto& [key,state] : daemons) {
2625 std::lock_guard l{state->lock};
2626 for (const auto& metric : state->daemon_health_metrics) {
2627 auto acc = accumulated.find(metric.get_type());
2628 if (acc == accumulated.end()) {
2629 auto collector = DaemonHealthMetricCollector::create(metric.get_type());
2630 if (!collector) {
2631 derr << __func__ << " " << key
2632 << " sent me an unknown health metric: "
2633 << std::hex << static_cast<uint8_t>(metric.get_type())
2634 << std::dec << dendl;
2635 continue;
2636 }
2637 tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
2638 std::move(collector));
2639 }
2640 acc->second->update(key, metric);
2641 }
2642 }
2643 }
2644 for (const auto& acc : accumulated) {
2645 acc.second->summarize(m->health_checks);
2646 }
2647 // TODO? We currently do not notify the PyModules
2648 // TODO: respect needs_send, so we send the report only if we are asked to do
2649 // so, or the state is updated.
2650 monc->send_mon_message(std::move(m));
2651 }
2652
2653 void DaemonServer::adjust_pgs()
2654 {
2655 dout(20) << dendl;
2656 unsigned max = std::max<int64_t>(1, g_conf()->mon_osd_max_creating_pgs);
2657 double max_misplaced = g_conf().get_val<double>("target_max_misplaced_ratio");
2658 bool aggro = g_conf().get_val<bool>("mgr_debug_aggressive_pg_num_changes");
2659
2660 map<string,unsigned> pg_num_to_set;
2661 map<string,unsigned> pgp_num_to_set;
2662 set<pg_t> upmaps_to_clear;
2663 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2664 unsigned creating_or_unknown = 0;
2665 for (auto& i : pg_map.num_pg_by_state) {
2666 if ((i.first & (PG_STATE_CREATING)) ||
2667 i.first == 0) {
2668 creating_or_unknown += i.second;
2669 }
2670 }
2671 unsigned left = max;
2672 if (creating_or_unknown >= max) {
2673 return;
2674 }
2675 left -= creating_or_unknown;
2676 dout(10) << "creating_or_unknown " << creating_or_unknown
2677 << " max_creating " << max
2678 << " left " << left
2679 << dendl;
2680
2681 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2682 // can run more frequently than we get updated pg stats from OSDs. We
2683 // may make multiple adjustments with stale informaiton.
2684 double misplaced_ratio, degraded_ratio;
2685 double inactive_pgs_ratio, unknown_pgs_ratio;
2686 pg_map.get_recovery_stats(&misplaced_ratio, &degraded_ratio,
2687 &inactive_pgs_ratio, &unknown_pgs_ratio);
2688 dout(20) << "misplaced_ratio " << misplaced_ratio
2689 << " degraded_ratio " << degraded_ratio
2690 << " inactive_pgs_ratio " << inactive_pgs_ratio
2691 << " unknown_pgs_ratio " << unknown_pgs_ratio
2692 << "; target_max_misplaced_ratio " << max_misplaced
2693 << dendl;
2694
2695 for (auto& i : osdmap.get_pools()) {
2696 const pg_pool_t& p = i.second;
2697
2698 // adjust pg_num?
2699 if (p.get_pg_num_target() != p.get_pg_num()) {
2700 dout(20) << "pool " << i.first
2701 << " pg_num " << p.get_pg_num()
2702 << " target " << p.get_pg_num_target()
2703 << dendl;
2704 if (p.has_flag(pg_pool_t::FLAG_CREATING)) {
2705 dout(10) << "pool " << i.first
2706 << " pg_num_target " << p.get_pg_num_target()
2707 << " pg_num " << p.get_pg_num()
2708 << " - still creating initial pgs"
2709 << dendl;
2710 } else if (p.get_pg_num_target() < p.get_pg_num()) {
2711 // pg_num decrease (merge)
2712 pg_t merge_source(p.get_pg_num() - 1, i.first);
2713 pg_t merge_target = merge_source.get_parent();
2714 bool ok = true;
2715
2716 if (p.get_pg_num() != p.get_pg_num_pending()) {
2717 dout(10) << "pool " << i.first
2718 << " pg_num_target " << p.get_pg_num_target()
2719 << " pg_num " << p.get_pg_num()
2720 << " - decrease and pg_num_pending != pg_num, waiting"
2721 << dendl;
2722 ok = false;
2723 } else if (p.get_pg_num() == p.get_pgp_num()) {
2724 dout(10) << "pool " << i.first
2725 << " pg_num_target " << p.get_pg_num_target()
2726 << " pg_num " << p.get_pg_num()
2727 << " - decrease blocked by pgp_num "
2728 << p.get_pgp_num()
2729 << dendl;
2730 ok = false;
2731 }
2732 vector<int32_t> source_acting;
2733 for (auto &merge_participant : {merge_source, merge_target}) {
2734 bool is_merge_source = merge_participant == merge_source;
2735 if (osdmap.have_pg_upmaps(merge_participant)) {
2736 dout(10) << "pool " << i.first
2737 << " pg_num_target " << p.get_pg_num_target()
2738 << " pg_num " << p.get_pg_num()
2739 << (is_merge_source ? " - merge source " : " - merge target ")
2740 << merge_participant
2741 << " has upmap" << dendl;
2742 upmaps_to_clear.insert(merge_participant);
2743 ok = false;
2744 }
2745 auto q = pg_map.pg_stat.find(merge_participant);
2746 if (q == pg_map.pg_stat.end()) {
2747 dout(10) << "pool " << i.first
2748 << " pg_num_target " << p.get_pg_num_target()
2749 << " pg_num " << p.get_pg_num()
2750 << " - no state for " << merge_participant
2751 << (is_merge_source ? " (merge source)" : " (merge target)")
2752 << dendl;
2753 ok = false;
2754 } else if ((q->second.state & (PG_STATE_ACTIVE | PG_STATE_CLEAN)) !=
2755 (PG_STATE_ACTIVE | PG_STATE_CLEAN)) {
2756 dout(10) << "pool " << i.first
2757 << " pg_num_target " << p.get_pg_num_target()
2758 << " pg_num " << p.get_pg_num()
2759 << (is_merge_source ? " - merge source " : " - merge target ")
2760 << merge_participant
2761 << " not clean (" << pg_state_string(q->second.state)
2762 << ")" << dendl;
2763 ok = false;
2764 }
2765 if (is_merge_source) {
2766 source_acting = q->second.acting;
2767 } else if (ok && q->second.acting != source_acting) {
2768 dout(10) << "pool " << i.first
2769 << " pg_num_target " << p.get_pg_num_target()
2770 << " pg_num " << p.get_pg_num()
2771 << (is_merge_source ? " - merge source " : " - merge target ")
2772 << merge_participant
2773 << " acting does not match (source " << source_acting
2774 << " != target " << q->second.acting
2775 << ")" << dendl;
2776 ok = false;
2777 }
2778 }
2779
2780 if (ok) {
2781 unsigned target = p.get_pg_num() - 1;
2782 dout(10) << "pool " << i.first
2783 << " pg_num_target " << p.get_pg_num_target()
2784 << " pg_num " << p.get_pg_num()
2785 << " -> " << target
2786 << " (merging " << merge_source
2787 << " and " << merge_target
2788 << ")" << dendl;
2789 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2790 continue;
2791 }
2792 } else if (p.get_pg_num_target() > p.get_pg_num()) {
2793 // pg_num increase (split)
2794 bool active = true;
2795 auto q = pg_map.num_pg_by_pool_state.find(i.first);
2796 if (q != pg_map.num_pg_by_pool_state.end()) {
2797 for (auto& j : q->second) {
2798 if ((j.first & (PG_STATE_ACTIVE|PG_STATE_PEERED)) == 0) {
2799 dout(20) << "pool " << i.first << " has " << j.second
2800 << " pgs in " << pg_state_string(j.first)
2801 << dendl;
2802 active = false;
2803 break;
2804 }
2805 }
2806 } else {
2807 active = false;
2808 }
2809 unsigned pg_gap = p.get_pg_num() - p.get_pgp_num();
2810 unsigned max_jump = cct->_conf->mgr_max_pg_num_change;
2811 if (!active) {
2812 dout(10) << "pool " << i.first
2813 << " pg_num_target " << p.get_pg_num_target()
2814 << " pg_num " << p.get_pg_num()
2815 << " - not all pgs active"
2816 << dendl;
2817 } else if (pg_gap >= max_jump) {
2818 dout(10) << "pool " << i.first
2819 << " pg_num " << p.get_pg_num()
2820 << " - pgp_num " << p.get_pgp_num()
2821 << " gap >= max_pg_num_change " << max_jump
2822 << " - must scale pgp_num first"
2823 << dendl;
2824 } else {
2825 unsigned add = std::min(
2826 std::min(left, max_jump - pg_gap),
2827 p.get_pg_num_target() - p.get_pg_num());
2828 unsigned target = p.get_pg_num() + add;
2829 left -= add;
2830 dout(10) << "pool " << i.first
2831 << " pg_num_target " << p.get_pg_num_target()
2832 << " pg_num " << p.get_pg_num()
2833 << " -> " << target << dendl;
2834 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2835 }
2836 }
2837 }
2838
2839 // adjust pgp_num?
2840 unsigned target = std::min(p.get_pg_num_pending(),
2841 p.get_pgp_num_target());
2842 if (target != p.get_pgp_num()) {
2843 dout(20) << "pool " << i.first
2844 << " pgp_num_target " << p.get_pgp_num_target()
2845 << " pgp_num " << p.get_pgp_num()
2846 << " -> " << target << dendl;
2847 if (target > p.get_pgp_num() &&
2848 p.get_pgp_num() == p.get_pg_num()) {
2849 dout(10) << "pool " << i.first
2850 << " pgp_num_target " << p.get_pgp_num_target()
2851 << " pgp_num " << p.get_pgp_num()
2852 << " - increase blocked by pg_num " << p.get_pg_num()
2853 << dendl;
2854 } else if (!aggro && (inactive_pgs_ratio > 0 ||
2855 degraded_ratio > 0 ||
2856 unknown_pgs_ratio > 0)) {
2857 dout(10) << "pool " << i.first
2858 << " pgp_num_target " << p.get_pgp_num_target()
2859 << " pgp_num " << p.get_pgp_num()
2860 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2861 << " update" << dendl;
2862 } else if (!aggro && (misplaced_ratio > max_misplaced)) {
2863 dout(10) << "pool " << i.first
2864 << " pgp_num_target " << p.get_pgp_num_target()
2865 << " pgp_num " << p.get_pgp_num()
2866 << " - misplaced_ratio " << misplaced_ratio
2867 << " > max " << max_misplaced
2868 << ", deferring pgp_num update" << dendl;
2869 } else {
2870 // NOTE: this calculation assumes objects are
2871 // basically uniformly distributed across all PGs
2872 // (regardless of pool), which is probably not
2873 // perfectly correct, but it's a start. make no
2874 // single adjustment that's more than half of the
2875 // max_misplaced, to somewhat limit the magnitude of
2876 // our potential error here.
2877 unsigned next;
2878 static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP = 1;
2879 pool_stat_t s = pg_map.get_pg_pool_sum_stat(i.first);
2880 if (aggro ||
2881 // pool is (virtually) empty; just jump to final pgp_num?
2882 (p.get_pgp_num_target() > p.get_pgp_num() &&
2883 s.stats.sum.num_objects <= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP *
2884 p.get_pgp_num_target()))) {
2885 next = target;
2886 } else {
2887 double room =
2888 std::min<double>(max_misplaced - misplaced_ratio,
2889 max_misplaced / 2.0);
2890 unsigned estmax = std::max<unsigned>(
2891 (double)p.get_pg_num() * room, 1u);
2892 unsigned next_min = 0;
2893 if (p.get_pgp_num() > estmax) {
2894 next_min = p.get_pgp_num() - estmax;
2895 }
2896 next = std::clamp(target,
2897 next_min,
2898 p.get_pgp_num() + estmax);
2899 dout(20) << " room " << room << " estmax " << estmax
2900 << " delta " << (target-p.get_pgp_num())
2901 << " next " << next << dendl;
2902 if (p.get_pgp_num_target() == p.get_pg_num_target() &&
2903 p.get_pgp_num_target() < p.get_pg_num()) {
2904 // since pgp_num is tracking pg_num, ceph is handling
2905 // pgp_num. so, be responsible: don't let pgp_num get
2906 // too far out ahead of merges (if we are merging).
2907 // this avoids moving lots of unmerged pgs onto a
2908 // small number of OSDs where we might blow out the
2909 // per-osd pg max.
2910 unsigned max_outpace_merges =
2911 std::max<unsigned>(8, p.get_pg_num() * max_misplaced);
2912 if (next + max_outpace_merges < p.get_pg_num()) {
2913 next = p.get_pg_num() - max_outpace_merges;
2914 dout(10) << " using next " << next
2915 << " to avoid outpacing merges (max_outpace_merges "
2916 << max_outpace_merges << ")" << dendl;
2917 }
2918 }
2919 }
2920 if (next != p.get_pgp_num()) {
2921 dout(10) << "pool " << i.first
2922 << " pgp_num_target " << p.get_pgp_num_target()
2923 << " pgp_num " << p.get_pgp_num()
2924 << " -> " << next << dendl;
2925 pgp_num_to_set[osdmap.get_pool_name(i.first)] = next;
2926 }
2927 }
2928 }
2929 if (left == 0) {
2930 return;
2931 }
2932 }
2933 });
2934 for (auto i : pg_num_to_set) {
2935 const string cmd =
2936 "{"
2937 "\"prefix\": \"osd pool set\", "
2938 "\"pool\": \"" + i.first + "\", "
2939 "\"var\": \"pg_num_actual\", "
2940 "\"val\": \"" + stringify(i.second) + "\""
2941 "}";
2942 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2943 }
2944 for (auto i : pgp_num_to_set) {
2945 const string cmd =
2946 "{"
2947 "\"prefix\": \"osd pool set\", "
2948 "\"pool\": \"" + i.first + "\", "
2949 "\"var\": \"pgp_num_actual\", "
2950 "\"val\": \"" + stringify(i.second) + "\""
2951 "}";
2952 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2953 }
2954 for (auto pg : upmaps_to_clear) {
2955 const string cmd =
2956 "{"
2957 "\"prefix\": \"osd rm-pg-upmap\", "
2958 "\"pgid\": \"" + stringify(pg) + "\""
2959 "}";
2960 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2961 const string cmd2 =
2962 "{"
2963 "\"prefix\": \"osd rm-pg-upmap-items\", "
2964 "\"pgid\": \"" + stringify(pg) + "\"" +
2965 "}";
2966 monc->start_mon_command({cmd2}, {}, nullptr, nullptr, nullptr);
2967 }
2968 }
2969
2970 void DaemonServer::got_service_map()
2971 {
2972 std::lock_guard l(lock);
2973
2974 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
2975 if (pending_service_map.epoch == 0) {
2976 // we just started up
2977 dout(10) << "got initial map e" << service_map.epoch << dendl;
2978 ceph_assert(pending_service_map_dirty == 0);
2979 pending_service_map = service_map;
2980 pending_service_map.epoch = service_map.epoch + 1;
2981 } else if (pending_service_map.epoch <= service_map.epoch) {
2982 // we just started up but got one more not our own map
2983 dout(10) << "got newer initial map e" << service_map.epoch << dendl;
2984 ceph_assert(pending_service_map_dirty == 0);
2985 pending_service_map = service_map;
2986 pending_service_map.epoch = service_map.epoch + 1;
2987 } else {
2988 // we already active and therefore must have persisted it,
2989 // which means ours is the same or newer.
2990 dout(10) << "got updated map e" << service_map.epoch << dendl;
2991 }
2992 });
2993
2994 // cull missing daemons, populate new ones
2995 std::set<std::string> types;
2996 for (auto& [type, service] : pending_service_map.services) {
2997 if (ServiceMap::is_normal_ceph_entity(type)) {
2998 continue;
2999 }
3000
3001 types.insert(type);
3002
3003 std::set<std::string> names;
3004 for (auto& q : service.daemons) {
3005 names.insert(q.first);
3006 DaemonKey key{type, q.first};
3007 if (!daemon_state.exists(key)) {
3008 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
3009 daemon->key = key;
3010 daemon->set_metadata(q.second.metadata);
3011 daemon->service_daemon = true;
3012 daemon_state.insert(daemon);
3013 dout(10) << "added missing " << key << dendl;
3014 }
3015 }
3016 daemon_state.cull(type, names);
3017 }
3018 daemon_state.cull_services(types);
3019 }
3020
3021 void DaemonServer::got_mgr_map()
3022 {
3023 std::lock_guard l(lock);
3024 set<std::string> have;
3025 cluster_state.with_mgrmap([&](const MgrMap& mgrmap) {
3026 auto md_update = [&] (DaemonKey key) {
3027 std::ostringstream oss;
3028 auto c = new MetadataUpdate(daemon_state, key);
3029 // FIXME remove post-nautilus: include 'id' for luminous mons
3030 oss << "{\"prefix\": \"mgr metadata\", \"who\": \""
3031 << key.name << "\", \"id\": \"" << key.name << "\"}";
3032 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
3033 };
3034 if (mgrmap.active_name.size()) {
3035 DaemonKey key{"mgr", mgrmap.active_name};
3036 have.insert(mgrmap.active_name);
3037 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
3038 md_update(key);
3039 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
3040 }
3041 }
3042 for (auto& i : mgrmap.standbys) {
3043 DaemonKey key{"mgr", i.second.name};
3044 have.insert(i.second.name);
3045 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
3046 md_update(key);
3047 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
3048 }
3049 }
3050 });
3051 daemon_state.cull("mgr", have);
3052 }
3053
3054 const char** DaemonServer::get_tracked_conf_keys() const
3055 {
3056 static const char *KEYS[] = {
3057 "mgr_stats_threshold",
3058 "mgr_stats_period",
3059 nullptr
3060 };
3061
3062 return KEYS;
3063 }
3064
3065 void DaemonServer::handle_conf_change(const ConfigProxy& conf,
3066 const std::set <std::string> &changed)
3067 {
3068
3069 if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
3070 dout(4) << "Updating stats threshold/period on "
3071 << daemon_connections.size() << " clients" << dendl;
3072 // Send a fresh MMgrConfigure to all clients, so that they can follow
3073 // the new policy for transmitting stats
3074 finisher.queue(new LambdaContext([this](int r) {
3075 std::lock_guard l(lock);
3076 for (auto &c : daemon_connections) {
3077 _send_configure(c);
3078 }
3079 }));
3080 }
3081 }
3082
3083 void DaemonServer::_send_configure(ConnectionRef c)
3084 {
3085 ceph_assert(ceph_mutex_is_locked_by_me(lock));
3086
3087 auto configure = make_message<MMgrConfigure>();
3088 configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
3089 configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
3090
3091 if (c->peer_is_osd()) {
3092 configure->osd_perf_metric_queries =
3093 osd_perf_metric_collector.get_queries();
3094 } else if (c->peer_is_mds()) {
3095 configure->metric_config_message =
3096 MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector.get_queries()));
3097 }
3098
3099 c->send_message2(configure);
3100 }
3101
3102 MetricQueryID DaemonServer::add_osd_perf_query(
3103 const OSDPerfMetricQuery &query,
3104 const std::optional<OSDPerfMetricLimit> &limit)
3105 {
3106 return osd_perf_metric_collector.add_query(query, limit);
3107 }
3108
3109 int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
3110 {
3111 return osd_perf_metric_collector.remove_query(query_id);
3112 }
3113
3114 int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector)
3115 {
3116 return osd_perf_metric_collector.get_counters(collector);
3117 }
3118
3119 MetricQueryID DaemonServer::add_mds_perf_query(
3120 const MDSPerfMetricQuery &query,
3121 const std::optional<MDSPerfMetricLimit> &limit)
3122 {
3123 return mds_perf_metric_collector.add_query(query, limit);
3124 }
3125
3126 int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
3127 {
3128 return mds_perf_metric_collector.remove_query(query_id);
3129 }
3130
3131 void DaemonServer::reregister_mds_perf_queries()
3132 {
3133 mds_perf_metric_collector.reregister_queries();
3134 }
3135
3136 int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
3137 {
3138 return mds_perf_metric_collector.get_counters(collector);
3139 }