]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/DaemonServer.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "DaemonServer.h"
15 #include "mgr/Mgr.h"
16
17 #include "include/stringify.h"
18 #include "include/str_list.h"
19 #include "auth/RotatingKeyRing.h"
20 #include "json_spirit/json_spirit_writer.h"
21
22 #include "mgr/mgr_commands.h"
23 #include "mgr/DaemonHealthMetricCollector.h"
24 #include "mgr/OSDPerfMetricCollector.h"
25 #include "mon/MonCommand.h"
26
27 #include "messages/MMgrOpen.h"
28 #include "messages/MMgrClose.h"
29 #include "messages/MMgrConfigure.h"
30 #include "messages/MMonMgrReport.h"
31 #include "messages/MCommand.h"
32 #include "messages/MCommandReply.h"
33 #include "messages/MPGStats.h"
34 #include "messages/MOSDScrub.h"
35 #include "messages/MOSDScrub2.h"
36 #include "messages/MOSDForceRecovery.h"
37 #include "common/errno.h"
38 #include "common/pick_address.h"
39
40 #define dout_context g_ceph_context
41 #define dout_subsys ceph_subsys_mgr
42 #undef dout_prefix
43 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
44
45
46
47 DaemonServer::DaemonServer(MonClient *monc_,
48 Finisher &finisher_,
49 DaemonStateIndex &daemon_state_,
50 ClusterState &cluster_state_,
51 PyModuleRegistry &py_modules_,
52 LogChannelRef clog_,
53 LogChannelRef audit_clog_)
54 : Dispatcher(g_ceph_context),
55 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
56 g_conf().get_val<Option::size_t>("mgr_client_bytes"))),
57 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
58 g_conf().get_val<uint64_t>("mgr_client_messages"))),
59 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
60 g_conf().get_val<Option::size_t>("mgr_osd_bytes"))),
61 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
62 g_conf().get_val<uint64_t>("mgr_osd_messages"))),
63 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
64 g_conf().get_val<Option::size_t>("mgr_mds_bytes"))),
65 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
66 g_conf().get_val<uint64_t>("mgr_mds_messages"))),
67 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
68 g_conf().get_val<Option::size_t>("mgr_mon_bytes"))),
69 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
70 g_conf().get_val<uint64_t>("mgr_mon_messages"))),
71 msgr(nullptr),
72 monc(monc_),
73 finisher(finisher_),
74 daemon_state(daemon_state_),
75 cluster_state(cluster_state_),
76 py_modules(py_modules_),
77 clog(clog_),
78 audit_clog(audit_clog_),
79 lock("DaemonServer"),
80 pgmap_ready(false),
81 timer(g_ceph_context, lock),
82 shutting_down(false),
83 tick_event(nullptr),
84 osd_perf_metric_collector_listener(this),
85 osd_perf_metric_collector(osd_perf_metric_collector_listener)
86 {
87 g_conf().add_observer(this);
88 }
89
90 DaemonServer::~DaemonServer() {
91 delete msgr;
92 g_conf().remove_observer(this);
93 }
94
95 int DaemonServer::init(uint64_t gid, entity_addrvec_t client_addrs)
96 {
97 // Initialize Messenger
98 std::string public_msgr_type = g_conf()->ms_public_type.empty() ?
99 g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
100 msgr = Messenger::create(g_ceph_context, public_msgr_type,
101 entity_name_t::MGR(gid),
102 "mgr",
103 getpid(), 0);
104 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
105
106 msgr->set_auth_client(monc);
107
108 // throttle clients
109 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
110 client_byte_throttler.get(),
111 client_msg_throttler.get());
112
113 // servers
114 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
115 osd_byte_throttler.get(),
116 osd_msg_throttler.get());
117 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
118 mds_byte_throttler.get(),
119 mds_msg_throttler.get());
120 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
121 mon_byte_throttler.get(),
122 mon_msg_throttler.get());
123
124 entity_addrvec_t addrs;
125 int r = pick_addresses(cct, CEPH_PICK_ADDRESS_PUBLIC, &addrs);
126 if (r < 0) {
127 return r;
128 }
129 dout(20) << __func__ << " will bind to " << addrs << dendl;
130 r = msgr->bindv(addrs);
131 if (r < 0) {
132 derr << "unable to bind mgr to " << addrs << dendl;
133 return r;
134 }
135
136 msgr->set_myname(entity_name_t::MGR(gid));
137 msgr->set_addr_unknowns(client_addrs);
138
139 msgr->start();
140 msgr->add_dispatcher_tail(this);
141
142 msgr->set_auth_server(monc);
143 monc->set_handle_authentication_dispatcher(this);
144
145 started_at = ceph_clock_now();
146
147 std::lock_guard l(lock);
148 timer.init();
149
150 schedule_tick_locked(
151 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
152
153 return 0;
154 }
155
156 entity_addrvec_t DaemonServer::get_myaddrs() const
157 {
158 return msgr->get_myaddrs();
159 }
160
161 KeyStore *DaemonServer::ms_get_auth1_authorizer_keystore()
162 {
163 return monc->rotating_secrets.get();
164 }
165
166 int DaemonServer::ms_handle_authentication(Connection *con)
167 {
168 int ret = 0;
169 MgrSession *s = new MgrSession(cct);
170 con->set_priv(s);
171 s->inst.addr = con->get_peer_addr();
172 s->entity_name = con->peer_name;
173 dout(10) << __func__ << " new session " << s << " con " << con
174 << " entity " << con->peer_name
175 << " addr " << con->get_peer_addrs()
176 << dendl;
177
178 AuthCapsInfo &caps_info = con->get_peer_caps_info();
179 if (caps_info.allow_all) {
180 dout(10) << " session " << s << " " << s->entity_name
181 << " allow_all" << dendl;
182 s->caps.set_allow_all();
183 }
184
185 if (caps_info.caps.length() > 0) {
186 auto p = caps_info.caps.cbegin();
187 string str;
188 try {
189 decode(str, p);
190 }
191 catch (buffer::error& e) {
192 ret = -EPERM;
193 }
194 bool success = s->caps.parse(str);
195 if (success) {
196 dout(10) << " session " << s << " " << s->entity_name
197 << " has caps " << s->caps << " '" << str << "'" << dendl;
198 ret = 1;
199 } else {
200 dout(10) << " session " << s << " " << s->entity_name
201 << " failed to parse caps '" << str << "'" << dendl;
202 ret = -EPERM;
203 }
204 }
205
206 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
207 std::lock_guard l(lock);
208 s->osd_id = atoi(s->entity_name.get_id().c_str());
209 dout(10) << "registering osd." << s->osd_id << " session "
210 << s << " con " << con << dendl;
211 osd_cons[s->osd_id].insert(con);
212 }
213
214 return ret;
215 }
216
217 bool DaemonServer::ms_get_authorizer(
218 int dest_type,
219 AuthAuthorizer **authorizer)
220 {
221 dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
222
223 if (dest_type == CEPH_ENTITY_TYPE_MON) {
224 return true;
225 }
226
227 *authorizer = monc->build_authorizer(dest_type);
228 dout(20) << "got authorizer " << *authorizer << dendl;
229 return *authorizer != NULL;
230 }
231
232 bool DaemonServer::ms_handle_reset(Connection *con)
233 {
234 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
235 auto priv = con->get_priv();
236 auto session = static_cast<MgrSession*>(priv.get());
237 if (!session) {
238 return false;
239 }
240 std::lock_guard l(lock);
241 dout(10) << "unregistering osd." << session->osd_id
242 << " session " << session << " con " << con << dendl;
243 osd_cons[session->osd_id].erase(con);
244
245 auto iter = daemon_connections.find(con);
246 if (iter != daemon_connections.end()) {
247 daemon_connections.erase(iter);
248 }
249 }
250 return false;
251 }
252
253 bool DaemonServer::ms_handle_refused(Connection *con)
254 {
255 // do nothing for now
256 return false;
257 }
258
259 bool DaemonServer::ms_dispatch(Message *m)
260 {
261 // Note that we do *not* take ::lock here, in order to avoid
262 // serializing all message handling. It's up to each handler
263 // to take whatever locks it needs.
264 switch (m->get_type()) {
265 case MSG_PGSTATS:
266 cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
267 maybe_ready(m->get_source().num());
268 m->put();
269 return true;
270 case MSG_MGR_REPORT:
271 return handle_report(static_cast<MMgrReport*>(m));
272 case MSG_MGR_OPEN:
273 return handle_open(static_cast<MMgrOpen*>(m));
274 case MSG_MGR_CLOSE:
275 return handle_close(static_cast<MMgrClose*>(m));
276 case MSG_COMMAND:
277 return handle_command(static_cast<MCommand*>(m));
278 default:
279 dout(1) << "Unhandled message type " << m->get_type() << dendl;
280 return false;
281 };
282 }
283
284 void DaemonServer::maybe_ready(int32_t osd_id)
285 {
286 if (pgmap_ready.load()) {
287 // Fast path: we don't need to take lock because pgmap_ready
288 // is already set
289 } else {
290 std::lock_guard l(lock);
291
292 if (reported_osds.find(osd_id) == reported_osds.end()) {
293 dout(4) << "initial report from osd " << osd_id << dendl;
294 reported_osds.insert(osd_id);
295 std::set<int32_t> up_osds;
296
297 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
298 osdmap.get_up_osds(up_osds);
299 });
300
301 std::set<int32_t> unreported_osds;
302 std::set_difference(up_osds.begin(), up_osds.end(),
303 reported_osds.begin(), reported_osds.end(),
304 std::inserter(unreported_osds, unreported_osds.begin()));
305
306 if (unreported_osds.size() == 0) {
307 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
308 pgmap_ready = true;
309 reported_osds.clear();
310 // Avoid waiting for next tick
311 send_report();
312 } else {
313 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
314 " to report in before PGMap is ready" << dendl;
315 }
316 }
317 }
318 }
319
320 void DaemonServer::tick()
321 {
322 dout(10) << dendl;
323 send_report();
324 adjust_pgs();
325
326 schedule_tick_locked(
327 g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count());
328 }
329
330 // Currently modules do not set health checks in response to events delivered to
331 // all modules (e.g. notify) so we do not risk a thundering hurd situation here.
332 // if this pattern emerges in the future, this scheduler could be modified to
333 // fire after all modules have had a chance to set their health checks.
334 void DaemonServer::schedule_tick_locked(double delay_sec)
335 {
336 ceph_assert(lock.is_locked_by_me());
337
338 if (tick_event) {
339 timer.cancel_event(tick_event);
340 tick_event = nullptr;
341 }
342
343 // on shutdown start rejecting explicit requests to send reports that may
344 // originate from python land which may still be running.
345 if (shutting_down)
346 return;
347
348 tick_event = timer.add_event_after(delay_sec,
349 new FunctionContext([this](int r) {
350 tick();
351 }));
352 }
353
354 void DaemonServer::schedule_tick(double delay_sec)
355 {
356 std::lock_guard l(lock);
357 schedule_tick_locked(delay_sec);
358 }
359
360 void DaemonServer::handle_osd_perf_metric_query_updated()
361 {
362 dout(10) << dendl;
363
364 // Send a fresh MMgrConfigure to all clients, so that they can follow
365 // the new policy for transmitting stats
366 finisher.queue(new FunctionContext([this](int r) {
367 std::lock_guard l(lock);
368 for (auto &c : daemon_connections) {
369 if (c->peer_is_osd()) {
370 _send_configure(c);
371 }
372 }
373 }));
374 }
375
376 void DaemonServer::shutdown()
377 {
378 dout(10) << "begin" << dendl;
379 msgr->shutdown();
380 msgr->wait();
381 dout(10) << "done" << dendl;
382
383 std::lock_guard l(lock);
384 shutting_down = true;
385 timer.shutdown();
386 }
387
388 static DaemonKey key_from_service(
389 const std::string& service_name,
390 int peer_type,
391 const std::string& daemon_name)
392 {
393 if (!service_name.empty()) {
394 return DaemonKey(service_name, daemon_name);
395 } else {
396 return DaemonKey(ceph_entity_type_name(peer_type), daemon_name);
397 }
398 }
399
400 static bool key_from_string(
401 const std::string& name,
402 DaemonKey *out)
403 {
404 auto p = name.find('.');
405 if (p == std::string::npos) {
406 return false;
407 }
408 out->first = name.substr(0, p);
409 out->second = name.substr(p + 1);
410 return true;
411 }
412
413 bool DaemonServer::handle_open(MMgrOpen *m)
414 {
415 std::lock_guard l(lock);
416
417 DaemonKey key = key_from_service(m->service_name,
418 m->get_connection()->get_peer_type(),
419 m->daemon_name);
420
421 dout(4) << "from " << m->get_connection() << " " << key << dendl;
422
423 _send_configure(m->get_connection());
424
425 DaemonStatePtr daemon;
426 if (daemon_state.exists(key)) {
427 daemon = daemon_state.get(key);
428 }
429 if (m->service_daemon && !daemon) {
430 dout(4) << "constructing new DaemonState for " << key << dendl;
431 daemon = std::make_shared<DaemonState>(daemon_state.types);
432 daemon->key = key;
433 daemon->service_daemon = true;
434 if (m->daemon_metadata.count("hostname")) {
435 daemon->hostname = m->daemon_metadata["hostname"];
436 }
437 daemon_state.insert(daemon);
438 }
439 if (daemon) {
440 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
441 std::lock_guard l(daemon->lock);
442 daemon->perf_counters.clear();
443
444 if (m->service_daemon) {
445 daemon->set_metadata(m->daemon_metadata);
446 daemon->service_status = m->daemon_status;
447
448 utime_t now = ceph_clock_now();
449 auto d = pending_service_map.get_daemon(m->service_name,
450 m->daemon_name);
451 if (d->gid != (uint64_t)m->get_source().num()) {
452 dout(10) << "registering " << key << " in pending_service_map" << dendl;
453 d->gid = m->get_source().num();
454 d->addr = m->get_source_addr();
455 d->start_epoch = pending_service_map.epoch;
456 d->start_stamp = now;
457 d->metadata = m->daemon_metadata;
458 pending_service_map_dirty = pending_service_map.epoch;
459 }
460 }
461
462 auto p = m->config_bl.cbegin();
463 if (p != m->config_bl.end()) {
464 decode(daemon->config, p);
465 decode(daemon->ignored_mon_config, p);
466 dout(20) << " got config " << daemon->config
467 << " ignored " << daemon->ignored_mon_config << dendl;
468 }
469 daemon->config_defaults_bl = m->config_defaults_bl;
470 daemon->config_defaults.clear();
471 dout(20) << " got config_defaults_bl " << daemon->config_defaults_bl.length()
472 << " bytes" << dendl;
473 }
474
475 if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT &&
476 m->service_name.empty())
477 {
478 // Store in set of the daemon/service connections, i.e. those
479 // connections that require an update in the event of stats
480 // configuration changes.
481 daemon_connections.insert(m->get_connection());
482 }
483
484 m->put();
485 return true;
486 }
487
488 bool DaemonServer::handle_close(MMgrClose *m)
489 {
490 std::lock_guard l(lock);
491
492 DaemonKey key = key_from_service(m->service_name,
493 m->get_connection()->get_peer_type(),
494 m->daemon_name);
495 dout(4) << "from " << m->get_connection() << " " << key << dendl;
496
497 if (daemon_state.exists(key)) {
498 DaemonStatePtr daemon = daemon_state.get(key);
499 daemon_state.rm(key);
500 {
501 std::lock_guard l(daemon->lock);
502 if (daemon->service_daemon) {
503 pending_service_map.rm_daemon(m->service_name, m->daemon_name);
504 pending_service_map_dirty = pending_service_map.epoch;
505 }
506 }
507 }
508
509 // send same message back as a reply
510 m->get_connection()->send_message(m);
511 return true;
512 }
513
514 bool DaemonServer::handle_report(MMgrReport *m)
515 {
516 DaemonKey key;
517 if (!m->service_name.empty()) {
518 key.first = m->service_name;
519 } else {
520 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
521 }
522 key.second = m->daemon_name;
523
524 dout(4) << "from " << m->get_connection() << " " << key << dendl;
525
526 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
527 m->service_name.empty()) {
528 // Clients should not be sending us stats unless they are declaring
529 // themselves to be a daemon for some service.
530 dout(4) << "rejecting report from non-daemon client " << m->daemon_name
531 << dendl;
532 m->get_connection()->mark_down();
533 m->put();
534 return true;
535 }
536
537 // Look up the DaemonState
538 DaemonStatePtr daemon;
539 if (daemon_state.exists(key)) {
540 dout(20) << "updating existing DaemonState for " << key << dendl;
541 daemon = daemon_state.get(key);
542 } else {
543 // we don't know the hostname at this stage, reject MMgrReport here.
544 dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
545 << dendl;
546
547 // issue metadata request in background
548 if (!daemon_state.is_updating(key) &&
549 (key.first == "osd" || key.first == "mds" || key.first == "mon")) {
550
551 std::ostringstream oss;
552 auto c = new MetadataUpdate(daemon_state, key);
553 if (key.first == "osd") {
554 oss << "{\"prefix\": \"osd metadata\", \"id\": "
555 << key.second<< "}";
556
557 } else if (key.first == "mds") {
558 c->set_default("addr", stringify(m->get_source_addr()));
559 oss << "{\"prefix\": \"mds metadata\", \"who\": \""
560 << key.second << "\"}";
561
562 } else if (key.first == "mon") {
563 oss << "{\"prefix\": \"mon metadata\", \"id\": \""
564 << key.second << "\"}";
565 } else {
566 ceph_abort();
567 }
568
569 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
570 }
571
572 {
573 std::lock_guard l(lock);
574 // kill session
575 auto priv = m->get_connection()->get_priv();
576 auto session = static_cast<MgrSession*>(priv.get());
577 if (!session) {
578 return false;
579 }
580 m->get_connection()->mark_down();
581
582 dout(10) << "unregistering osd." << session->osd_id
583 << " session " << session << " con " << m->get_connection() << dendl;
584
585 if (osd_cons.find(session->osd_id) != osd_cons.end()) {
586 osd_cons[session->osd_id].erase(m->get_connection());
587 }
588
589 auto iter = daemon_connections.find(m->get_connection());
590 if (iter != daemon_connections.end()) {
591 daemon_connections.erase(iter);
592 }
593 }
594
595 return false;
596 }
597
598 // Update the DaemonState
599 ceph_assert(daemon != nullptr);
600 {
601 std::lock_guard l(daemon->lock);
602 auto &daemon_counters = daemon->perf_counters;
603 daemon_counters.update(m);
604
605 auto p = m->config_bl.cbegin();
606 if (p != m->config_bl.end()) {
607 decode(daemon->config, p);
608 decode(daemon->ignored_mon_config, p);
609 dout(20) << " got config " << daemon->config
610 << " ignored " << daemon->ignored_mon_config << dendl;
611 }
612
613 if (daemon->service_daemon) {
614 utime_t now = ceph_clock_now();
615 if (m->daemon_status) {
616 daemon->service_status = *m->daemon_status;
617 daemon->service_status_stamp = now;
618 }
619 daemon->last_service_beacon = now;
620 } else if (m->daemon_status) {
621 derr << "got status from non-daemon " << key << dendl;
622 }
623 if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
624 // only OSD and MON send health_checks to me now
625 daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
626 dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
627 << dendl;
628 }
629 }
630
631 // if there are any schema updates, notify the python modules
632 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
633 ostringstream oss;
634 oss << key.first << '.' << key.second;
635 py_modules.notify_all("perf_schema_update", oss.str());
636 }
637
638 if (m->get_connection()->peer_is_osd()) {
639 osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
640 }
641
642 m->put();
643 return true;
644 }
645
646
647 void DaemonServer::_generate_command_map(
648 cmdmap_t& cmdmap,
649 map<string,string> &param_str_map)
650 {
651 for (auto p = cmdmap.begin();
652 p != cmdmap.end(); ++p) {
653 if (p->first == "prefix")
654 continue;
655 if (p->first == "caps") {
656 vector<string> cv;
657 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
658 cv.size() % 2 == 0) {
659 for (unsigned i = 0; i < cv.size(); i += 2) {
660 string k = string("caps_") + cv[i];
661 param_str_map[k] = cv[i + 1];
662 }
663 continue;
664 }
665 }
666 param_str_map[p->first] = cmd_vartype_stringify(p->second);
667 }
668 }
669
670 const MonCommand *DaemonServer::_get_mgrcommand(
671 const string &cmd_prefix,
672 const std::vector<MonCommand> &cmds)
673 {
674 const MonCommand *this_cmd = nullptr;
675 for (const auto &cmd : cmds) {
676 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
677 this_cmd = &cmd;
678 break;
679 }
680 }
681 return this_cmd;
682 }
683
684 bool DaemonServer::_allowed_command(
685 MgrSession *s,
686 const string &module,
687 const string &prefix,
688 const cmdmap_t& cmdmap,
689 const map<string,string>& param_str_map,
690 const MonCommand *this_cmd) {
691
692 if (s->entity_name.is_mon()) {
693 // mon is all-powerful. even when it is forwarding commands on behalf of
694 // old clients; we expect the mon is validating commands before proxying!
695 return true;
696 }
697
698 bool cmd_r = this_cmd->requires_perm('r');
699 bool cmd_w = this_cmd->requires_perm('w');
700 bool cmd_x = this_cmd->requires_perm('x');
701
702 bool capable = s->caps.is_capable(
703 g_ceph_context,
704 CEPH_ENTITY_TYPE_MGR,
705 s->entity_name,
706 module, prefix, param_str_map,
707 cmd_r, cmd_w, cmd_x,
708 s->get_peer_addr());
709
710 dout(10) << " " << s->entity_name << " "
711 << (capable ? "" : "not ") << "capable" << dendl;
712 return capable;
713 }
714
715 /**
716 * The working data for processing an MCommand. This lives in
717 * a class to enable passing it into other threads for processing
718 * outside of the thread/locks that called handle_command.
719 */
720 class CommandContext {
721 public:
722 MCommand *m;
723 bufferlist odata;
724 cmdmap_t cmdmap;
725
726 explicit CommandContext(MCommand *m_)
727 : m(m_) {
728 }
729
730 ~CommandContext() {
731 m->put();
732 }
733
734 void reply(int r, const std::stringstream &ss) {
735 reply(r, ss.str());
736 }
737
738 void reply(int r, const std::string &rs) {
739 // Let the connection drop as soon as we've sent our response
740 ConnectionRef con = m->get_connection();
741 if (con) {
742 con->mark_disposable();
743 }
744
745 if (r == 0) {
746 dout(4) << __func__ << " success" << dendl;
747 } else {
748 derr << __func__ << " " << cpp_strerror(r) << " " << rs << dendl;
749 }
750 if (con) {
751 MCommandReply *reply = new MCommandReply(r, rs);
752 reply->set_tid(m->get_tid());
753 reply->set_data(odata);
754 con->send_message(reply);
755 }
756 }
757 };
758
759 /**
760 * A context for receiving a bufferlist/error string from a background
761 * function and then calling back to a CommandContext when it's done
762 */
763 class ReplyOnFinish : public Context {
764 std::shared_ptr<CommandContext> cmdctx;
765
766 public:
767 bufferlist from_mon;
768 string outs;
769
770 explicit ReplyOnFinish(const std::shared_ptr<CommandContext> &cmdctx_)
771 : cmdctx(cmdctx_)
772 {}
773 void finish(int r) override {
774 cmdctx->odata.claim_append(from_mon);
775 cmdctx->reply(r, outs);
776 }
777 };
778
779 bool DaemonServer::handle_command(MCommand *m)
780 {
781 std::lock_guard l(lock);
782 std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
783 try {
784 return _handle_command(m, cmdctx);
785 } catch (const bad_cmd_get& e) {
786 cmdctx->reply(-EINVAL, e.what());
787 return true;
788 }
789 }
790
791 bool DaemonServer::_handle_command(
792 MCommand *m,
793 std::shared_ptr<CommandContext>& cmdctx)
794 {
795 auto priv = m->get_connection()->get_priv();
796 auto session = static_cast<MgrSession*>(priv.get());
797 if (!session) {
798 return true;
799 }
800 if (session->inst.name == entity_name_t())
801 session->inst.name = m->get_source();
802
803 std::string format;
804 boost::scoped_ptr<Formatter> f;
805 map<string,string> param_str_map;
806 std::stringstream ss;
807 int r = 0;
808
809 if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
810 cmdctx->reply(-EINVAL, ss);
811 return true;
812 }
813
814 {
815 cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
816 f.reset(Formatter::create(format));
817 }
818
819 string prefix;
820 cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
821
822 dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
823 dout(4) << "prefix=" << prefix << dendl;
824
825 if (prefix == "get_command_descriptions") {
826 dout(10) << "reading commands from python modules" << dendl;
827 const auto py_commands = py_modules.get_commands();
828
829 int cmdnum = 0;
830 JSONFormatter f;
831 f.open_object_section("command_descriptions");
832
833 auto dump_cmd = [&cmdnum, &f, m](const MonCommand &mc){
834 ostringstream secname;
835 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
836 dump_cmddesc_to_json(&f, m->get_connection()->get_features(),
837 secname.str(), mc.cmdstring, mc.helpstring,
838 mc.module, mc.req_perms, 0);
839 cmdnum++;
840 };
841
842 for (const auto &pyc : py_commands) {
843 dump_cmd(pyc);
844 }
845
846 for (const auto &mgr_cmd : mgr_commands) {
847 dump_cmd(mgr_cmd);
848 }
849
850 f.close_section(); // command_descriptions
851 f.flush(cmdctx->odata);
852 cmdctx->reply(0, ss);
853 return true;
854 }
855
856 // lookup command
857 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
858 _generate_command_map(cmdctx->cmdmap, param_str_map);
859
860 bool is_allowed;
861 if (!mgr_cmd) {
862 MonCommand py_command = {"", "", "py", "rw"};
863 is_allowed = _allowed_command(session, py_command.module,
864 prefix, cmdctx->cmdmap, param_str_map, &py_command);
865 } else {
866 // validate user's permissions for requested command
867 is_allowed = _allowed_command(session, mgr_cmd->module,
868 prefix, cmdctx->cmdmap, param_str_map, mgr_cmd);
869 }
870 if (!is_allowed) {
871 dout(1) << " access denied" << dendl;
872 audit_clog->info() << "from='" << session->inst << "' "
873 << "entity='" << session->entity_name << "' "
874 << "cmd=" << m->cmd << ": access denied";
875 ss << "access denied: does your client key have mgr caps? "
876 "See http://docs.ceph.com/docs/master/mgr/administrator/"
877 "#client-authentication";
878 cmdctx->reply(-EACCES, ss);
879 return true;
880 }
881
882 audit_clog->debug()
883 << "from='" << session->inst << "' "
884 << "entity='" << session->entity_name << "' "
885 << "cmd=" << m->cmd << ": dispatch";
886
887 // ----------------
888 // service map commands
889 if (prefix == "service dump") {
890 if (!f)
891 f.reset(Formatter::create("json-pretty"));
892 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
893 f->dump_object("service_map", service_map);
894 });
895 f->flush(cmdctx->odata);
896 cmdctx->reply(0, ss);
897 return true;
898 }
899 if (prefix == "service status") {
900 if (!f)
901 f.reset(Formatter::create("json-pretty"));
902 // only include state from services that are in the persisted service map
903 f->open_object_section("service_status");
904 for (auto& p : pending_service_map.services) {
905 f->open_object_section(p.first.c_str());
906 for (auto& q : p.second.daemons) {
907 f->open_object_section(q.first.c_str());
908 DaemonKey key(p.first, q.first);
909 ceph_assert(daemon_state.exists(key));
910 auto daemon = daemon_state.get(key);
911 std::lock_guard l(daemon->lock);
912 f->dump_stream("status_stamp") << daemon->service_status_stamp;
913 f->dump_stream("last_beacon") << daemon->last_service_beacon;
914 f->open_object_section("status");
915 for (auto& r : daemon->service_status) {
916 f->dump_string(r.first.c_str(), r.second);
917 }
918 f->close_section();
919 f->close_section();
920 }
921 f->close_section();
922 }
923 f->close_section();
924 f->flush(cmdctx->odata);
925 cmdctx->reply(0, ss);
926 return true;
927 }
928
929 if (prefix == "config set") {
930 std::string key;
931 std::string val;
932 cmd_getval(cct, cmdctx->cmdmap, "key", key);
933 cmd_getval(cct, cmdctx->cmdmap, "value", val);
934 r = cct->_conf.set_val(key, val, &ss);
935 if (r == 0) {
936 cct->_conf.apply_changes(nullptr);
937 }
938 cmdctx->reply(0, ss);
939 return true;
940 }
941
942 // -----------
943 // PG commands
944
945 if (prefix == "pg scrub" ||
946 prefix == "pg repair" ||
947 prefix == "pg deep-scrub") {
948 string scrubop = prefix.substr(3, string::npos);
949 pg_t pgid;
950 spg_t spgid;
951 string pgidstr;
952 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
953 if (!pgid.parse(pgidstr.c_str())) {
954 ss << "invalid pgid '" << pgidstr << "'";
955 cmdctx->reply(-EINVAL, ss);
956 return true;
957 }
958 bool pg_exists = false;
959 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
960 pg_exists = osdmap.pg_exists(pgid);
961 });
962 if (!pg_exists) {
963 ss << "pg " << pgid << " does not exist";
964 cmdctx->reply(-ENOENT, ss);
965 return true;
966 }
967 int acting_primary = -1;
968 epoch_t epoch;
969 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
970 epoch = osdmap.get_epoch();
971 osdmap.get_primary_shard(pgid, &acting_primary, &spgid);
972 });
973 if (acting_primary == -1) {
974 ss << "pg " << pgid << " has no primary osd";
975 cmdctx->reply(-EAGAIN, ss);
976 return true;
977 }
978 auto p = osd_cons.find(acting_primary);
979 if (p == osd_cons.end()) {
980 ss << "pg " << pgid << " primary osd." << acting_primary
981 << " is not currently connected";
982 cmdctx->reply(-EAGAIN, ss);
983 return true;
984 }
985 for (auto& con : p->second) {
986 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
987 vector<spg_t> pgs = { spgid };
988 con->send_message(new MOSDScrub2(monc->get_fsid(),
989 epoch,
990 pgs,
991 scrubop == "repair",
992 scrubop == "deep-scrub"));
993 } else {
994 vector<pg_t> pgs = { pgid };
995 con->send_message(new MOSDScrub(monc->get_fsid(),
996 pgs,
997 scrubop == "repair",
998 scrubop == "deep-scrub"));
999 }
1000 }
1001 ss << "instructing pg " << spgid << " on osd." << acting_primary
1002 << " to " << scrubop;
1003 cmdctx->reply(0, ss);
1004 return true;
1005 } else if (prefix == "osd scrub" ||
1006 prefix == "osd deep-scrub" ||
1007 prefix == "osd repair") {
1008 string whostr;
1009 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
1010 vector<string> pvec;
1011 get_str_vec(prefix, pvec);
1012
1013 set<int> osds;
1014 if (whostr == "*" || whostr == "all" || whostr == "any") {
1015 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1016 for (int i = 0; i < osdmap.get_max_osd(); i++)
1017 if (osdmap.is_up(i)) {
1018 osds.insert(i);
1019 }
1020 });
1021 } else {
1022 long osd = parse_osd_id(whostr.c_str(), &ss);
1023 if (osd < 0) {
1024 ss << "invalid osd '" << whostr << "'";
1025 cmdctx->reply(-EINVAL, ss);
1026 return true;
1027 }
1028 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1029 if (osdmap.is_up(osd)) {
1030 osds.insert(osd);
1031 }
1032 });
1033 if (osds.empty()) {
1034 ss << "osd." << osd << " is not up";
1035 cmdctx->reply(-EAGAIN, ss);
1036 return true;
1037 }
1038 }
1039 set<int> sent_osds, failed_osds;
1040 for (auto osd : osds) {
1041 vector<spg_t> spgs;
1042 epoch_t epoch;
1043 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1044 epoch = osdmap.get_epoch();
1045 auto p = pgmap.pg_by_osd.find(osd);
1046 if (p != pgmap.pg_by_osd.end()) {
1047 for (auto pgid : p->second) {
1048 int primary;
1049 spg_t spg;
1050 osdmap.get_primary_shard(pgid, &primary, &spg);
1051 if (primary == osd) {
1052 spgs.push_back(spg);
1053 }
1054 }
1055 }
1056 });
1057 auto p = osd_cons.find(osd);
1058 if (p == osd_cons.end()) {
1059 failed_osds.insert(osd);
1060 } else {
1061 sent_osds.insert(osd);
1062 for (auto& con : p->second) {
1063 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
1064 con->send_message(new MOSDScrub2(monc->get_fsid(),
1065 epoch,
1066 spgs,
1067 pvec.back() == "repair",
1068 pvec.back() == "deep-scrub"));
1069 } else {
1070 con->send_message(new MOSDScrub(monc->get_fsid(),
1071 pvec.back() == "repair",
1072 pvec.back() == "deep-scrub"));
1073 }
1074 }
1075 }
1076 }
1077 if (failed_osds.size() == osds.size()) {
1078 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
1079 << " (not connected)";
1080 r = -EAGAIN;
1081 } else {
1082 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
1083 if (!failed_osds.empty()) {
1084 ss << "; osd(s) " << failed_osds << " were not connected";
1085 }
1086 r = 0;
1087 }
1088 cmdctx->reply(0, ss);
1089 return true;
1090 } else if (prefix == "osd pool scrub" ||
1091 prefix == "osd pool deep-scrub" ||
1092 prefix == "osd pool repair") {
1093 vector<string> pool_names;
1094 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", pool_names);
1095 if (pool_names.empty()) {
1096 ss << "must specify one or more pool names";
1097 cmdctx->reply(-EINVAL, ss);
1098 return true;
1099 }
1100 epoch_t epoch;
1101 map<int32_t, vector<pg_t>> pgs_by_primary; // legacy
1102 map<int32_t, vector<spg_t>> spgs_by_primary;
1103 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1104 epoch = osdmap.get_epoch();
1105 for (auto& pool_name : pool_names) {
1106 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1107 if (pool_id < 0) {
1108 ss << "unrecognized pool '" << pool_name << "'";
1109 r = -ENOENT;
1110 return;
1111 }
1112 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1113 for (int i = 0; i < pool_pg_num; i++) {
1114 pg_t pg(i, pool_id);
1115 int primary;
1116 spg_t spg;
1117 auto got = osdmap.get_primary_shard(pg, &primary, &spg);
1118 if (!got)
1119 continue;
1120 pgs_by_primary[primary].push_back(pg);
1121 spgs_by_primary[primary].push_back(spg);
1122 }
1123 }
1124 });
1125 if (r < 0) {
1126 cmdctx->reply(r, ss);
1127 return true;
1128 }
1129 for (auto& it : spgs_by_primary) {
1130 auto primary = it.first;
1131 auto p = osd_cons.find(primary);
1132 if (p == osd_cons.end()) {
1133 ss << "osd." << primary << " is not currently connected";
1134 cmdctx->reply(-EAGAIN, ss);
1135 return true;
1136 }
1137 for (auto& con : p->second) {
1138 if (HAVE_FEATURE(con->get_features(), SERVER_MIMIC)) {
1139 con->send_message(new MOSDScrub2(monc->get_fsid(),
1140 epoch,
1141 it.second,
1142 prefix == "osd pool repair",
1143 prefix == "osd pool deep-scrub"));
1144 } else {
1145 // legacy
1146 auto q = pgs_by_primary.find(primary);
1147 ceph_assert(q != pgs_by_primary.end());
1148 con->send_message(new MOSDScrub(monc->get_fsid(),
1149 q->second,
1150 prefix == "osd pool repair",
1151 prefix == "osd pool deep-scrub"));
1152 }
1153 }
1154 }
1155 cmdctx->reply(0, "");
1156 return true;
1157 } else if (prefix == "osd reweight-by-pg" ||
1158 prefix == "osd reweight-by-utilization" ||
1159 prefix == "osd test-reweight-by-pg" ||
1160 prefix == "osd test-reweight-by-utilization") {
1161 bool by_pg =
1162 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
1163 bool dry_run =
1164 prefix == "osd test-reweight-by-pg" ||
1165 prefix == "osd test-reweight-by-utilization";
1166 int64_t oload;
1167 cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
1168 set<int64_t> pools;
1169 vector<string> poolnames;
1170 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
1171 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1172 for (const auto& poolname : poolnames) {
1173 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
1174 if (pool < 0) {
1175 ss << "pool '" << poolname << "' does not exist";
1176 r = -ENOENT;
1177 }
1178 pools.insert(pool);
1179 }
1180 });
1181 if (r) {
1182 cmdctx->reply(r, ss);
1183 return true;
1184 }
1185
1186 double max_change = g_conf().get_val<double>("mon_reweight_max_change");
1187 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
1188 if (max_change <= 0.0) {
1189 ss << "max_change " << max_change << " must be positive";
1190 cmdctx->reply(-EINVAL, ss);
1191 return true;
1192 }
1193 int64_t max_osds = g_conf().get_val<int64_t>("mon_reweight_max_osds");
1194 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
1195 if (max_osds <= 0) {
1196 ss << "max_osds " << max_osds << " must be positive";
1197 cmdctx->reply(-EINVAL, ss);
1198 return true;
1199 }
1200 bool no_increasing = false;
1201 cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
1202 string out_str;
1203 mempool::osdmap::map<int32_t, uint32_t> new_weights;
1204 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap &osdmap, const PGMap& pgmap) {
1205 return reweight::by_utilization(osdmap, pgmap,
1206 oload,
1207 max_change,
1208 max_osds,
1209 by_pg,
1210 pools.empty() ? NULL : &pools,
1211 no_increasing,
1212 &new_weights,
1213 &ss, &out_str, f.get());
1214 });
1215 if (r >= 0) {
1216 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
1217 }
1218 if (f) {
1219 f->flush(cmdctx->odata);
1220 } else {
1221 cmdctx->odata.append(out_str);
1222 }
1223 if (r < 0) {
1224 ss << "FAILED reweight-by-pg";
1225 cmdctx->reply(r, ss);
1226 return true;
1227 } else if (r == 0 || dry_run) {
1228 ss << "no change";
1229 cmdctx->reply(r, ss);
1230 return true;
1231 } else {
1232 json_spirit::Object json_object;
1233 for (const auto& osd_weight : new_weights) {
1234 json_spirit::Config::add(json_object,
1235 std::to_string(osd_weight.first),
1236 std::to_string(osd_weight.second));
1237 }
1238 string s = json_spirit::write(json_object);
1239 std::replace(begin(s), end(s), '\"', '\'');
1240 const string cmd =
1241 "{"
1242 "\"prefix\": \"osd reweightn\", "
1243 "\"weights\": \"" + s + "\""
1244 "}";
1245 auto on_finish = new ReplyOnFinish(cmdctx);
1246 monc->start_mon_command({cmd}, {},
1247 &on_finish->from_mon, &on_finish->outs, on_finish);
1248 return true;
1249 }
1250 } else if (prefix == "osd df") {
1251 string method;
1252 cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
1253 string filter_by;
1254 string filter;
1255 cmd_getval(g_ceph_context, cmdctx->cmdmap, "filter_by", filter_by);
1256 cmd_getval(g_ceph_context, cmdctx->cmdmap, "filter", filter);
1257 if (filter_by.empty() != filter.empty()) {
1258 cmdctx->reply(-EINVAL, "you must specify both 'filter_by' and 'filter'");
1259 return true;
1260 }
1261 stringstream rs;
1262 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
1263 string class_name;
1264 string item_name;
1265 // sanity check filter(s)
1266 if (filter_by == "class") {
1267 if (!osdmap.crush->class_exists(filter)) {
1268 rs << "specified class '" << filter << "' does not exist";
1269 return -EINVAL;
1270 }
1271 class_name = filter;
1272 }
1273 if (filter_by == "name") {
1274 if (!osdmap.crush->name_exists(filter)) {
1275 rs << "specified name '" << filter << "' does not exist";
1276 return -EINVAL;
1277 }
1278 item_name = filter;
1279 }
1280 print_osd_utilization(osdmap, pgmap, ss,
1281 f.get(), method == "tree",
1282 class_name, item_name);
1283
1284 cmdctx->odata.append(ss);
1285 return 0;
1286 });
1287 cmdctx->reply(r, rs);
1288 return true;
1289 } else if (prefix == "osd pool stats") {
1290 string pool_name;
1291 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pool_name", pool_name);
1292 int64_t poolid = -ENOENT;
1293 bool one_pool = false;
1294 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1295 if (!pool_name.empty()) {
1296 poolid = osdmap.lookup_pg_pool_name(pool_name);
1297 if (poolid < 0) {
1298 ceph_assert(poolid == -ENOENT);
1299 ss << "unrecognized pool '" << pool_name << "'";
1300 return -ENOENT;
1301 }
1302 one_pool = true;
1303 }
1304 stringstream rs;
1305 if (f)
1306 f->open_array_section("pool_stats");
1307 else {
1308 if (osdmap.get_pools().empty()) {
1309 ss << "there are no pools!";
1310 goto stats_out;
1311 }
1312 }
1313 for (auto &p : osdmap.get_pools()) {
1314 if (!one_pool) {
1315 poolid = p.first;
1316 }
1317 pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, f.get(), &rs);
1318 if (one_pool) {
1319 break;
1320 }
1321 }
1322 stats_out:
1323 if (f) {
1324 f->close_section();
1325 f->flush(cmdctx->odata);
1326 } else {
1327 cmdctx->odata.append(rs.str());
1328 }
1329 return 0;
1330 });
1331 if (r != -EOPNOTSUPP) {
1332 cmdctx->reply(r, ss);
1333 return true;
1334 }
1335 } else if (prefix == "osd safe-to-destroy" ||
1336 prefix == "osd destroy" ||
1337 prefix == "osd purge") {
1338 set<int> osds;
1339 int r = 0;
1340 if (prefix == "osd safe-to-destroy") {
1341 vector<string> ids;
1342 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1343 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1344 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1345 });
1346 if (!r && osds.empty()) {
1347 ss << "must specify one or more OSDs";
1348 r = -EINVAL;
1349 }
1350 } else {
1351 int64_t id;
1352 if (!cmd_getval(g_ceph_context, cmdctx->cmdmap, "id", id)) {
1353 r = -EINVAL;
1354 ss << "must specify OSD id";
1355 } else {
1356 osds.insert(id);
1357 }
1358 }
1359 if (r < 0) {
1360 cmdctx->reply(r, ss);
1361 return true;
1362 }
1363 set<int> active_osds, missing_stats, stored_pgs, safe_to_destroy;
1364 int affected_pgs = 0;
1365 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1366 if (pg_map.num_pg_unknown > 0) {
1367 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
1368 << " any conclusions";
1369 r = -EAGAIN;
1370 return;
1371 }
1372 int num_active_clean = 0;
1373 for (auto& p : pg_map.num_pg_by_state) {
1374 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
1375 if ((p.first & want) == want) {
1376 num_active_clean += p.second;
1377 }
1378 }
1379 for (auto osd : osds) {
1380 if (!osdmap.exists(osd)) {
1381 safe_to_destroy.insert(osd);
1382 continue; // clearly safe to destroy
1383 }
1384 auto q = pg_map.num_pg_by_osd.find(osd);
1385 if (q != pg_map.num_pg_by_osd.end()) {
1386 if (q->second.acting > 0 || q->second.up > 0) {
1387 active_osds.insert(osd);
1388 affected_pgs += q->second.acting + q->second.up;
1389 continue;
1390 }
1391 }
1392 if (num_active_clean < pg_map.num_pg) {
1393 // all pgs aren't active+clean; we need to be careful.
1394 auto p = pg_map.osd_stat.find(osd);
1395 if (p == pg_map.osd_stat.end()) {
1396 missing_stats.insert(osd);
1397 continue;
1398 } else if (p->second.num_pgs > 0) {
1399 stored_pgs.insert(osd);
1400 continue;
1401 }
1402 }
1403 safe_to_destroy.insert(osd);
1404 }
1405 });
1406 if (r && prefix == "osd safe-to-destroy") {
1407 cmdctx->reply(r, ss); // regardless of formatter
1408 return true;
1409 }
1410 if (!r && (!active_osds.empty() ||
1411 !missing_stats.empty() || !stored_pgs.empty())) {
1412 if (!safe_to_destroy.empty()) {
1413 ss << "OSD(s) " << safe_to_destroy
1414 << " are safe to destroy without reducing data durability. ";
1415 }
1416 if (!active_osds.empty()) {
1417 ss << "OSD(s) " << active_osds << " have " << affected_pgs
1418 << " pgs currently mapped to them. ";
1419 }
1420 if (!missing_stats.empty()) {
1421 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1422 << " PGs are active+clean; we cannot draw any conclusions. ";
1423 }
1424 if (!stored_pgs.empty()) {
1425 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1426 << " data, and not all PGs are active+clean; we cannot be sure they"
1427 << " aren't still needed.";
1428 }
1429 if (!active_osds.empty() || !stored_pgs.empty()) {
1430 r = -EBUSY;
1431 } else {
1432 r = -EAGAIN;
1433 }
1434 }
1435
1436 if (prefix == "osd safe-to-destroy") {
1437 if (!r) {
1438 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1439 << " durability.";
1440 safe_to_destroy.swap(osds);
1441 }
1442 if (f) {
1443 f->open_object_section("osd_status");
1444 f->open_array_section("safe_to_destroy");
1445 for (auto i : safe_to_destroy)
1446 f->dump_int("osd", i);
1447 f->close_section();
1448 f->open_array_section("active");
1449 for (auto i : active_osds)
1450 f->dump_int("osd", i);
1451 f->close_section();
1452 f->open_array_section("missing_stats");
1453 for (auto i : missing_stats)
1454 f->dump_int("osd", i);
1455 f->close_section();
1456 f->open_array_section("stored_pgs");
1457 for (auto i : stored_pgs)
1458 f->dump_int("osd", i);
1459 f->close_section();
1460 f->close_section(); // osd_status
1461 f->flush(cmdctx->odata);
1462 r = 0;
1463 std::stringstream().swap(ss);
1464 }
1465 cmdctx->reply(r, ss);
1466 return true;
1467 }
1468
1469 if (r) {
1470 bool force = false;
1471 cmd_getval(cct, cmdctx->cmdmap, "force", force);
1472 if (!force) {
1473 // Backward compat
1474 cmd_getval(cct, cmdctx->cmdmap, "yes_i_really_mean_it", force);
1475 }
1476 if (!force) {
1477 ss << "\nYou can proceed by passing --force, but be warned that"
1478 " this will likely mean real, permanent data loss.";
1479 } else {
1480 r = 0;
1481 }
1482 }
1483 if (r) {
1484 cmdctx->reply(r, ss);
1485 return true;
1486 }
1487 const string cmd =
1488 "{"
1489 "\"prefix\": \"" + prefix + "-actual\", "
1490 "\"id\": " + stringify(osds) + ", "
1491 "\"yes_i_really_mean_it\": true"
1492 "}";
1493 auto on_finish = new ReplyOnFinish(cmdctx);
1494 monc->start_mon_command({cmd}, {}, nullptr, &on_finish->outs, on_finish);
1495 return true;
1496 } else if (prefix == "osd ok-to-stop") {
1497 vector<string> ids;
1498 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1499 set<int> osds;
1500 int r;
1501 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1502 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1503 });
1504 if (!r && osds.empty()) {
1505 ss << "must specify one or more OSDs";
1506 r = -EINVAL;
1507 }
1508 if (r < 0) {
1509 cmdctx->reply(r, ss);
1510 return true;
1511 }
1512 map<pg_t,int> pg_delta; // pgid -> net acting set size change
1513 int dangerous_pgs = 0;
1514 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
1515 if (pg_map.num_pg_unknown > 0) {
1516 ss << pg_map.num_pg_unknown << " pgs have unknown state; "
1517 << "cannot draw any conclusions";
1518 r = -EAGAIN;
1519 return;
1520 }
1521 for (auto osd : osds) {
1522 auto p = pg_map.pg_by_osd.find(osd);
1523 if (p != pg_map.pg_by_osd.end()) {
1524 for (auto& pgid : p->second) {
1525 --pg_delta[pgid];
1526 }
1527 }
1528 }
1529 for (auto& p : pg_delta) {
1530 auto q = pg_map.pg_stat.find(p.first);
1531 if (q == pg_map.pg_stat.end()) {
1532 ss << "missing information about " << p.first << "; cannot draw"
1533 << " any conclusions";
1534 r = -EAGAIN;
1535 return;
1536 }
1537 if (!(q->second.state & PG_STATE_ACTIVE) ||
1538 (q->second.state & PG_STATE_DEGRADED)) {
1539 // we don't currently have a good way to tell *how* degraded
1540 // a degraded PG is, so we have to assume we cannot remove
1541 // any more replicas/shards.
1542 ++dangerous_pgs;
1543 continue;
1544 }
1545 const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool());
1546 if (!pi) {
1547 ++dangerous_pgs; // pool is creating or deleting
1548 } else {
1549 if (q->second.acting.size() + p.second < pi->min_size) {
1550 ++dangerous_pgs;
1551 }
1552 }
1553 }
1554 });
1555 if (r) {
1556 cmdctx->reply(r, ss);
1557 return true;
1558 }
1559 if (dangerous_pgs) {
1560 ss << dangerous_pgs << " PGs are already degraded or might become "
1561 << "unavailable";
1562 cmdctx->reply(-EBUSY, ss);
1563 return true;
1564 }
1565 ss << "OSD(s) " << osds << " are ok to stop without reducing"
1566 << " availability, provided there are no other concurrent failures"
1567 << " or interventions. " << pg_delta.size() << " PGs are likely to be"
1568 << " degraded (but remain available) as a result.";
1569 cmdctx->reply(0, ss);
1570 return true;
1571 } else if (prefix == "pg force-recovery" ||
1572 prefix == "pg force-backfill" ||
1573 prefix == "pg cancel-force-recovery" ||
1574 prefix == "pg cancel-force-backfill" ||
1575 prefix == "osd pool force-recovery" ||
1576 prefix == "osd pool force-backfill" ||
1577 prefix == "osd pool cancel-force-recovery" ||
1578 prefix == "osd pool cancel-force-backfill") {
1579 vector<string> vs;
1580 get_str_vec(prefix, vs);
1581 auto& granularity = vs.front();
1582 auto& forceop = vs.back();
1583 vector<pg_t> pgs;
1584
1585 // figure out actual op just once
1586 int actual_op = 0;
1587 if (forceop == "force-recovery") {
1588 actual_op = OFR_RECOVERY;
1589 } else if (forceop == "force-backfill") {
1590 actual_op = OFR_BACKFILL;
1591 } else if (forceop == "cancel-force-backfill") {
1592 actual_op = OFR_BACKFILL | OFR_CANCEL;
1593 } else if (forceop == "cancel-force-recovery") {
1594 actual_op = OFR_RECOVERY | OFR_CANCEL;
1595 }
1596
1597 set<pg_t> candidates; // deduped
1598 if (granularity == "pg") {
1599 // covnert pg names to pgs, discard any invalid ones while at it
1600 vector<string> pgids;
1601 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgids);
1602 for (auto& i : pgids) {
1603 pg_t pgid;
1604 if (!pgid.parse(i.c_str())) {
1605 ss << "invlaid pgid '" << i << "'; ";
1606 r = -EINVAL;
1607 continue;
1608 }
1609 candidates.insert(pgid);
1610 }
1611 } else {
1612 // per pool
1613 vector<string> pool_names;
1614 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", pool_names);
1615 if (pool_names.empty()) {
1616 ss << "must specify one or more pool names";
1617 cmdctx->reply(-EINVAL, ss);
1618 return true;
1619 }
1620 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1621 for (auto& pool_name : pool_names) {
1622 auto pool_id = osdmap.lookup_pg_pool_name(pool_name);
1623 if (pool_id < 0) {
1624 ss << "unrecognized pool '" << pool_name << "'";
1625 r = -ENOENT;
1626 return;
1627 }
1628 auto pool_pg_num = osdmap.get_pg_num(pool_id);
1629 for (int i = 0; i < pool_pg_num; i++)
1630 candidates.insert({(unsigned int)i, (uint64_t)pool_id});
1631 }
1632 });
1633 if (r < 0) {
1634 cmdctx->reply(r, ss);
1635 return true;
1636 }
1637 }
1638
1639 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1640 for (auto& i : candidates) {
1641 auto it = pg_map.pg_stat.find(i);
1642 if (it == pg_map.pg_stat.end()) {
1643 ss << "pg " << i << " does not exist; ";
1644 r = -ENOENT;
1645 continue;
1646 }
1647 auto state = it->second.state;
1648 // discard pgs for which user requests are pointless
1649 switch (actual_op) {
1650 case OFR_RECOVERY:
1651 if ((state & (PG_STATE_DEGRADED |
1652 PG_STATE_RECOVERY_WAIT |
1653 PG_STATE_RECOVERING)) == 0) {
1654 // don't return error, user script may be racing with cluster.
1655 // not fatal.
1656 ss << "pg " << i << " doesn't require recovery; ";
1657 continue;
1658 } else if (state & PG_STATE_FORCED_RECOVERY) {
1659 ss << "pg " << i << " recovery already forced; ";
1660 // return error, as it may be a bug in user script
1661 r = -EINVAL;
1662 continue;
1663 }
1664 break;
1665 case OFR_BACKFILL:
1666 if ((state & (PG_STATE_DEGRADED |
1667 PG_STATE_BACKFILL_WAIT |
1668 PG_STATE_BACKFILLING)) == 0) {
1669 ss << "pg " << i << " doesn't require backfilling; ";
1670 continue;
1671 } else if (state & PG_STATE_FORCED_BACKFILL) {
1672 ss << "pg " << i << " backfill already forced; ";
1673 r = -EINVAL;
1674 continue;
1675 }
1676 break;
1677 case OFR_BACKFILL | OFR_CANCEL:
1678 if ((state & PG_STATE_FORCED_BACKFILL) == 0) {
1679 ss << "pg " << i << " backfill not forced; ";
1680 continue;
1681 }
1682 break;
1683 case OFR_RECOVERY | OFR_CANCEL:
1684 if ((state & PG_STATE_FORCED_RECOVERY) == 0) {
1685 ss << "pg " << i << " recovery not forced; ";
1686 continue;
1687 }
1688 break;
1689 default:
1690 ceph_abort_msg("actual_op value is not supported");
1691 }
1692 pgs.push_back(i);
1693 } // for
1694 });
1695
1696 // respond with error only when no pgs are correct
1697 // yes, in case of mixed errors, only the last one will be emitted,
1698 // but the message presented will be fine
1699 if (pgs.size() != 0) {
1700 // clear error to not confuse users/scripts
1701 r = 0;
1702 }
1703
1704 // optimize the command -> messages conversion, use only one
1705 // message per distinct OSD
1706 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1707 // group pgs to process by osd
1708 map<int, vector<spg_t>> osdpgs;
1709 for (auto& pgid : pgs) {
1710 int primary;
1711 spg_t spg;
1712 if (osdmap.get_primary_shard(pgid, &primary, &spg)) {
1713 osdpgs[primary].push_back(spg);
1714 }
1715 }
1716 for (auto& i : osdpgs) {
1717 if (osdmap.is_up(i.first)) {
1718 auto p = osd_cons.find(i.first);
1719 if (p == osd_cons.end()) {
1720 ss << "osd." << i.first << " is not currently connected";
1721 r = -EAGAIN;
1722 continue;
1723 }
1724 for (auto& con : p->second) {
1725 con->send_message(
1726 new MOSDForceRecovery(monc->get_fsid(), i.second, actual_op));
1727 }
1728 ss << "instructing pg(s) " << i.second << " on osd." << i.first
1729 << " to " << forceop << "; ";
1730 }
1731 }
1732 });
1733 ss << std::endl;
1734 cmdctx->reply(r, ss);
1735 return true;
1736 } else if (prefix == "config show" ||
1737 prefix == "config show-with-defaults") {
1738 string who;
1739 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", who);
1740 int r = 0;
1741 auto dot = who.find('.');
1742 DaemonKey key;
1743 key.first = who.substr(0, dot);
1744 key.second = who.substr(dot + 1);
1745 DaemonStatePtr daemon = daemon_state.get(key);
1746 string name;
1747 if (!daemon) {
1748 ss << "no config state for daemon " << who;
1749 cmdctx->reply(-ENOENT, ss);
1750 return true;
1751 }
1752
1753 std::lock_guard l(daemon->lock);
1754
1755 if (cmd_getval(g_ceph_context, cmdctx->cmdmap, "key", name)) {
1756 auto p = daemon->config.find(name);
1757 if (p != daemon->config.end() &&
1758 !p->second.empty()) {
1759 cmdctx->odata.append(p->second.rbegin()->second + "\n");
1760 } else {
1761 auto& defaults = daemon->_get_config_defaults();
1762 auto q = defaults.find(name);
1763 if (q != defaults.end()) {
1764 cmdctx->odata.append(q->second + "\n");
1765 } else {
1766 r = -ENOENT;
1767 }
1768 }
1769 } else if (daemon->config_defaults_bl.length() > 0) {
1770 TextTable tbl;
1771 if (f) {
1772 f->open_array_section("config");
1773 } else {
1774 tbl.define_column("NAME", TextTable::LEFT, TextTable::LEFT);
1775 tbl.define_column("VALUE", TextTable::LEFT, TextTable::LEFT);
1776 tbl.define_column("SOURCE", TextTable::LEFT, TextTable::LEFT);
1777 tbl.define_column("OVERRIDES", TextTable::LEFT, TextTable::LEFT);
1778 tbl.define_column("IGNORES", TextTable::LEFT, TextTable::LEFT);
1779 }
1780 if (prefix == "config show") {
1781 // show
1782 for (auto& i : daemon->config) {
1783 dout(20) << " " << i.first << " -> " << i.second << dendl;
1784 if (i.second.empty()) {
1785 continue;
1786 }
1787 if (f) {
1788 f->open_object_section("value");
1789 f->dump_string("name", i.first);
1790 f->dump_string("value", i.second.rbegin()->second);
1791 f->dump_string("source", ceph_conf_level_name(
1792 i.second.rbegin()->first));
1793 if (i.second.size() > 1) {
1794 f->open_array_section("overrides");
1795 auto j = i.second.rend();
1796 for (--j; j != i.second.rbegin(); --j) {
1797 f->open_object_section("value");
1798 f->dump_string("source", ceph_conf_level_name(j->first));
1799 f->dump_string("value", j->second);
1800 f->close_section();
1801 }
1802 f->close_section();
1803 }
1804 if (daemon->ignored_mon_config.count(i.first)) {
1805 f->dump_string("ignores", "mon");
1806 }
1807 f->close_section();
1808 } else {
1809 tbl << i.first;
1810 tbl << i.second.rbegin()->second;
1811 tbl << ceph_conf_level_name(i.second.rbegin()->first);
1812 if (i.second.size() > 1) {
1813 list<string> ov;
1814 auto j = i.second.rend();
1815 for (--j; j != i.second.rbegin(); --j) {
1816 if (j->second == i.second.rbegin()->second) {
1817 ov.push_front(string("(") + ceph_conf_level_name(j->first) +
1818 string("[") + j->second + string("]") +
1819 string(")"));
1820 } else {
1821 ov.push_front(ceph_conf_level_name(j->first) +
1822 string("[") + j->second + string("]"));
1823
1824 }
1825 }
1826 tbl << ov;
1827 } else {
1828 tbl << "";
1829 }
1830 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
1831 tbl << TextTable::endrow;
1832 }
1833 }
1834 } else {
1835 // show-with-defaults
1836 auto& defaults = daemon->_get_config_defaults();
1837 for (auto& i : defaults) {
1838 if (f) {
1839 f->open_object_section("value");
1840 f->dump_string("name", i.first);
1841 } else {
1842 tbl << i.first;
1843 }
1844 auto j = daemon->config.find(i.first);
1845 if (j != daemon->config.end() && !j->second.empty()) {
1846 // have config
1847 if (f) {
1848 f->dump_string("value", j->second.rbegin()->second);
1849 f->dump_string("source", ceph_conf_level_name(
1850 j->second.rbegin()->first));
1851 if (j->second.size() > 1) {
1852 f->open_array_section("overrides");
1853 auto k = j->second.rend();
1854 for (--k; k != j->second.rbegin(); --k) {
1855 f->open_object_section("value");
1856 f->dump_string("source", ceph_conf_level_name(k->first));
1857 f->dump_string("value", k->second);
1858 f->close_section();
1859 }
1860 f->close_section();
1861 }
1862 if (daemon->ignored_mon_config.count(i.first)) {
1863 f->dump_string("ignores", "mon");
1864 }
1865 f->close_section();
1866 } else {
1867 tbl << j->second.rbegin()->second;
1868 tbl << ceph_conf_level_name(j->second.rbegin()->first);
1869 if (j->second.size() > 1) {
1870 list<string> ov;
1871 auto k = j->second.rend();
1872 for (--k; k != j->second.rbegin(); --k) {
1873 if (k->second == j->second.rbegin()->second) {
1874 ov.push_front(string("(") + ceph_conf_level_name(k->first) +
1875 string("[") + k->second + string("]") +
1876 string(")"));
1877 } else {
1878 ov.push_front(ceph_conf_level_name(k->first) +
1879 string("[") + k->second + string("]"));
1880 }
1881 }
1882 tbl << ov;
1883 } else {
1884 tbl << "";
1885 }
1886 tbl << (daemon->ignored_mon_config.count(i.first) ? "mon" : "");
1887 tbl << TextTable::endrow;
1888 }
1889 } else {
1890 // only have default
1891 if (f) {
1892 f->dump_string("value", i.second);
1893 f->dump_string("source", ceph_conf_level_name(CONF_DEFAULT));
1894 f->close_section();
1895 } else {
1896 tbl << i.second;
1897 tbl << ceph_conf_level_name(CONF_DEFAULT);
1898 tbl << "";
1899 tbl << "";
1900 tbl << TextTable::endrow;
1901 }
1902 }
1903 }
1904 }
1905 if (f) {
1906 f->close_section();
1907 f->flush(cmdctx->odata);
1908 } else {
1909 cmdctx->odata.append(stringify(tbl));
1910 }
1911 }
1912 cmdctx->reply(r, ss);
1913 return true;
1914 } else if (prefix == "device ls") {
1915 set<string> devids;
1916 TextTable tbl;
1917 if (f) {
1918 f->open_array_section("devices");
1919 daemon_state.with_devices([&f](const DeviceState& dev) {
1920 f->dump_object("device", dev);
1921 });
1922 f->close_section();
1923 f->flush(cmdctx->odata);
1924 } else {
1925 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
1926 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
1927 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
1928 tbl.define_column("LIFE EXPECTANCY", TextTable::LEFT, TextTable::LEFT);
1929 auto now = ceph_clock_now();
1930 daemon_state.with_devices([&tbl, now](const DeviceState& dev) {
1931 string h;
1932 for (auto& i : dev.devnames) {
1933 if (h.size()) {
1934 h += " ";
1935 }
1936 h += i.first + ":" + i.second;
1937 }
1938 string d;
1939 for (auto& i : dev.daemons) {
1940 if (d.size()) {
1941 d += " ";
1942 }
1943 d += to_string(i);
1944 }
1945 tbl << dev.devid
1946 << h
1947 << d
1948 << dev.get_life_expectancy_str(now)
1949 << TextTable::endrow;
1950 });
1951 cmdctx->odata.append(stringify(tbl));
1952 }
1953 cmdctx->reply(0, ss);
1954 return true;
1955 } else if (prefix == "device ls-by-daemon") {
1956 string who;
1957 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", who);
1958 DaemonKey k;
1959 if (!key_from_string(who, &k)) {
1960 ss << who << " is not a valid daemon name";
1961 r = -EINVAL;
1962 } else {
1963 auto dm = daemon_state.get(k);
1964 if (dm) {
1965 if (f) {
1966 f->open_array_section("devices");
1967 for (auto& i : dm->devices) {
1968 daemon_state.with_device(i.first, [&f] (const DeviceState& dev) {
1969 f->dump_object("device", dev);
1970 });
1971 }
1972 f->close_section();
1973 f->flush(cmdctx->odata);
1974 } else {
1975 TextTable tbl;
1976 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
1977 tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
1978 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT,
1979 TextTable::LEFT);
1980 auto now = ceph_clock_now();
1981 for (auto& i : dm->devices) {
1982 daemon_state.with_device(
1983 i.first, [&tbl, now] (const DeviceState& dev) {
1984 string h;
1985 for (auto& i : dev.devnames) {
1986 if (h.size()) {
1987 h += " ";
1988 }
1989 h += i.first + ":" + i.second;
1990 }
1991 tbl << dev.devid
1992 << h
1993 << dev.get_life_expectancy_str(now)
1994 << TextTable::endrow;
1995 });
1996 }
1997 cmdctx->odata.append(stringify(tbl));
1998 }
1999 } else {
2000 r = -ENOENT;
2001 ss << "daemon " << who << " not found";
2002 }
2003 cmdctx->reply(r, ss);
2004 }
2005 } else if (prefix == "device ls-by-host") {
2006 string host;
2007 cmd_getval(g_ceph_context, cmdctx->cmdmap, "host", host);
2008 set<string> devids;
2009 daemon_state.list_devids_by_server(host, &devids);
2010 if (f) {
2011 f->open_array_section("devices");
2012 for (auto& devid : devids) {
2013 daemon_state.with_device(
2014 devid, [&f] (const DeviceState& dev) {
2015 f->dump_object("device", dev);
2016 });
2017 }
2018 f->close_section();
2019 f->flush(cmdctx->odata);
2020 } else {
2021 TextTable tbl;
2022 tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
2023 tbl.define_column("DEV", TextTable::LEFT, TextTable::LEFT);
2024 tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
2025 tbl.define_column("EXPECTED FAILURE", TextTable::LEFT, TextTable::LEFT);
2026 auto now = ceph_clock_now();
2027 for (auto& devid : devids) {
2028 daemon_state.with_device(
2029 devid, [&tbl, &host, now] (const DeviceState& dev) {
2030 string n;
2031 for (auto& j : dev.devnames) {
2032 if (j.first == host) {
2033 if (n.size()) {
2034 n += " ";
2035 }
2036 n += j.second;
2037 }
2038 }
2039 string d;
2040 for (auto& i : dev.daemons) {
2041 if (d.size()) {
2042 d += " ";
2043 }
2044 d += to_string(i);
2045 }
2046 tbl << dev.devid
2047 << n
2048 << d
2049 << dev.get_life_expectancy_str(now)
2050 << TextTable::endrow;
2051 });
2052 }
2053 cmdctx->odata.append(stringify(tbl));
2054 }
2055 cmdctx->reply(0, ss);
2056 return true;
2057 } else if (prefix == "device info") {
2058 string devid;
2059 cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
2060 int r = 0;
2061 ostringstream rs;
2062 if (!daemon_state.with_device(devid,
2063 [&f, &rs] (const DeviceState& dev) {
2064 if (f) {
2065 f->dump_object("device", dev);
2066 } else {
2067 dev.print(rs);
2068 }
2069 })) {
2070 ss << "device " << devid << " not found";
2071 r = -ENOENT;
2072 } else {
2073 if (f) {
2074 f->flush(cmdctx->odata);
2075 } else {
2076 cmdctx->odata.append(rs.str());
2077 }
2078 }
2079 cmdctx->reply(r, ss);
2080 return true;
2081 } else if (prefix == "device set-life-expectancy") {
2082 string devid;
2083 cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
2084 string from_str, to_str;
2085 cmd_getval(g_ceph_context, cmdctx->cmdmap, "from", from_str);
2086 cmd_getval(g_ceph_context, cmdctx->cmdmap, "to", to_str);
2087 utime_t from, to;
2088 if (!from.parse(from_str)) {
2089 ss << "unable to parse datetime '" << from_str << "'";
2090 r = -EINVAL;
2091 cmdctx->reply(r, ss);
2092 } else if (to_str.size() && !to.parse(to_str)) {
2093 ss << "unable to parse datetime '" << to_str << "'";
2094 r = -EINVAL;
2095 cmdctx->reply(r, ss);
2096 } else {
2097 map<string,string> meta;
2098 daemon_state.with_device_create(
2099 devid,
2100 [from, to, &meta] (DeviceState& dev) {
2101 dev.set_life_expectancy(from, to, ceph_clock_now());
2102 meta = dev.metadata;
2103 });
2104 json_spirit::Object json_object;
2105 for (auto& i : meta) {
2106 json_spirit::Config::add(json_object, i.first, i.second);
2107 }
2108 bufferlist json;
2109 json.append(json_spirit::write(json_object));
2110 const string cmd =
2111 "{"
2112 "\"prefix\": \"config-key set\", "
2113 "\"key\": \"device/" + devid + "\""
2114 "}";
2115 auto on_finish = new ReplyOnFinish(cmdctx);
2116 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2117 }
2118 return true;
2119 } else if (prefix == "device rm-life-expectancy") {
2120 string devid;
2121 cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
2122 map<string,string> meta;
2123 if (daemon_state.with_device_write(devid, [&meta] (DeviceState& dev) {
2124 dev.rm_life_expectancy();
2125 meta = dev.metadata;
2126 })) {
2127 string cmd;
2128 bufferlist json;
2129 if (meta.empty()) {
2130 cmd =
2131 "{"
2132 "\"prefix\": \"config-key rm\", "
2133 "\"key\": \"device/" + devid + "\""
2134 "}";
2135 } else {
2136 json_spirit::Object json_object;
2137 for (auto& i : meta) {
2138 json_spirit::Config::add(json_object, i.first, i.second);
2139 }
2140 json.append(json_spirit::write(json_object));
2141 cmd =
2142 "{"
2143 "\"prefix\": \"config-key set\", "
2144 "\"key\": \"device/" + devid + "\""
2145 "}";
2146 }
2147 auto on_finish = new ReplyOnFinish(cmdctx);
2148 monc->start_mon_command({cmd}, json, nullptr, nullptr, on_finish);
2149 } else {
2150 cmdctx->reply(0, ss);
2151 }
2152 return true;
2153 } else {
2154 if (!pgmap_ready) {
2155 ss << "Warning: due to ceph-mgr restart, some PG states may not be up to date\n";
2156 }
2157 if (f) {
2158 f->open_object_section("pg_info");
2159 f->dump_bool("pg_ready", pgmap_ready);
2160 }
2161
2162 // fall back to feeding command to PGMap
2163 r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2164 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
2165 f.get(), &ss, &cmdctx->odata);
2166 });
2167
2168 if (f) {
2169 f->close_section();
2170 }
2171 if (r != -EOPNOTSUPP) {
2172 if (f) {
2173 f->flush(cmdctx->odata);
2174 }
2175 cmdctx->reply(r, ss);
2176 return true;
2177 }
2178 }
2179
2180 // Resolve the command to the name of the module that will
2181 // handle it (if the command exists)
2182 std::string handler_name;
2183 auto py_commands = py_modules.get_py_commands();
2184 for (const auto &pyc : py_commands) {
2185 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
2186 if (pyc_prefix == prefix) {
2187 handler_name = pyc.module_name;
2188 break;
2189 }
2190 }
2191
2192 // Was the command unfound?
2193 if (handler_name.empty()) {
2194 ss << "No handler found for '" << prefix << "'";
2195 dout(4) << "No handler found for '" << prefix << "'" << dendl;
2196 cmdctx->reply(-EINVAL, ss);
2197 return true;
2198 }
2199
2200 dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
2201 finisher.queue(new FunctionContext([this, cmdctx, handler_name, prefix](int r_) {
2202 std::stringstream ss;
2203
2204 // Validate that the module is enabled
2205 PyModuleRef module = py_modules.get_module(handler_name);
2206 ceph_assert(module);
2207 if (!module->is_enabled()) {
2208 ss << "Module '" << handler_name << "' is not enabled (required by "
2209 "command '" << prefix << "'): use `ceph mgr module enable "
2210 << handler_name << "` to enable it";
2211 dout(4) << ss.str() << dendl;
2212 cmdctx->reply(-EOPNOTSUPP, ss);
2213 return;
2214 }
2215
2216 // Hack: allow the self-test method to run on unhealthy modules.
2217 // Fix this in future by creating a special path for self test rather
2218 // than having the hook be a normal module command.
2219 std::string self_test_prefix = handler_name + " " + "self-test";
2220
2221 // Validate that the module is healthy
2222 bool accept_command;
2223 if (module->is_loaded()) {
2224 if (module->get_can_run() && !module->is_failed()) {
2225 // Healthy module
2226 accept_command = true;
2227 } else if (self_test_prefix == prefix) {
2228 // Unhealthy, but allow because it's a self test command
2229 accept_command = true;
2230 } else {
2231 accept_command = false;
2232 ss << "Module '" << handler_name << "' has experienced an error and "
2233 "cannot handle commands: " << module->get_error_string();
2234 }
2235 } else {
2236 // Module not loaded
2237 accept_command = false;
2238 ss << "Module '" << handler_name << "' failed to load and "
2239 "cannot handle commands: " << module->get_error_string();
2240 }
2241
2242 if (!accept_command) {
2243 dout(4) << ss.str() << dendl;
2244 cmdctx->reply(-EIO, ss);
2245 return;
2246 }
2247
2248 std::stringstream ds;
2249 bufferlist inbl = cmdctx->m->get_data();
2250 int r = py_modules.handle_command(handler_name, cmdctx->cmdmap, inbl, &ds, &ss);
2251 cmdctx->odata.append(ds);
2252 cmdctx->reply(r, ss);
2253 }));
2254 return true;
2255 }
2256
2257 void DaemonServer::_prune_pending_service_map()
2258 {
2259 utime_t cutoff = ceph_clock_now();
2260 cutoff -= g_conf().get_val<double>("mgr_service_beacon_grace");
2261 auto p = pending_service_map.services.begin();
2262 while (p != pending_service_map.services.end()) {
2263 auto q = p->second.daemons.begin();
2264 while (q != p->second.daemons.end()) {
2265 DaemonKey key(p->first, q->first);
2266 if (!daemon_state.exists(key)) {
2267 derr << "missing key " << key << dendl;
2268 ++q;
2269 continue;
2270 }
2271 auto daemon = daemon_state.get(key);
2272 std::lock_guard l(daemon->lock);
2273 if (daemon->last_service_beacon == utime_t()) {
2274 // we must have just restarted; assume they are alive now.
2275 daemon->last_service_beacon = ceph_clock_now();
2276 ++q;
2277 continue;
2278 }
2279 if (daemon->last_service_beacon < cutoff) {
2280 dout(10) << "pruning stale " << p->first << "." << q->first
2281 << " last_beacon " << daemon->last_service_beacon << dendl;
2282 q = p->second.daemons.erase(q);
2283 pending_service_map_dirty = pending_service_map.epoch;
2284 } else {
2285 ++q;
2286 }
2287 }
2288 if (p->second.daemons.empty()) {
2289 p = pending_service_map.services.erase(p);
2290 pending_service_map_dirty = pending_service_map.epoch;
2291 } else {
2292 ++p;
2293 }
2294 }
2295 }
2296
2297 void DaemonServer::send_report()
2298 {
2299 if (!pgmap_ready) {
2300 if (ceph_clock_now() - started_at > g_conf().get_val<int64_t>("mgr_stats_period") * 4.0) {
2301 pgmap_ready = true;
2302 reported_osds.clear();
2303 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
2304 << "potentially incomplete PG state to mon" << dendl;
2305 } else {
2306 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
2307 << dendl;
2308 return;
2309 }
2310 }
2311
2312 auto m = new MMonMgrReport();
2313 py_modules.get_health_checks(&m->health_checks);
2314 py_modules.get_progress_events(&m->progress_events);
2315
2316 cluster_state.with_mutable_pgmap([&](PGMap& pg_map) {
2317 cluster_state.update_delta_stats();
2318
2319 if (pending_service_map.epoch) {
2320 _prune_pending_service_map();
2321 if (pending_service_map_dirty >= pending_service_map.epoch) {
2322 pending_service_map.modified = ceph_clock_now();
2323 encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
2324 dout(10) << "sending service_map e" << pending_service_map.epoch
2325 << dendl;
2326 pending_service_map.epoch++;
2327 }
2328 }
2329
2330 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
2331 // FIXME: no easy way to get mon features here. this will do for
2332 // now, though, as long as we don't make a backward-incompat change.
2333 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
2334 dout(10) << pg_map << dendl;
2335
2336 pg_map.get_health_checks(g_ceph_context, osdmap,
2337 &m->health_checks);
2338
2339 dout(10) << m->health_checks.checks.size() << " health checks"
2340 << dendl;
2341 dout(20) << "health checks:\n";
2342 JSONFormatter jf(true);
2343 jf.dump_object("health_checks", m->health_checks);
2344 jf.flush(*_dout);
2345 *_dout << dendl;
2346 if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2347 clog->debug() << "pgmap v" << pg_map.version << ": " << pg_map;
2348 }
2349 });
2350 });
2351
2352 map<daemon_metric, unique_ptr<DaemonHealthMetricCollector>> accumulated;
2353 for (auto service : {"osd", "mon"} ) {
2354 auto daemons = daemon_state.get_by_service(service);
2355 for (const auto& [key,state] : daemons) {
2356 std::lock_guard l{state->lock};
2357 for (const auto& metric : state->daemon_health_metrics) {
2358 auto acc = accumulated.find(metric.get_type());
2359 if (acc == accumulated.end()) {
2360 auto collector = DaemonHealthMetricCollector::create(metric.get_type());
2361 if (!collector) {
2362 derr << __func__ << " " << key.first << "." << key.second
2363 << " sent me an unknown health metric: "
2364 << std::hex << static_cast<uint8_t>(metric.get_type())
2365 << std::dec << dendl;
2366 continue;
2367 }
2368 dout(20) << " + " << state->key << " "
2369 << metric << dendl;
2370 tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
2371 std::move(collector));
2372 }
2373 acc->second->update(key, metric);
2374 }
2375 }
2376 }
2377 for (const auto& acc : accumulated) {
2378 acc.second->summarize(m->health_checks);
2379 }
2380 // TODO? We currently do not notify the PyModules
2381 // TODO: respect needs_send, so we send the report only if we are asked to do
2382 // so, or the state is updated.
2383 monc->send_mon_message(m);
2384 }
2385
2386 void DaemonServer::adjust_pgs()
2387 {
2388 dout(20) << dendl;
2389 unsigned max = std::max<int64_t>(1, g_conf()->mon_osd_max_creating_pgs);
2390 double max_misplaced = g_conf().get_val<double>("target_max_misplaced_ratio");
2391 bool aggro = g_conf().get_val<bool>("mgr_debug_aggressive_pg_num_changes");
2392
2393 map<string,unsigned> pg_num_to_set;
2394 map<string,unsigned> pgp_num_to_set;
2395 set<pg_t> upmaps_to_clear;
2396 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
2397 unsigned creating_or_unknown = 0;
2398 for (auto& i : pg_map.num_pg_by_state) {
2399 if ((i.first & (PG_STATE_CREATING)) ||
2400 i.first == 0) {
2401 creating_or_unknown += i.second;
2402 }
2403 }
2404 unsigned left = max;
2405 if (creating_or_unknown >= max) {
2406 return;
2407 }
2408 left -= creating_or_unknown;
2409 dout(10) << "creating_or_unknown " << creating_or_unknown
2410 << " max_creating " << max
2411 << " left " << left
2412 << dendl;
2413
2414 // FIXME: These checks are fundamentally racy given that adjust_pgs()
2415 // can run more frequently than we get updated pg stats from OSDs. We
2416 // may make multiple adjustments with stale informaiton.
2417 double misplaced_ratio, degraded_ratio;
2418 double inactive_pgs_ratio, unknown_pgs_ratio;
2419 pg_map.get_recovery_stats(&misplaced_ratio, &degraded_ratio,
2420 &inactive_pgs_ratio, &unknown_pgs_ratio);
2421 dout(20) << "misplaced_ratio " << misplaced_ratio
2422 << " degraded_ratio " << degraded_ratio
2423 << " inactive_pgs_ratio " << inactive_pgs_ratio
2424 << " unknown_pgs_ratio " << unknown_pgs_ratio
2425 << "; target_max_misplaced_ratio " << max_misplaced
2426 << dendl;
2427
2428 for (auto& i : osdmap.get_pools()) {
2429 const pg_pool_t& p = i.second;
2430
2431 // adjust pg_num?
2432 if (p.get_pg_num_target() != p.get_pg_num()) {
2433 dout(20) << "pool " << i.first
2434 << " pg_num " << p.get_pg_num()
2435 << " target " << p.get_pg_num_target()
2436 << dendl;
2437 if (p.has_flag(pg_pool_t::FLAG_CREATING)) {
2438 dout(10) << "pool " << i.first
2439 << " pg_num_target " << p.get_pg_num_target()
2440 << " pg_num " << p.get_pg_num()
2441 << " - still creating initial pgs"
2442 << dendl;
2443 } else if (p.get_pg_num_target() < p.get_pg_num()) {
2444 // pg_num decrease (merge)
2445 pg_t merge_source(p.get_pg_num() - 1, i.first);
2446 pg_t merge_target = merge_source.get_parent();
2447 bool ok = true;
2448
2449 if (p.get_pg_num() != p.get_pg_num_pending()) {
2450 dout(10) << "pool " << i.first
2451 << " pg_num_target " << p.get_pg_num_target()
2452 << " pg_num " << p.get_pg_num()
2453 << " - decrease and pg_num_pending != pg_num, waiting"
2454 << dendl;
2455 ok = false;
2456 } else if (p.get_pg_num() == p.get_pgp_num()) {
2457 dout(10) << "pool " << i.first
2458 << " pg_num_target " << p.get_pg_num_target()
2459 << " pg_num " << p.get_pg_num()
2460 << " - decrease blocked by pgp_num "
2461 << p.get_pgp_num()
2462 << dendl;
2463 ok = false;
2464 }
2465 for (auto &merge_participant : {merge_source, merge_target}) {
2466 bool is_merge_source = merge_participant == merge_source;
2467 if (osdmap.have_pg_upmaps(merge_participant)) {
2468 dout(10) << "pool " << i.first
2469 << " pg_num_target " << p.get_pg_num_target()
2470 << " pg_num " << p.get_pg_num()
2471 << (is_merge_source ? " - merge source " : " - merge target ")
2472 << merge_participant
2473 << " has upmap" << dendl;
2474 upmaps_to_clear.insert(merge_participant);
2475 ok = false;
2476 }
2477 auto q = pg_map.pg_stat.find(merge_participant);
2478 if (q == pg_map.pg_stat.end()) {
2479 dout(10) << "pool " << i.first
2480 << " pg_num_target " << p.get_pg_num_target()
2481 << " pg_num " << p.get_pg_num()
2482 << " - no state for " << merge_participant
2483 << (is_merge_source ? " (merge source)" : " (merge target)")
2484 << dendl;
2485 ok = false;
2486 } else if ((q->second.state & (PG_STATE_ACTIVE | PG_STATE_CLEAN)) !=
2487 (PG_STATE_ACTIVE | PG_STATE_CLEAN)) {
2488 dout(10) << "pool " << i.first
2489 << " pg_num_target " << p.get_pg_num_target()
2490 << " pg_num " << p.get_pg_num()
2491 << (is_merge_source ? " - merge source " : " - merge target ")
2492 << merge_participant
2493 << " not clean (" << pg_state_string(q->second.state)
2494 << ")" << dendl;
2495 ok = false;
2496 }
2497 }
2498
2499 if (ok) {
2500 unsigned target = p.get_pg_num() - 1;
2501 dout(10) << "pool " << i.first
2502 << " pg_num_target " << p.get_pg_num_target()
2503 << " pg_num " << p.get_pg_num()
2504 << " -> " << target
2505 << " (merging " << merge_source
2506 << " and " << merge_target
2507 << ")" << dendl;
2508 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2509 }
2510 } else if (p.get_pg_num_target() > p.get_pg_num()) {
2511 // pg_num increase (split)
2512 bool active = true;
2513 auto q = pg_map.num_pg_by_pool_state.find(i.first);
2514 if (q != pg_map.num_pg_by_pool_state.end()) {
2515 for (auto& j : q->second) {
2516 if ((j.first & (PG_STATE_ACTIVE|PG_STATE_PEERED)) == 0) {
2517 dout(20) << "pool " << i.first << " has " << j.second
2518 << " pgs in " << pg_state_string(j.first)
2519 << dendl;
2520 active = false;
2521 break;
2522 }
2523 }
2524 } else {
2525 active = false;
2526 }
2527 if (!active) {
2528 dout(10) << "pool " << i.first
2529 << " pg_num_target " << p.get_pg_num_target()
2530 << " pg_num " << p.get_pg_num()
2531 << " - not all pgs active"
2532 << dendl;
2533 } else {
2534 unsigned add = std::min(
2535 left,
2536 p.get_pg_num_target() - p.get_pg_num());
2537 unsigned target = p.get_pg_num() + add;
2538 left -= add;
2539 dout(10) << "pool " << i.first
2540 << " pg_num_target " << p.get_pg_num_target()
2541 << " pg_num " << p.get_pg_num()
2542 << " -> " << target << dendl;
2543 pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
2544 }
2545 }
2546 }
2547
2548 // adjust pgp_num?
2549 unsigned target = std::min(p.get_pg_num_pending(),
2550 p.get_pgp_num_target());
2551 if (target != p.get_pgp_num()) {
2552 dout(20) << "pool " << i.first
2553 << " pgp_num_target " << p.get_pgp_num_target()
2554 << " pgp_num " << p.get_pgp_num()
2555 << " -> " << target << dendl;
2556 if (target > p.get_pgp_num() &&
2557 p.get_pgp_num() == p.get_pg_num()) {
2558 dout(10) << "pool " << i.first
2559 << " pgp_num_target " << p.get_pgp_num_target()
2560 << " pgp_num " << p.get_pgp_num()
2561 << " - increase blocked by pg_num " << p.get_pg_num()
2562 << dendl;
2563 } else if (!aggro && (inactive_pgs_ratio > 0 ||
2564 degraded_ratio > 0 ||
2565 unknown_pgs_ratio > 0)) {
2566 dout(10) << "pool " << i.first
2567 << " pgp_num_target " << p.get_pgp_num_target()
2568 << " pgp_num " << p.get_pgp_num()
2569 << " - inactive|degraded|unknown pgs, deferring pgp_num"
2570 << " update" << dendl;
2571 } else if (!aggro && (misplaced_ratio > max_misplaced)) {
2572 dout(10) << "pool " << i.first
2573 << " pgp_num_target " << p.get_pgp_num_target()
2574 << " pgp_num " << p.get_pgp_num()
2575 << " - misplaced_ratio " << misplaced_ratio
2576 << " > max " << max_misplaced
2577 << ", deferring pgp_num update" << dendl;
2578 } else {
2579 // NOTE: this calculation assumes objects are
2580 // basically uniformly distributed across all PGs
2581 // (regardless of pool), which is probably not
2582 // perfectly correct, but it's a start. make no
2583 // single adjustment that's more than half of the
2584 // max_misplaced, to somewhat limit the magnitude of
2585 // our potential error here.
2586 int next;
2587 if (aggro) {
2588 next = target;
2589 } else {
2590 double room =
2591 std::min<double>(max_misplaced - misplaced_ratio,
2592 misplaced_ratio / 2.0);
2593 unsigned estmax = std::max<unsigned>(
2594 (double)p.get_pg_num() * room, 1u);
2595 int delta = target - p.get_pgp_num();
2596 next = p.get_pgp_num();
2597 if (delta < 0) {
2598 next += std::max<int>(-estmax, delta);
2599 } else {
2600 next += std::min<int>(estmax, delta);
2601 }
2602 dout(20) << " room " << room << " estmax " << estmax
2603 << " delta " << delta << " next " << next << dendl;
2604 if (p.get_pgp_num_target() == p.get_pg_num_target()) {
2605 // since pgp_num is tracking pg_num, ceph is handling
2606 // pgp_num. so, be responsible: don't let pgp_num get
2607 // too far out ahead of merges (if we are merging).
2608 // this avoids moving lots of unmerged pgs onto a
2609 // small number of OSDs where we might blow out the
2610 // per-osd pg max.
2611 unsigned max_outpace_merges =
2612 std::max<unsigned>(8, p.get_pg_num() * max_misplaced);
2613 if (next + max_outpace_merges < p.get_pg_num()) {
2614 next = p.get_pg_num() - max_outpace_merges;
2615 dout(10) << " using next " << next
2616 << " to avoid outpacing merges (max_outpace_merges "
2617 << max_outpace_merges << ")" << dendl;
2618 }
2619 }
2620 }
2621 dout(10) << "pool " << i.first
2622 << " pgp_num_target " << p.get_pgp_num_target()
2623 << " pgp_num " << p.get_pgp_num()
2624 << " -> " << next << dendl;
2625 pgp_num_to_set[osdmap.get_pool_name(i.first)] = next;
2626 }
2627 }
2628 if (left == 0) {
2629 return;
2630 }
2631 }
2632 });
2633 for (auto i : pg_num_to_set) {
2634 const string cmd =
2635 "{"
2636 "\"prefix\": \"osd pool set\", "
2637 "\"pool\": \"" + i.first + "\", "
2638 "\"var\": \"pg_num_actual\", "
2639 "\"val\": \"" + stringify(i.second) + "\""
2640 "}";
2641 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2642 }
2643 for (auto i : pgp_num_to_set) {
2644 const string cmd =
2645 "{"
2646 "\"prefix\": \"osd pool set\", "
2647 "\"pool\": \"" + i.first + "\", "
2648 "\"var\": \"pgp_num_actual\", "
2649 "\"val\": \"" + stringify(i.second) + "\""
2650 "}";
2651 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2652 }
2653 for (auto pg : upmaps_to_clear) {
2654 const string cmd =
2655 "{"
2656 "\"prefix\": \"osd rm-pg-upmap\", "
2657 "\"pgid\": \"" + stringify(pg) + "\""
2658 "}";
2659 monc->start_mon_command({cmd}, {}, nullptr, nullptr, nullptr);
2660 const string cmd2 =
2661 "{"
2662 "\"prefix\": \"osd rm-pg-upmap-items\", "
2663 "\"pgid\": \"" + stringify(pg) + "\"" +
2664 "}";
2665 monc->start_mon_command({cmd2}, {}, nullptr, nullptr, nullptr);
2666 }
2667 }
2668
2669 void DaemonServer::got_service_map()
2670 {
2671 std::lock_guard l(lock);
2672
2673 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
2674 if (pending_service_map.epoch == 0) {
2675 // we just started up
2676 dout(10) << "got initial map e" << service_map.epoch << dendl;
2677 pending_service_map = service_map;
2678 } else {
2679 // we we already active and therefore must have persisted it,
2680 // which means ours is the same or newer.
2681 dout(10) << "got updated map e" << service_map.epoch << dendl;
2682 }
2683 pending_service_map.epoch = service_map.epoch + 1;
2684 });
2685
2686 // cull missing daemons, populate new ones
2687 for (auto& p : pending_service_map.services) {
2688 std::set<std::string> names;
2689 for (auto& q : p.second.daemons) {
2690 names.insert(q.first);
2691 DaemonKey key(p.first, q.first);
2692 if (!daemon_state.exists(key)) {
2693 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
2694 daemon->key = key;
2695 daemon->set_metadata(q.second.metadata);
2696 if (q.second.metadata.count("hostname")) {
2697 daemon->hostname = q.second.metadata["hostname"];
2698 }
2699 daemon->service_daemon = true;
2700 daemon_state.insert(daemon);
2701 dout(10) << "added missing " << key << dendl;
2702 }
2703 }
2704 daemon_state.cull(p.first, names);
2705 }
2706 }
2707
2708 void DaemonServer::got_mgr_map()
2709 {
2710 std::lock_guard l(lock);
2711 set<std::string> have;
2712 cluster_state.with_mgrmap([&](const MgrMap& mgrmap) {
2713 auto md_update = [&] (DaemonKey key) {
2714 std::ostringstream oss;
2715 auto c = new MetadataUpdate(daemon_state, key);
2716 // FIXME remove post-nautilus: include 'id' for luminous mons
2717 oss << "{\"prefix\": \"mgr metadata\", \"who\": \""
2718 << key.second << "\", \"id\": \"" << key.second << "\"}";
2719 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
2720 };
2721 if (mgrmap.active_name.size()) {
2722 DaemonKey key("mgr", mgrmap.active_name);
2723 have.insert(mgrmap.active_name);
2724 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
2725 md_update(key);
2726 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
2727 }
2728 }
2729 for (auto& i : mgrmap.standbys) {
2730 DaemonKey key("mgr", i.second.name);
2731 have.insert(i.second.name);
2732 if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
2733 md_update(key);
2734 dout(10) << "triggered addition of " << key << " via metadata update" << dendl;
2735 }
2736 }
2737 });
2738 daemon_state.cull("mgr", have);
2739 }
2740
2741 const char** DaemonServer::get_tracked_conf_keys() const
2742 {
2743 static const char *KEYS[] = {
2744 "mgr_stats_threshold",
2745 "mgr_stats_period",
2746 nullptr
2747 };
2748
2749 return KEYS;
2750 }
2751
2752 void DaemonServer::handle_conf_change(const ConfigProxy& conf,
2753 const std::set <std::string> &changed)
2754 {
2755
2756 if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
2757 dout(4) << "Updating stats threshold/period on "
2758 << daemon_connections.size() << " clients" << dendl;
2759 // Send a fresh MMgrConfigure to all clients, so that they can follow
2760 // the new policy for transmitting stats
2761 finisher.queue(new FunctionContext([this](int r) {
2762 std::lock_guard l(lock);
2763 for (auto &c : daemon_connections) {
2764 _send_configure(c);
2765 }
2766 }));
2767 }
2768 }
2769
2770 void DaemonServer::_send_configure(ConnectionRef c)
2771 {
2772 ceph_assert(lock.is_locked_by_me());
2773
2774 auto configure = new MMgrConfigure();
2775 configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
2776 configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
2777
2778 if (c->peer_is_osd()) {
2779 configure->osd_perf_metric_queries =
2780 osd_perf_metric_collector.get_queries();
2781 }
2782
2783 c->send_message(configure);
2784 }
2785
2786 OSDPerfMetricQueryID DaemonServer::add_osd_perf_query(
2787 const OSDPerfMetricQuery &query,
2788 const std::optional<OSDPerfMetricLimit> &limit)
2789 {
2790 return osd_perf_metric_collector.add_query(query, limit);
2791 }
2792
2793 int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
2794 {
2795 return osd_perf_metric_collector.remove_query(query_id);
2796 }
2797
2798 int DaemonServer::get_osd_perf_counters(
2799 OSDPerfMetricQueryID query_id,
2800 std::map<OSDPerfMetricKey, PerformanceCounters> *counters)
2801 {
2802 return osd_perf_metric_collector.get_counters(query_id, counters);
2803 }