]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/DaemonServer.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "DaemonServer.h"
15 #include <boost/algorithm/string.hpp>
16 #include "mgr/Mgr.h"
17
18 #include "include/stringify.h"
19 #include "include/str_list.h"
20 #include "auth/RotatingKeyRing.h"
21 #include "json_spirit/json_spirit_writer.h"
22
23 #include "mgr/mgr_commands.h"
24 #include "mgr/DaemonHealthMetricCollector.h"
25 #include "mgr/OSDPerfMetricCollector.h"
26 #include "mgr/MDSPerfMetricCollector.h"
27 #include "mon/MonCommand.h"
28
29 #include "messages/MMgrOpen.h"
30 #include "messages/MMgrUpdate.h"
31 #include "messages/MMgrClose.h"
32 #include "messages/MMgrConfigure.h"
33 #include "messages/MMonMgrReport.h"
34 #include "messages/MCommand.h"
35 #include "messages/MCommandReply.h"
36 #include "messages/MMgrCommand.h"
37 #include "messages/MMgrCommandReply.h"
38 #include "messages/MPGStats.h"
39 #include "messages/MOSDScrub.h"
40 #include "messages/MOSDScrub2.h"
41 #include "messages/MOSDForceRecovery.h"
42 #include "common/errno.h"
43 #include "common/pick_address.h"
44
45 #define dout_context g_ceph_context
46 #define dout_subsys ceph_subsys_mgr
47 #undef dout_prefix
48 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
49
50 using namespace TOPNSPC::common;
51
52 using std::list;
53 using std::ostringstream;
54 using std::string;
55 using std::stringstream;
56 using std::vector;
57 using std::unique_ptr;
58
59 namespace {
60 template <typename Map>
61 bool map_compare(Map const &lhs, Map const &rhs) {
62 return lhs.size() == rhs.size()
63 && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
64 [] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
65 }
66 }
67
68 DaemonServer::DaemonServer(MonClient *monc_,
69 Finisher &finisher_,
70 DaemonStateIndex &daemon_state_,
71 ClusterState &cluster_state_,
72 PyModuleRegistry &py_modules_,
73 LogChannelRef clog_,
74 LogChannelRef audit_clog_)
75 : Dispatcher(g_ceph_context),
76 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
77 g_conf().get_val<Option::size_t>("mgr_client_bytes"))),
78 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
79 g_conf().get_val<uint64_t>("mgr_client_messages"))),
80 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
81 g_conf().get_val<Option::size_t>("mgr_osd_bytes"))),
82 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
83 g_conf().get_val<uint64_t>("mgr_osd_messages"))),
84 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
85 g_conf().get_val<Option::size_t>("mgr_mds_bytes"))),
86 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
87 g_conf().get_val<uint64_t>("mgr_mds_messages"))),
88 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
89 g_conf().get_val<Option::size_t>("mgr_mon_bytes"))),
90 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
91 g_conf().get_val<uint64_t>("mgr_mon_messages"))),
92 msgr(nullptr),
93 monc(monc_),
94 finisher(finisher_),
95 daemon_state(daemon_state_),
96 cluster_state(cluster_state_),
97 py_modules(py_modules_),
98 clog(clog_),
99 audit_clog(audit_clog_),
100 pgmap_ready(false),
101 timer(g_ceph_context, lock),
102 shutting_down(false),
103 tick_event(nullptr),
104 osd_perf_metric_collector_listener(this),
105 osd_perf_metric_collector(osd_perf_metric_collector_listener),
106 mds_perf_metric_collector_listener(this),
107 mds_perf_metric_collector(mds_perf_metric_collector_listener)
108 {
109 g_conf().add_observer(this);
110 }
111
112 DaemonServer::~DaemonServer() {
113 delete msgr;
114 g_conf().remove_observer(this);
115 }
116
117 int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
118 {
119 // Initialize Messenger
120 std::string public_msgr_type = g_conf()->ms_public_type.empty() ?
121 g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
122 msgr = Messenger::create(g_ceph_context, public_msgr_type,
123 entity_name_t::MGR(gid),
124 "mgr",
125 Messenger::get_pid_nonce());
126 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
127
128 msgr->set_auth_client(monc);
129
130 // throttle clients
131 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
132 client_byte_throttler.get(),
133 client_msg_throttler.get());
134
135 // servers
136 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
137 osd_byte_throttler.get(),
138 osd_msg_throttler.get());
139 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
140 mds_byte_throttler.get(),
141 mds_msg_throttler.get());
142 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
143 mon_byte_throttler.get(),
144 mon_msg_throttler.get());
145
146 entity_addrvec_t addrs;
147 int r = pick_addresses(cct, CEPH_PICK_ADDRESS_PUBLIC, &addrs);
148 if (r < 0) {
149 return r;
150 }
151 dout(20) << __func__ << " will bind to " << addrs << dendl;
152 r = msgr->bindv(addrs);
153 if (r < 0) {
154 derr << "unable to bind mgr to " << addrs << dendl;
155 return r;
156 }
157
158 msgr->set_myname(entity_name_t::MGR(gid));
159 msgr->set_addr_unknowns(client_addrs);
160
161 msgr->start();
162 msgr->add_dispatcher_tail(this);
163
164 msgr->set_auth_server(monc);
165 monc->set_handle_authentication_dispatcher(this);
166
167 started_at = ceph_clock_now();
168
169 std::lock_guard l(lock);
170 timer.init();
171
172 schedule_tick_locked(
173 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
174
175 return 0;
176 }
177
178 entity_addrvec_t DaemonServer::get_myaddrs() const
179 {
180 return msgr->get_myaddrs();
181 }
182
183 int DaemonServer::ms_handle_authentication(Connection *con)
184 {
185 auto s = ceph::make_ref<MgrSession>(cct);
186 con->set_priv(s);
187 s->inst.addr = con->get_peer_addr();
188 s->entity_name = con->peer_name;
189 dout(10) << __func__ << " new session " << s << " con " << con
190 << " entity " << con->peer_name
191 << " addr " << con->get_peer_addrs()
192 << dendl;
193
194 AuthCapsInfo &caps_info = con->get_peer_caps_info();
195 if (caps_info.allow_all) {
196 dout(10) << " session " << s << " " << s->entity_name
197 << " allow_all" << dendl;
198 s->caps.set_allow_all();
199 } else if (caps_info.caps.length() > 0) {
200 auto p = caps_info.caps.cbegin();
201 string str;
202 try {
203 decode(str, p);
204 }
205 catch (buffer::error& e) {
206 dout(10) << " session " << s << " " << s->entity_name
207 << " failed to decode caps" << dendl;
208 return -EACCES;
209 }
210 if (!s->caps.parse(str)) {
211 dout(10) << " session " << s << " " << s->entity_name
212 << " failed to parse caps '" << str << "'" << dendl;
213 return -EACCES;
214 }
215 dout(10) << " session " << s << " " << s->entity_name
216 << " has caps " << s->caps << " '" << str << "'" << dendl;
217 }
218
219 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
220 std::lock_guard l(lock);
221 s->osd_id = atoi(s->entity_name.get_id().c_str());
222 dout(10) << "registering osd." << s->osd_id << " session "
223 << s << " con " << con << dendl;
224 osd_cons[s->osd_id].insert(con);
225 }
226
227 return 1;
228 }
229
230 bool DaemonServer::ms_handle_reset(Connection *con)
231 {
232 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
233 auto priv = con->get_priv();
234 auto session = static_cast<MgrSession*>(priv.get());
235 if (!session) {
236 return false;
237 }
238 std::lock_guard l(lock);
239 dout(10) << "unregistering osd." << session->osd_id
240 << " session " << session << " con " << con << dendl;
241 osd_cons[session->osd_id].erase(con);
242
243 auto iter = daemon_connections.find(con);
244 if (iter != daemon_connections.end()) {
245 daemon_connections.erase(iter);
246 }
247 }
248 return false;
249 }
250
251 bool DaemonServer::ms_handle_refused(Connection *con)
252 {
253 // do nothing for now
254 return false;
255 }
256
257 bool DaemonServer::ms_dispatch2(const ref_t<Message>& m)
258 {
259 // Note that we do *not* take ::lock here, in order to avoid
260 // serializing all message handling. It's up to each handler
261 // to take whatever locks it needs.
262 switch (m->get_type()) {
263 case MSG_PGSTATS:
264 cluster_state.ingest_pgstats(ref_cast<MPGStats>(m));
265 maybe_ready(m->get_source().num());
266 return true;
267 case MSG_MGR_REPORT:
268 return handle_report(ref_cast<MMgrReport>(m));
269 case MSG_MGR_OPEN:
270 return handle_open(ref_cast<MMgrOpen>(m));
271 case MSG_MGR_UPDATE:
272 return handle_update(ref_cast<MMgrUpdate>(m));
273 case MSG_MGR_CLOSE:
274 return handle_close(ref_cast<MMgrClose>(m));
275 case MSG_COMMAND:
276 return handle_command(ref_cast<MCommand>(m));
277 case MSG_MGR_COMMAND:
278 return handle_command(ref_cast<MMgrCommand>(m));
279 default:
280 dout(1) << "Unhandled message type " << m->get_type() << dendl;
281 return false;
282 };
283 }
284
285 void DaemonServer::dump_pg_ready(ceph::Formatter *f)
286 {
287 f->dump_bool("pg_ready", pgmap_ready.load());
288 }
289
290 void DaemonServer::maybe_ready(int32_t osd_id)
291 {
292 if (pgmap_ready.load()) {
293 // Fast path: we don't need to take lock because pgmap_ready
294 // is already set
295 } else {
296 std::lock_guard l(lock);
297
298 if (reported_osds.find(osd_id) == reported_osds.end()) {
299 dout(4) << "initial report from osd " << osd_id << dendl;
300 reported_osds.insert(osd_id);
301 std::set<int32_t> up_osds;
302
303 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
304 osdmap.get_up_osds(up_osds);
305 });
306
307 std::set<int32_t> unreported_osds;
308 std::set_difference(up_osds.begin(), up_osds.end(),
309 reported_osds.begin(), reported_osds.end(),
310 std::inserter(unreported_osds, unreported_osds.begin()));
311
312 if (unreported_osds.size() == 0) {
313 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
314 pgmap_ready = true;
315 reported_osds.clear();
316 // Avoid waiting for next tick
317 send_report();
318 } else {
319 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
320 " to report in before PGMap is ready" << dendl;
321 }
322 }
323 }
324 }
325
326 void DaemonServer::tick()
327 {
328 dout(10) << dendl;
329 send_report();
330 adjust_pgs();
331
332 schedule_tick_locked(
333 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
334 }
335
336 // Currently modules do not set health checks in response to events delivered to
337 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
338 // if this pattern emerges in the future, this scheduler could be modified to
339 // fire after all modules have had a chance to set their health checks.
340 void DaemonServer::schedule_tick_locked(double delay_sec)
341 {
342 ceph_assert(ceph_mutex_is_locked_by_me(lock));
343
344 if (tick_event) {
345 timer.cancel_event(tick_event);
346 tick_event = nullptr;
347 }
348
349 // on shutdown start rejecting explicit requests to send reports that may
350 // originate from python land which may still be running.
351 if (shutting_down)
352 return;
353
354 tick_event = timer.add_event_after(delay_sec,
355 new LambdaContext([this](int r) {
356 tick();
357 }));
358 }
359
360 void DaemonServer::schedule_tick(double delay_sec)
361 {
362 std::lock_guard l(lock);
363 schedule_tick_locked(delay_sec);
364 }
365
366 void DaemonServer::handle_osd_perf_metric_query_updated()
367 {
368 dout(10) << dendl;
369
370 // Send a fresh MMgrConfigure to all clients, so that they can follow
371 // the new policy for transmitting stats
372 finisher.queue(new LambdaContext([this](int r) {
373 std::lock_guard l(lock);
374 for (auto &c : daemon_connections) {
375 if (c->peer_is_osd()) {
376 _send_configure(c);
377 }
378 }
379 }));
380 }
381
382 void DaemonServer::handle_mds_perf_metric_query_updated()
383 {
384 dout(10) << dendl;
385
386 // Send a fresh MMgrConfigure to all clients, so that they can follow
387 // the new policy for transmitting stats
388 finisher.queue(new LambdaContext([this](int r) {
389 std::lock_guard l(lock);
390 for (auto &c : daemon_connections) {
391 if (c->peer_is_mds()) {
392 _send_configure(c);
393 }
394 }
395 }));
396 }
397
398 void DaemonServer::shutdown()
399 {
400 dout(10) << "begin" << dendl;
401 msgr->shutdown();
402 msgr->wait();
403 cluster_state.shutdown();
404 dout(10) << "done" << dendl;
405
406 std::lock_guard l(lock);
407 shutting_down = true;
408 timer.shutdown();
409 }
410
411 static DaemonKey key_from_service(
412 const std::string& service_name,
413 int peer_type,
414 const std::string& daemon_name)
415 {
416 if (!service_name.empty()) {
417 return DaemonKey{service_name, daemon_name};
418 } else {
419 return DaemonKey{ceph_entity_type_name(peer_type), daemon_name};
420 }
421 }
422
423 void DaemonServer::fetch_missing_metadata(const DaemonKey& key,
424 const entity_addr_t& addr)
425 {
426 if (!daemon_state.is_updating(key) &&
427 (key.type == "osd" || key.type == "mds" || key.type == "mon")) {
428 std::ostringstream oss;
429 auto c = new MetadataUpdate(daemon_state, key);
430 if (key.type == "osd") {
431 oss << "{\"prefix\": \"osd metadata\", \"id\": "
432 << key.name<< "}";
433 } else if (key.type == "mds") {
434 c->set_default("addr", stringify(addr));
435 oss << "{\"prefix\": \"mds metadata\", \"who\": \""
436 << key.name << "\"}";
437 } else if (key.type == "mon") {
438 oss << "{\"prefix\": \"mon metadata\", \"id\": \""
439 << key.name << "\"}";
440 } else {
441 ceph_abort();
442 }
443 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
444 }
445 }
446
447 bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
448 {
449 std::unique_lock l(lock);
450
451 DaemonKey key = key_from_service(m->service_name,
452 m->get_connection()->get_peer_type(),
453 m->daemon_name);
454
455 auto con = m->get_connection();
456 dout(10) << "from " << key << " " << con->get_peer_addr() << dendl;
457
458 _send_configure(con);
459
460 DaemonStatePtr daemon;
461 if (daemon_state.exists(key)) {
462 dout(20) << "updating existing DaemonState for " << key << dendl;
463 daemon = daemon_state.get(key);
464 }
465 if (!daemon) {
466 if (m->service_daemon) {
467 dout(4) << "constructing new DaemonState for " << key << dendl;
468 daemon = std::make_shared<DaemonState>(daemon_state.types);
469 daemon->key = key;
470 daemon->service_daemon = true;
471 daemon_state.insert(daemon);
472 } else {
473 /* A normal Ceph daemon has connected but we are or should be waiting on
474 * metadata for it. Close the session so that it tries to reconnect.
475 */
476 dout(2) << "ignoring open from " << key << " " << con->get_peer_addr()
477 << "; not ready for session (expect reconnect)" << dendl;
478 con->mark_down();
479 l.unlock();
480 fetch_missing_metadata(key, m->get_source_addr());
481 return true;
482 }
483 }
484 if (daemon) {
485 if (m->service_daemon) {
486 // update the metadata through the daemon state index to
487 // ensure it's kept up-to-date
488 daemon_state.update_metadata(daemon, m->daemon_metadata);
489 }
490
491 std::lock_guard l(daemon->lock);
492 daemon->perf_counters.clear();
493
494 daemon->service_daemon = m->service_daemon;
495 if (m->service_daemon) {
496 daemon->service_status = m->daemon_status;
497
498 utime_t now = ceph_clock_now();
499 auto [d, added] = pending_service_map.get_daemon(m->service_name,
500 m->daemon_name);
501 if (added || d->gid != (uint64_t)m->get_source().num()) {
502 dout(10) << "registering " << key << " in pending_service_map" << dendl;
503 d->gid = m->get_source().num();
504 d->addr = m->get_source_addr();
505 d->start_epoch = pending_service_map.epoch;
506 d->start_stamp = now;
507 d->metadata = m->daemon_metadata;
508 pending_service_map_dirty = pending_service_map.epoch;
509 }
510 }
511
512 auto p = m->config_bl.cbegin();
513 if (p != m->config_bl.end()) {
514 decode(daemon->config, p);
515 decode(daemon->ignored_mon_config, p);
516 dout(20) << " got config " << daemon->config
517 << " ignored " << daemon->ignored_mon_config << dendl;
518 }
519 daemon->config_defaults_bl = m->config_defaults_bl;
520 daemon->config_defaults.clear();
521 dout(20) << " got config_defaults_bl " << daemon->config_defaults_bl.length()
522 << " bytes" << dendl;
523 }
524
525 if (con->get_peer_type() != entity_name_t::TYPE_CLIENT &&
526 m->service_name.empty())
527 {
528 // Store in set of the daemon/service connections, i.e. those
529 // connections that require an update in the event of stats
530 // configuration changes.
531 daemon_connections.insert(con);
532 }
533
534 return true;
535 }
536
537 bool DaemonServer::handle_update(const ref_t<MMgrUpdate>& m)
538 {
539 DaemonKey key;
540 if (!m->service_name.empty()) {
541 key.type = m->service_name;
542 } else {
543 key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
544 }
545 key.name = m->daemon_name;
546
547 dout(10) << "from " << m->get_connection() << " " << key << dendl;
548
549 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
550 m->service_name.empty()) {
551 // Clients should not be sending us update request
552 dout(10) << "rejecting update request from non-daemon client " << m->daemon_name
553 << dendl;
554 clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
555 << " at " << m->get_connection()->get_peer_addrs();
556 m->get_connection()->mark_down();
557 return true;
558 }
559
560
561 {
562 std::unique_lock locker(lock);
563
564 DaemonStatePtr daemon;
565 // Look up the DaemonState
566 if (daemon_state.exists(key)) {
567 dout(20) << "updating existing DaemonState for " << key << dendl;
568
569 daemon = daemon_state.get(key);
570 if (m->need_metadata_update &&
571 !m->daemon_metadata.empty()) {
572 daemon_state.update_metadata(daemon, m->daemon_metadata);
573 }
574 }
575 }
576
577 return true;
578 }
579
580 bool DaemonServer::handle_close(const ref_t<MMgrClose>& m)
581 {
582 std::lock_guard l(lock);
583
584 DaemonKey key = key_from_service(m->service_name,
585 m->get_connection()->get_peer_type(),
586 m->daemon_name);
587 dout(4) << "from " << m->get_connection() << " " << key << dendl;
588
589 if (daemon_state.exists(key)) {
590 DaemonStatePtr daemon = daemon_state.get(key);
591 daemon_state.rm(key);
592 {
593 std::lock_guard l(daemon->lock);
594 if (daemon->service_daemon) {
595 pending_service_map.rm_daemon(m->service_name, m->daemon_name);
596 pending_service_map_dirty = pending_service_map.epoch;
597 }
598 }
599 }
600
601 // send same message back as a reply
602 m->get_connection()->send_message2(m);
603 return true;
604 }
605
606 void DaemonServer::update_task_status(
607 DaemonKey key,
608 const std::map<std::string,std::string>& task_status)
609 {
610 dout(10) << "got task status from " << key << dendl;
611
612 [[maybe_unused]] auto [daemon, added] =
613 pending_service_map.get_daemon(key.type, key.name);
614 if (daemon->task_status != task_status) {
615 daemon->task_status = task_status;
616 pending_service_map_dirty = pending_service_map.epoch;
617 }
618 }
619
620 bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
621 {
622 DaemonKey key;
623 if (!m->service_name.empty()) {
624 key.type = m->service_name;
625 } else {
626 key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
627 }
628 key.name = m->daemon_name;
629
630 dout(10) << "from " << m->get_connection() << " " << key << dendl;
631
632 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
633 m->service_name.empty()) {
634 // Clients should not be sending us stats unless they are declaring
635 // themselves to be a daemon for some service.
636 dout(10) << "rejecting report from non-daemon client " << m->daemon_name
637 << dendl;
638 clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
639 << " at " << m->get_connection()->get_peer_addrs();
640 m->get_connection()->mark_down();
641 return true;
642 }
643
644
645 {
646 std::unique_lock locker(lock);
647
648 DaemonStatePtr daemon;
649 // Look up the DaemonState
650 if (daemon_state.exists(key)) {
651 dout(20) << "updating existing DaemonState for " << key << dendl;
652 daemon = daemon_state.get(key);
653 } else {
654 locker.unlock();
655
656 // we don't know the hostname at this stage, reject MMgrReport here.
657 dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
658 << dendl;
659 // issue metadata request in background
660 fetch_missing_metadata(key, m->get_source_addr());
661
662 locker.lock();
663
664 // kill session
665 auto priv = m->get_connection()->get_priv();
666 auto session = static_cast<MgrSession*>(priv.get());
667 if (!session) {
668 return false;
669 }
670 m->get_connection()->mark_down();
671
672 dout(10) << "unregistering osd." << session->osd_id
673 << " session " << session << " con " << m->get_connection() << dendl;
674
675 if (osd_cons.find(session->osd_id) != osd_cons.end()) {
676 osd_cons[session->osd_id].erase(m->get_connection());
677 }
678
679 auto iter = daemon_connections.find(m->get_connection());
680 if (iter != daemon_connections.end()) {
681 daemon_connections.erase(iter);
682 }
683
684 return false;
685 }
686
687 // Update the DaemonState
688 ceph_assert(daemon != nullptr);
689 {
690 std::lock_guard l(daemon->lock);
691 auto &daemon_counters = daemon->perf_counters;
692 daemon_counters.update(*m.get());
693
694 auto p = m->config_bl.cbegin();
695 if (p != m->config_bl.end()) {
696 decode(daemon->config, p);
697 decode(daemon->ignored_mon_config, p);
698 dout(20) << " got config " << daemon->config
699 << " ignored " << daemon->ignored_mon_config << dendl;
700 }
701
702 utime_t now = ceph_clock_now();
703 if (daemon->service_daemon) {
704 if (m->daemon_status) {
705 daemon->service_status_stamp = now;
706 daemon->service_status = *m->daemon_status;
707 }
708 daemon->last_service_beacon = now;
709 } else if (m->daemon_status) {
710 derr << "got status from non-daemon " << key << dendl;
711 }
712 // update task status
713 if (m->task_status) {
714 update_task_status(key, *m->task_status);
715 daemon->last_service_beacon = now;
716 }
717 if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
718 // only OSD and MON send health_checks to me now
719 daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
720 dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
721 << dendl;
722 }
723 }
724 }
725
726 // if there are any schema updates, notify the python modules
727 /* no users currently
728 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
729 py_modules.notify_all("perf_schema_update", ceph::to_string(key));
730 }
731 */
732
733 if (m->get_connection()->peer_is_osd()) {
734 osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
735 }
736
737 if (m->metric_report_message) {
738 const MetricReportMessage &message = *m->metric_report_message;
739 boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
740 }
741
742 return true;
743 }
744
745
746 void DaemonServer::_generate_command_map(
747 cmdmap_t& cmdmap,
748 map<string,string> &param_str_map)
749 {
750 for (auto p = cmdmap.begin();
751 p != cmdmap.end(); ++p) {
752 if (p->first == "prefix")
753 continue;
754 if (p->first == "caps") {
755 vector<string> cv;
756 if (cmd_getval(cmdmap, "caps", cv) &&
757 cv.size() % 2 == 0) {
758 for (unsigned i = 0; i < cv.size(); i += 2) {
759 string k = string("caps_") + cv[i];
760 param_str_map[k] = cv[i + 1];
761 }
762 continue;
763 }
764 }
765 param_str_map[p->first] = cmd_vartype_stringify(p->second);
766 }
767 }
768
769 const MonCommand *DaemonServer::_get_mgrcommand(
770 const string &cmd_prefix,
771 const std::vector<MonCommand> &cmds)
772 {
773 const MonCommand *this_cmd = nullptr;
774 for (const auto &cmd : cmds) {
775 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
776 this_cmd = &cmd;
777 break;
778 }
779 }
780 return this_cmd;
781 }
782
783 bool DaemonServer::_allowed_command(
784 MgrSession *s,
785 const string &service,
786 const string &module,
787 const string &prefix,
788 const cmdmap_t& cmdmap,
789 const map<string,string>& param_str_map,
790 const MonCommand *this_cmd) {
791
792 if (s->entity_name.is_mon()) {
793 // mon is all-powerful. even when it is forwarding commands on behalf of
794 // old clients; we expect the mon is validating commands before proxying!
795 return true;
796 }
797
798 bool cmd_r = this_cmd->requires_perm('r');
799 bool cmd_w = this_cmd->requires_perm('w');
800 bool cmd_x = this_cmd->requires_perm('x');
801
802 bool capable = s->caps.is_capable(
803 g_ceph_context,
804 s->entity_name,
805 service, module, prefix, param_str_map,
806 cmd_r, cmd_w, cmd_x,
807 s->get_peer_addr());
808
809 dout(10) << " " << s->entity_name << " "
810 << (capable ? "" : "not ") << "capable" << dendl;
811 return capable;
812 }
813
814 /**
815 * The working data for processing an MCommand. This lives in
816 * a class to enable passing it into other threads for processing
817 * outside of the thread/locks that called handle_command.
818 */
819 class CommandContext {
820 public:
821 ceph::ref_t<MCommand> m_tell;
822 ceph::ref_t<MMgrCommand> m_mgr;
823 const std::vector<std::string>& cmd; ///< ref into m_tell or m_mgr
824 const bufferlist& data; ///< ref into m_tell or m_mgr
825 bufferlist odata;
826 cmdmap_t cmdmap;
827
828 explicit CommandContext(ceph::ref_t<MCommand> m)
829 : m_tell{std::move(m)},
830 cmd(m_tell->cmd),
831 data(m_tell->get_data()) {
832 }
833 explicit CommandContext(ceph::ref_t<MMgrCommand> m)
834 : m_mgr{std::move(m)},
835 cmd(m_mgr->cmd),
836 data(m_mgr->get_data()) {
837 }
838
839 void reply(int r, const std::stringstream &ss) {
840 reply(r, ss.str());
841 }
842
843 void reply(int r, const std::string &rs) {
844 // Let the connection drop as soon as we've sent our response
845 ConnectionRef con = m_tell ? m_tell->get_connection()
846 : m_mgr->get_connection();
847 if (con) {
848 con->mark_disposable();
849 }
850
851 if (r == 0) {
852 dout(20) << "success" << dendl;
853 } else {
854 derr << __func__ << " " << cpp_strerror(r) << " " << rs << dendl;
855 }
856 if (con) {
857 if (m_tell) {
858 MCommandReply *reply = new MCommandReply(r, rs);
859 reply->set_tid(m_tell->get_tid());
860 reply->set_data(odata);
861 con->send_message(reply);
862 } else {
863 MMgrCommandReply *reply = new MMgrCommandReply(r, rs);
864 reply->set_tid(m_mgr->get_tid());
865 reply->set_data(odata);
866 con->send_message(reply);
867 }
868 }
869 }
870 };
871
872 /**
873 * A context for receiving a bufferlist/error string from a background
874 * function and then calling back to a CommandContext when it's done
875 */
876 class ReplyOnFinish : public Context {
877 std::shared_ptr<CommandContext> cmdctx;
878
879 public:
880 bufferlist from_mon;
881 string outs;
882
883 explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_)
884 : cmdctx(cmdctx_)
885 {}
886 void finish(int r) override {
887 cmdctx->odata.claim_append(from_mon);
888 cmdctx->reply(r, outs);
889 }
890 };
891
892 bool DaemonServer::handle_command(const ref_t<MCommand>& m)
893 {
894 std::lock_guard l(lock);
895 auto cmdctx = std::make_shared<CommandContext>(m);
896 try {
897 return _handle_command(cmdctx);
898 } catch (const bad_cmd_get& e) {
899 cmdctx->reply(-EINVAL, e.what());
900 return true;
901 }
902 }
903
904 bool DaemonServer::handle_command(const ref_t<MMgrCommand>& m)
905 {
906 std::lock_guard l(lock);
907 auto cmdctx = std::make_shared<CommandContext>(m);
908 try {
909 return _handle_command(cmdctx);
910 } catch (const bad_cmd_get& e) {
911 cmdctx->reply(-EINVAL, e.what());
912 return true;
913 }
914 }
915
916 void DaemonServer::log_access_denied(
917 std::shared_ptr<CommandContext>& cmdctx,
918 MgrSession* session, std::stringstream& ss) {
919 dout(1) << " access denied" << dendl;
920 audit_clog->info() << "from='" << session->inst << "' "
921 << "entity='" << session->entity_name << "' "
922 << "cmd=" << cmdctx->cmd << ": access denied";
923 ss << "access denied: does your client key have mgr caps? "
924 "See http://docs.ceph.com/en/latest/mgr/administrator/"
925 "#client-authentication";
926 }
927
928 void DaemonServer::_check_offlines_pgs(
929 const set<int>& osds,
930 const OSDMap& osdmap,
931 const PGMap& pgmap,
932 offline_pg_report *report)
933 {
934 // reset output
935 *report = offline_pg_report();
936 report->osds = osds;
937
938 for (const auto& q : pgmap.pg_stat) {
939 set<int32_t> pg_acting; // net acting sets (with no missing if degraded)
940 bool found = false;
941 if (q.second.state == 0) {
942 report->unknown.insert(q.first);
943 continue;
944 }
945 if (q.second.state & PG_STATE_DEGRADED) {
946 for (auto& anm : q.second.avail_no_missing) {
947 if (osds.count(anm.osd)) {
948 found = true;
949 continue;
950 }
951 if (anm.osd != CRUSH_ITEM_NONE) {
952 pg_acting.insert(anm.osd);
953 }
954 }
955 } else {
956 for (auto& a : q.second.acting) {
957 if (osds.count(a)) {
958 found = true;
959 continue;
960 }
961 if (a != CRUSH_ITEM_NONE) {
962 pg_acting.insert(a);
963 }
964 }
965 }
966 if (!found) {
967 continue;
968 }
969 const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
970 bool dangerous = false;
971 if (!pi) {
972 report->bad_no_pool.insert(q.first); // pool is creating or deleting
973 dangerous = true;
974 }
975 if (!(q.second.state & PG_STATE_ACTIVE)) {
976 report->bad_already_inactive.insert(q.first);
977 dangerous = true;
978 }
979 if (pg_acting.size() < pi->min_size) {
980 report->bad_become_inactive.insert(q.first);
981 dangerous = true;
982 }
983 if (dangerous) {
984 report->not_ok.insert(q.first);
985 } else {
986 report->ok.insert(q.first);
987 if (q.second.state & PG_STATE_DEGRADED) {
988 report->ok_become_more_degraded.insert(q.first);
989 } else {
990 report->ok_become_degraded.insert(q.first);
991 }
992 }
993 }
994 dout(20) << osds << " -> " << report->ok.size() << " ok, "
995 << report->not_ok.size() << " not ok, "
996 << report->unknown.size() << " unknown"
997 << dendl;
998 }
999
1000 void DaemonServer::_maximize_ok_to_stop_set(
1001 const set<int>& orig_osds,
1002 unsigned max,
1003 const OSDMap& osdmap,
1004 const PGMap& pgmap,
1005 offline_pg_report *out_report)
1006 {
1007 dout(20) << "orig_osds " << orig_osds << " max " << max << dendl;
1008 _check_offlines_pgs(orig_osds, osdmap, pgmap, out_report);
1009 if (!out_report->ok_to_stop()) {
1010 return;
1011 }
1012 if (orig_osds.size() >= max) {
1013 // already at max
1014 return;
1015 }
1016
1017 // semi-arbitrarily start with the first osd in the set
1018 offline_pg_report report;
1019 set<int> osds = orig_osds;
1020 int parent = *osds.begin();
1021 set<int> children;
1022
1023 while (true) {
1024 // identify the next parent
1025 int r = osdmap.crush->get_immediate_parent_id(parent, &parent);
1026 if (r < 0) {
1027 return; // just go with what we have so far!
1028 }
1029
1030 // get candidate additions that are beneath this point in the tree
1031 children.clear();
1032 r = osdmap.crush->get_all_children(parent, &children);
1033 if (r < 0) {
1034 return; // just go with what we have so far!
1035 }
1036 dout(20) << " parent " << parent << " children " << children << dendl;
1037
1038 // try adding in more osds
1039 int failed = 0; // how many children we failed to add to our set
1040 for (auto o : children) {
1041 if (o >= 0 && osdmap.is_up(o) && osds.count(o) == 0) {
1042 osds.insert(o);
1043 _check_offlines_pgs(osds, osdmap, pgmap, &report);
1044 if (!report.ok_to_stop()) {
1045 osds.erase(o);
1046 ++failed;
1047 continue;
1048 }
1049 *out_report = report;
1050 if (osds.size() == max) {
1051 dout(20) << " hit max" << dendl;
1052 return; // yay, we hit the max
1053 }
1054 }
1055 }
1056
1057 if (failed) {
1058 // we hit some failures; go with what we have
1059 dout(20) << " hit some peer failures" << dendl;
1060 return;
1061 }
1062 }
1063 }
1064
1065 bool DaemonServer::_handle_command(
1066 std::shared_ptr<CommandContext>& cmdctx)
1067 {
1068 MessageRef m;
1069 bool admin_socket_cmd = false;
1070 if (cmdctx->m_tell) {
1071 m = cmdctx->m_tell;
1072 // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
1073 // command.
1074 admin_socket_cmd = (cmdctx->m_tell->fsid != uuid_d());
1075 } else {
1076 m = cmdctx->m_mgr;
1077 }
1078 auto priv = m->get_connection()->get_priv();
1079 auto session = static_cast<MgrSession*>(priv.get());
1080 if (!session) {
1081 return true;
1082 }
1083 if (session->inst.name == entity_name_t()) {
1084 session->inst.name = m->get_source();
1085 }
1086
1087 map<string,string> param_str_map;
1088 std::stringstream ss;
1089 int r = 0;
1090
1091 if (!cmdmap_from_json(cmdctx->cmd, &(cmdctx->cmdmap), ss)) {
1092 cmdctx->reply(-EINVAL, ss);
1093 return true;
1094 }
1095
1096 string prefix;
1097 cmd_getval(cmdctx->cmdmap, "prefix", prefix);
1098 dout(10) << "decoded-size=" << cmdctx->cmdmap.size() << " prefix=" << prefix << dendl;
1099
1100 boost::scoped_ptr<Formatter> f;
1101 {
1102 std::string format;
1103 if (boost::algorithm::ends_with(prefix, "_json")) {
1104 format = "json";
1105 } else {
1106 format = cmd_getval_or<string>(cmdctx->cmdmap, "format", "plain");
1107 }
1108 f.reset(Formatter::create(format));
1109 }
1110
1111 // this is just for mgr commands - admin socket commands will fall
1112 // through and use the admin socket version of
1113 // get_command_descriptions
1114 if (prefix == "get_command_descriptions" && !admin_socket_cmd) {
1115 dout(10) << "reading commands from python modules" << dendl;
1116 const auto py_commands = py_modules.get_commands();
1117
1118 int cmdnum = 0;
1119 JSONFormatter f;
1120 f.open_object_section("command_descriptions");
1121
1122 auto dump_cmd = [&cmdnum, &f, m](const MonCommand &mc){
1123 ostringstream secname;
1124 secname << "cmd" << std::setfill('0') << std::setw(3) << cmdnum;
1125 dump_cmddesc_to_json(&f, m->get_connection()->get_features(),
1126 secname.str(), mc.cmdstring, mc.helpstring,
1127 mc.module, mc.req_perms, 0);
1128 cmdnum++;
1129 };
1130
1131 for (const auto &pyc : py_commands) {
1132 dump_cmd(pyc);
1133 }
1134
1135 for (const auto &mgr_cmd : mgr_commands) {
1136 dump_cmd(mgr_cmd);
1137 }
1138
1139 f.close_section(); // command_descriptions
1140 f.flush(cmdctx->odata);
1141 cmdctx->reply(0, ss);
1142 return true;
1143 }
1144
1145 // lookup command
1146 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
1147 _generate_command_map(cmdctx->cmdmap, param_str_map);
1148
1149 bool is_allowed = false;
1150 ModuleCommand py_command;
1151 if (admin_socket_cmd) {
1152 // admin socket commands require all capabilities
1153 is_allowed = session->caps.is_allow_all();
1154 } else if (!mgr_cmd) {
1155 // Resolve the command to the name of the module that will
1156 // handle it (if the command exists)
1157 auto py_commands = py_modules.get_py_commands();
1158 for (const auto &pyc : py_commands) {
1159 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1160 if (pyc_prefix == prefix) {
1161 py_command = pyc;
1162 break;
1163 }
1164 }
1165
1166 MonCommand pyc = {"", "", "py", py_command.perm};
1167 is_allowed = _allowed_command(session, "py", py_command.module_name,
1168 prefix, cmdctx->cmdmap, param_str_map,
1169 &pyc);
1170 } else {
1171 // validate user's permissions for requested command
1172 is_allowed = _allowed_command(session, mgr_cmd->module, "",
1173 prefix, cmdctx->cmdmap, param_str_map, mgr_cmd);
1174 }
1175
1176 if (!is_allowed) {
1177 log_access_denied(cmdctx, session, ss);
1178 cmdctx->reply(-EACCES, ss);
1179 return true;
1180 }
1181
1182 audit_clog->debug()
1183 << "from='" << session->inst << "' "
1184 << "entity='" << session->entity_name << "' "
1185 << "cmd=" << cmdctx->cmd << ": dispatch";
1186
1187 if (admin_socket_cmd) {
1188 cct->get_admin_socket()->queue_tell_command(cmdctx->m_tell);
1189 return true;
1190 }
1191
1192 // ----------------
1193 // service map commands
1194 if (prefix == "service dump") {
1195 if (!f)
1196 f.reset(Formatter::create("json-pretty"));
1197 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
1198 f->dump_object("service_map", service_map);
1199 });
1200 f->flush(cmdctx->odata);
1201 cmdctx->reply(0, ss);
1202 return true;
1203 }
1204 if (prefix == "service status") {
1205 if (!f)
1206 f.reset(Formatter::create("json-pretty"));
1207 // only include state from services that are in the persisted service map
1208 f->open_object_section("service_status");
1209 for (auto& [type, service] : pending_service_map.services) {
1210 if (ServiceMap::is_normal_ceph_entity(type)) {
1211 continue;
1212 }
1213
1214 f->open_object_section(type.c_str());
1215 for (auto& q : service.daemons) {
1216 f->open_object_section(q.first.c_str());
1217 DaemonKey key{type, q.first};
1218 ceph_assert(daemon_state.exists(key));
1219 auto daemon = daemon_state.get(key);
1220 std::lock_guard l(daemon->lock);
1221 f->dump_stream("status_stamp") << daemon->service_status_stamp;
1222 f->dump_stream("last_beacon") << daemon->last_service_beacon;
1223 f->open_object_section("status");
1224 for (auto& r : daemon->service_status) {
1225 f->dump_string(r.first.c_str(), r.second);
1226 }
1227 f->close_section();
1228 f->close_section();
1229 }
1230 f->close_section();
1231 }
1232 f->close_section();
1233 f->flush(cmdctx->odata);
1234 cmdctx->reply(0, ss);
1235 return true;
1236 }
1237
1238 if (prefix == "config set") {
1239 std::string key;
1240 std::string val;
1241 cmd_getval(cmdctx->cmdmap, "key", key);
1242 cmd_getval(cmdctx->cmdmap, "value", val);
1243 r = cct->_conf.set_val(key, val, &ss);
1244 if (r == 0) {
1245 cct->_conf.apply_changes(nullptr);
1246 }
1247 cmdctx->reply(0, ss);
1248 return true;
1249 }
1250
1251 // -----------
1252 // PG commands
1253
1254 if (prefix == "pg scrub" ||
1255 prefix == "pg repair" ||
1256 prefix == "pg deep-scrub") {
1257 string scrubop = prefix.substr(3, string::npos);
1258 pg_t pgid;
1259 spg_t spgid;
1260 string pgidstr;
1261 cmd_getval(cmdctx->cmdmap, "pgid", pgidstr);
1262 if (!pgid.parse(pgidstr.c_str())) {
1263 ss << "invalid pgid '" << pgidstr << "'";
1264 cmdctx->reply(-EINVAL, ss);
1265 return true;
1266 }
1267 bool pg_exists = false;
1268 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1269 pg_exists = osdmap.pg_exists(pgid);
1270 });
1271 if (!pg_exists) {
1272 ss << "pg " << pgid << " does not exist";
1273 cmdctx->reply(-ENOENT, ss);
1274 return true;
1275 }
1276 int acting_primary = -1;
1277 epoch_t epoch;
1278 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1279 epoch = osdmap.get_epoch();
1280 osdmap.get_primary_shard(pgid, &acting_primary, &spgid);
1281 });
1282 if (acting_primary == -1) {
1283 ss << "pg " << pgid << " has no primary osd";
1284 cmdctx->reply(-EAGAIN, ss);
1285 return true;
1286 }
1287 auto p = osd_cons.find(acting_primary);
1288 if (p == osd_cons.end()) {
1289 ss << "pg " << pgid << " primary osd." << acting_primary
1290 << " is not currently connected";
1291 cmdctx->reply(-EAGAIN, ss);
1292 return true;
1293 }
1294 for (auto& con : p->second) {
1295 assert(HAVE_FEATURE(con->get_features(), SERVER_OCTOPUS));
1296 vector<spg_t> pgs = { spgid };
1297 con->send_message(new MOSDScrub2(monc->get_fsid(),
1298 epoch,
1299 pgs,
1300 scrubop == "repair",
1301 scrubop == "deep-scrub"));
1302 }
1303 ss << "instructing pg " << spgid << " on osd." << acting_primary
1304 << " to " << scrubop;
1305 cmdctx->reply(0, ss);
1306 return true;
1307 } else if (prefix == "osd scrub" ||
1308 prefix == "osd deep-scrub" ||
1309 prefix == "osd repair") {
1310 string whostr;
1311 cmd_getval(cmdctx->cmdmap, "who", whostr);
1312 vector<string> pvec;
1313 get_str_vec(prefix, pvec);
1314
1315 set<int> osds;
1316 if (whostr == "*" || whostr == "all" || whostr == "any") {
1317 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1318 for (int i = 0; i < osdmap.get_max_osd(); i++)
1319 if (osdmap.is_up(i)) {
1320 osds.insert(i);
1321 }
1322 });
1323 } else {
1324 long osd = parse_osd_id(whostr.c_str(), &ss);
1325 if (osd < 0) {
1326 ss << "invalid osd '" << whostr << "'";
1327 cmdctx->reply(-EINVAL, ss);
1328 return true;
1329 }
1330 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1331 if (osdmap.is_up(osd)) {
1332 osds.insert(osd);
1333 }
1334 });
1335 if (osds.empty()) {
1336 ss << "osd." << osd << " is not up";
1337 cmdctx->reply(-EAGAIN, ss);
1338 return true;
1339 }
1340 }
1341 set<int> sent_osds, failed_osds;
1342 for (auto osd : osds) {
1343 vector<spg_t> spgs;
1344 epoch_t epoch;
1345 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1346 epoch = osdmap.get_epoch();
1347 auto p = pgmap.pg_by_osd.find(osd);
1348 if (p != pgmap.pg_by_osd.end()) {
1349 for (auto pgid : p->second) {
1350 int primary;
1351 spg_t spg;
1352 osdmap.get_primary_shard(pgid, &primary, &spg);
1353 if (primary == osd) {
1354 spgs.push_back(spg);
1355 }
1356 }
1357 }
1358 });
1359 auto p = osd_cons.find(osd);
1360 if (p == osd_cons.end()) {
1361 failed_osds.insert(osd);
1362 } else {
1363 sent_osds.insert(osd);
1364 for (auto& con : p->second) {
1365 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
1366 con->send_message(new MOSDScrub2(monc->get_fsid(),
1367 epoch,
1368 spgs,
1369 pvec.back() == "repair",
1370 pvec.back() == "deep-scrub"));
1371 } else {
1372 con->send_message(new MOSDScrub(monc->get_fsid(),
1373 pvec.back() == "repair",
1374 pvec.back() == "deep-scrub"));
1375 }
1376 }
1377 }
1378 }
1379 if (failed_osds.size() == osds.size()) {
1380 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
1381 << " (not connected)";
1382 r = -EAGAIN;
1383 } else {
1384 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
1385 if (!failed_osds.empty()) {
1386 ss << "; osd(s) " << failed_osds << " were not connected";
1387 }
1388 r = 0;
1389 }
1390 cmdctx->reply(0, ss);
1391 return true;
1392 } else if (prefix == "osd pool scrub" ||
1393 prefix == "osd pool deep-scrub" ||
1394 prefix == "osd pool repair") {
1395 vector<string> pool_names;
1396 cmd_getval(cmdctx->cmdmap, "who", pool_names);
1397 if (pool_names.empty()) {
1398 ss << "must specify one or more pool names";
1399 cmdctx->reply(-EINVAL, ss);
1400 return true;
1401 }
1402 epoch_t epoch;
1403 map<int32_t, vector<pg_t>> pgs_by_primary; // legacy
1404 map<int32_t, vector<spg_t>> spgs_by_primary;
1405 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1406 epoch = osdmap.get_epoch();
1407 for (auto& pool_name : pool_names) {
1408 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1409 if (pool_id < 0) {
1410 ss << "unrecognized pool '" << pool_name << "'";
1411 r = -ENOENT;
1412 return;
1413 }
1414 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1415 for (int i = 0; i < pool_pg_num; i++) {
1416 pg_t pg(i, pool_id);
1417 int primary;
1418 spg_t spg;
1419 auto got = osdmap.get_primary_shard(pg, &primary, &spg);
1420 if (!got)
1421 continue;
1422 pgs_by_primary[primary].push_back(pg);
1423 spgs_by_primary[primary].push_back(spg);
1424 }
1425 }
1426 });
1427 if (r < 0) {
1428 cmdctx->reply(r, ss);
1429 return true;
1430 }
1431 for (auto& it : spgs_by_primary) {
1432 auto primary = it.first;
1433 auto p = osd_cons.find(primary);
1434 if (p == osd_cons.end()) {
1435 ss << "osd." << primary << " is not currently connected";
1436 cmdctx->reply(-EAGAIN, ss);
1437 return true;
1438 }
1439 for (auto& con : p->second) {
1440 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
1441 con->send_message(new MOSDScrub2(monc->get_fsid(),
1442 epoch,
1443 it.second,
1444 prefix == "osd pool repair",
1445 prefix == "osd pool deep-scrub"));
1446 } else {
1447 // legacy
1448 auto q = pgs_by_primary.find(primary);
1449 ceph_assert(q != pgs_by_primary.end());
1450 con->send_message(new MOSDScrub(monc->get_fsid(),
1451 q->second,
1452 prefix == "osd pool repair",
1453 prefix == "osd pool deep-scrub"));
1454 }
1455 }
1456 }
1457 cmdctx->reply(0, "");
1458 return true;
1459 } else if (prefix == "osd reweight-by-pg" ||
1460 prefix == "osd reweight-by-utilization" ||
1461 prefix == "osd test-reweight-by-pg" ||
1462 prefix == "osd test-reweight-by-utilization") {
1463 bool by_pg =
1464 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
1465 bool dry_run =
1466 prefix == "osd test-reweight-by-pg" ||
1467 prefix == "osd test-reweight-by-utilization";
1468 int64_t oload = cmd_getval_or<int64_t>(cmdctx->cmdmap, "oload", 120);
1469 set<int64_t> pools;
1470 vector<string> poolnames;
1471 cmd_getval(cmdctx->cmdmap, "pools", poolnames);
1472 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1473 for (const auto& poolname : poolnames) {
1474 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
1475 if (pool < 0) {
1476 ss << "pool '" << poolname << "' does not exist";
1477 r = -ENOENT;
1478 }
1479 pools.insert(pool);
1480 }
1481 });
1482 if (r) {
1483 cmdctx->reply(r, ss);
1484 return true;
1485 }
1486
1487 double max_change = g_conf().get_val<double>("mon_reweight_max_change");
1488 cmd_getval(cmdctx->cmdmap, "max_change", max_change);
1489 if (max_change <= 0.0) {
1490 ss << "max_change " << max_change << " must be positive";
1491 cmdctx->reply(-EINVAL, ss);
1492 return true;
1493 }
1494 int64_t max_osds = g_conf().get_val<int64_t>("mon_reweight_max_osds");
1495 cmd_getval(cmdctx->cmdmap, "max_osds", max_osds);
1496 if (max_osds <= 0) {
1497 ss << "max_osds " << max_osds << " must be positive";
1498 cmdctx->reply(-EINVAL, ss);
1499 return true;
1500 }
1501 bool no_increasing = false;
1502 cmd_getval_compat_cephbool(cmdctx->cmdmap, "no_increasing", no_increasing);
1503 string out_str;
1504 mempool::osdmap::map<int32_t, uint32_t> new_weights;
1505 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap &osdmap, const PGMap& pgmap) {
1506 return reweight::by_utilization(osdmap, pgmap,
1507 oload,
1508 max_change,
1509 max_osds,
1510 by_pg,
1511 pools.empty() ? NULL : &pools,
1512 no_increasing,
1513 &new_weights,
1514 &ss, &out_str, f.get());
1515 });
1516 if (r >= 0) {
1517 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
1518 }
1519 if (f) {
1520 f->flush(cmdctx->odata);
1521 } else {
1522 cmdctx->odata.append(out_str);
1523 }
1524 if (r < 0) {
1525 ss << "FAILED reweight-by-pg";
1526 cmdctx->reply(r, ss);
1527 return true;
1528 } else if (r == 0 || dry_run) {
1529 ss << "no change";
1530 cmdctx->reply(r, ss);
1531 return true;
1532 } else {
1533 json_spirit::Object json_object;
1534 for (const auto& osd_weight : new_weights) {
1535 json_spirit::Config::add(json_object,
1536 std::to_string(osd_weight.first),
1537 std::to_string(osd_weight.second));
1538 }
1539 string s = json_spirit::write(json_object);
1540 std::replace(begin(s), end(s), '\"', '\'');
1541 const string cmd =
1542 "{"
1543 "\"prefix\": \"osd reweightn\", "
1544 "\"weights\": \"" + s + "\""
1545 "}";
1546 auto on_finish = new ReplyOnFinish(cmdctx);
1547 monc->start_mon_command({cmd}, {},
1548 &on_finish->from_mon, &on_finish->outs, on_finish);
1549 return true;
1550 }
1551 } else if (prefix == "osd df") {
1552 string method, filter;
1553 cmd_getval(cmdctx->cmdmap, "output_method", method);
1554 cmd_getval(cmdctx->cmdmap, "filter", filter);
1555 stringstream rs;
1556 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1557 // sanity check filter(s)
1558 if (!filter.empty() &&
1559 osdmap.lookup_pg_pool_name(filter) < 0 &&
1560 !osdmap.crush->class_exists(filter) &&
1561 !osdmap.crush->name_exists(filter)) {
1562 rs << "'" << filter << "' not a pool, crush node or device class name";
1563 return -EINVAL;
1564 }
1565 print_osd_utilization(osdmap, pgmap, ss,
1566 f.get(), method == "tree", filter);
1567 cmdctx->odata.append(ss);
1568 return 0;
1569 });
1570 cmdctx->reply(r, rs);
1571 return true;
1572 } else if (prefix == "osd pool stats") {
1573 string pool_name;
1574 cmd_getval(cmdctx->cmdmap, "pool_name", pool_name);
1575 int64_t poolid = -ENOENT;
1576 bool one_pool = false;
1577 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1578 if (!pool_name.empty()) {
1579 poolid = osdmap.lookup_pg_pool_name(pool_name);
1580 if (poolid < 0) {
1581 ceph_assert(poolid == -ENOENT);
1582 ss << "unrecognized pool '" << pool_name << "'";
1583 return -ENOENT;
1584 }
1585 one_pool = true;
1586 }
1587 stringstream rs;
1588 if (f)
1589 f->open_array_section("pool_stats");
1590 else {
1591 if (osdmap.get_pools().empty()) {
1592 ss << "there are no pools!";
1593 goto stats_out;
1594 }
1595 }
1596 for (auto &p : osdmap.get_pools()) {
1597 if (!one_pool) {
1598 poolid = p.first;
1599 }
1600 pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, f.get(), &rs);
1601 if (one_pool) {
1602 break;
1603 }
1604 }
1605 stats_out:
1606 if (f) {
1607 f->close_section();
1608 f->flush(cmdctx->odata);
1609 } else {
1610 cmdctx->odata.append(rs.str());
1611 }
1612 return 0;
1613 });
1614 if (r != -EOPNOTSUPP) {
1615 cmdctx->reply(r, ss);
1616 return true;
1617 }
1618 } else if (prefix == "osd safe-to-destroy" ||
1619 prefix == "osd destroy" ||
1620 prefix == "osd purge") {
1621 set<int> osds;
1622 int r = 0;
1623 if (prefix == "osd safe-to-destroy") {
1624 vector<string> ids;
1625 cmd_getval(cmdctx->cmdmap, "ids", ids);
1626 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1627 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1628 });
1629 if (!r && osds.empty()) {
1630 ss << "must specify one or more OSDs";
1631 r = -EINVAL;
1632 }
1633 } else {
1634 int64_t id;
1635 if (!cmd_getval(cmdctx->cmdmap, "id", id)) {
1636 r = -EINVAL;
1637 ss << "must specify OSD id";
1638 } else {
1639 osds.insert(id);
1640 }
1641 }
1642 if (r < 0) {
1643 cmdctx->reply(r, ss);
1644 return true;
1645 }
1646 set<int> active_osds, missing_stats, stored_pgs, safe_to_destroy;
1647 int affected_pgs = 0;
1648 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1649 if (pg_map.num_pg_unknown > 0) {
1650 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
1651 << " any conclusions";
1652 r = -EAGAIN;
1653 return;
1654 }
1655 int num_active_clean = 0;
1656 for (auto& p : pg_map.num_pg_by_state) {
1657 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
1658 if ((p.first & want) == want) {
1659 num_active_clean += p.second;
1660 }
1661 }
1662 for (auto osd : osds) {
1663 if (!osdmap.exists(osd)) {
1664 safe_to_destroy.insert(osd);
1665 continue; // clearly safe to destroy
1666 }
1667 auto q = pg_map.num_pg_by_osd.find(osd);
1668 if (q != pg_map.num_pg_by_osd.end()) {
1669 if (q->second.acting > 0 || q->second.up_not_acting > 0) {
1670 active_osds.insert(osd);
1671 // XXX: For overlapping PGs, this counts them again
1672 affected_pgs += q->second.acting + q->second.up_not_acting;
1673 continue;
1674 }
1675 }
1676 if (num_active_clean < pg_map.num_pg) {
1677 // all pgs aren't active+clean; we need to be careful.
1678 auto p = pg_map.osd_stat.find(osd);
1679 if (p == pg_map.osd_stat.end() || !osdmap.is_up(osd)) {
1680 missing_stats.insert(osd);
1681 continue;
1682 } else if (p->second.num_pgs > 0) {
1683 stored_pgs.insert(osd);
1684 continue;
1685 }
1686 }
1687 safe_to_destroy.insert(osd);
1688 }
1689 });
1690 if (r && prefix == "osd safe-to-destroy") {
1691 cmdctx->reply(r, ss); // regardless of formatter
1692 return true;
1693 }
1694 if (!r && (!active_osds.empty() ||
1695 !missing_stats.empty() || !stored_pgs.empty())) {
1696 if (!safe_to_destroy.empty()) {
1697 ss << "OSD(s) " << safe_to_destroy
1698 << " are safe to destroy without reducing data durability. ";
1699 }
1700 if (!active_osds.empty()) {
1701 ss << "OSD(s) " << active_osds << " have " << affected_pgs
1702 << " pgs currently mapped to them. ";
1703 }
1704 if (!missing_stats.empty()) {
1705 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1706 << " PGs are active+clean; we cannot draw any conclusions. ";
1707 }
1708 if (!stored_pgs.empty()) {
1709 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1710 << " data, and not all PGs are active+clean; we cannot be sure they"
1711 << " aren't still needed.";
1712 }
1713 if (!active_osds.empty() || !stored_pgs.empty()) {
1714 r = -EBUSY;
1715 } else {
1716 r = -EAGAIN;
1717 }
1718 }
1719
1720 if (prefix == "osd safe-to-destroy") {
1721 if (!r) {
1722 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1723 << " durability.";
1724 }
1725 if (f) {
1726 f->open_object_section("osd_status");
1727 f->open_array_section("safe_to_destroy");
1728 for (auto i : safe_to_destroy)
1729 f->dump_int("osd", i);
1730 f->close_section();
1731 f->open_array_section("active");
1732 for (auto i : active_osds)
1733 f->dump_int("osd", i);
1734 f->close_section();
1735 f->open_array_section("missing_stats");
1736 for (auto i : missing_stats)
1737 f->dump_int("osd", i);
1738 f->close_section();
1739 f->open_array_section("stored_pgs");
1740 for (auto i : stored_pgs)
1741 f->dump_int("osd", i);
1742 f->close_section();
1743 f->close_section(); // osd_status
1744 f->flush(cmdctx->odata);
1745 r = 0;
1746 std::stringstream().swap(ss);
1747 }
1748 cmdctx->reply(r, ss);
1749 return true;
1750 }
1751
1752 if (r) {
1753 bool force = false;
1754 cmd_getval(cmdctx->cmdmap, "force", force);
1755 if (!force) {
1756 // Backward compat
1757 cmd_getval(cmdctx->cmdmap, "yes_i_really_mean_it", force);
1758 }
1759 if (!force) {
1760 ss << "\nYou can proceed by passing --force, but be warned that"
1761 " this will likely mean real, permanent data loss.";
1762 } else {
1763 r = 0;
1764 }
1765 }
1766 if (r) {
1767 cmdctx->reply(r, ss);
1768 return true;
1769 }
1770 const string cmd =
1771 "{"
1772 "\"prefix\": \"" + prefix + "-actual\", "
1773 "\"id\": " + stringify(osds) + ", "
1774 "\"yes_i_really_mean_it\": true"
1775 "}";
1776 auto on_finish = new ReplyOnFinish(cmdctx);
1777 monc->start_mon_command({cmd}, {}, nullptr, &on_finish->outs, on_finish);
1778 return true;
1779 } else if (prefix == "osd ok-to-stop") {
1780 vector<string> ids;
1781 cmd_getval(cmdctx->cmdmap, "ids", ids);
1782 set<int> osds;
1783 int64_t max = 1;
1784 cmd_getval(cmdctx->cmdmap, "max", max);
1785 int r;
1786 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1787 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1788 });
1789 if (!r && osds.empty()) {
1790 ss << "must specify one or more OSDs";
1791 r = -EINVAL;
1792 }
1793 if (max < (int)osds.size()) {
1794 max = osds.size();
1795 }
1796 if (r < 0) {
1797 cmdctx->reply(r, ss);
1798 return true;
1799 }
1800 offline_pg_report out_report;
1801 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1802 _maximize_ok_to_stop_set(
1803 osds, max, osdmap, pg_map,
1804 &out_report);
1805 });
1806 if (!f) {
1807 f.reset(Formatter::create("json"));
1808 }
1809 f->dump_object("ok_to_stop", out_report);
1810 f->flush(cmdctx->odata);
1811 cmdctx->odata.append("\n");
1812 if (!out_report.unknown.empty()) {
1813 ss << out_report.unknown.size() << " pgs have unknown state; "
1814 << "cannot draw any conclusions";
1815 cmdctx->reply(-EAGAIN, ss);
1816 }
1817 if (!out_report.ok_to_stop()) {
1818 ss << "unsafe to stop osd(s) at this time (" << out_report.not_ok.size() << " PGs are or would become offline)";
1819 cmdctx->reply(-EBUSY, ss);
1820 } else {
1821 cmdctx->reply(0, ss);
1822 }
1823 return true;
1824 } else if (prefix == "pg force-recovery" ||
1825 prefix == "pg force-backfill" ||
1826 prefix == "pg cancel-force-recovery" ||
1827 prefix == "pg cancel-force-backfill" ||
1828 prefix == "osd pool force-recovery" ||
1829 prefix == "osd pool force-backfill" ||
1830 prefix == "osd pool cancel-force-recovery" ||
1831 prefix == "osd pool cancel-force-backfill") {
1832 vector<string> vs;
1833 get_str_vec(prefix, vs);
1834 auto& granularity = vs.front();
1835 auto& forceop = vs.back();
1836 vector<pg_t> pgs;
1837
1838 // figure out actual op just once
1839 int actual_op = 0;
1840 if (forceop == "force-recovery") {
1841 actual_op = OFR_RECOVERY;
1842 } else if (forceop == "force-backfill") {
1843 actual_op = OFR_BACKFILL;
1844 } else if (forceop == "cancel-force-backfill") {
1845 actual_op = OFR_BACKFILL | OFR_CANCEL;
1846 } else if (forceop == "cancel-force-recovery") {
1847 actual_op = OFR_RECOVERY | OFR_CANCEL;
1848 }
1849
1850 set<pg_t> candidates; // deduped
1851 if (granularity == "pg") {
1852 // covnert pg names to pgs, discard any invalid ones while at it
1853 vector<string> pgids;
1854 cmd_getval(cmdctx->cmdmap, "pgid", pgids);
1855 for (auto& i : pgids) {
1856 pg_t pgid;
1857 if (!pgid.parse(i.c_str())) {
1858 ss << "invlaid pgid '" << i << "'; ";
1859 r = -EINVAL;
1860 continue;
1861 }
1862 candidates.insert(pgid);
1863 }
1864 } else {
1865 // per pool
1866 vector<string> pool_names;
1867 cmd_getval(cmdctx->cmdmap, "who", pool_names);
1868 if (pool_names.empty()) {
1869 ss << "must specify one or more pool names";
1870 cmdctx->reply(-EINVAL, ss);
1871 return true;
1872 }
1873 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1874 for (auto& pool_name : pool_names) {
1875 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1876 if (pool_id < 0) {
1877 ss << "unrecognized pool '" << pool_name << "'";
1878 r = -ENOENT;
1879 return;
1880 }
1881 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1882 for (int i = 0; i < pool_pg_num; i++)
1883 candidates.insert({(unsigned int)i, (uint64_t)pool_id});
1884 }
1885 });
1886 if (r < 0) {
1887 cmdctx->reply(r, ss);
1888 return true;
1889 }
1890 }
1891
1892 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1893 for (auto& i : candidates) {
1894 auto it = pg_map.pg_stat.find(i);
1895 if (it == pg_map.pg_stat.end()) {
1896 ss << "pg " << i << " does not exist; ";
1897 r = -ENOENT;
1898 continue;
1899 }
1900 auto state = it->second.state;
1901 // discard pgs for which user requests are pointless
1902 switch (actual_op) {
1903 case OFR_RECOVERY:
1904 if ((state & (PG_STATE_DEGRADED |
1905 PG_STATE_RECOVERY_WAIT |
1906 PG_STATE_RECOVERING)) == 0) {
1907 // don't return error, user script may be racing with cluster.
1908 // not fatal.
1909 ss << "pg " << i << " doesn't require recovery; ";
1910 continue;
1911 } else if (state & PG_STATE_FORCED_RECOVERY) {
1912 ss << "pg " << i << " recovery already forced; ";
1913 // return error, as it may be a bug in user script
1914 r = -EINVAL;
1915 continue;
1916 }
1917 break;
1918 case OFR_BACKFILL:
1919 if ((state & (PG_STATE_DEGRADED |
1920 PG_STATE_BACKFILL_WAIT |
1921 PG_STATE_BACKFILLING)) == 0) {
1922 ss << "pg " << i << " doesn't require backfilling; ";
1923 continue;
1924 } else if (state & PG_STATE_FORCED_BACKFILL) {
1925 ss << "pg " << i << " backfill already forced; ";
1926 r = -EINVAL;
1927 continue;
1928 }
1929 break;
1930 case OFR_BACKFILL | OFR_CANCEL:
1931 if ((state & PG_STATE_FORCED_BACKFILL) == 0) {
1932 ss << "pg " << i << " backfill not forced; ";
1933 continue;
1934 }
1935 break;
1936 case OFR_RECOVERY | OFR_CANCEL:
1937 if ((state & PG_STATE_FORCED_RECOVERY) == 0) {
1938 ss << "pg " << i << " recovery not forced; ";
1939 continue;
1940 }
1941 break;
1942 default:
1943 ceph_abort_msg("actual_op value is not supported");
1944 }
1945 pgs.push_back(i);
1946 } // for
1947 });
1948
1949 // respond with error only when no pgs are correct
1950 // yes, in case of mixed errors, only the last one will be emitted,
1951 // but the message presented will be fine
1952 if (pgs.size() != 0) {
1953 // clear error to not confuse users/scripts
1954 r = 0;
1955 }
1956
1957 // optimize the command -> messages conversion, use only one
1958 // message per distinct OSD
1959 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1960 // group pgs to process by osd
1961 map<int, vector<spg_t>> osdpgs;
1962 for (auto& pgid : pgs) {
1963 int primary;
1964 spg_t spg;
1965 if (osdmap.get_primary_shard(pgid, &primary, &spg)) {
1966 osdpgs[primary].push_back(spg);
1967 }
1968 }
1969 for (auto& i : osdpgs) {
1970 if (osdmap.is_up(i.first)) {
1971 auto p = osd_cons.find(i.first);
1972 if (p == osd_cons.end()) {
1973 ss << "osd." << i.first << " is not currently connected";
1974 r = -EAGAIN;
1975 continue;
1976 }
1977 for (auto& con : p->second) {
1978 con->send_message(
1979 new MOSDForceRecovery(monc->get_fsid(), i.second, actual_op));
1980 }
1981 ss << "instructing pg(s) " << i.second << " on osd." << i.first
1982 << " to " << forceop << "; ";
1983 }
1984 }
1985 });
1986 ss << std::endl;
1987 cmdctx->reply(r, ss);
1988 return true;
1989 } else if (prefix == "config show" ||
1990 prefix == "config show-with-defaults") {
1991 string who;
1992 cmd_getval(cmdctx->cmdmap, "who", who);
1993 auto [key, valid] = DaemonKey::parse(who);
1994 if (!valid) {
1995 ss << "invalid daemon name: use <type>.<id>";
1996 cmdctx->reply(-EINVAL, ss);
1997 return true;
1998 }
1999 DaemonStatePtr daemon = daemon_state.get(key);
2000 if (!daemon) {
2001 ss << "no config state for daemon " << who;
2002 cmdctx->reply(-ENOENT, ss);
2003 return true;
2004 }
2005
2006 std::lock_guard l(daemon->lock);
2007
2008 int r = 0;
2009 string name;
2010 if (cmd_getval(cmdctx->cmdmap, "key", name)) {
2011 // handle special options
2012 if (name == "fsid") {
2013 cmdctx->odata.append(stringify(monc->get_fsid()) + "\n");
2014 cmdctx->reply(r, ss);
2015 return true;
2016 }
2017 auto p = daemon->config.find(name);
2018 if (p != daemon->config.end() &&
2019 !p->second.empty()) {
2020 cmdctx->odata.append(p->second.rbegin()->second + "\n");
2021 } else {
2022 auto& defaults = daemon->_get_config_defaults();
2023 auto q = defaults.find(name);
2024 if (q != defaults.end()) {
2025 cmdctx->odata.append(q->second + "\n");
2026 } else {
2027 r = -ENOENT;
2028 }
2029 }
2030 } else if (daemon->config_defaults_bl.length() > 0) {
2031 TextTable tbl;
2032 if (f) {
2033 f->open_array_section("config");
2034 } else {
2035 tbl.define_column("NAME", TextTable::LEFT, TextTable::LEFT);
2036 tbl.define_column("VALUE", TextTable::LEFT, TextTable::LEFT);
2037 tbl.define_column("SOURCE", TextTable::LEFT, TextTable::LEFT);
2038 tbl.define_column("OVERRIDES", TextTable::LEFT, TextTable::LEFT);
2039 tbl.define_column("IGNORES", TextTable::LEFT, TextTable::LEFT);
2040 }
2041 if (prefix == "config show") {
2042 // show
2043 for (auto& i : daemon->config) {
2044 dout(20) << " " << i.first << " -> " << i.second << dendl;
2045 if (i.second.empty()) {
2046 continue;
2047 }
2048 if (f) {
2049 f->open_object_section("value");
2050 f->dump_string("name", i.first);
2051 f->dump_string("value", i.second.rbegin()->second);
2052 f->dump_string("source", ceph_conf_level_name(
2053 i.second.rbegin()->first));
2054 if (i.second.size() > 1) {
2055 f->open_array_section("overrides");
2056 auto j = i.second.rend();
2057 for (--j; j != i.second.rbegin(); --j) {
2058 f->open_object_section("value");
2059 f->dump_string("source", ceph_conf_level_name(j->first));
2060 f->dump_string("value", j->second);
2061 f->close_section();
2062 }
2063 f->close_section();
2064 }
2065 if (daemon->ignored_mon_config.count(i.first)) {
2066 f->dump_string("ignores", "mon");
2067 }
2068 f->close_section();
2069 } else {
2070 tbl << i.first;
2071 tbl << i.second.rbegin()->second;
2072 tbl << ceph_conf_level_name(i.second.rbegin()->first);
2073 if (i.second.size() > 1) {
2074 list<string> ov;
2075 auto j = i.second.rend();
2076 for (--j; j != i.second.rbegin(); --j) {
2077 if (j->second == i.second.rbegin()->second) {
2078 ov.push_front(string("(") + ceph_conf_level_name(j->first) +
2079 string("[") + j->second + string("]") +
2080 string(")"));
2081 } else {
2082 ov.push_front(ceph_conf_level_name(j->first) +
2083 string("[") + j->second + string("]"));
2084
2085 }
2086 }
2087 tbl << ov;
2088 } else {
2089 tbl << "";
2090 }
2091 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
2092 tbl << TextTable::endrow;
2093 }
2094 }
2095 } else {
2096 // show-with-defaults
2097 auto& defaults = daemon->_get_config_defaults();
2098 for (auto& i : defaults) {
2099 if (f) {
2100 f->open_object_section("value");
2101 f->dump_string("name", i.first);
2102 } else {
2103 tbl << i.first;
2104 }
2105 auto j = daemon->config.find(i.first);
2106 if (j != daemon->config.end() && !j->second.empty()) {
2107 // have config
2108 if (f) {
2109 f->dump_string("value", j->second.rbegin()->second);
2110 f->dump_string("source", ceph_conf_level_name(
2111 j->second.rbegin()->first));
2112 if (j->second.size() > 1) {
2113 f->open_array_section("overrides");
2114 auto k = j->second.rend();
2115 for (--k; k != j->second.rbegin(); --k) {
2116 f->open_object_section("value");
2117 f->dump_string("source", ceph_conf_level_name(k->first));
2118 f->dump_string("value", k->second);
2119 f->close_section();
2120 }
2121 f->close_section();
2122 }
2123 if (daemon->ignored_mon_config.count(i.first)) {
2124 f->dump_string("ignores", "mon");
2125 }
2126 f->close_section();
2127 } else {
2128 tbl << j->second.rbegin()->second;
2129 tbl << ceph_conf_level_name(j->second.rbegin()->first);
2130 if (j->second.size() > 1) {
2131 list<string> ov;
2132 auto k = j->second.rend();
2133 for (--k; k != j->second.rbegin(); --k) {
2134 if (k->second == j->second.rbegin()->second) {
2135 ov.push_front(string("(") + ceph_conf_level_name(k->first) +
2136 string("[") + k->second + string("]") +
2137 string(")"));
2138 } else {
2139 ov.push_front(ceph_conf_level_name(k->first) +
2140 string("[") + k->second + string("]"));
2141 }
2142 }
2143 tbl << ov;
2144 } else {
2145 tbl << "";
2146 }
2147 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
2148 tbl << TextTable::endrow;
2149 }
2150 } else {
2151 // only have default
2152 if (f) {
2153 f->dump_string("value", i.second);
2154 f->dump_string("source", ceph_conf_level_name(CONF_DEFAULT));
2155 f->close_section();
2156 } else {
2157 tbl << i.second;
2158 tbl << ceph_conf_level_name(CONF_DEFAULT);
2159 tbl << "";
2160 tbl << "";
2161 tbl << TextTable::endrow;
2162 }
2163 }
2164 }
2165 }
2166 if (f) {
2167 f->close_section();
2168 f->flush(cmdctx->odata);
2169 } else {
2170 cmdctx->odata.append(stringify(tbl));
2171 }
2172 }
2173 cmdctx->reply(r, ss);
2174 return true;
2175 } else if (prefix == "device ls") {
2176 set<string> devids;
2177 TextTable tbl;
2178 if (f) {
2179 f->open_array_section("devices");
2180 daemon_state.with_devices([&f](const DeviceState& dev) {
2181 f->dump_object("device", dev);
2182 });
2183 f->close_section();
2184 f->flush(cmdctx->odata);
2185 } else {
2186 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2187 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
2188 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
2189 tbl.define_column("WEAR", TextTable::RIGHT, TextTable::RIGHT);
2190 tbl.define_column("LIFE EXPECTANCY", TextTable::LEFT, TextTable::LEFT);
2191 auto now = ceph_clock_now();
2192 daemon_state.with_devices([&tbl, now](const DeviceState& dev) {
2193 string h;
2194 for (auto& i : dev.attachments) {
2195 if (h.size()) {
2196 h += " ";
2197 }
2198 h += std::get<0>(i) + ":" + std::get<1>(i);
2199 }
2200 string d;
2201 for (auto& i : dev.daemons) {
2202 if (d.size()) {
2203 d += " ";
2204 }
2205 d += to_string(i);
2206 }
2207 char wear_level_str[16] = {0};
2208 if (dev.wear_level >= 0) {
2209 snprintf(wear_level_str, sizeof(wear_level_str)-1, "%d%%",
2210 (int)(100.1 * dev.wear_level));
2211 }
2212 tbl << dev.devid
2213 << h
2214 << d
2215 << wear_level_str
2216 << dev.get_life_expectancy_str(now)
2217 << TextTable::endrow;
2218 });
2219 cmdctx->odata.append(stringify(tbl));
2220 }
2221 cmdctx->reply(0, ss);
2222 return true;
2223 } else if (prefix == "device ls-by-daemon") {
2224 string who;
2225 cmd_getval(cmdctx->cmdmap, "who", who);
2226 if (auto [k, valid] = DaemonKey::parse(who); !valid) {
2227 ss << who << " is not a valid daemon name";
2228 r = -EINVAL;
2229 } else {
2230 auto dm = daemon_state.get(k);
2231 if (dm) {
2232 if (f) {
2233 f->open_array_section("devices");
2234 for (auto& i : dm->devices) {
2235 daemon_state.with_device(i.first, [&f] (const DeviceState& dev) {
2236 f->dump_object("device", dev);
2237 });
2238 }
2239 f->close_section();
2240 f->flush(cmdctx->odata);
2241 } else {
2242 TextTable tbl;
2243 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2244 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
2245 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT,
2246 TextTable::LEFT);
2247 auto now = ceph_clock_now();
2248 for (auto& i : dm->devices) {
2249 daemon_state.with_device(
2250 i.first, [&tbl, now] (const DeviceState& dev) {
2251 string h;
2252 for (auto& i : dev.attachments) {
2253 if (h.size()) {
2254 h += " ";
2255 }
2256 h += std::get<0>(i) + ":" + std::get<1>(i);
2257 }
2258 tbl << dev.devid
2259 << h
2260 << dev.get_life_expectancy_str(now)
2261 << TextTable::endrow;
2262 });
2263 }
2264 cmdctx->odata.append(stringify(tbl));
2265 }
2266 } else {
2267 r = -ENOENT;
2268 ss << "daemon " << who << " not found";
2269 }
2270 cmdctx->reply(r, ss);
2271 }
2272 } else if (prefix == "device ls-by-host") {
2273 string host;
2274 cmd_getval(cmdctx->cmdmap, "host", host);
2275 set<string> devids;
2276 daemon_state.list_devids_by_server(host, &devids);
2277 if (f) {
2278 f->open_array_section("devices");
2279 for (auto& devid : devids) {
2280 daemon_state.with_device(
2281 devid, [&f] (const DeviceState& dev) {
2282 f->dump_object("device", dev);
2283 });
2284 }
2285 f->close_section();
2286 f->flush(cmdctx->odata);
2287 } else {
2288 TextTable tbl;
2289 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2290 tbl.define_column("DEV", TextTable::LEFT, TextTable::LEFT);
2291 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
2292 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT, TextTable::LEFT);
2293 auto now = ceph_clock_now();
2294 for (auto& devid : devids) {
2295 daemon_state.with_device(
2296 devid, [&tbl, &host, now] (const DeviceState& dev) {
2297 string n;
2298 for (auto& j : dev.attachments) {
2299 if (std::get<0>(j) == host) {
2300 if (n.size()) {
2301 n += " ";
2302 }
2303 n += std::get<1>(j);
2304 }
2305 }
2306 string d;
2307 for (auto& i : dev.daemons) {
2308 if (d.size()) {
2309 d += " ";
2310 }
2311 d += to_string(i);
2312 }
2313 tbl << dev.devid
2314 << n
2315 << d
2316 << dev.get_life_expectancy_str(now)
2317 << TextTable::endrow;
2318 });
2319 }
2320 cmdctx->odata.append(stringify(tbl));
2321 }
2322 cmdctx->reply(0, ss);
2323 return true;
2324 } else if (prefix == "device info") {
2325 string devid;
2326 cmd_getval(cmdctx->cmdmap, "devid", devid);
2327 int r = 0;
2328 ostringstream rs;
2329 if (!daemon_state.with_device(devid,
2330 [&f, &rs] (const DeviceState& dev) {
2331 if (f) {
2332 f->dump_object("device", dev);
2333 } else {
2334 dev.print(rs);
2335 }
2336 })) {
2337 ss << "device " << devid << " not found";
2338 r = -ENOENT;
2339 } else {
2340 if (f) {
2341 f->flush(cmdctx->odata);
2342 } else {
2343 cmdctx->odata.append(rs.str());
2344 }
2345 }
2346 cmdctx->reply(r, ss);
2347 return true;
2348 } else if (prefix == "device set-life-expectancy") {
2349 string devid;
2350 cmd_getval(cmdctx->cmdmap, "devid", devid);
2351 string from_str, to_str;
2352 cmd_getval(cmdctx->cmdmap, "from", from_str);
2353 cmd_getval(cmdctx->cmdmap, "to", to_str);
2354 utime_t from, to;
2355 if (!from.parse(from_str)) {
2356 ss << "unable to parse datetime '" << from_str << "'";
2357 r = -EINVAL;
2358 cmdctx->reply(r, ss);
2359 } else if (to_str.size() && !to.parse(to_str)) {
2360 ss << "unable to parse datetime '" << to_str << "'";
2361 r = -EINVAL;
2362 cmdctx->reply(r, ss);
2363 } else {
2364 map<string,string> meta;
2365 daemon_state.with_device_create(
2366 devid,
2367 [from, to, &meta] (DeviceState& dev) {
2368 dev.set_life_expectancy(from, to, ceph_clock_now());
2369 meta = dev.metadata;
2370 });
2371 json_spirit::Object json_object;
2372 for (auto& i : meta) {
2373 json_spirit::Config::add(json_object, i.first, i.second);
2374 }
2375 bufferlist json;
2376 json.append(json_spirit::write(json_object));
2377 const string cmd =
2378 "{"
2379 "\"prefix\": \"config-key set\", "
2380 "\"key\": \"device/" + devid + "\""
2381 "}";
2382 auto on_finish = new ReplyOnFinish(cmdctx);
2383 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2384 }
2385 return true;
2386 } else if (prefix == "device rm-life-expectancy") {
2387 string devid;
2388 cmd_getval(cmdctx->cmdmap, "devid", devid);
2389 map<string,string> meta;
2390 if (daemon_state.with_device_write(devid, [&meta] (DeviceState& dev) {
2391 dev.rm_life_expectancy();
2392 meta = dev.metadata;
2393 })) {
2394 string cmd;
2395 bufferlist json;
2396 if (meta.empty()) {
2397 cmd =
2398 "{"
2399 "\"prefix\": \"config-key rm\", "
2400 "\"key\": \"device/" + devid + "\""
2401 "}";
2402 } else {
2403 json_spirit::Object json_object;
2404 for (auto& i : meta) {
2405 json_spirit::Config::add(json_object, i.first, i.second);
2406 }
2407 json.append(json_spirit::write(json_object));
2408 cmd =
2409 "{"
2410 "\"prefix\": \"config-key set\", "
2411 "\"key\": \"device/" + devid + "\""
2412 "}";
2413 }
2414 auto on_finish = new ReplyOnFinish(cmdctx);
2415 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2416 } else {
2417 cmdctx->reply(0, ss);
2418 }
2419 return true;
2420 } else {
2421 if (!pgmap_ready) {
2422 ss << "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2423 }
2424 if (f) {
2425 f->open_object_section("pg_info");
2426 f->dump_bool("pg_ready", pgmap_ready);
2427 }
2428
2429 // fall back to feeding command to PGMap
2430 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2431 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
2432 f.get(), &ss, &cmdctx->odata);
2433 });
2434
2435 if (f) {
2436 f->close_section();
2437 }
2438 if (r != -EOPNOTSUPP) {
2439 if (f) {
2440 f->flush(cmdctx->odata);
2441 }
2442 cmdctx->reply(r, ss);
2443 return true;
2444 }
2445 }
2446
2447 // Was the command unfound?
2448 if (py_command.cmdstring.empty()) {
2449 ss << "No handler found for '" << prefix << "'";
2450 dout(4) << "No handler found for '" << prefix << "'" << dendl;
2451 cmdctx->reply(-EINVAL, ss);
2452 return true;
2453 }
2454
2455 dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
2456 finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]
2457 (int r_) mutable {
2458 std::stringstream ss;
2459
2460 dout(10) << "dispatching command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
2461
2462 // Validate that the module is enabled
2463 auto& py_handler_name = py_command.module_name;
2464 PyModuleRef module = py_modules.get_module(py_handler_name);
2465 ceph_assert(module);
2466 if (!module->is_enabled()) {
2467 ss << "Module '" << py_handler_name << "' is not enabled (required by "
2468 "command '" << prefix << "'): use `ceph mgr module enable "
2469 << py_handler_name << "` to enable it";
2470 dout(4) << ss.str() << dendl;
2471 cmdctx->reply(-EOPNOTSUPP, ss);
2472 return;
2473 }
2474
2475 // Hack: allow the self-test method to run on unhealthy modules.
2476 // Fix this in future by creating a special path for self test rather
2477 // than having the hook be a normal module command.
2478 std::string self_test_prefix = py_handler_name + " " + "self-test";
2479
2480 // Validate that the module is healthy
2481 bool accept_command;
2482 if (module->is_loaded()) {
2483 if (module->get_can_run() && !module->is_failed()) {
2484 // Healthy module
2485 accept_command = true;
2486 } else if (self_test_prefix == prefix) {
2487 // Unhealthy, but allow because it's a self test command
2488 accept_command = true;
2489 } else {
2490 accept_command = false;
2491 ss << "Module '" << py_handler_name << "' has experienced an error and "
2492 "cannot handle commands: " << module->get_error_string();
2493 }
2494 } else {
2495 // Module not loaded
2496 accept_command = false;
2497 ss << "Module '" << py_handler_name << "' failed to load and "
2498 "cannot handle commands: " << module->get_error_string();
2499 }
2500
2501 if (!accept_command) {
2502 dout(4) << ss.str() << dendl;
2503 cmdctx->reply(-EIO, ss);
2504 return;
2505 }
2506
2507 std::stringstream ds;
2508 bufferlist inbl = cmdctx->data;
2509 int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
2510 inbl, &ds, &ss);
2511 if (r == -EACCES) {
2512 log_access_denied(cmdctx, session, ss);
2513 }
2514
2515 cmdctx->odata.append(ds);
2516 cmdctx->reply(r, ss);
2517 dout(10) << " command returned " << r << dendl;
2518 }));
2519 return true;
2520 }
2521
2522 void DaemonServer::_prune_pending_service_map()
2523 {
2524 utime_t cutoff = ceph_clock_now();
2525 cutoff -= g_conf().get_val<double>("mgr_service_beacon_grace");
2526 auto p = pending_service_map.services.begin();
2527 while (p != pending_service_map.services.end()) {
2528 auto q = p->second.daemons.begin();
2529 while (q != p->second.daemons.end()) {
2530 DaemonKey key{p->first, q->first};
2531 if (!daemon_state.exists(key)) {
2532 if (ServiceMap::is_normal_ceph_entity(p->first)) {
2533 dout(10) << "daemon " << key << " in service map but not in daemon state "
2534 << "index -- force pruning" << dendl;
2535 q = p->second.daemons.erase(q);
2536 pending_service_map_dirty = pending_service_map.epoch;
2537 } else {
2538 derr << "missing key " << key << dendl;
2539 ++q;
2540 }
2541
2542 continue;
2543 }
2544
2545 auto daemon = daemon_state.get(key);
2546 std::lock_guard l(daemon->lock);
2547 if (daemon->last_service_beacon == utime_t()) {
2548 // we must have just restarted; assume they are alive now.
2549 daemon->last_service_beacon = ceph_clock_now();
2550 ++q;
2551 continue;
2552 }
2553 if (daemon->last_service_beacon < cutoff) {
2554 dout(10) << "pruning stale " << p->first << "." << q->first
2555 << " last_beacon " << daemon->last_service_beacon << dendl;
2556 q = p->second.daemons.erase(q);
2557 pending_service_map_dirty = pending_service_map.epoch;
2558 } else {
2559 ++q;
2560 }
2561 }
2562 if (p->second.daemons.empty()) {
2563 p = pending_service_map.services.erase(p);
2564 pending_service_map_dirty = pending_service_map.epoch;
2565 } else {
2566 ++p;
2567 }
2568 }
2569 }
2570
2571 void DaemonServer::send_report()
2572 {
2573 if (!pgmap_ready) {
2574 if (ceph_clock_now() - started_at > g_conf().get_val<int64_t>("mgr_stats_period") * 4.0) {
2575 pgmap_ready = true;
2576 reported_osds.clear();
2577 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2578 << "potentially incomplete PG state to mon" << dendl;
2579 } else {
2580 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2581 << dendl;
2582 return;
2583 }
2584 }
2585
2586 auto m = ceph::make_message<MMonMgrReport>();
2587 m->gid = monc->get_global_id();
2588 py_modules.get_health_checks(&m->health_checks);
2589 py_modules.get_progress_events(&m->progress_events);
2590
2591 cluster_state.with_mutable_pgmap([&](PGMap& pg_map) {
2592 cluster_state.update_delta_stats();
2593
2594 if (pending_service_map.epoch) {
2595 _prune_pending_service_map();
2596 if (pending_service_map_dirty >= pending_service_map.epoch) {
2597 pending_service_map.modified = ceph_clock_now();
2598 encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
2599 dout(10) << "sending service_map e" << pending_service_map.epoch
2600 << dendl;
2601 pending_service_map.epoch++;
2602 }
2603 }
2604
2605 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
2606 // FIXME: no easy way to get mon features here. this will do for
2607 // now, though, as long as we don't make a backward-incompat change.
2608 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
2609 dout(10) << pg_map << dendl;
2610
2611 pg_map.get_health_checks(g_ceph_context, osdmap,
2612 &m->health_checks);
2613
2614 dout(10) << m->health_checks.checks.size() << " health checks"
2615 << dendl;
2616 dout(20) << "health checks:\n";
2617 JSONFormatter jf(true);
2618 jf.dump_object("health_checks", m->health_checks);
2619 jf.flush(*_dout);
2620 *_dout << dendl;
2621 if (osdmap.require_osd_release >= ceph_release_t::luminous) {
2622 clog->debug() << "pgmap v" << pg_map.version << ": " << pg_map;
2623 }
2624 });
2625 });
2626
2627 map<daemon_metric, unique_ptr<DaemonHealthMetricCollector>> accumulated;
2628 for (auto service : {"osd", "mon"} ) {
2629 auto daemons = daemon_state.get_by_service(service);
2630 for (const auto& [key,state] : daemons) {
2631 std::lock_guard l{state->lock};
2632 for (const auto& metric : state->daemon_health_metrics) {
2633 auto acc = accumulated.find(metric.get_type());
2634 if (acc == accumulated.end()) {
2635 auto collector = DaemonHealthMetricCollector::create(metric.get_type());
2636 if (!collector) {
2637 derr << __func__ << " " << key
2638 << " sent me an unknown health metric: "
2639 << std::hex << static_cast<uint8_t>(metric.get_type())
2640 << std::dec << dendl;
2641 continue;
2642 }
2643 dout(20) << " + " << state->key << " "
2644 << metric << dendl;
2645 tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
2646 std::move(collector));
2647 }
2648 acc->second->update(key, metric);
2649 }
2650 }
2651 }
2652 for (const auto& acc : accumulated) {
2653 acc.second->summarize(m->health_checks);
2654 }
2655 // TODO? We currently do not notify the PyModules
2656 // TODO: respect needs_send, so we send the report only if we are asked to do
2657 // so, or the state is updated.
2658 monc->send_mon_message(std::move(m));
2659 }
2660
2661 void DaemonServer::adjust_pgs()
2662 {
2663 dout(20) << dendl;
2664 unsigned max = std::max<int64_t>(1, g_conf()->mon_osd_max_creating_pgs);
2665 double max_misplaced = g_conf().get_val<double>("target_max_misplaced_ratio");
2666 bool aggro = g_conf().get_val<bool>("mgr_debug_aggressive_pg_num_changes");
2667
2668 map<string,unsigned> pg_num_to_set;
2669 map<string,unsigned> pgp_num_to_set;
2670 set<pg_t> upmaps_to_clear;
2671 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2672 unsigned creating_or_unknown = 0;
2673 for (auto& i : pg_map.num_pg_by_state) {
2674 if ((i.first & (PG_STATE_CREATING)) ||
2675 i.first == 0) {
2676 creating_or_unknown += i.second;
2677 }
2678 }
2679 unsigned left = max;
2680 if (creating_or_unknown >= max) {
2681 return;
2682 }
2683 left -= creating_or_unknown;
2684 dout(10) << "creating_or_unknown " << creating_or_unknown
2685 << " max_creating " << max
2686 << " left " << left
2687 << dendl;
2688
2689 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2690 // can run more frequently than we get updated pg stats from OSDs. We
2691 // may make multiple adjustments with stale informaiton.
2692 double misplaced_ratio, degraded_ratio;
2693 double inactive_pgs_ratio, unknown_pgs_ratio;
2694 pg_map.get_recovery_stats(&misplaced_ratio, &degraded_ratio,
2695 &inactive_pgs_ratio, &unknown_pgs_ratio);
2696 dout(20) << "misplaced_ratio " << misplaced_ratio
2697 << " degraded_ratio " << degraded_ratio
2698 << " inactive_pgs_ratio " << inactive_pgs_ratio
2699 << " unknown_pgs_ratio " << unknown_pgs_ratio
2700 << "; target_max_misplaced_ratio " << max_misplaced
2701 << dendl;
2702
2703 for (auto& i : osdmap.get_pools()) {
2704 const pg_pool_t& p = i.second;
2705
2706 // adjust pg_num?
2707 if (p.get_pg_num_target() != p.get_pg_num()) {
2708 dout(20) << "pool " << i.first
2709 << " pg_num " << p.get_pg_num()
2710 << " target " << p.get_pg_num_target()
2711 << dendl;
2712 if (p.has_flag(pg_pool_t::FLAG_CREATING)) {
2713 dout(10) << "pool " << i.first
2714 << " pg_num_target " << p.get_pg_num_target()
2715 << " pg_num " << p.get_pg_num()
2716 << " - still creating initial pgs"
2717 << dendl;
2718 } else if (p.get_pg_num_target() < p.get_pg_num()) {
2719 // pg_num decrease (merge)
2720 pg_t merge_source(p.get_pg_num() - 1, i.first);
2721 pg_t merge_target = merge_source.get_parent();
2722 bool ok = true;
2723
2724 if (p.get_pg_num() != p.get_pg_num_pending()) {
2725 dout(10) << "pool " << i.first
2726 << " pg_num_target " << p.get_pg_num_target()
2727 << " pg_num " << p.get_pg_num()
2728 << " - decrease and pg_num_pending != pg_num, waiting"
2729 << dendl;
2730 ok = false;
2731 } else if (p.get_pg_num() == p.get_pgp_num()) {
2732 dout(10) << "pool " << i.first
2733 << " pg_num_target " << p.get_pg_num_target()
2734 << " pg_num " << p.get_pg_num()
2735 << " - decrease blocked by pgp_num "
2736 << p.get_pgp_num()
2737 << dendl;
2738 ok = false;
2739 }
2740 vector<int32_t> source_acting;
2741 for (auto &merge_participant : {merge_source, merge_target}) {
2742 bool is_merge_source = merge_participant == merge_source;
2743 if (osdmap.have_pg_upmaps(merge_participant)) {
2744 dout(10) << "pool " << i.first
2745 << " pg_num_target " << p.get_pg_num_target()
2746 << " pg_num " << p.get_pg_num()
2747 << (is_merge_source ? " - merge source " : " - merge target ")
2748 << merge_participant
2749 << " has upmap" << dendl;
2750 upmaps_to_clear.insert(merge_participant);
2751 ok = false;
2752 }
2753 auto q = pg_map.pg_stat.find(merge_participant);
2754 if (q == pg_map.pg_stat.end()) {
2755 dout(10) << "pool " << i.first
2756 << " pg_num_target " << p.get_pg_num_target()
2757 << " pg_num " << p.get_pg_num()
2758 << " - no state for " << merge_participant
2759 << (is_merge_source ? " (merge source)" : " (merge target)")
2760 << dendl;
2761 ok = false;
2762 } else if ((q->second.state & (PG_STATE_ACTIVE | PG_STATE_CLEAN)) !=
2763 (PG_STATE_ACTIVE | PG_STATE_CLEAN)) {
2764 dout(10) << "pool " << i.first
2765 << " pg_num_target " << p.get_pg_num_target()
2766 << " pg_num " << p.get_pg_num()
2767 << (is_merge_source ? " - merge source " : " - merge target ")
2768 << merge_participant
2769 << " not clean (" << pg_state_string(q->second.state)
2770 << ")" << dendl;
2771 ok = false;
2772 }
2773 if (is_merge_source) {
2774 source_acting = q->second.acting;
2775 } else if (ok && q->second.acting != source_acting) {
2776 dout(10) << "pool " << i.first
2777 << " pg_num_target " << p.get_pg_num_target()
2778 << " pg_num " << p.get_pg_num()
2779 << (is_merge_source ? " - merge source " : " - merge target ")
2780 << merge_participant
2781 << " acting does not match (source " << source_acting
2782 << " != target " << q->second.acting
2783 << ")" << dendl;
2784 ok = false;
2785 }
2786 }
2787
2788 if (ok) {
2789 unsigned target = p.get_pg_num() - 1;
2790 dout(10) << "pool " << i.first
2791 << " pg_num_target " << p.get_pg_num_target()
2792 << " pg_num " << p.get_pg_num()
2793 << " -> " << target
2794 << " (merging " << merge_source
2795 << " and " << merge_target
2796 << ")" << dendl;
2797 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2798 continue;
2799 }
2800 } else if (p.get_pg_num_target() > p.get_pg_num()) {
2801 // pg_num increase (split)
2802 bool active = true;
2803 auto q = pg_map.num_pg_by_pool_state.find(i.first);
2804 if (q != pg_map.num_pg_by_pool_state.end()) {
2805 for (auto& j : q->second) {
2806 if ((j.first & (PG_STATE_ACTIVE|PG_STATE_PEERED)) == 0) {
2807 dout(20) << "pool " << i.first << " has " << j.second
2808 << " pgs in " << pg_state_string(j.first)
2809 << dendl;
2810 active = false;
2811 break;
2812 }
2813 }
2814 } else {
2815 active = false;
2816 }
2817 unsigned pg_gap = p.get_pg_num() - p.get_pgp_num();
2818 unsigned max_jump = cct->_conf->mgr_max_pg_num_change;
2819 if (!active) {
2820 dout(10) << "pool " << i.first
2821 << " pg_num_target " << p.get_pg_num_target()
2822 << " pg_num " << p.get_pg_num()
2823 << " - not all pgs active"
2824 << dendl;
2825 } else if (pg_gap >= max_jump) {
2826 dout(10) << "pool " << i.first
2827 << " pg_num " << p.get_pg_num()
2828 << " - pgp_num " << p.get_pgp_num()
2829 << " gap >= max_pg_num_change " << max_jump
2830 << " - must scale pgp_num first"
2831 << dendl;
2832 } else {
2833 unsigned add = std::min(
2834 std::min(left, max_jump - pg_gap),
2835 p.get_pg_num_target() - p.get_pg_num());
2836 unsigned target = p.get_pg_num() + add;
2837 left -= add;
2838 dout(10) << "pool " << i.first
2839 << " pg_num_target " << p.get_pg_num_target()
2840 << " pg_num " << p.get_pg_num()
2841 << " -> " << target << dendl;
2842 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2843 }
2844 }
2845 }
2846
2847 // adjust pgp_num?
2848 unsigned target = std::min(p.get_pg_num_pending(),
2849 p.get_pgp_num_target());
2850 if (target != p.get_pgp_num()) {
2851 dout(20) << "pool " << i.first
2852 << " pgp_num_target " << p.get_pgp_num_target()
2853 << " pgp_num " << p.get_pgp_num()
2854 << " -> " << target << dendl;
2855 if (target > p.get_pgp_num() &&
2856 p.get_pgp_num() == p.get_pg_num()) {
2857 dout(10) << "pool " << i.first
2858 << " pgp_num_target " << p.get_pgp_num_target()
2859 << " pgp_num " << p.get_pgp_num()
2860 << " - increase blocked by pg_num " << p.get_pg_num()
2861 << dendl;
2862 } else if (!aggro && (inactive_pgs_ratio > 0 ||
2863 degraded_ratio > 0 ||
2864 unknown_pgs_ratio > 0)) {
2865 dout(10) << "pool " << i.first
2866 << " pgp_num_target " << p.get_pgp_num_target()
2867 << " pgp_num " << p.get_pgp_num()
2868 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2869 << " update" << dendl;
2870 } else if (!aggro && (misplaced_ratio > max_misplaced)) {
2871 dout(10) << "pool " << i.first
2872 << " pgp_num_target " << p.get_pgp_num_target()
2873 << " pgp_num " << p.get_pgp_num()
2874 << " - misplaced_ratio " << misplaced_ratio
2875 << " > max " << max_misplaced
2876 << ", deferring pgp_num update" << dendl;
2877 } else {
2878 // NOTE: this calculation assumes objects are
2879 // basically uniformly distributed across all PGs
2880 // (regardless of pool), which is probably not
2881 // perfectly correct, but it's a start. make no
2882 // single adjustment that's more than half of the
2883 // max_misplaced, to somewhat limit the magnitude of
2884 // our potential error here.
2885 unsigned next;
2886 static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP = 1;
2887 pool_stat_t s = pg_map.get_pg_pool_sum_stat(i.first);
2888 if (aggro ||
2889 // pool is (virtually) empty; just jump to final pgp_num?
2890 (p.get_pgp_num_target() > p.get_pgp_num() &&
2891 s.stats.sum.num_objects <= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP *
2892 p.get_pgp_num_target()))) {
2893 next = target;
2894 } else {
2895 double room =
2896 std::min<double>(max_misplaced - misplaced_ratio,
2897 max_misplaced / 2.0);
2898 unsigned estmax = std::max<unsigned>(
2899 (double)p.get_pg_num() * room, 1u);
2900 unsigned next_min = 0;
2901 if (p.get_pgp_num() > estmax) {
2902 next_min = p.get_pgp_num() - estmax;
2903 }
2904 next = std::clamp(target,
2905 next_min,
2906 p.get_pgp_num() + estmax);
2907 dout(20) << " room " << room << " estmax " << estmax
2908 << " delta " << (target-p.get_pgp_num())
2909 << " next " << next << dendl;
2910 if (p.get_pgp_num_target() == p.get_pg_num_target() &&
2911 p.get_pgp_num_target() < p.get_pg_num()) {
2912 // since pgp_num is tracking pg_num, ceph is handling
2913 // pgp_num. so, be responsible: don't let pgp_num get
2914 // too far out ahead of merges (if we are merging).
2915 // this avoids moving lots of unmerged pgs onto a
2916 // small number of OSDs where we might blow out the
2917 // per-osd pg max.
2918 unsigned max_outpace_merges =
2919 std::max<unsigned>(8, p.get_pg_num() * max_misplaced);
2920 if (next + max_outpace_merges < p.get_pg_num()) {
2921 next = p.get_pg_num() - max_outpace_merges;
2922 dout(10) << " using next " << next
2923 << " to avoid outpacing merges (max_outpace_merges "
2924 << max_outpace_merges << ")" << dendl;
2925 }
2926 }
2927 }
2928 if (next != p.get_pgp_num()) {
2929 dout(10) << "pool " << i.first
2930 << " pgp_num_target " << p.get_pgp_num_target()
2931 << " pgp_num " << p.get_pgp_num()
2932 << " -> " << next << dendl;
2933 pgp_num_to_set[osdmap.get_pool_name(i.first)] = next;
2934 }
2935 }
2936 }
2937 if (left == 0) {
2938 return;
2939 }
2940 }
2941 });
2942 for (auto i : pg_num_to_set) {
2943 const string cmd =
2944 "{"
2945 "\"prefix\": \"osd pool set\", "
2946 "\"pool\": \"" + i.first + "\", "
2947 "\"var\": \"pg_num_actual\", "
2948 "\"val\": \"" + stringify(i.second) + "\""
2949 "}";
2950 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2951 }
2952 for (auto i : pgp_num_to_set) {
2953 const string cmd =
2954 "{"
2955 "\"prefix\": \"osd pool set\", "
2956 "\"pool\": \"" + i.first + "\", "
2957 "\"var\": \"pgp_num_actual\", "
2958 "\"val\": \"" + stringify(i.second) + "\""
2959 "}";
2960 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2961 }
2962 for (auto pg : upmaps_to_clear) {
2963 const string cmd =
2964 "{"
2965 "\"prefix\": \"osd rm-pg-upmap\", "
2966 "\"pgid\": \"" + stringify(pg) + "\""
2967 "}";
2968 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2969 const string cmd2 =
2970 "{"
2971 "\"prefix\": \"osd rm-pg-upmap-items\", "
2972 "\"pgid\": \"" + stringify(pg) + "\"" +
2973 "}";
2974 monc->start_mon_command({cmd2}, {}, nullptr, nullptr, nullptr);
2975 }
2976 }
2977
2978 void DaemonServer::got_service_map()
2979 {
2980 std::lock_guard l(lock);
2981
2982 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
2983 if (pending_service_map.epoch == 0) {
2984 // we just started up
2985 dout(10) << "got initial map e" << service_map.epoch << dendl;
2986 ceph_assert(pending_service_map_dirty == 0);
2987 pending_service_map = service_map;
2988 pending_service_map.epoch = service_map.epoch + 1;
2989 } else if (pending_service_map.epoch <= service_map.epoch) {
2990 // we just started up but got one more not our own map
2991 dout(10) << "got newer initial map e" << service_map.epoch << dendl;
2992 ceph_assert(pending_service_map_dirty == 0);
2993 pending_service_map = service_map;
2994 pending_service_map.epoch = service_map.epoch + 1;
2995 } else {
2996 // we already active and therefore must have persisted it,
2997 // which means ours is the same or newer.
2998 dout(10) << "got updated map e" << service_map.epoch << dendl;
2999 }
3000 });
3001
3002 // cull missing daemons, populate new ones
3003 std::set<std::string> types;
3004 for (auto& [type, service] : pending_service_map.services) {
3005 if (ServiceMap::is_normal_ceph_entity(type)) {
3006 continue;
3007 }
3008
3009 types.insert(type);
3010
3011 std::set<std::string> names;
3012 for (auto& q : service.daemons) {
3013 names.insert(q.first);
3014 DaemonKey key{type, q.first};
3015 if (!daemon_state.exists(key)) {
3016 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
3017 daemon->key = key;
3018 daemon->set_metadata(q.second.metadata);
3019 daemon->service_daemon = true;
3020 daemon_state.insert(daemon);
3021 dout(10) << "added missing " << key << dendl;
3022 }
3023 }
3024 daemon_state.cull(type, names);
3025 }
3026 daemon_state.cull_services(types);
3027 }
3028
3029 void DaemonServer::got_mgr_map()
3030 {
3031 std::lock_guard l(lock);
3032 set<std::string> have;
3033 cluster_state.with_mgrmap([&](const MgrMap& mgrmap) {
3034 auto md_update = [&] (DaemonKey key) {
3035 std::ostringstream oss;
3036 auto c = new MetadataUpdate(daemon_state, key);
3037 // FIXME remove post-nautilus: include 'id' for luminous mons
3038 oss << "{\"prefix\": \"mgr metadata\", \"who\": \""
3039 << key.name << "\", \"id\": \"" << key.name << "\"}";
3040 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
3041 };
3042 if (mgrmap.active_name.size()) {
3043 DaemonKey key{"mgr", mgrmap.active_name};
3044 have.insert(mgrmap.active_name);
3045 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
3046 md_update(key);
3047 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
3048 }
3049 }
3050 for (auto& i : mgrmap.standbys) {
3051 DaemonKey key{"mgr", i.second.name};
3052 have.insert(i.second.name);
3053 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
3054 md_update(key);
3055 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
3056 }
3057 }
3058 });
3059 daemon_state.cull("mgr", have);
3060 }
3061
3062 const char** DaemonServer::get_tracked_conf_keys() const
3063 {
3064 static const char *KEYS[] = {
3065 "mgr_stats_threshold",
3066 "mgr_stats_period",
3067 nullptr
3068 };
3069
3070 return KEYS;
3071 }
3072
3073 void DaemonServer::handle_conf_change(const ConfigProxy& conf,
3074 const std::set <std::string> &changed)
3075 {
3076
3077 if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
3078 dout(4) << "Updating stats threshold/period on "
3079 << daemon_connections.size() << " clients" << dendl;
3080 // Send a fresh MMgrConfigure to all clients, so that they can follow
3081 // the new policy for transmitting stats
3082 finisher.queue(new LambdaContext([this](int r) {
3083 std::lock_guard l(lock);
3084 for (auto &c : daemon_connections) {
3085 _send_configure(c);
3086 }
3087 }));
3088 }
3089 }
3090
3091 void DaemonServer::_send_configure(ConnectionRef c)
3092 {
3093 ceph_assert(ceph_mutex_is_locked_by_me(lock));
3094
3095 auto configure = make_message<MMgrConfigure>();
3096 configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
3097 configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
3098
3099 if (c->peer_is_osd()) {
3100 configure->osd_perf_metric_queries =
3101 osd_perf_metric_collector.get_queries();
3102 } else if (c->peer_is_mds()) {
3103 configure->metric_config_message =
3104 MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector.get_queries()));
3105 }
3106
3107 c->send_message2(configure);
3108 }
3109
3110 MetricQueryID DaemonServer::add_osd_perf_query(
3111 const OSDPerfMetricQuery &query,
3112 const std::optional<OSDPerfMetricLimit> &limit)
3113 {
3114 return osd_perf_metric_collector.add_query(query, limit);
3115 }
3116
3117 int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
3118 {
3119 return osd_perf_metric_collector.remove_query(query_id);
3120 }
3121
3122 int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector)
3123 {
3124 return osd_perf_metric_collector.get_counters(collector);
3125 }
3126
3127 MetricQueryID DaemonServer::add_mds_perf_query(
3128 const MDSPerfMetricQuery &query,
3129 const std::optional<MDSPerfMetricLimit> &limit)
3130 {
3131 return mds_perf_metric_collector.add_query(query, limit);
3132 }
3133
3134 int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
3135 {
3136 return mds_perf_metric_collector.remove_query(query_id);
3137 }
3138
3139 void DaemonServer::reregister_mds_perf_queries()
3140 {
3141 mds_perf_metric_collector.reregister_queries();
3142 }
3143
3144 int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
3145 {
3146 return mds_perf_metric_collector.get_counters(collector);
3147 }