]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/DaemonServer.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "DaemonServer.h"
15 #include <boost/algorithm/string.hpp>
16 #include "mgr/Mgr.h"
17
18 #include "include/stringify.h"
19 #include "include/str_list.h"
20 #include "auth/RotatingKeyRing.h"
21 #include "json_spirit/json_spirit_writer.h"
22
23 #include "mgr/mgr_commands.h"
24 #include "mgr/DaemonHealthMetricCollector.h"
25 #include "mgr/OSDPerfMetricCollector.h"
26 #include "mgr/MDSPerfMetricCollector.h"
27 #include "mon/MonCommand.h"
28
29 #include "messages/MMgrOpen.h"
30 #include "messages/MMgrUpdate.h"
31 #include "messages/MMgrClose.h"
32 #include "messages/MMgrConfigure.h"
33 #include "messages/MMonMgrReport.h"
34 #include "messages/MCommand.h"
35 #include "messages/MCommandReply.h"
36 #include "messages/MMgrCommand.h"
37 #include "messages/MMgrCommandReply.h"
38 #include "messages/MPGStats.h"
39 #include "messages/MOSDScrub2.h"
40 #include "messages/MOSDForceRecovery.h"
41 #include "common/errno.h"
42 #include "common/pick_address.h"
43
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_mgr
46 #undef dout_prefix
47 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
48
49 using namespace TOPNSPC::common;
50
51 using std::list;
52 using std::ostringstream;
53 using std::string;
54 using std::stringstream;
55 using std::vector;
56 using std::unique_ptr;
57
58 namespace {
59 template <typename Map>
60 bool map_compare(Map const &lhs, Map const &rhs) {
61 return lhs.size() == rhs.size()
62 && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
63 [] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
64 }
65 }
66
67 DaemonServer::DaemonServer(MonClient *monc_,
68 Finisher &finisher_,
69 DaemonStateIndex &daemon_state_,
70 ClusterState &cluster_state_,
71 PyModuleRegistry &py_modules_,
72 LogChannelRef clog_,
73 LogChannelRef audit_clog_)
74 : Dispatcher(g_ceph_context),
75 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
76 g_conf().get_val<Option::size_t>("mgr_client_bytes"))),
77 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
78 g_conf().get_val<uint64_t>("mgr_client_messages"))),
79 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
80 g_conf().get_val<Option::size_t>("mgr_osd_bytes"))),
81 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
82 g_conf().get_val<uint64_t>("mgr_osd_messages"))),
83 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
84 g_conf().get_val<Option::size_t>("mgr_mds_bytes"))),
85 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
86 g_conf().get_val<uint64_t>("mgr_mds_messages"))),
87 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
88 g_conf().get_val<Option::size_t>("mgr_mon_bytes"))),
89 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
90 g_conf().get_val<uint64_t>("mgr_mon_messages"))),
91 msgr(nullptr),
92 monc(monc_),
93 finisher(finisher_),
94 daemon_state(daemon_state_),
95 cluster_state(cluster_state_),
96 py_modules(py_modules_),
97 clog(clog_),
98 audit_clog(audit_clog_),
99 pgmap_ready(false),
100 timer(g_ceph_context, lock),
101 shutting_down(false),
102 tick_event(nullptr),
103 osd_perf_metric_collector_listener(this),
104 osd_perf_metric_collector(osd_perf_metric_collector_listener),
105 mds_perf_metric_collector_listener(this),
106 mds_perf_metric_collector(mds_perf_metric_collector_listener)
107 {
108 g_conf().add_observer(this);
109 }
110
111 DaemonServer::~DaemonServer() {
112 delete msgr;
113 g_conf().remove_observer(this);
114 }
115
116 int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
117 {
118 // Initialize Messenger
119 std::string public_msgr_type = g_conf()->ms_public_type.empty() ?
120 g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
121 msgr = Messenger::create(g_ceph_context, public_msgr_type,
122 entity_name_t::MGR(gid),
123 "mgr",
124 Messenger::get_pid_nonce());
125 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
126
127 msgr->set_auth_client(monc);
128
129 // throttle clients
130 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
131 client_byte_throttler.get(),
132 client_msg_throttler.get());
133
134 // servers
135 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
136 osd_byte_throttler.get(),
137 osd_msg_throttler.get());
138 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
139 mds_byte_throttler.get(),
140 mds_msg_throttler.get());
141 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
142 mon_byte_throttler.get(),
143 mon_msg_throttler.get());
144
145 entity_addrvec_t addrs;
146 int r = pick_addresses(cct, CEPH_PICK_ADDRESS_PUBLIC, &addrs);
147 if (r < 0) {
148 return r;
149 }
150 dout(20) << __func__ << " will bind to " << addrs << dendl;
151 r = msgr->bindv(addrs);
152 if (r < 0) {
153 derr << "unable to bind mgr to " << addrs << dendl;
154 return r;
155 }
156
157 msgr->set_myname(entity_name_t::MGR(gid));
158 msgr->set_addr_unknowns(client_addrs);
159
160 msgr->start();
161 msgr->add_dispatcher_tail(this);
162
163 msgr->set_auth_server(monc);
164 monc->set_handle_authentication_dispatcher(this);
165
166 started_at = ceph_clock_now();
167
168 std::lock_guard l(lock);
169 timer.init();
170
171 schedule_tick_locked(
172 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
173
174 return 0;
175 }
176
177 entity_addrvec_t DaemonServer::get_myaddrs() const
178 {
179 return msgr->get_myaddrs();
180 }
181
182 int DaemonServer::ms_handle_fast_authentication(Connection *con)
183 {
184 auto s = ceph::make_ref<MgrSession>(cct);
185 con->set_priv(s);
186 s->inst.addr = con->get_peer_addr();
187 s->entity_name = con->peer_name;
188 dout(10) << __func__ << " new session " << s << " con " << con
189 << " entity " << con->peer_name
190 << " addr " << con->get_peer_addrs()
191 << dendl;
192
193 AuthCapsInfo &caps_info = con->get_peer_caps_info();
194 if (caps_info.allow_all) {
195 dout(10) << " session " << s << " " << s->entity_name
196 << " allow_all" << dendl;
197 s->caps.set_allow_all();
198 } else if (caps_info.caps.length() > 0) {
199 auto p = caps_info.caps.cbegin();
200 string str;
201 try {
202 decode(str, p);
203 }
204 catch (buffer::error& e) {
205 dout(10) << " session " << s << " " << s->entity_name
206 << " failed to decode caps" << dendl;
207 return -EACCES;
208 }
209 if (!s->caps.parse(str)) {
210 dout(10) << " session " << s << " " << s->entity_name
211 << " failed to parse caps '" << str << "'" << dendl;
212 return -EACCES;
213 }
214 dout(10) << " session " << s << " " << s->entity_name
215 << " has caps " << s->caps << " '" << str << "'" << dendl;
216 }
217 return 1;
218 }
219
220 void DaemonServer::ms_handle_accept(Connection* con)
221 {
222 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
223 auto s = ceph::ref_cast<MgrSession>(con->get_priv());
224 std::lock_guard l(lock);
225 s->osd_id = atoi(s->entity_name.get_id().c_str());
226 dout(10) << "registering osd." << s->osd_id << " session "
227 << s << " con " << con << dendl;
228 osd_cons[s->osd_id].insert(con);
229 }
230 }
231
232 bool DaemonServer::ms_handle_reset(Connection *con)
233 {
234 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
235 auto priv = con->get_priv();
236 auto session = static_cast<MgrSession*>(priv.get());
237 if (!session) {
238 return false;
239 }
240 std::lock_guard l(lock);
241 dout(10) << "unregistering osd." << session->osd_id
242 << " session " << session << " con " << con << dendl;
243 osd_cons[session->osd_id].erase(con);
244
245 auto iter = daemon_connections.find(con);
246 if (iter != daemon_connections.end()) {
247 daemon_connections.erase(iter);
248 }
249 }
250 return false;
251 }
252
253 bool DaemonServer::ms_handle_refused(Connection *con)
254 {
255 // do nothing for now
256 return false;
257 }
258
259 bool DaemonServer::ms_dispatch2(const ref_t<Message>& m)
260 {
261 // Note that we do *not* take ::lock here, in order to avoid
262 // serializing all message handling. It's up to each handler
263 // to take whatever locks it needs.
264 switch (m->get_type()) {
265 case MSG_PGSTATS:
266 cluster_state.ingest_pgstats(ref_cast<MPGStats>(m));
267 maybe_ready(m->get_source().num());
268 return true;
269 case MSG_MGR_REPORT:
270 return handle_report(ref_cast<MMgrReport>(m));
271 case MSG_MGR_OPEN:
272 return handle_open(ref_cast<MMgrOpen>(m));
273 case MSG_MGR_UPDATE:
274 return handle_update(ref_cast<MMgrUpdate>(m));
275 case MSG_MGR_CLOSE:
276 return handle_close(ref_cast<MMgrClose>(m));
277 case MSG_COMMAND:
278 return handle_command(ref_cast<MCommand>(m));
279 case MSG_MGR_COMMAND:
280 return handle_command(ref_cast<MMgrCommand>(m));
281 default:
282 dout(1) << "Unhandled message type " << m->get_type() << dendl;
283 return false;
284 };
285 }
286
287 void DaemonServer::dump_pg_ready(ceph::Formatter *f)
288 {
289 f->dump_bool("pg_ready", pgmap_ready.load());
290 }
291
292 void DaemonServer::maybe_ready(int32_t osd_id)
293 {
294 if (pgmap_ready.load()) {
295 // Fast path: we don't need to take lock because pgmap_ready
296 // is already set
297 } else {
298 std::lock_guard l(lock);
299
300 if (reported_osds.find(osd_id) == reported_osds.end()) {
301 dout(4) << "initial report from osd " << osd_id << dendl;
302 reported_osds.insert(osd_id);
303 std::set<int32_t> up_osds;
304
305 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
306 osdmap.get_up_osds(up_osds);
307 });
308
309 std::set<int32_t> unreported_osds;
310 std::set_difference(up_osds.begin(), up_osds.end(),
311 reported_osds.begin(), reported_osds.end(),
312 std::inserter(unreported_osds, unreported_osds.begin()));
313
314 if (unreported_osds.size() == 0) {
315 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
316 pgmap_ready = true;
317 reported_osds.clear();
318 // Avoid waiting for next tick
319 send_report();
320 } else {
321 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
322 " to report in before PGMap is ready" << dendl;
323 }
324 }
325 }
326 }
327
328 void DaemonServer::tick()
329 {
330 dout(10) << dendl;
331 send_report();
332 adjust_pgs();
333
334 schedule_tick_locked(
335 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
336 }
337
338 // Currently modules do not set health checks in response to events delivered to
339 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
340 // if this pattern emerges in the future, this scheduler could be modified to
341 // fire after all modules have had a chance to set their health checks.
342 void DaemonServer::schedule_tick_locked(double delay_sec)
343 {
344 ceph_assert(ceph_mutex_is_locked_by_me(lock));
345
346 if (tick_event) {
347 timer.cancel_event(tick_event);
348 tick_event = nullptr;
349 }
350
351 // on shutdown start rejecting explicit requests to send reports that may
352 // originate from python land which may still be running.
353 if (shutting_down)
354 return;
355
356 tick_event = timer.add_event_after(delay_sec,
357 new LambdaContext([this](int r) {
358 tick();
359 }));
360 }
361
362 void DaemonServer::schedule_tick(double delay_sec)
363 {
364 std::lock_guard l(lock);
365 schedule_tick_locked(delay_sec);
366 }
367
368 void DaemonServer::handle_osd_perf_metric_query_updated()
369 {
370 dout(10) << dendl;
371
372 // Send a fresh MMgrConfigure to all clients, so that they can follow
373 // the new policy for transmitting stats
374 finisher.queue(new LambdaContext([this](int r) {
375 std::lock_guard l(lock);
376 for (auto &c : daemon_connections) {
377 if (c->peer_is_osd()) {
378 _send_configure(c);
379 }
380 }
381 }));
382 }
383
384 void DaemonServer::handle_mds_perf_metric_query_updated()
385 {
386 dout(10) << dendl;
387
388 // Send a fresh MMgrConfigure to all clients, so that they can follow
389 // the new policy for transmitting stats
390 finisher.queue(new LambdaContext([this](int r) {
391 std::lock_guard l(lock);
392 for (auto &c : daemon_connections) {
393 if (c->peer_is_mds()) {
394 _send_configure(c);
395 }
396 }
397 }));
398 }
399
400 void DaemonServer::shutdown()
401 {
402 dout(10) << "begin" << dendl;
403 msgr->shutdown();
404 msgr->wait();
405 cluster_state.shutdown();
406 dout(10) << "done" << dendl;
407
408 std::lock_guard l(lock);
409 shutting_down = true;
410 timer.shutdown();
411 }
412
413 static DaemonKey key_from_service(
414 const std::string& service_name,
415 int peer_type,
416 const std::string& daemon_name)
417 {
418 if (!service_name.empty()) {
419 return DaemonKey{service_name, daemon_name};
420 } else {
421 return DaemonKey{ceph_entity_type_name(peer_type), daemon_name};
422 }
423 }
424
425 void DaemonServer::fetch_missing_metadata(const DaemonKey& key,
426 const entity_addr_t& addr)
427 {
428 if (!daemon_state.is_updating(key) &&
429 (key.type == "osd" || key.type == "mds" || key.type == "mon")) {
430 std::ostringstream oss;
431 auto c = new MetadataUpdate(daemon_state, key);
432 if (key.type == "osd") {
433 oss << "{\"prefix\": \"osd metadata\", \"id\": "
434 << key.name<< "}";
435 } else if (key.type == "mds") {
436 c->set_default("addr", stringify(addr));
437 oss << "{\"prefix\": \"mds metadata\", \"who\": \""
438 << key.name << "\"}";
439 } else if (key.type == "mon") {
440 oss << "{\"prefix\": \"mon metadata\", \"id\": \""
441 << key.name << "\"}";
442 } else {
443 ceph_abort();
444 }
445 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
446 }
447 }
448
449 bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
450 {
451 std::unique_lock l(lock);
452
453 DaemonKey key = key_from_service(m->service_name,
454 m->get_connection()->get_peer_type(),
455 m->daemon_name);
456
457 auto con = m->get_connection();
458 dout(10) << "from " << key << " " << con->get_peer_addr() << dendl;
459
460 _send_configure(con);
461
462 DaemonStatePtr daemon;
463 if (daemon_state.exists(key)) {
464 dout(20) << "updating existing DaemonState for " << key << dendl;
465 daemon = daemon_state.get(key);
466 }
467 if (!daemon) {
468 if (m->service_daemon) {
469 dout(4) << "constructing new DaemonState for " << key << dendl;
470 daemon = std::make_shared<DaemonState>(daemon_state.types);
471 daemon->key = key;
472 daemon->service_daemon = true;
473 daemon_state.insert(daemon);
474 } else {
475 /* A normal Ceph daemon has connected but we are or should be waiting on
476 * metadata for it. Close the session so that it tries to reconnect.
477 */
478 dout(2) << "ignoring open from " << key << " " << con->get_peer_addr()
479 << "; not ready for session (expect reconnect)" << dendl;
480 con->mark_down();
481 l.unlock();
482 fetch_missing_metadata(key, m->get_source_addr());
483 return true;
484 }
485 }
486 if (daemon) {
487 if (m->service_daemon) {
488 // update the metadata through the daemon state index to
489 // ensure it's kept up-to-date
490 daemon_state.update_metadata(daemon, m->daemon_metadata);
491 }
492
493 std::lock_guard l(daemon->lock);
494 daemon->perf_counters.clear();
495
496 daemon->service_daemon = m->service_daemon;
497 if (m->service_daemon) {
498 daemon->service_status = m->daemon_status;
499
500 utime_t now = ceph_clock_now();
501 auto [d, added] = pending_service_map.get_daemon(m->service_name,
502 m->daemon_name);
503 if (added || d->gid != (uint64_t)m->get_source().num()) {
504 dout(10) << "registering " << key << " in pending_service_map" << dendl;
505 d->gid = m->get_source().num();
506 d->addr = m->get_source_addr();
507 d->start_epoch = pending_service_map.epoch;
508 d->start_stamp = now;
509 d->metadata = m->daemon_metadata;
510 pending_service_map_dirty = pending_service_map.epoch;
511 }
512 }
513
514 auto p = m->config_bl.cbegin();
515 if (p != m->config_bl.end()) {
516 decode(daemon->config, p);
517 decode(daemon->ignored_mon_config, p);
518 dout(20) << " got config " << daemon->config
519 << " ignored " << daemon->ignored_mon_config << dendl;
520 }
521 daemon->config_defaults_bl = m->config_defaults_bl;
522 daemon->config_defaults.clear();
523 dout(20) << " got config_defaults_bl " << daemon->config_defaults_bl.length()
524 << " bytes" << dendl;
525 }
526
527 if (con->get_peer_type() != entity_name_t::TYPE_CLIENT &&
528 m->service_name.empty())
529 {
530 // Store in set of the daemon/service connections, i.e. those
531 // connections that require an update in the event of stats
532 // configuration changes.
533 daemon_connections.insert(con);
534 }
535
536 return true;
537 }
538
539 bool DaemonServer::handle_update(const ref_t<MMgrUpdate>& m)
540 {
541 DaemonKey key;
542 if (!m->service_name.empty()) {
543 key.type = m->service_name;
544 } else {
545 key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
546 }
547 key.name = m->daemon_name;
548
549 dout(10) << "from " << m->get_connection() << " " << key << dendl;
550
551 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
552 m->service_name.empty()) {
553 // Clients should not be sending us update request
554 dout(10) << "rejecting update request from non-daemon client " << m->daemon_name
555 << dendl;
556 clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
557 << " at " << m->get_connection()->get_peer_addrs();
558 m->get_connection()->mark_down();
559 return true;
560 }
561
562
563 {
564 std::unique_lock locker(lock);
565
566 DaemonStatePtr daemon;
567 // Look up the DaemonState
568 if (daemon_state.exists(key)) {
569 dout(20) << "updating existing DaemonState for " << key << dendl;
570
571 daemon = daemon_state.get(key);
572 if (m->need_metadata_update &&
573 !m->daemon_metadata.empty()) {
574 daemon_state.update_metadata(daemon, m->daemon_metadata);
575 }
576 }
577 }
578
579 return true;
580 }
581
582 bool DaemonServer::handle_close(const ref_t<MMgrClose>& m)
583 {
584 std::lock_guard l(lock);
585
586 DaemonKey key = key_from_service(m->service_name,
587 m->get_connection()->get_peer_type(),
588 m->daemon_name);
589 dout(4) << "from " << m->get_connection() << " " << key << dendl;
590
591 if (daemon_state.exists(key)) {
592 DaemonStatePtr daemon = daemon_state.get(key);
593 daemon_state.rm(key);
594 {
595 std::lock_guard l(daemon->lock);
596 if (daemon->service_daemon) {
597 pending_service_map.rm_daemon(m->service_name, m->daemon_name);
598 pending_service_map_dirty = pending_service_map.epoch;
599 }
600 }
601 }
602
603 // send same message back as a reply
604 m->get_connection()->send_message2(m);
605 return true;
606 }
607
608 void DaemonServer::update_task_status(
609 DaemonKey key,
610 const std::map<std::string,std::string>& task_status)
611 {
612 dout(10) << "got task status from " << key << dendl;
613
614 [[maybe_unused]] auto [daemon, added] =
615 pending_service_map.get_daemon(key.type, key.name);
616 if (daemon->task_status != task_status) {
617 daemon->task_status = task_status;
618 pending_service_map_dirty = pending_service_map.epoch;
619 }
620 }
621
622 bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
623 {
624 DaemonKey key;
625 if (!m->service_name.empty()) {
626 key.type = m->service_name;
627 } else {
628 key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
629 }
630 key.name = m->daemon_name;
631
632 dout(10) << "from " << m->get_connection() << " " << key << dendl;
633
634 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
635 m->service_name.empty()) {
636 // Clients should not be sending us stats unless they are declaring
637 // themselves to be a daemon for some service.
638 dout(10) << "rejecting report from non-daemon client " << m->daemon_name
639 << dendl;
640 clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
641 << " at " << m->get_connection()->get_peer_addrs();
642 m->get_connection()->mark_down();
643 return true;
644 }
645
646
647 {
648 std::unique_lock locker(lock);
649
650 DaemonStatePtr daemon;
651 // Look up the DaemonState
652 if (daemon = daemon_state.get(key); daemon != nullptr) {
653 dout(20) << "updating existing DaemonState for " << key << dendl;
654 } else {
655 locker.unlock();
656
657 // we don't know the hostname at this stage, reject MMgrReport here.
658 dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
659 << dendl;
660 // issue metadata request in background
661 fetch_missing_metadata(key, m->get_source_addr());
662
663 locker.lock();
664
665 // kill session
666 auto priv = m->get_connection()->get_priv();
667 auto session = static_cast<MgrSession*>(priv.get());
668 if (!session) {
669 return false;
670 }
671 m->get_connection()->mark_down();
672
673 dout(10) << "unregistering osd." << session->osd_id
674 << " session " << session << " con " << m->get_connection() << dendl;
675
676 if (osd_cons.find(session->osd_id) != osd_cons.end()) {
677 osd_cons[session->osd_id].erase(m->get_connection());
678 }
679
680 auto iter = daemon_connections.find(m->get_connection());
681 if (iter != daemon_connections.end()) {
682 daemon_connections.erase(iter);
683 }
684
685 return false;
686 }
687
688 // Update the DaemonState
689 ceph_assert(daemon != nullptr);
690 {
691 std::lock_guard l(daemon->lock);
692 auto &daemon_counters = daemon->perf_counters;
693 daemon_counters.update(*m.get());
694
695 auto p = m->config_bl.cbegin();
696 if (p != m->config_bl.end()) {
697 decode(daemon->config, p);
698 decode(daemon->ignored_mon_config, p);
699 dout(20) << " got config " << daemon->config
700 << " ignored " << daemon->ignored_mon_config << dendl;
701 }
702
703 utime_t now = ceph_clock_now();
704 if (daemon->service_daemon) {
705 if (m->daemon_status) {
706 daemon->service_status_stamp = now;
707 daemon->service_status = *m->daemon_status;
708 }
709 daemon->last_service_beacon = now;
710 } else if (m->daemon_status) {
711 derr << "got status from non-daemon " << key << dendl;
712 }
713 // update task status
714 if (m->task_status) {
715 update_task_status(key, *m->task_status);
716 daemon->last_service_beacon = now;
717 }
718 if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
719 // only OSD and MON send health_checks to me now
720 daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
721 dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
722 << dendl;
723 }
724 }
725 }
726
727 // if there are any schema updates, notify the python modules
728 /* no users currently
729 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
730 py_modules.notify_all("perf_schema_update", ceph::to_string(key));
731 }
732 */
733
734 if (m->get_connection()->peer_is_osd()) {
735 osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
736 }
737
738 if (m->metric_report_message) {
739 const MetricReportMessage &message = *m->metric_report_message;
740 boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
741 }
742
743 return true;
744 }
745
746
747 void DaemonServer::_generate_command_map(
748 cmdmap_t& cmdmap,
749 map<string,string> &param_str_map)
750 {
751 for (auto p = cmdmap.begin();
752 p != cmdmap.end(); ++p) {
753 if (p->first == "prefix")
754 continue;
755 if (p->first == "caps") {
756 vector<string> cv;
757 if (cmd_getval(cmdmap, "caps", cv) &&
758 cv.size() % 2 == 0) {
759 for (unsigned i = 0; i < cv.size(); i += 2) {
760 string k = string("caps_") + cv[i];
761 param_str_map[k] = cv[i + 1];
762 }
763 continue;
764 }
765 }
766 param_str_map[p->first] = cmd_vartype_stringify(p->second);
767 }
768 }
769
770 const MonCommand *DaemonServer::_get_mgrcommand(
771 const string &cmd_prefix,
772 const std::vector<MonCommand> &cmds)
773 {
774 const MonCommand *this_cmd = nullptr;
775 for (const auto &cmd : cmds) {
776 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
777 this_cmd = &cmd;
778 break;
779 }
780 }
781 return this_cmd;
782 }
783
784 bool DaemonServer::_allowed_command(
785 MgrSession *s,
786 const string &service,
787 const string &module,
788 const string &prefix,
789 const cmdmap_t& cmdmap,
790 const map<string,string>& param_str_map,
791 const MonCommand *this_cmd) {
792
793 if (s->entity_name.is_mon()) {
794 // mon is all-powerful. even when it is forwarding commands on behalf of
795 // old clients; we expect the mon is validating commands before proxying!
796 return true;
797 }
798
799 bool cmd_r = this_cmd->requires_perm('r');
800 bool cmd_w = this_cmd->requires_perm('w');
801 bool cmd_x = this_cmd->requires_perm('x');
802
803 bool capable = s->caps.is_capable(
804 g_ceph_context,
805 s->entity_name,
806 service, module, prefix, param_str_map,
807 cmd_r, cmd_w, cmd_x,
808 s->get_peer_addr());
809
810 dout(10) << " " << s->entity_name << " "
811 << (capable ? "" : "not ") << "capable" << dendl;
812 return capable;
813 }
814
815 /**
816 * The working data for processing an MCommand. This lives in
817 * a class to enable passing it into other threads for processing
818 * outside of the thread/locks that called handle_command.
819 */
820 class CommandContext {
821 public:
822 ceph::ref_t<MCommand> m_tell;
823 ceph::ref_t<MMgrCommand> m_mgr;
824 const std::vector<std::string>& cmd; ///< ref into m_tell or m_mgr
825 const bufferlist& data; ///< ref into m_tell or m_mgr
826 bufferlist odata;
827 cmdmap_t cmdmap;
828
829 explicit CommandContext(ceph::ref_t<MCommand> m)
830 : m_tell{std::move(m)},
831 cmd(m_tell->cmd),
832 data(m_tell->get_data()) {
833 }
834 explicit CommandContext(ceph::ref_t<MMgrCommand> m)
835 : m_mgr{std::move(m)},
836 cmd(m_mgr->cmd),
837 data(m_mgr->get_data()) {
838 }
839
840 void reply(int r, const std::stringstream &ss) {
841 reply(r, ss.str());
842 }
843
844 void reply(int r, const std::string &rs) {
845 // Let the connection drop as soon as we've sent our response
846 ConnectionRef con = m_tell ? m_tell->get_connection()
847 : m_mgr->get_connection();
848 if (con) {
849 con->mark_disposable();
850 }
851
852 if (r == 0) {
853 dout(20) << "success" << dendl;
854 } else {
855 derr << __func__ << " " << cpp_strerror(r) << " " << rs << dendl;
856 }
857 if (con) {
858 if (m_tell) {
859 MCommandReply *reply = new MCommandReply(r, rs);
860 reply->set_tid(m_tell->get_tid());
861 reply->set_data(odata);
862 con->send_message(reply);
863 } else {
864 MMgrCommandReply *reply = new MMgrCommandReply(r, rs);
865 reply->set_tid(m_mgr->get_tid());
866 reply->set_data(odata);
867 con->send_message(reply);
868 }
869 }
870 }
871 };
872
873 /**
874 * A context for receiving a bufferlist/error string from a background
875 * function and then calling back to a CommandContext when it's done
876 */
877 class ReplyOnFinish : public Context {
878 std::shared_ptr<CommandContext> cmdctx;
879
880 public:
881 bufferlist from_mon;
882 string outs;
883
884 explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_)
885 : cmdctx(cmdctx_)
886 {}
887 void finish(int r) override {
888 cmdctx->odata.claim_append(from_mon);
889 cmdctx->reply(r, outs);
890 }
891 };
892
893 bool DaemonServer::handle_command(const ref_t<MCommand>& m)
894 {
895 std::lock_guard l(lock);
896 auto cmdctx = std::make_shared<CommandContext>(m);
897 try {
898 return _handle_command(cmdctx);
899 } catch (const bad_cmd_get& e) {
900 cmdctx->reply(-EINVAL, e.what());
901 return true;
902 }
903 }
904
905 bool DaemonServer::handle_command(const ref_t<MMgrCommand>& m)
906 {
907 std::lock_guard l(lock);
908 auto cmdctx = std::make_shared<CommandContext>(m);
909 try {
910 return _handle_command(cmdctx);
911 } catch (const bad_cmd_get& e) {
912 cmdctx->reply(-EINVAL, e.what());
913 return true;
914 }
915 }
916
917 void DaemonServer::log_access_denied(
918 std::shared_ptr<CommandContext>& cmdctx,
919 MgrSession* session, std::stringstream& ss) {
920 dout(1) << " access denied" << dendl;
921 audit_clog->info() << "from='" << session->inst << "' "
922 << "entity='" << session->entity_name << "' "
923 << "cmd=" << cmdctx->cmd << ": access denied";
924 ss << "access denied: does your client key have mgr caps? "
925 "See http://docs.ceph.com/en/latest/mgr/administrator/"
926 "#client-authentication";
927 }
928
929 void DaemonServer::_check_offlines_pgs(
930 const set<int>& osds,
931 const OSDMap& osdmap,
932 const PGMap& pgmap,
933 offline_pg_report *report)
934 {
935 // reset output
936 *report = offline_pg_report();
937 report->osds = osds;
938
939 for (const auto& q : pgmap.pg_stat) {
940 set<int32_t> pg_acting; // net acting sets (with no missing if degraded)
941 bool found = false;
942 if (q.second.state == 0) {
943 report->unknown.insert(q.first);
944 continue;
945 }
946 if (q.second.state & PG_STATE_DEGRADED) {
947 for (auto& anm : q.second.avail_no_missing) {
948 if (osds.count(anm.osd)) {
949 found = true;
950 continue;
951 }
952 if (anm.osd != CRUSH_ITEM_NONE) {
953 pg_acting.insert(anm.osd);
954 }
955 }
956 } else {
957 for (auto& a : q.second.acting) {
958 if (osds.count(a)) {
959 found = true;
960 continue;
961 }
962 if (a != CRUSH_ITEM_NONE) {
963 pg_acting.insert(a);
964 }
965 }
966 }
967 if (!found) {
968 continue;
969 }
970 const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
971 bool dangerous = false;
972 if (!pi) {
973 report->bad_no_pool.insert(q.first); // pool is creating or deleting
974 dangerous = true;
975 }
976 if (!(q.second.state & PG_STATE_ACTIVE)) {
977 report->bad_already_inactive.insert(q.first);
978 dangerous = true;
979 }
980 if (pg_acting.size() < pi->min_size) {
981 report->bad_become_inactive.insert(q.first);
982 dangerous = true;
983 }
984 if (dangerous) {
985 report->not_ok.insert(q.first);
986 } else {
987 report->ok.insert(q.first);
988 if (q.second.state & PG_STATE_DEGRADED) {
989 report->ok_become_more_degraded.insert(q.first);
990 } else {
991 report->ok_become_degraded.insert(q.first);
992 }
993 }
994 }
995 dout(20) << osds << " -> " << report->ok.size() << " ok, "
996 << report->not_ok.size() << " not ok, "
997 << report->unknown.size() << " unknown"
998 << dendl;
999 }
1000
1001 void DaemonServer::_maximize_ok_to_stop_set(
1002 const set<int>& orig_osds,
1003 unsigned max,
1004 const OSDMap& osdmap,
1005 const PGMap& pgmap,
1006 offline_pg_report *out_report)
1007 {
1008 dout(20) << "orig_osds " << orig_osds << " max " << max << dendl;
1009 _check_offlines_pgs(orig_osds, osdmap, pgmap, out_report);
1010 if (!out_report->ok_to_stop()) {
1011 return;
1012 }
1013 if (orig_osds.size() >= max) {
1014 // already at max
1015 return;
1016 }
1017
1018 // semi-arbitrarily start with the first osd in the set
1019 offline_pg_report report;
1020 set<int> osds = orig_osds;
1021 int parent = *osds.begin();
1022 set<int> children;
1023
1024 while (true) {
1025 // identify the next parent
1026 int r = osdmap.crush->get_immediate_parent_id(parent, &parent);
1027 if (r < 0) {
1028 return; // just go with what we have so far!
1029 }
1030
1031 // get candidate additions that are beneath this point in the tree
1032 children.clear();
1033 r = osdmap.crush->get_all_children(parent, &children);
1034 if (r < 0) {
1035 return; // just go with what we have so far!
1036 }
1037 dout(20) << " parent " << parent << " children " << children << dendl;
1038
1039 // try adding in more osds
1040 int failed = 0; // how many children we failed to add to our set
1041 for (auto o : children) {
1042 if (o >= 0 && osdmap.is_up(o) && osds.count(o) == 0) {
1043 osds.insert(o);
1044 _check_offlines_pgs(osds, osdmap, pgmap, &report);
1045 if (!report.ok_to_stop()) {
1046 osds.erase(o);
1047 ++failed;
1048 continue;
1049 }
1050 *out_report = report;
1051 if (osds.size() == max) {
1052 dout(20) << " hit max" << dendl;
1053 return; // yay, we hit the max
1054 }
1055 }
1056 }
1057
1058 if (failed) {
1059 // we hit some failures; go with what we have
1060 dout(20) << " hit some peer failures" << dendl;
1061 return;
1062 }
1063 }
1064 }
1065
1066 bool DaemonServer::_handle_command(
1067 std::shared_ptr<CommandContext>& cmdctx)
1068 {
1069 MessageRef m;
1070 bool admin_socket_cmd = false;
1071 if (cmdctx->m_tell) {
1072 m = cmdctx->m_tell;
1073 // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
1074 // command.
1075 admin_socket_cmd = (cmdctx->m_tell->fsid != uuid_d());
1076 } else {
1077 m = cmdctx->m_mgr;
1078 }
1079 auto priv = m->get_connection()->get_priv();
1080 auto session = static_cast<MgrSession*>(priv.get());
1081 if (!session) {
1082 return true;
1083 }
1084 if (session->inst.name == entity_name_t()) {
1085 session->inst.name = m->get_source();
1086 }
1087
1088 map<string,string> param_str_map;
1089 std::stringstream ss;
1090 int r = 0;
1091
1092 if (!cmdmap_from_json(cmdctx->cmd, &(cmdctx->cmdmap), ss)) {
1093 cmdctx->reply(-EINVAL, ss);
1094 return true;
1095 }
1096
1097 string prefix;
1098 cmd_getval(cmdctx->cmdmap, "prefix", prefix);
1099 dout(10) << "decoded-size=" << cmdctx->cmdmap.size() << " prefix=" << prefix << dendl;
1100
1101 boost::scoped_ptr<Formatter> f;
1102 {
1103 std::string format;
1104 if (boost::algorithm::ends_with(prefix, "_json")) {
1105 format = "json";
1106 } else {
1107 format = cmd_getval_or<string>(cmdctx->cmdmap, "format", "plain");
1108 }
1109 f.reset(Formatter::create(format));
1110 }
1111
1112 // this is just for mgr commands - admin socket commands will fall
1113 // through and use the admin socket version of
1114 // get_command_descriptions
1115 if (prefix == "get_command_descriptions" && !admin_socket_cmd) {
1116 dout(10) << "reading commands from python modules" << dendl;
1117 const auto py_commands = py_modules.get_commands();
1118
1119 int cmdnum = 0;
1120 JSONFormatter f;
1121 f.open_object_section("command_descriptions");
1122
1123 auto dump_cmd = [&cmdnum, &f, m](const MonCommand &mc){
1124 ostringstream secname;
1125 secname << "cmd" << std::setfill('0') << std::setw(3) << cmdnum;
1126 dump_cmddesc_to_json(&f, m->get_connection()->get_features(),
1127 secname.str(), mc.cmdstring, mc.helpstring,
1128 mc.module, mc.req_perms, 0);
1129 cmdnum++;
1130 };
1131
1132 for (const auto &pyc : py_commands) {
1133 dump_cmd(pyc);
1134 }
1135
1136 for (const auto &mgr_cmd : mgr_commands) {
1137 dump_cmd(mgr_cmd);
1138 }
1139
1140 f.close_section(); // command_descriptions
1141 f.flush(cmdctx->odata);
1142 cmdctx->reply(0, ss);
1143 return true;
1144 }
1145
1146 // lookup command
1147 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
1148 _generate_command_map(cmdctx->cmdmap, param_str_map);
1149
1150 bool is_allowed = false;
1151 ModuleCommand py_command;
1152 if (admin_socket_cmd) {
1153 // admin socket commands require all capabilities
1154 is_allowed = session->caps.is_allow_all();
1155 } else if (!mgr_cmd) {
1156 // Resolve the command to the name of the module that will
1157 // handle it (if the command exists)
1158 auto py_commands = py_modules.get_py_commands();
1159 for (const auto &pyc : py_commands) {
1160 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1161 if (pyc_prefix == prefix) {
1162 py_command = pyc;
1163 break;
1164 }
1165 }
1166
1167 MonCommand pyc = {"", "", "py", py_command.perm};
1168 is_allowed = _allowed_command(session, "py", py_command.module_name,
1169 prefix, cmdctx->cmdmap, param_str_map,
1170 &pyc);
1171 } else {
1172 // validate user's permissions for requested command
1173 is_allowed = _allowed_command(session, mgr_cmd->module, "",
1174 prefix, cmdctx->cmdmap, param_str_map, mgr_cmd);
1175 }
1176
1177 if (!is_allowed) {
1178 log_access_denied(cmdctx, session, ss);
1179 cmdctx->reply(-EACCES, ss);
1180 return true;
1181 }
1182
1183 audit_clog->debug()
1184 << "from='" << session->inst << "' "
1185 << "entity='" << session->entity_name << "' "
1186 << "cmd=" << cmdctx->cmd << ": dispatch";
1187
1188 if (admin_socket_cmd) {
1189 cct->get_admin_socket()->queue_tell_command(cmdctx->m_tell);
1190 return true;
1191 }
1192
1193 // ----------------
1194 // service map commands
1195 if (prefix == "service dump") {
1196 if (!f)
1197 f.reset(Formatter::create("json-pretty"));
1198 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
1199 f->dump_object("service_map", service_map);
1200 });
1201 f->flush(cmdctx->odata);
1202 cmdctx->reply(0, ss);
1203 return true;
1204 }
1205 if (prefix == "service status") {
1206 if (!f)
1207 f.reset(Formatter::create("json-pretty"));
1208 // only include state from services that are in the persisted service map
1209 f->open_object_section("service_status");
1210 for (auto& [type, service] : pending_service_map.services) {
1211 if (ServiceMap::is_normal_ceph_entity(type)) {
1212 continue;
1213 }
1214
1215 f->open_object_section(type.c_str());
1216 for (auto& q : service.daemons) {
1217 f->open_object_section(q.first.c_str());
1218 DaemonKey key{type, q.first};
1219 ceph_assert(daemon_state.exists(key));
1220 auto daemon = daemon_state.get(key);
1221 std::lock_guard l(daemon->lock);
1222 f->dump_stream("status_stamp") << daemon->service_status_stamp;
1223 f->dump_stream("last_beacon") << daemon->last_service_beacon;
1224 f->open_object_section("status");
1225 for (auto& r : daemon->service_status) {
1226 f->dump_string(r.first.c_str(), r.second);
1227 }
1228 f->close_section();
1229 f->close_section();
1230 }
1231 f->close_section();
1232 }
1233 f->close_section();
1234 f->flush(cmdctx->odata);
1235 cmdctx->reply(0, ss);
1236 return true;
1237 }
1238
1239 if (prefix == "config set") {
1240 std::string key;
1241 std::string val;
1242 cmd_getval(cmdctx->cmdmap, "key", key);
1243 cmd_getval(cmdctx->cmdmap, "value", val);
1244 r = cct->_conf.set_val(key, val, &ss);
1245 if (r == 0) {
1246 cct->_conf.apply_changes(nullptr);
1247 }
1248 cmdctx->reply(0, ss);
1249 return true;
1250 }
1251
1252 // -----------
1253 // PG commands
1254
1255 if (prefix == "pg scrub" ||
1256 prefix == "pg repair" ||
1257 prefix == "pg deep-scrub") {
1258 string scrubop = prefix.substr(3, string::npos);
1259 pg_t pgid;
1260 spg_t spgid;
1261 string pgidstr;
1262 cmd_getval(cmdctx->cmdmap, "pgid", pgidstr);
1263 if (!pgid.parse(pgidstr.c_str())) {
1264 ss << "invalid pgid '" << pgidstr << "'";
1265 cmdctx->reply(-EINVAL, ss);
1266 return true;
1267 }
1268 bool pg_exists = false;
1269 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1270 pg_exists = osdmap.pg_exists(pgid);
1271 });
1272 if (!pg_exists) {
1273 ss << "pg " << pgid << " does not exist";
1274 cmdctx->reply(-ENOENT, ss);
1275 return true;
1276 }
1277 int acting_primary = -1;
1278 epoch_t epoch;
1279 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1280 epoch = osdmap.get_epoch();
1281 osdmap.get_primary_shard(pgid, &acting_primary, &spgid);
1282 });
1283 if (acting_primary == -1) {
1284 ss << "pg " << pgid << " has no primary osd";
1285 cmdctx->reply(-EAGAIN, ss);
1286 return true;
1287 }
1288 auto p = osd_cons.find(acting_primary);
1289 if (p == osd_cons.end()) {
1290 ss << "pg " << pgid << " primary osd." << acting_primary
1291 << " is not currently connected";
1292 cmdctx->reply(-EAGAIN, ss);
1293 return true;
1294 }
1295 for (auto& con : p->second) {
1296 assert(HAVE_FEATURE(con->get_features(), SERVER_OCTOPUS));
1297 vector<spg_t> pgs = { spgid };
1298 con->send_message(new MOSDScrub2(monc->get_fsid(),
1299 epoch,
1300 pgs,
1301 scrubop == "repair",
1302 scrubop == "deep-scrub"));
1303 }
1304 ss << "instructing pg " << spgid << " on osd." << acting_primary
1305 << " to " << scrubop;
1306 cmdctx->reply(0, ss);
1307 return true;
1308 } else if (prefix == "osd scrub" ||
1309 prefix == "osd deep-scrub" ||
1310 prefix == "osd repair") {
1311 string whostr;
1312 cmd_getval(cmdctx->cmdmap, "who", whostr);
1313 vector<string> pvec;
1314 get_str_vec(prefix, pvec);
1315
1316 set<int> osds;
1317 if (whostr == "*" || whostr == "all" || whostr == "any") {
1318 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1319 for (int i = 0; i < osdmap.get_max_osd(); i++)
1320 if (osdmap.is_up(i)) {
1321 osds.insert(i);
1322 }
1323 });
1324 } else {
1325 long osd = parse_osd_id(whostr.c_str(), &ss);
1326 if (osd < 0) {
1327 ss << "invalid osd '" << whostr << "'";
1328 cmdctx->reply(-EINVAL, ss);
1329 return true;
1330 }
1331 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1332 if (osdmap.is_up(osd)) {
1333 osds.insert(osd);
1334 }
1335 });
1336 if (osds.empty()) {
1337 ss << "osd." << osd << " is not up";
1338 cmdctx->reply(-EAGAIN, ss);
1339 return true;
1340 }
1341 }
1342 set<int> sent_osds, failed_osds;
1343 for (auto osd : osds) {
1344 vector<spg_t> spgs;
1345 epoch_t epoch;
1346 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1347 epoch = osdmap.get_epoch();
1348 auto p = pgmap.pg_by_osd.find(osd);
1349 if (p != pgmap.pg_by_osd.end()) {
1350 for (auto pgid : p->second) {
1351 int primary;
1352 spg_t spg;
1353 osdmap.get_primary_shard(pgid, &primary, &spg);
1354 if (primary == osd) {
1355 spgs.push_back(spg);
1356 }
1357 }
1358 }
1359 });
1360 auto p = osd_cons.find(osd);
1361 if (p == osd_cons.end()) {
1362 failed_osds.insert(osd);
1363 } else {
1364 sent_osds.insert(osd);
1365 for (auto& con : p->second) {
1366 con->send_message(new MOSDScrub2(monc->get_fsid(),
1367 epoch,
1368 spgs,
1369 pvec.back() == "repair",
1370 pvec.back() == "deep-scrub"));
1371 }
1372 }
1373 }
1374 if (failed_osds.size() == osds.size()) {
1375 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
1376 << " (not connected)";
1377 r = -EAGAIN;
1378 } else {
1379 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
1380 if (!failed_osds.empty()) {
1381 ss << "; osd(s) " << failed_osds << " were not connected";
1382 }
1383 r = 0;
1384 }
1385 cmdctx->reply(0, ss);
1386 return true;
1387 } else if (prefix == "osd pool scrub" ||
1388 prefix == "osd pool deep-scrub" ||
1389 prefix == "osd pool repair") {
1390 vector<string> pool_names;
1391 cmd_getval(cmdctx->cmdmap, "who", pool_names);
1392 if (pool_names.empty()) {
1393 ss << "must specify one or more pool names";
1394 cmdctx->reply(-EINVAL, ss);
1395 return true;
1396 }
1397 epoch_t epoch;
1398 map<int32_t, vector<pg_t>> pgs_by_primary; // legacy
1399 map<int32_t, vector<spg_t>> spgs_by_primary;
1400 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1401 epoch = osdmap.get_epoch();
1402 for (auto& pool_name : pool_names) {
1403 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1404 if (pool_id < 0) {
1405 ss << "unrecognized pool '" << pool_name << "'";
1406 r = -ENOENT;
1407 return;
1408 }
1409 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1410 for (int i = 0; i < pool_pg_num; i++) {
1411 pg_t pg(i, pool_id);
1412 int primary;
1413 spg_t spg;
1414 auto got = osdmap.get_primary_shard(pg, &primary, &spg);
1415 if (!got)
1416 continue;
1417 pgs_by_primary[primary].push_back(pg);
1418 spgs_by_primary[primary].push_back(spg);
1419 }
1420 }
1421 });
1422 if (r < 0) {
1423 cmdctx->reply(r, ss);
1424 return true;
1425 }
1426 for (auto& it : spgs_by_primary) {
1427 auto primary = it.first;
1428 auto p = osd_cons.find(primary);
1429 if (p == osd_cons.end()) {
1430 ss << "osd." << primary << " is not currently connected";
1431 cmdctx->reply(-EAGAIN, ss);
1432 return true;
1433 }
1434 for (auto& con : p->second) {
1435 con->send_message(new MOSDScrub2(monc->get_fsid(),
1436 epoch,
1437 it.second,
1438 prefix == "osd pool repair",
1439 prefix == "osd pool deep-scrub"));
1440 }
1441 }
1442 cmdctx->reply(0, "");
1443 return true;
1444 } else if (prefix == "osd reweight-by-pg" ||
1445 prefix == "osd reweight-by-utilization" ||
1446 prefix == "osd test-reweight-by-pg" ||
1447 prefix == "osd test-reweight-by-utilization") {
1448 bool by_pg =
1449 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
1450 bool dry_run =
1451 prefix == "osd test-reweight-by-pg" ||
1452 prefix == "osd test-reweight-by-utilization";
1453 int64_t oload = cmd_getval_or<int64_t>(cmdctx->cmdmap, "oload", 120);
1454 set<int64_t> pools;
1455 vector<string> poolnames;
1456 cmd_getval(cmdctx->cmdmap, "pools", poolnames);
1457 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1458 for (const auto& poolname : poolnames) {
1459 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
1460 if (pool < 0) {
1461 ss << "pool '" << poolname << "' does not exist";
1462 r = -ENOENT;
1463 }
1464 pools.insert(pool);
1465 }
1466 });
1467 if (r) {
1468 cmdctx->reply(r, ss);
1469 return true;
1470 }
1471
1472 double max_change = g_conf().get_val<double>("mon_reweight_max_change");
1473 cmd_getval(cmdctx->cmdmap, "max_change", max_change);
1474 if (max_change <= 0.0) {
1475 ss << "max_change " << max_change << " must be positive";
1476 cmdctx->reply(-EINVAL, ss);
1477 return true;
1478 }
1479 int64_t max_osds = g_conf().get_val<int64_t>("mon_reweight_max_osds");
1480 cmd_getval(cmdctx->cmdmap, "max_osds", max_osds);
1481 if (max_osds <= 0) {
1482 ss << "max_osds " << max_osds << " must be positive";
1483 cmdctx->reply(-EINVAL, ss);
1484 return true;
1485 }
1486 bool no_increasing = false;
1487 cmd_getval_compat_cephbool(cmdctx->cmdmap, "no_increasing", no_increasing);
1488 string out_str;
1489 mempool::osdmap::map<int32_t, uint32_t> new_weights;
1490 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap &osdmap, const PGMap& pgmap) {
1491 return reweight::by_utilization(osdmap, pgmap,
1492 oload,
1493 max_change,
1494 max_osds,
1495 by_pg,
1496 pools.empty() ? NULL : &pools,
1497 no_increasing,
1498 &new_weights,
1499 &ss, &out_str, f.get());
1500 });
1501 if (r >= 0) {
1502 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
1503 }
1504 if (f) {
1505 f->flush(cmdctx->odata);
1506 } else {
1507 cmdctx->odata.append(out_str);
1508 }
1509 if (r < 0) {
1510 ss << "FAILED reweight-by-pg";
1511 cmdctx->reply(r, ss);
1512 return true;
1513 } else if (r == 0 || dry_run) {
1514 ss << "no change";
1515 cmdctx->reply(r, ss);
1516 return true;
1517 } else {
1518 json_spirit::Object json_object;
1519 for (const auto& osd_weight : new_weights) {
1520 json_spirit::Config::add(json_object,
1521 std::to_string(osd_weight.first),
1522 std::to_string(osd_weight.second));
1523 }
1524 string s = json_spirit::write(json_object);
1525 std::replace(begin(s), end(s), '\"', '\'');
1526 const string cmd =
1527 "{"
1528 "\"prefix\": \"osd reweightn\", "
1529 "\"weights\": \"" + s + "\""
1530 "}";
1531 auto on_finish = new ReplyOnFinish(cmdctx);
1532 monc->start_mon_command({cmd}, {},
1533 &on_finish->from_mon, &on_finish->outs, on_finish);
1534 return true;
1535 }
1536 } else if (prefix == "osd df") {
1537 string method, filter;
1538 cmd_getval(cmdctx->cmdmap, "output_method", method);
1539 cmd_getval(cmdctx->cmdmap, "filter", filter);
1540 stringstream rs;
1541 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1542 // sanity check filter(s)
1543 if (!filter.empty() &&
1544 osdmap.lookup_pg_pool_name(filter) < 0 &&
1545 !osdmap.crush->class_exists(filter) &&
1546 !osdmap.crush->name_exists(filter)) {
1547 rs << "'" << filter << "' not a pool, crush node or device class name";
1548 return -EINVAL;
1549 }
1550 print_osd_utilization(osdmap, pgmap, ss,
1551 f.get(), method == "tree", filter);
1552 cmdctx->odata.append(ss);
1553 return 0;
1554 });
1555 cmdctx->reply(r, rs);
1556 return true;
1557 } else if (prefix == "osd pool stats") {
1558 string pool_name;
1559 cmd_getval(cmdctx->cmdmap, "pool_name", pool_name);
1560 int64_t poolid = -ENOENT;
1561 bool one_pool = false;
1562 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1563 if (!pool_name.empty()) {
1564 poolid = osdmap.lookup_pg_pool_name(pool_name);
1565 if (poolid < 0) {
1566 ceph_assert(poolid == -ENOENT);
1567 ss << "unrecognized pool '" << pool_name << "'";
1568 return -ENOENT;
1569 }
1570 one_pool = true;
1571 }
1572 stringstream rs;
1573 if (f)
1574 f->open_array_section("pool_stats");
1575 else {
1576 if (osdmap.get_pools().empty()) {
1577 ss << "there are no pools!";
1578 goto stats_out;
1579 }
1580 }
1581 for (auto &p : osdmap.get_pools()) {
1582 if (!one_pool) {
1583 poolid = p.first;
1584 }
1585 pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, f.get(), &rs);
1586 if (one_pool) {
1587 break;
1588 }
1589 }
1590 stats_out:
1591 if (f) {
1592 f->close_section();
1593 f->flush(cmdctx->odata);
1594 } else {
1595 cmdctx->odata.append(rs.str());
1596 }
1597 return 0;
1598 });
1599 if (r != -EOPNOTSUPP) {
1600 cmdctx->reply(r, ss);
1601 return true;
1602 }
1603 } else if (prefix == "osd safe-to-destroy" ||
1604 prefix == "osd destroy" ||
1605 prefix == "osd purge") {
1606 set<int> osds;
1607 int r = 0;
1608 if (prefix == "osd safe-to-destroy") {
1609 vector<string> ids;
1610 cmd_getval(cmdctx->cmdmap, "ids", ids);
1611 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1612 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1613 });
1614 if (!r && osds.empty()) {
1615 ss << "must specify one or more OSDs";
1616 r = -EINVAL;
1617 }
1618 } else {
1619 int64_t id;
1620 if (!cmd_getval(cmdctx->cmdmap, "id", id)) {
1621 r = -EINVAL;
1622 ss << "must specify OSD id";
1623 } else {
1624 osds.insert(id);
1625 }
1626 }
1627 if (r < 0) {
1628 cmdctx->reply(r, ss);
1629 return true;
1630 }
1631 set<int> active_osds, missing_stats, stored_pgs, safe_to_destroy;
1632 int affected_pgs = 0;
1633 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1634 if (pg_map.num_pg_unknown > 0) {
1635 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
1636 << " any conclusions";
1637 r = -EAGAIN;
1638 return;
1639 }
1640 int num_active_clean = 0;
1641 for (auto& p : pg_map.num_pg_by_state) {
1642 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
1643 if ((p.first & want) == want) {
1644 num_active_clean += p.second;
1645 }
1646 }
1647 for (auto osd : osds) {
1648 if (!osdmap.exists(osd)) {
1649 safe_to_destroy.insert(osd);
1650 continue; // clearly safe to destroy
1651 }
1652 auto q = pg_map.num_pg_by_osd.find(osd);
1653 if (q != pg_map.num_pg_by_osd.end()) {
1654 if (q->second.acting > 0 || q->second.up_not_acting > 0) {
1655 active_osds.insert(osd);
1656 // XXX: For overlapping PGs, this counts them again
1657 affected_pgs += q->second.acting + q->second.up_not_acting;
1658 continue;
1659 }
1660 }
1661 if (num_active_clean < pg_map.num_pg) {
1662 // all pgs aren't active+clean; we need to be careful.
1663 auto p = pg_map.osd_stat.find(osd);
1664 if (p == pg_map.osd_stat.end() || !osdmap.is_up(osd)) {
1665 missing_stats.insert(osd);
1666 continue;
1667 } else if (p->second.num_pgs > 0) {
1668 stored_pgs.insert(osd);
1669 continue;
1670 }
1671 }
1672 safe_to_destroy.insert(osd);
1673 }
1674 });
1675 if (r && prefix == "osd safe-to-destroy") {
1676 cmdctx->reply(r, ss); // regardless of formatter
1677 return true;
1678 }
1679 if (!r && (!active_osds.empty() ||
1680 !missing_stats.empty() || !stored_pgs.empty())) {
1681 if (!safe_to_destroy.empty()) {
1682 ss << "OSD(s) " << safe_to_destroy
1683 << " are safe to destroy without reducing data durability. ";
1684 }
1685 if (!active_osds.empty()) {
1686 ss << "OSD(s) " << active_osds << " have " << affected_pgs
1687 << " pgs currently mapped to them. ";
1688 }
1689 if (!missing_stats.empty()) {
1690 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1691 << " PGs are active+clean; we cannot draw any conclusions. ";
1692 }
1693 if (!stored_pgs.empty()) {
1694 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1695 << " data, and not all PGs are active+clean; we cannot be sure they"
1696 << " aren't still needed.";
1697 }
1698 if (!active_osds.empty() || !stored_pgs.empty()) {
1699 r = -EBUSY;
1700 } else {
1701 r = -EAGAIN;
1702 }
1703 }
1704
1705 if (prefix == "osd safe-to-destroy") {
1706 if (!r) {
1707 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1708 << " durability.";
1709 }
1710 if (f) {
1711 f->open_object_section("osd_status");
1712 f->open_array_section("safe_to_destroy");
1713 for (auto i : safe_to_destroy)
1714 f->dump_int("osd", i);
1715 f->close_section();
1716 f->open_array_section("active");
1717 for (auto i : active_osds)
1718 f->dump_int("osd", i);
1719 f->close_section();
1720 f->open_array_section("missing_stats");
1721 for (auto i : missing_stats)
1722 f->dump_int("osd", i);
1723 f->close_section();
1724 f->open_array_section("stored_pgs");
1725 for (auto i : stored_pgs)
1726 f->dump_int("osd", i);
1727 f->close_section();
1728 f->close_section(); // osd_status
1729 f->flush(cmdctx->odata);
1730 r = 0;
1731 std::stringstream().swap(ss);
1732 }
1733 cmdctx->reply(r, ss);
1734 return true;
1735 }
1736
1737 if (r) {
1738 bool force = false;
1739 cmd_getval(cmdctx->cmdmap, "force", force);
1740 if (!force) {
1741 // Backward compat
1742 cmd_getval(cmdctx->cmdmap, "yes_i_really_mean_it", force);
1743 }
1744 if (!force) {
1745 ss << "\nYou can proceed by passing --force, but be warned that"
1746 " this will likely mean real, permanent data loss.";
1747 } else {
1748 r = 0;
1749 }
1750 }
1751 if (r) {
1752 cmdctx->reply(r, ss);
1753 return true;
1754 }
1755 const string cmd =
1756 "{"
1757 "\"prefix\": \"" + prefix + "-actual\", "
1758 "\"id\": " + stringify(osds) + ", "
1759 "\"yes_i_really_mean_it\": true"
1760 "}";
1761 auto on_finish = new ReplyOnFinish(cmdctx);
1762 monc->start_mon_command({cmd}, {}, nullptr, &on_finish->outs, on_finish);
1763 return true;
1764 } else if (prefix == "osd ok-to-stop") {
1765 vector<string> ids;
1766 cmd_getval(cmdctx->cmdmap, "ids", ids);
1767 set<int> osds;
1768 int64_t max = 1;
1769 cmd_getval(cmdctx->cmdmap, "max", max);
1770 int r;
1771 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1772 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1773 });
1774 if (!r && osds.empty()) {
1775 ss << "must specify one or more OSDs";
1776 r = -EINVAL;
1777 }
1778 if (max < (int)osds.size()) {
1779 max = osds.size();
1780 }
1781 if (r < 0) {
1782 cmdctx->reply(r, ss);
1783 return true;
1784 }
1785 offline_pg_report out_report;
1786 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1787 _maximize_ok_to_stop_set(
1788 osds, max, osdmap, pg_map,
1789 &out_report);
1790 });
1791 if (!f) {
1792 f.reset(Formatter::create("json"));
1793 }
1794 f->dump_object("ok_to_stop", out_report);
1795 f->flush(cmdctx->odata);
1796 cmdctx->odata.append("\n");
1797 if (!out_report.unknown.empty()) {
1798 ss << out_report.unknown.size() << " pgs have unknown state; "
1799 << "cannot draw any conclusions";
1800 cmdctx->reply(-EAGAIN, ss);
1801 }
1802 if (!out_report.ok_to_stop()) {
1803 ss << "unsafe to stop osd(s) at this time (" << out_report.not_ok.size() << " PGs are or would become offline)";
1804 cmdctx->reply(-EBUSY, ss);
1805 } else {
1806 cmdctx->reply(0, ss);
1807 }
1808 return true;
1809 } else if (prefix == "pg force-recovery" ||
1810 prefix == "pg force-backfill" ||
1811 prefix == "pg cancel-force-recovery" ||
1812 prefix == "pg cancel-force-backfill" ||
1813 prefix == "osd pool force-recovery" ||
1814 prefix == "osd pool force-backfill" ||
1815 prefix == "osd pool cancel-force-recovery" ||
1816 prefix == "osd pool cancel-force-backfill") {
1817 vector<string> vs;
1818 get_str_vec(prefix, vs);
1819 auto& granularity = vs.front();
1820 auto& forceop = vs.back();
1821 vector<pg_t> pgs;
1822
1823 // figure out actual op just once
1824 int actual_op = 0;
1825 if (forceop == "force-recovery") {
1826 actual_op = OFR_RECOVERY;
1827 } else if (forceop == "force-backfill") {
1828 actual_op = OFR_BACKFILL;
1829 } else if (forceop == "cancel-force-backfill") {
1830 actual_op = OFR_BACKFILL | OFR_CANCEL;
1831 } else if (forceop == "cancel-force-recovery") {
1832 actual_op = OFR_RECOVERY | OFR_CANCEL;
1833 }
1834
1835 set<pg_t> candidates; // deduped
1836 if (granularity == "pg") {
1837 // covnert pg names to pgs, discard any invalid ones while at it
1838 vector<string> pgids;
1839 cmd_getval(cmdctx->cmdmap, "pgid", pgids);
1840 for (auto& i : pgids) {
1841 pg_t pgid;
1842 if (!pgid.parse(i.c_str())) {
1843 ss << "invlaid pgid '" << i << "'; ";
1844 r = -EINVAL;
1845 continue;
1846 }
1847 candidates.insert(pgid);
1848 }
1849 } else {
1850 // per pool
1851 vector<string> pool_names;
1852 cmd_getval(cmdctx->cmdmap, "who", pool_names);
1853 if (pool_names.empty()) {
1854 ss << "must specify one or more pool names";
1855 cmdctx->reply(-EINVAL, ss);
1856 return true;
1857 }
1858 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1859 for (auto& pool_name : pool_names) {
1860 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1861 if (pool_id < 0) {
1862 ss << "unrecognized pool '" << pool_name << "'";
1863 r = -ENOENT;
1864 return;
1865 }
1866 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1867 for (int i = 0; i < pool_pg_num; i++)
1868 candidates.insert({(unsigned int)i, (uint64_t)pool_id});
1869 }
1870 });
1871 if (r < 0) {
1872 cmdctx->reply(r, ss);
1873 return true;
1874 }
1875 }
1876
1877 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1878 for (auto& i : candidates) {
1879 auto it = pg_map.pg_stat.find(i);
1880 if (it == pg_map.pg_stat.end()) {
1881 ss << "pg " << i << " does not exist; ";
1882 r = -ENOENT;
1883 continue;
1884 }
1885 auto state = it->second.state;
1886 // discard pgs for which user requests are pointless
1887 switch (actual_op) {
1888 case OFR_RECOVERY:
1889 if ((state & (PG_STATE_DEGRADED |
1890 PG_STATE_RECOVERY_WAIT |
1891 PG_STATE_RECOVERING)) == 0) {
1892 // don't return error, user script may be racing with cluster.
1893 // not fatal.
1894 ss << "pg " << i << " doesn't require recovery; ";
1895 continue;
1896 } else if (state & PG_STATE_FORCED_RECOVERY) {
1897 ss << "pg " << i << " recovery already forced; ";
1898 // return error, as it may be a bug in user script
1899 r = -EINVAL;
1900 continue;
1901 }
1902 break;
1903 case OFR_BACKFILL:
1904 if ((state & (PG_STATE_DEGRADED |
1905 PG_STATE_BACKFILL_WAIT |
1906 PG_STATE_BACKFILLING)) == 0) {
1907 ss << "pg " << i << " doesn't require backfilling; ";
1908 continue;
1909 } else if (state & PG_STATE_FORCED_BACKFILL) {
1910 ss << "pg " << i << " backfill already forced; ";
1911 r = -EINVAL;
1912 continue;
1913 }
1914 break;
1915 case OFR_BACKFILL | OFR_CANCEL:
1916 if ((state & PG_STATE_FORCED_BACKFILL) == 0) {
1917 ss << "pg " << i << " backfill not forced; ";
1918 continue;
1919 }
1920 break;
1921 case OFR_RECOVERY | OFR_CANCEL:
1922 if ((state & PG_STATE_FORCED_RECOVERY) == 0) {
1923 ss << "pg " << i << " recovery not forced; ";
1924 continue;
1925 }
1926 break;
1927 default:
1928 ceph_abort_msg("actual_op value is not supported");
1929 }
1930 pgs.push_back(i);
1931 } // for
1932 });
1933
1934 // respond with error only when no pgs are correct
1935 // yes, in case of mixed errors, only the last one will be emitted,
1936 // but the message presented will be fine
1937 if (pgs.size() != 0) {
1938 // clear error to not confuse users/scripts
1939 r = 0;
1940 }
1941
1942 // optimize the command -> messages conversion, use only one
1943 // message per distinct OSD
1944 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1945 // group pgs to process by osd
1946 map<int, vector<spg_t>> osdpgs;
1947 for (auto& pgid : pgs) {
1948 int primary;
1949 spg_t spg;
1950 if (osdmap.get_primary_shard(pgid, &primary, &spg)) {
1951 osdpgs[primary].push_back(spg);
1952 }
1953 }
1954 for (auto& i : osdpgs) {
1955 if (osdmap.is_up(i.first)) {
1956 auto p = osd_cons.find(i.first);
1957 if (p == osd_cons.end()) {
1958 ss << "osd." << i.first << " is not currently connected";
1959 r = -EAGAIN;
1960 continue;
1961 }
1962 for (auto& con : p->second) {
1963 con->send_message(
1964 new MOSDForceRecovery(monc->get_fsid(), i.second, actual_op));
1965 }
1966 ss << "instructing pg(s) " << i.second << " on osd." << i.first
1967 << " to " << forceop << "; ";
1968 }
1969 }
1970 });
1971 ss << std::endl;
1972 cmdctx->reply(r, ss);
1973 return true;
1974 } else if (prefix == "config show" ||
1975 prefix == "config show-with-defaults") {
1976 string who;
1977 cmd_getval(cmdctx->cmdmap, "who", who);
1978 auto [key, valid] = DaemonKey::parse(who);
1979 if (!valid) {
1980 ss << "invalid daemon name: use <type>.<id>";
1981 cmdctx->reply(-EINVAL, ss);
1982 return true;
1983 }
1984 DaemonStatePtr daemon = daemon_state.get(key);
1985 if (!daemon) {
1986 ss << "no config state for daemon " << who;
1987 cmdctx->reply(-ENOENT, ss);
1988 return true;
1989 }
1990
1991 std::lock_guard l(daemon->lock);
1992
1993 int r = 0;
1994 string name;
1995 if (cmd_getval(cmdctx->cmdmap, "key", name)) {
1996 // handle special options
1997 if (name == "fsid") {
1998 cmdctx->odata.append(stringify(monc->get_fsid()) + "\n");
1999 cmdctx->reply(r, ss);
2000 return true;
2001 }
2002 auto p = daemon->config.find(name);
2003 if (p != daemon->config.end() &&
2004 !p->second.empty()) {
2005 cmdctx->odata.append(p->second.rbegin()->second + "\n");
2006 } else {
2007 auto& defaults = daemon->_get_config_defaults();
2008 auto q = defaults.find(name);
2009 if (q != defaults.end()) {
2010 cmdctx->odata.append(q->second + "\n");
2011 } else {
2012 r = -ENOENT;
2013 }
2014 }
2015 } else if (daemon->config_defaults_bl.length() > 0) {
2016 TextTable tbl;
2017 if (f) {
2018 f->open_array_section("config");
2019 } else {
2020 tbl.define_column("NAME", TextTable::LEFT, TextTable::LEFT);
2021 tbl.define_column("VALUE", TextTable::LEFT, TextTable::LEFT);
2022 tbl.define_column("SOURCE", TextTable::LEFT, TextTable::LEFT);
2023 tbl.define_column("OVERRIDES", TextTable::LEFT, TextTable::LEFT);
2024 tbl.define_column("IGNORES", TextTable::LEFT, TextTable::LEFT);
2025 }
2026 if (prefix == "config show") {
2027 // show
2028 for (auto& i : daemon->config) {
2029 dout(20) << " " << i.first << " -> " << i.second << dendl;
2030 if (i.second.empty()) {
2031 continue;
2032 }
2033 if (f) {
2034 f->open_object_section("value");
2035 f->dump_string("name", i.first);
2036 f->dump_string("value", i.second.rbegin()->second);
2037 f->dump_string("source", ceph_conf_level_name(
2038 i.second.rbegin()->first));
2039 if (i.second.size() > 1) {
2040 f->open_array_section("overrides");
2041 auto j = i.second.rend();
2042 for (--j; j != i.second.rbegin(); --j) {
2043 f->open_object_section("value");
2044 f->dump_string("source", ceph_conf_level_name(j->first));
2045 f->dump_string("value", j->second);
2046 f->close_section();
2047 }
2048 f->close_section();
2049 }
2050 if (daemon->ignored_mon_config.count(i.first)) {
2051 f->dump_string("ignores", "mon");
2052 }
2053 f->close_section();
2054 } else {
2055 tbl << i.first;
2056 tbl << i.second.rbegin()->second;
2057 tbl << ceph_conf_level_name(i.second.rbegin()->first);
2058 if (i.second.size() > 1) {
2059 list<string> ov;
2060 auto j = i.second.rend();
2061 for (--j; j != i.second.rbegin(); --j) {
2062 if (j->second == i.second.rbegin()->second) {
2063 ov.push_front(string("(") + ceph_conf_level_name(j->first) +
2064 string("[") + j->second + string("]") +
2065 string(")"));
2066 } else {
2067 ov.push_front(ceph_conf_level_name(j->first) +
2068 string("[") + j->second + string("]"));
2069
2070 }
2071 }
2072 tbl << ov;
2073 } else {
2074 tbl << "";
2075 }
2076 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
2077 tbl << TextTable::endrow;
2078 }
2079 }
2080 } else {
2081 // show-with-defaults
2082 auto& defaults = daemon->_get_config_defaults();
2083 for (auto& i : defaults) {
2084 if (f) {
2085 f->open_object_section("value");
2086 f->dump_string("name", i.first);
2087 } else {
2088 tbl << i.first;
2089 }
2090 auto j = daemon->config.find(i.first);
2091 if (j != daemon->config.end() && !j->second.empty()) {
2092 // have config
2093 if (f) {
2094 f->dump_string("value", j->second.rbegin()->second);
2095 f->dump_string("source", ceph_conf_level_name(
2096 j->second.rbegin()->first));
2097 if (j->second.size() > 1) {
2098 f->open_array_section("overrides");
2099 auto k = j->second.rend();
2100 for (--k; k != j->second.rbegin(); --k) {
2101 f->open_object_section("value");
2102 f->dump_string("source", ceph_conf_level_name(k->first));
2103 f->dump_string("value", k->second);
2104 f->close_section();
2105 }
2106 f->close_section();
2107 }
2108 if (daemon->ignored_mon_config.count(i.first)) {
2109 f->dump_string("ignores", "mon");
2110 }
2111 f->close_section();
2112 } else {
2113 tbl << j->second.rbegin()->second;
2114 tbl << ceph_conf_level_name(j->second.rbegin()->first);
2115 if (j->second.size() > 1) {
2116 list<string> ov;
2117 auto k = j->second.rend();
2118 for (--k; k != j->second.rbegin(); --k) {
2119 if (k->second == j->second.rbegin()->second) {
2120 ov.push_front(string("(") + ceph_conf_level_name(k->first) +
2121 string("[") + k->second + string("]") +
2122 string(")"));
2123 } else {
2124 ov.push_front(ceph_conf_level_name(k->first) +
2125 string("[") + k->second + string("]"));
2126 }
2127 }
2128 tbl << ov;
2129 } else {
2130 tbl << "";
2131 }
2132 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
2133 tbl << TextTable::endrow;
2134 }
2135 } else {
2136 // only have default
2137 if (f) {
2138 f->dump_string("value", i.second);
2139 f->dump_string("source", ceph_conf_level_name(CONF_DEFAULT));
2140 f->close_section();
2141 } else {
2142 tbl << i.second;
2143 tbl << ceph_conf_level_name(CONF_DEFAULT);
2144 tbl << "";
2145 tbl << "";
2146 tbl << TextTable::endrow;
2147 }
2148 }
2149 }
2150 }
2151 if (f) {
2152 f->close_section();
2153 f->flush(cmdctx->odata);
2154 } else {
2155 cmdctx->odata.append(stringify(tbl));
2156 }
2157 }
2158 cmdctx->reply(r, ss);
2159 return true;
2160 } else if (prefix == "device ls") {
2161 set<string> devids;
2162 TextTable tbl;
2163 if (f) {
2164 f->open_array_section("devices");
2165 daemon_state.with_devices([&f](const DeviceState& dev) {
2166 f->dump_object("device", dev);
2167 });
2168 f->close_section();
2169 f->flush(cmdctx->odata);
2170 } else {
2171 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2172 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
2173 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
2174 tbl.define_column("WEAR", TextTable::RIGHT, TextTable::RIGHT);
2175 tbl.define_column("LIFE EXPECTANCY", TextTable::LEFT, TextTable::LEFT);
2176 auto now = ceph_clock_now();
2177 daemon_state.with_devices([&tbl, now](const DeviceState& dev) {
2178 string h;
2179 for (auto& i : dev.attachments) {
2180 if (h.size()) {
2181 h += " ";
2182 }
2183 h += std::get<0>(i) + ":" + std::get<1>(i);
2184 }
2185 string d;
2186 for (auto& i : dev.daemons) {
2187 if (d.size()) {
2188 d += " ";
2189 }
2190 d += to_string(i);
2191 }
2192 char wear_level_str[16] = {0};
2193 if (dev.wear_level >= 0) {
2194 snprintf(wear_level_str, sizeof(wear_level_str)-1, "%d%%",
2195 (int)(100.1 * dev.wear_level));
2196 }
2197 tbl << dev.devid
2198 << h
2199 << d
2200 << wear_level_str
2201 << dev.get_life_expectancy_str(now)
2202 << TextTable::endrow;
2203 });
2204 cmdctx->odata.append(stringify(tbl));
2205 }
2206 cmdctx->reply(0, ss);
2207 return true;
2208 } else if (prefix == "device ls-by-daemon") {
2209 string who;
2210 cmd_getval(cmdctx->cmdmap, "who", who);
2211 if (auto [k, valid] = DaemonKey::parse(who); !valid) {
2212 ss << who << " is not a valid daemon name";
2213 r = -EINVAL;
2214 } else {
2215 auto dm = daemon_state.get(k);
2216 if (dm) {
2217 if (f) {
2218 f->open_array_section("devices");
2219 for (auto& i : dm->devices) {
2220 daemon_state.with_device(i.first, [&f] (const DeviceState& dev) {
2221 f->dump_object("device", dev);
2222 });
2223 }
2224 f->close_section();
2225 f->flush(cmdctx->odata);
2226 } else {
2227 TextTable tbl;
2228 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2229 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
2230 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT,
2231 TextTable::LEFT);
2232 auto now = ceph_clock_now();
2233 for (auto& i : dm->devices) {
2234 daemon_state.with_device(
2235 i.first, [&tbl, now] (const DeviceState& dev) {
2236 string h;
2237 for (auto& i : dev.attachments) {
2238 if (h.size()) {
2239 h += " ";
2240 }
2241 h += std::get<0>(i) + ":" + std::get<1>(i);
2242 }
2243 tbl << dev.devid
2244 << h
2245 << dev.get_life_expectancy_str(now)
2246 << TextTable::endrow;
2247 });
2248 }
2249 cmdctx->odata.append(stringify(tbl));
2250 }
2251 } else {
2252 r = -ENOENT;
2253 ss << "daemon " << who << " not found";
2254 }
2255 cmdctx->reply(r, ss);
2256 }
2257 } else if (prefix == "device ls-by-host") {
2258 string host;
2259 cmd_getval(cmdctx->cmdmap, "host", host);
2260 set<string> devids;
2261 daemon_state.list_devids_by_server(host, &devids);
2262 if (f) {
2263 f->open_array_section("devices");
2264 for (auto& devid : devids) {
2265 daemon_state.with_device(
2266 devid, [&f] (const DeviceState& dev) {
2267 f->dump_object("device", dev);
2268 });
2269 }
2270 f->close_section();
2271 f->flush(cmdctx->odata);
2272 } else {
2273 TextTable tbl;
2274 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2275 tbl.define_column("DEV", TextTable::LEFT, TextTable::LEFT);
2276 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
2277 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT, TextTable::LEFT);
2278 auto now = ceph_clock_now();
2279 for (auto& devid : devids) {
2280 daemon_state.with_device(
2281 devid, [&tbl, &host, now] (const DeviceState& dev) {
2282 string n;
2283 for (auto& j : dev.attachments) {
2284 if (std::get<0>(j) == host) {
2285 if (n.size()) {
2286 n += " ";
2287 }
2288 n += std::get<1>(j);
2289 }
2290 }
2291 string d;
2292 for (auto& i : dev.daemons) {
2293 if (d.size()) {
2294 d += " ";
2295 }
2296 d += to_string(i);
2297 }
2298 tbl << dev.devid
2299 << n
2300 << d
2301 << dev.get_life_expectancy_str(now)
2302 << TextTable::endrow;
2303 });
2304 }
2305 cmdctx->odata.append(stringify(tbl));
2306 }
2307 cmdctx->reply(0, ss);
2308 return true;
2309 } else if (prefix == "device info") {
2310 string devid;
2311 cmd_getval(cmdctx->cmdmap, "devid", devid);
2312 int r = 0;
2313 ostringstream rs;
2314 if (!daemon_state.with_device(devid,
2315 [&f, &rs] (const DeviceState& dev) {
2316 if (f) {
2317 f->dump_object("device", dev);
2318 } else {
2319 dev.print(rs);
2320 }
2321 })) {
2322 ss << "device " << devid << " not found";
2323 r = -ENOENT;
2324 } else {
2325 if (f) {
2326 f->flush(cmdctx->odata);
2327 } else {
2328 cmdctx->odata.append(rs.str());
2329 }
2330 }
2331 cmdctx->reply(r, ss);
2332 return true;
2333 } else if (prefix == "device set-life-expectancy") {
2334 string devid;
2335 cmd_getval(cmdctx->cmdmap, "devid", devid);
2336 string from_str, to_str;
2337 cmd_getval(cmdctx->cmdmap, "from", from_str);
2338 cmd_getval(cmdctx->cmdmap, "to", to_str);
2339 utime_t from, to;
2340 if (!from.parse(from_str)) {
2341 ss << "unable to parse datetime '" << from_str << "'";
2342 r = -EINVAL;
2343 cmdctx->reply(r, ss);
2344 } else if (to_str.size() && !to.parse(to_str)) {
2345 ss << "unable to parse datetime '" << to_str << "'";
2346 r = -EINVAL;
2347 cmdctx->reply(r, ss);
2348 } else {
2349 map<string,string> meta;
2350 daemon_state.with_device_create(
2351 devid,
2352 [from, to, &meta] (DeviceState& dev) {
2353 dev.set_life_expectancy(from, to, ceph_clock_now());
2354 meta = dev.metadata;
2355 });
2356 json_spirit::Object json_object;
2357 for (auto& i : meta) {
2358 json_spirit::Config::add(json_object, i.first, i.second);
2359 }
2360 bufferlist json;
2361 json.append(json_spirit::write(json_object));
2362 const string cmd =
2363 "{"
2364 "\"prefix\": \"config-key set\", "
2365 "\"key\": \"device/" + devid + "\""
2366 "}";
2367 auto on_finish = new ReplyOnFinish(cmdctx);
2368 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2369 }
2370 return true;
2371 } else if (prefix == "device rm-life-expectancy") {
2372 string devid;
2373 cmd_getval(cmdctx->cmdmap, "devid", devid);
2374 map<string,string> meta;
2375 if (daemon_state.with_device_write(devid, [&meta] (DeviceState& dev) {
2376 dev.rm_life_expectancy();
2377 meta = dev.metadata;
2378 })) {
2379 string cmd;
2380 bufferlist json;
2381 if (meta.empty()) {
2382 cmd =
2383 "{"
2384 "\"prefix\": \"config-key rm\", "
2385 "\"key\": \"device/" + devid + "\""
2386 "}";
2387 } else {
2388 json_spirit::Object json_object;
2389 for (auto& i : meta) {
2390 json_spirit::Config::add(json_object, i.first, i.second);
2391 }
2392 json.append(json_spirit::write(json_object));
2393 cmd =
2394 "{"
2395 "\"prefix\": \"config-key set\", "
2396 "\"key\": \"device/" + devid + "\""
2397 "}";
2398 }
2399 auto on_finish = new ReplyOnFinish(cmdctx);
2400 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2401 } else {
2402 cmdctx->reply(0, ss);
2403 }
2404 return true;
2405 } else {
2406 if (!pgmap_ready) {
2407 ss << "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2408 }
2409 if (f) {
2410 f->open_object_section("pg_info");
2411 f->dump_bool("pg_ready", pgmap_ready);
2412 }
2413
2414 // fall back to feeding command to PGMap
2415 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2416 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
2417 f.get(), &ss, &cmdctx->odata);
2418 });
2419
2420 if (f) {
2421 f->close_section();
2422 }
2423 if (r != -EOPNOTSUPP) {
2424 if (f) {
2425 f->flush(cmdctx->odata);
2426 }
2427 cmdctx->reply(r, ss);
2428 return true;
2429 }
2430 }
2431
2432 // Was the command unfound?
2433 if (py_command.cmdstring.empty()) {
2434 ss << "No handler found for '" << prefix << "'";
2435 dout(4) << "No handler found for '" << prefix << "'" << dendl;
2436 cmdctx->reply(-EINVAL, ss);
2437 return true;
2438 }
2439
2440 // Validate that the module is active
2441 auto& mod_name = py_command.module_name;
2442 if (!py_modules.is_module_active(mod_name)) {
2443 ss << "Module '" << mod_name << "' is not enabled/loaded (required by "
2444 "command '" << prefix << "'): use `ceph mgr module enable "
2445 << mod_name << "` to enable it";
2446 dout(4) << ss.str() << dendl;
2447 cmdctx->reply(-EOPNOTSUPP, ss);
2448 return true;
2449 }
2450
2451 dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
2452 Finisher& mod_finisher = py_modules.get_active_module_finisher(mod_name);
2453 mod_finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]
2454 (int r_) mutable {
2455 std::stringstream ss;
2456
2457 dout(10) << "dispatching command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
2458
2459 // Validate that the module is enabled
2460 auto& py_handler_name = py_command.module_name;
2461 PyModuleRef module = py_modules.get_module(py_handler_name);
2462 ceph_assert(module);
2463 if (!module->is_enabled()) {
2464 ss << "Module '" << py_handler_name << "' is not enabled (required by "
2465 "command '" << prefix << "'): use `ceph mgr module enable "
2466 << py_handler_name << "` to enable it";
2467 dout(4) << ss.str() << dendl;
2468 cmdctx->reply(-EOPNOTSUPP, ss);
2469 return;
2470 }
2471
2472 // Hack: allow the self-test method to run on unhealthy modules.
2473 // Fix this in future by creating a special path for self test rather
2474 // than having the hook be a normal module command.
2475 std::string self_test_prefix = py_handler_name + " " + "self-test";
2476
2477 // Validate that the module is healthy
2478 bool accept_command;
2479 if (module->is_loaded()) {
2480 if (module->get_can_run() && !module->is_failed()) {
2481 // Healthy module
2482 accept_command = true;
2483 } else if (self_test_prefix == prefix) {
2484 // Unhealthy, but allow because it's a self test command
2485 accept_command = true;
2486 } else {
2487 accept_command = false;
2488 ss << "Module '" << py_handler_name << "' has experienced an error and "
2489 "cannot handle commands: " << module->get_error_string();
2490 }
2491 } else {
2492 // Module not loaded
2493 accept_command = false;
2494 ss << "Module '" << py_handler_name << "' failed to load and "
2495 "cannot handle commands: " << module->get_error_string();
2496 }
2497
2498 if (!accept_command) {
2499 dout(4) << ss.str() << dendl;
2500 cmdctx->reply(-EIO, ss);
2501 return;
2502 }
2503
2504 std::stringstream ds;
2505 bufferlist inbl = cmdctx->data;
2506 int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
2507 inbl, &ds, &ss);
2508 if (r == -EACCES) {
2509 log_access_denied(cmdctx, session, ss);
2510 }
2511
2512 cmdctx->odata.append(ds);
2513 cmdctx->reply(r, ss);
2514 dout(10) << " command returned " << r << dendl;
2515 }));
2516 return true;
2517 }
2518
2519 void DaemonServer::_prune_pending_service_map()
2520 {
2521 utime_t cutoff = ceph_clock_now();
2522 cutoff -= g_conf().get_val<double>("mgr_service_beacon_grace");
2523 auto p = pending_service_map.services.begin();
2524 while (p != pending_service_map.services.end()) {
2525 auto q = p->second.daemons.begin();
2526 while (q != p->second.daemons.end()) {
2527 DaemonKey key{p->first, q->first};
2528 if (!daemon_state.exists(key)) {
2529 if (ServiceMap::is_normal_ceph_entity(p->first)) {
2530 dout(10) << "daemon " << key << " in service map but not in daemon state "
2531 << "index -- force pruning" << dendl;
2532 q = p->second.daemons.erase(q);
2533 pending_service_map_dirty = pending_service_map.epoch;
2534 } else {
2535 derr << "missing key " << key << dendl;
2536 ++q;
2537 }
2538
2539 continue;
2540 }
2541
2542 auto daemon = daemon_state.get(key);
2543 std::lock_guard l(daemon->lock);
2544 if (daemon->last_service_beacon == utime_t()) {
2545 // we must have just restarted; assume they are alive now.
2546 daemon->last_service_beacon = ceph_clock_now();
2547 ++q;
2548 continue;
2549 }
2550 if (daemon->last_service_beacon < cutoff) {
2551 dout(10) << "pruning stale " << p->first << "." << q->first
2552 << " last_beacon " << daemon->last_service_beacon << dendl;
2553 q = p->second.daemons.erase(q);
2554 pending_service_map_dirty = pending_service_map.epoch;
2555 } else {
2556 ++q;
2557 }
2558 }
2559 if (p->second.daemons.empty()) {
2560 p = pending_service_map.services.erase(p);
2561 pending_service_map_dirty = pending_service_map.epoch;
2562 } else {
2563 ++p;
2564 }
2565 }
2566 }
2567
2568 void DaemonServer::send_report()
2569 {
2570 if (!pgmap_ready) {
2571 if (ceph_clock_now() - started_at > g_conf().get_val<int64_t>("mgr_stats_period") * 4.0) {
2572 pgmap_ready = true;
2573 reported_osds.clear();
2574 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2575 << "potentially incomplete PG state to mon" << dendl;
2576 } else {
2577 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2578 << dendl;
2579 return;
2580 }
2581 }
2582
2583 auto m = ceph::make_message<MMonMgrReport>();
2584 m->gid = monc->get_global_id();
2585 py_modules.get_health_checks(&m->health_checks);
2586 py_modules.get_progress_events(&m->progress_events);
2587
2588 cluster_state.with_mutable_pgmap([&](PGMap& pg_map) {
2589 cluster_state.update_delta_stats();
2590
2591 if (pending_service_map.epoch) {
2592 _prune_pending_service_map();
2593 if (pending_service_map_dirty >= pending_service_map.epoch) {
2594 pending_service_map.modified = ceph_clock_now();
2595 encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
2596 dout(10) << "sending service_map e" << pending_service_map.epoch
2597 << dendl;
2598 pending_service_map.epoch++;
2599 }
2600 }
2601
2602 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
2603 // FIXME: no easy way to get mon features here. this will do for
2604 // now, though, as long as we don't make a backward-incompat change.
2605 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
2606 dout(10) << pg_map << dendl;
2607
2608 pg_map.get_health_checks(g_ceph_context, osdmap,
2609 &m->health_checks);
2610
2611 dout(10) << m->health_checks.checks.size() << " health checks"
2612 << dendl;
2613 dout(20) << "health checks:\n";
2614 JSONFormatter jf(true);
2615 jf.dump_object("health_checks", m->health_checks);
2616 jf.flush(*_dout);
2617 *_dout << dendl;
2618 if (osdmap.require_osd_release >= ceph_release_t::luminous) {
2619 clog->debug() << "pgmap v" << pg_map.version << ": " << pg_map;
2620 }
2621 });
2622 });
2623
2624 map<daemon_metric, unique_ptr<DaemonHealthMetricCollector>> accumulated;
2625 for (auto service : {"osd", "mon"} ) {
2626 auto daemons = daemon_state.get_by_service(service);
2627 for (const auto& [key,state] : daemons) {
2628 std::lock_guard l{state->lock};
2629 for (const auto& metric : state->daemon_health_metrics) {
2630 auto acc = accumulated.find(metric.get_type());
2631 if (acc == accumulated.end()) {
2632 auto collector = DaemonHealthMetricCollector::create(metric.get_type());
2633 if (!collector) {
2634 derr << __func__ << " " << key
2635 << " sent me an unknown health metric: "
2636 << std::hex << static_cast<uint8_t>(metric.get_type())
2637 << std::dec << dendl;
2638 continue;
2639 }
2640 tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
2641 std::move(collector));
2642 }
2643 acc->second->update(key, metric);
2644 }
2645 }
2646 }
2647 for (const auto& acc : accumulated) {
2648 acc.second->summarize(m->health_checks);
2649 }
2650 // TODO? We currently do not notify the PyModules
2651 // TODO: respect needs_send, so we send the report only if we are asked to do
2652 // so, or the state is updated.
2653 monc->send_mon_message(std::move(m));
2654 }
2655
2656 void DaemonServer::adjust_pgs()
2657 {
2658 dout(20) << dendl;
2659 unsigned max = std::max<int64_t>(1, g_conf()->mon_osd_max_creating_pgs);
2660 double max_misplaced = g_conf().get_val<double>("target_max_misplaced_ratio");
2661 bool aggro = g_conf().get_val<bool>("mgr_debug_aggressive_pg_num_changes");
2662
2663 map<string,unsigned> pg_num_to_set;
2664 map<string,unsigned> pgp_num_to_set;
2665 set<pg_t> upmaps_to_clear;
2666 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2667 unsigned creating_or_unknown = 0;
2668 for (auto& i : pg_map.num_pg_by_state) {
2669 if ((i.first & (PG_STATE_CREATING)) ||
2670 i.first == 0) {
2671 creating_or_unknown += i.second;
2672 }
2673 }
2674 unsigned left = max;
2675 if (creating_or_unknown >= max) {
2676 return;
2677 }
2678 left -= creating_or_unknown;
2679 dout(10) << "creating_or_unknown " << creating_or_unknown
2680 << " max_creating " << max
2681 << " left " << left
2682 << dendl;
2683
2684 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2685 // can run more frequently than we get updated pg stats from OSDs. We
2686 // may make multiple adjustments with stale informaiton.
2687 double misplaced_ratio, degraded_ratio;
2688 double inactive_pgs_ratio, unknown_pgs_ratio;
2689 pg_map.get_recovery_stats(&misplaced_ratio, &degraded_ratio,
2690 &inactive_pgs_ratio, &unknown_pgs_ratio);
2691 dout(20) << "misplaced_ratio " << misplaced_ratio
2692 << " degraded_ratio " << degraded_ratio
2693 << " inactive_pgs_ratio " << inactive_pgs_ratio
2694 << " unknown_pgs_ratio " << unknown_pgs_ratio
2695 << "; target_max_misplaced_ratio " << max_misplaced
2696 << dendl;
2697
2698 for (auto& i : osdmap.get_pools()) {
2699 const pg_pool_t& p = i.second;
2700
2701 // adjust pg_num?
2702 if (p.get_pg_num_target() != p.get_pg_num()) {
2703 dout(20) << "pool " << i.first
2704 << " pg_num " << p.get_pg_num()
2705 << " target " << p.get_pg_num_target()
2706 << dendl;
2707 if (p.has_flag(pg_pool_t::FLAG_CREATING)) {
2708 dout(10) << "pool " << i.first
2709 << " pg_num_target " << p.get_pg_num_target()
2710 << " pg_num " << p.get_pg_num()
2711 << " - still creating initial pgs"
2712 << dendl;
2713 } else if (p.get_pg_num_target() < p.get_pg_num()) {
2714 // pg_num decrease (merge)
2715 pg_t merge_source(p.get_pg_num() - 1, i.first);
2716 pg_t merge_target = merge_source.get_parent();
2717 bool ok = true;
2718
2719 if (p.get_pg_num() != p.get_pg_num_pending()) {
2720 dout(10) << "pool " << i.first
2721 << " pg_num_target " << p.get_pg_num_target()
2722 << " pg_num " << p.get_pg_num()
2723 << " - decrease and pg_num_pending != pg_num, waiting"
2724 << dendl;
2725 ok = false;
2726 } else if (p.get_pg_num() == p.get_pgp_num()) {
2727 dout(10) << "pool " << i.first
2728 << " pg_num_target " << p.get_pg_num_target()
2729 << " pg_num " << p.get_pg_num()
2730 << " - decrease blocked by pgp_num "
2731 << p.get_pgp_num()
2732 << dendl;
2733 ok = false;
2734 }
2735 vector<int32_t> source_acting;
2736 for (auto &merge_participant : {merge_source, merge_target}) {
2737 bool is_merge_source = merge_participant == merge_source;
2738 if (osdmap.have_pg_upmaps(merge_participant)) {
2739 dout(10) << "pool " << i.first
2740 << " pg_num_target " << p.get_pg_num_target()
2741 << " pg_num " << p.get_pg_num()
2742 << (is_merge_source ? " - merge source " : " - merge target ")
2743 << merge_participant
2744 << " has upmap" << dendl;
2745 upmaps_to_clear.insert(merge_participant);
2746 ok = false;
2747 }
2748 auto q = pg_map.pg_stat.find(merge_participant);
2749 if (q == pg_map.pg_stat.end()) {
2750 dout(10) << "pool " << i.first
2751 << " pg_num_target " << p.get_pg_num_target()
2752 << " pg_num " << p.get_pg_num()
2753 << " - no state for " << merge_participant
2754 << (is_merge_source ? " (merge source)" : " (merge target)")
2755 << dendl;
2756 ok = false;
2757 } else if ((q->second.state & (PG_STATE_ACTIVE | PG_STATE_CLEAN)) !=
2758 (PG_STATE_ACTIVE | PG_STATE_CLEAN)) {
2759 dout(10) << "pool " << i.first
2760 << " pg_num_target " << p.get_pg_num_target()
2761 << " pg_num " << p.get_pg_num()
2762 << (is_merge_source ? " - merge source " : " - merge target ")
2763 << merge_participant
2764 << " not clean (" << pg_state_string(q->second.state)
2765 << ")" << dendl;
2766 ok = false;
2767 }
2768 if (is_merge_source) {
2769 source_acting = q->second.acting;
2770 } else if (ok && q->second.acting != source_acting) {
2771 dout(10) << "pool " << i.first
2772 << " pg_num_target " << p.get_pg_num_target()
2773 << " pg_num " << p.get_pg_num()
2774 << (is_merge_source ? " - merge source " : " - merge target ")
2775 << merge_participant
2776 << " acting does not match (source " << source_acting
2777 << " != target " << q->second.acting
2778 << ")" << dendl;
2779 ok = false;
2780 }
2781 }
2782
2783 if (ok) {
2784 unsigned target = p.get_pg_num() - 1;
2785 dout(10) << "pool " << i.first
2786 << " pg_num_target " << p.get_pg_num_target()
2787 << " pg_num " << p.get_pg_num()
2788 << " -> " << target
2789 << " (merging " << merge_source
2790 << " and " << merge_target
2791 << ")" << dendl;
2792 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2793 continue;
2794 }
2795 } else if (p.get_pg_num_target() > p.get_pg_num()) {
2796 // pg_num increase (split)
2797 bool active = true;
2798 auto q = pg_map.num_pg_by_pool_state.find(i.first);
2799 if (q != pg_map.num_pg_by_pool_state.end()) {
2800 for (auto& j : q->second) {
2801 if ((j.first & (PG_STATE_ACTIVE|PG_STATE_PEERED)) == 0) {
2802 dout(20) << "pool " << i.first << " has " << j.second
2803 << " pgs in " << pg_state_string(j.first)
2804 << dendl;
2805 active = false;
2806 break;
2807 }
2808 }
2809 } else {
2810 active = false;
2811 }
2812 unsigned pg_gap = p.get_pg_num() - p.get_pgp_num();
2813 unsigned max_jump = cct->_conf->mgr_max_pg_num_change;
2814 if (!active) {
2815 dout(10) << "pool " << i.first
2816 << " pg_num_target " << p.get_pg_num_target()
2817 << " pg_num " << p.get_pg_num()
2818 << " - not all pgs active"
2819 << dendl;
2820 } else if (pg_gap >= max_jump) {
2821 dout(10) << "pool " << i.first
2822 << " pg_num " << p.get_pg_num()
2823 << " - pgp_num " << p.get_pgp_num()
2824 << " gap >= max_pg_num_change " << max_jump
2825 << " - must scale pgp_num first"
2826 << dendl;
2827 } else {
2828 unsigned add = std::min(
2829 std::min(left, max_jump - pg_gap),
2830 p.get_pg_num_target() - p.get_pg_num());
2831 unsigned target = p.get_pg_num() + add;
2832 left -= add;
2833 dout(10) << "pool " << i.first
2834 << " pg_num_target " << p.get_pg_num_target()
2835 << " pg_num " << p.get_pg_num()
2836 << " -> " << target << dendl;
2837 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2838 }
2839 }
2840 }
2841
2842 // adjust pgp_num?
2843 unsigned target = std::min(p.get_pg_num_pending(),
2844 p.get_pgp_num_target());
2845 if (target != p.get_pgp_num()) {
2846 dout(20) << "pool " << i.first
2847 << " pgp_num_target " << p.get_pgp_num_target()
2848 << " pgp_num " << p.get_pgp_num()
2849 << " -> " << target << dendl;
2850 if (target > p.get_pgp_num() &&
2851 p.get_pgp_num() == p.get_pg_num()) {
2852 dout(10) << "pool " << i.first
2853 << " pgp_num_target " << p.get_pgp_num_target()
2854 << " pgp_num " << p.get_pgp_num()
2855 << " - increase blocked by pg_num " << p.get_pg_num()
2856 << dendl;
2857 } else if (!aggro && (inactive_pgs_ratio > 0 ||
2858 degraded_ratio > 0 ||
2859 unknown_pgs_ratio > 0)) {
2860 dout(10) << "pool " << i.first
2861 << " pgp_num_target " << p.get_pgp_num_target()
2862 << " pgp_num " << p.get_pgp_num()
2863 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2864 << " update" << dendl;
2865 } else if (!aggro && (misplaced_ratio > max_misplaced)) {
2866 dout(10) << "pool " << i.first
2867 << " pgp_num_target " << p.get_pgp_num_target()
2868 << " pgp_num " << p.get_pgp_num()
2869 << " - misplaced_ratio " << misplaced_ratio
2870 << " > max " << max_misplaced
2871 << ", deferring pgp_num update" << dendl;
2872 } else {
2873 // NOTE: this calculation assumes objects are
2874 // basically uniformly distributed across all PGs
2875 // (regardless of pool), which is probably not
2876 // perfectly correct, but it's a start. make no
2877 // single adjustment that's more than half of the
2878 // max_misplaced, to somewhat limit the magnitude of
2879 // our potential error here.
2880 unsigned next;
2881 static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP = 1;
2882 pool_stat_t s = pg_map.get_pg_pool_sum_stat(i.first);
2883 if (aggro ||
2884 // pool is (virtually) empty; just jump to final pgp_num?
2885 (p.get_pgp_num_target() > p.get_pgp_num() &&
2886 s.stats.sum.num_objects <= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP *
2887 p.get_pgp_num_target()))) {
2888 next = target;
2889 } else {
2890 double room =
2891 std::min<double>(max_misplaced - misplaced_ratio,
2892 max_misplaced / 2.0);
2893 unsigned estmax = std::max<unsigned>(
2894 (double)p.get_pg_num() * room, 1u);
2895 unsigned next_min = 0;
2896 if (p.get_pgp_num() > estmax) {
2897 next_min = p.get_pgp_num() - estmax;
2898 }
2899 next = std::clamp(target,
2900 next_min,
2901 p.get_pgp_num() + estmax);
2902 dout(20) << " room " << room << " estmax " << estmax
2903 << " delta " << (target-p.get_pgp_num())
2904 << " next " << next << dendl;
2905 if (p.get_pgp_num_target() == p.get_pg_num_target() &&
2906 p.get_pgp_num_target() < p.get_pg_num()) {
2907 // since pgp_num is tracking pg_num, ceph is handling
2908 // pgp_num. so, be responsible: don't let pgp_num get
2909 // too far out ahead of merges (if we are merging).
2910 // this avoids moving lots of unmerged pgs onto a
2911 // small number of OSDs where we might blow out the
2912 // per-osd pg max.
2913 unsigned max_outpace_merges =
2914 std::max<unsigned>(8, p.get_pg_num() * max_misplaced);
2915 if (next + max_outpace_merges < p.get_pg_num()) {
2916 next = p.get_pg_num() - max_outpace_merges;
2917 dout(10) << " using next " << next
2918 << " to avoid outpacing merges (max_outpace_merges "
2919 << max_outpace_merges << ")" << dendl;
2920 }
2921 }
2922 }
2923 if (next != p.get_pgp_num()) {
2924 dout(10) << "pool " << i.first
2925 << " pgp_num_target " << p.get_pgp_num_target()
2926 << " pgp_num " << p.get_pgp_num()
2927 << " -> " << next << dendl;
2928 pgp_num_to_set[osdmap.get_pool_name(i.first)] = next;
2929 }
2930 }
2931 }
2932 if (left == 0) {
2933 return;
2934 }
2935 }
2936 });
2937 for (auto i : pg_num_to_set) {
2938 const string cmd =
2939 "{"
2940 "\"prefix\": \"osd pool set\", "
2941 "\"pool\": \"" + i.first + "\", "
2942 "\"var\": \"pg_num_actual\", "
2943 "\"val\": \"" + stringify(i.second) + "\""
2944 "}";
2945 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2946 }
2947 for (auto i : pgp_num_to_set) {
2948 const string cmd =
2949 "{"
2950 "\"prefix\": \"osd pool set\", "
2951 "\"pool\": \"" + i.first + "\", "
2952 "\"var\": \"pgp_num_actual\", "
2953 "\"val\": \"" + stringify(i.second) + "\""
2954 "}";
2955 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2956 }
2957 for (auto pg : upmaps_to_clear) {
2958 const string cmd =
2959 "{"
2960 "\"prefix\": \"osd rm-pg-upmap\", "
2961 "\"pgid\": \"" + stringify(pg) + "\""
2962 "}";
2963 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2964 const string cmd2 =
2965 "{"
2966 "\"prefix\": \"osd rm-pg-upmap-items\", "
2967 "\"pgid\": \"" + stringify(pg) + "\"" +
2968 "}";
2969 monc->start_mon_command({cmd2}, {}, nullptr, nullptr, nullptr);
2970 }
2971 }
2972
2973 void DaemonServer::got_service_map()
2974 {
2975 std::lock_guard l(lock);
2976
2977 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
2978 if (pending_service_map.epoch == 0) {
2979 // we just started up
2980 dout(10) << "got initial map e" << service_map.epoch << dendl;
2981 ceph_assert(pending_service_map_dirty == 0);
2982 pending_service_map = service_map;
2983 pending_service_map.epoch = service_map.epoch + 1;
2984 } else if (pending_service_map.epoch <= service_map.epoch) {
2985 // we just started up but got one more not our own map
2986 dout(10) << "got newer initial map e" << service_map.epoch << dendl;
2987 ceph_assert(pending_service_map_dirty == 0);
2988 pending_service_map = service_map;
2989 pending_service_map.epoch = service_map.epoch + 1;
2990 } else {
2991 // we already active and therefore must have persisted it,
2992 // which means ours is the same or newer.
2993 dout(10) << "got updated map e" << service_map.epoch << dendl;
2994 }
2995 });
2996
2997 // cull missing daemons, populate new ones
2998 std::set<std::string> types;
2999 for (auto& [type, service] : pending_service_map.services) {
3000 if (ServiceMap::is_normal_ceph_entity(type)) {
3001 continue;
3002 }
3003
3004 types.insert(type);
3005
3006 std::set<std::string> names;
3007 for (auto& q : service.daemons) {
3008 names.insert(q.first);
3009 DaemonKey key{type, q.first};
3010 if (!daemon_state.exists(key)) {
3011 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
3012 daemon->key = key;
3013 daemon->set_metadata(q.second.metadata);
3014 daemon->service_daemon = true;
3015 daemon_state.insert(daemon);
3016 dout(10) << "added missing " << key << dendl;
3017 }
3018 }
3019 daemon_state.cull(type, names);
3020 }
3021 daemon_state.cull_services(types);
3022 }
3023
3024 void DaemonServer::got_mgr_map()
3025 {
3026 std::lock_guard l(lock);
3027 set<std::string> have;
3028 cluster_state.with_mgrmap([&](const MgrMap& mgrmap) {
3029 auto md_update = [&] (DaemonKey key) {
3030 std::ostringstream oss;
3031 auto c = new MetadataUpdate(daemon_state, key);
3032 // FIXME remove post-nautilus: include 'id' for luminous mons
3033 oss << "{\"prefix\": \"mgr metadata\", \"who\": \""
3034 << key.name << "\", \"id\": \"" << key.name << "\"}";
3035 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
3036 };
3037 if (mgrmap.active_name.size()) {
3038 DaemonKey key{"mgr", mgrmap.active_name};
3039 have.insert(mgrmap.active_name);
3040 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
3041 md_update(key);
3042 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
3043 }
3044 }
3045 for (auto& i : mgrmap.standbys) {
3046 DaemonKey key{"mgr", i.second.name};
3047 have.insert(i.second.name);
3048 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
3049 md_update(key);
3050 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
3051 }
3052 }
3053 });
3054 daemon_state.cull("mgr", have);
3055 }
3056
3057 const char** DaemonServer::get_tracked_conf_keys() const
3058 {
3059 static const char *KEYS[] = {
3060 "mgr_stats_threshold",
3061 "mgr_stats_period",
3062 nullptr
3063 };
3064
3065 return KEYS;
3066 }
3067
3068 void DaemonServer::handle_conf_change(const ConfigProxy& conf,
3069 const std::set <std::string> &changed)
3070 {
3071
3072 if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
3073 dout(4) << "Updating stats threshold/period on "
3074 << daemon_connections.size() << " clients" << dendl;
3075 // Send a fresh MMgrConfigure to all clients, so that they can follow
3076 // the new policy for transmitting stats
3077 finisher.queue(new LambdaContext([this](int r) {
3078 std::lock_guard l(lock);
3079 for (auto &c : daemon_connections) {
3080 _send_configure(c);
3081 }
3082 }));
3083 }
3084 }
3085
3086 void DaemonServer::_send_configure(ConnectionRef c)
3087 {
3088 ceph_assert(ceph_mutex_is_locked_by_me(lock));
3089
3090 auto configure = make_message<MMgrConfigure>();
3091 configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
3092 configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
3093
3094 if (c->peer_is_osd()) {
3095 configure->osd_perf_metric_queries =
3096 osd_perf_metric_collector.get_queries();
3097 } else if (c->peer_is_mds()) {
3098 configure->metric_config_message =
3099 MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector.get_queries()));
3100 }
3101
3102 c->send_message2(configure);
3103 }
3104
3105 MetricQueryID DaemonServer::add_osd_perf_query(
3106 const OSDPerfMetricQuery &query,
3107 const std::optional<OSDPerfMetricLimit> &limit)
3108 {
3109 return osd_perf_metric_collector.add_query(query, limit);
3110 }
3111
3112 int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
3113 {
3114 return osd_perf_metric_collector.remove_query(query_id);
3115 }
3116
3117 int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector)
3118 {
3119 return osd_perf_metric_collector.get_counters(collector);
3120 }
3121
3122 MetricQueryID DaemonServer::add_mds_perf_query(
3123 const MDSPerfMetricQuery &query,
3124 const std::optional<MDSPerfMetricLimit> &limit)
3125 {
3126 return mds_perf_metric_collector.add_query(query, limit);
3127 }
3128
3129 int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
3130 {
3131 return mds_perf_metric_collector.remove_query(query_id);
3132 }
3133
3134 void DaemonServer::reregister_mds_perf_queries()
3135 {
3136 mds_perf_metric_collector.reregister_queries();
3137 }
3138
3139 int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
3140 {
3141 return mds_perf_metric_collector.get_counters(collector);
3142 }