]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/DaemonServer.cc
update sources to v12.1.4
[ceph.git] / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "DaemonServer.h"
15
16 #include "include/str_list.h"
17 #include "auth/RotatingKeyRing.h"
18 #include "json_spirit/json_spirit_writer.h"
19
20 #include "mgr/mgr_commands.h"
21 #include "mon/MonCommand.h"
22
23 #include "messages/MMgrOpen.h"
24 #include "messages/MMgrConfigure.h"
25 #include "messages/MMonMgrReport.h"
26 #include "messages/MCommand.h"
27 #include "messages/MCommandReply.h"
28 #include "messages/MPGStats.h"
29 #include "messages/MOSDScrub.h"
30 #include "messages/MOSDForceRecovery.h"
31 #include "common/errno.h"
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mgr
35 #undef dout_prefix
36 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
37
38
39
40 DaemonServer::DaemonServer(MonClient *monc_,
41 Finisher &finisher_,
42 DaemonStateIndex &daemon_state_,
43 ClusterState &cluster_state_,
44 PyModules &py_modules_,
45 LogChannelRef clog_,
46 LogChannelRef audit_clog_)
47 : Dispatcher(g_ceph_context),
48 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
49 g_conf->mgr_client_bytes)),
50 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
51 g_conf->mgr_client_messages)),
52 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
53 g_conf->mgr_osd_bytes)),
54 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
55 g_conf->mgr_osd_messages)),
56 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
57 g_conf->mgr_mds_bytes)),
58 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
59 g_conf->mgr_mds_messages)),
60 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
61 g_conf->mgr_mon_bytes)),
62 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
63 g_conf->mgr_mon_messages)),
64 msgr(nullptr),
65 monc(monc_),
66 finisher(finisher_),
67 daemon_state(daemon_state_),
68 cluster_state(cluster_state_),
69 py_modules(py_modules_),
70 clog(clog_),
71 audit_clog(audit_clog_),
72 auth_registry(g_ceph_context,
73 g_conf->auth_supported.empty() ?
74 g_conf->auth_cluster_required :
75 g_conf->auth_supported),
76 lock("DaemonServer")
77 {}
78
79 DaemonServer::~DaemonServer() {
80 delete msgr;
81 }
82
83 int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
84 {
85 // Initialize Messenger
86 std::string public_msgr_type = g_conf->ms_public_type.empty() ?
87 g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
88 msgr = Messenger::create(g_ceph_context, public_msgr_type,
89 entity_name_t::MGR(gid),
90 "mgr",
91 getpid(), 0);
92 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
93
94 // throttle clients
95 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
96 client_byte_throttler.get(),
97 client_msg_throttler.get());
98
99 // servers
100 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
101 osd_byte_throttler.get(),
102 osd_msg_throttler.get());
103 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
104 mds_byte_throttler.get(),
105 mds_msg_throttler.get());
106 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
107 mon_byte_throttler.get(),
108 mon_msg_throttler.get());
109
110 int r = msgr->bind(g_conf->public_addr);
111 if (r < 0) {
112 derr << "unable to bind mgr to " << g_conf->public_addr << dendl;
113 return r;
114 }
115
116 msgr->set_myname(entity_name_t::MGR(gid));
117 msgr->set_addr_unknowns(client_addr);
118
119 msgr->start();
120 msgr->add_dispatcher_tail(this);
121
122 started_at = ceph_clock_now();
123
124 return 0;
125 }
126
127 entity_addr_t DaemonServer::get_myaddr() const
128 {
129 return msgr->get_myaddr();
130 }
131
132
133 bool DaemonServer::ms_verify_authorizer(Connection *con,
134 int peer_type,
135 int protocol,
136 ceph::bufferlist& authorizer_data,
137 ceph::bufferlist& authorizer_reply,
138 bool& is_valid,
139 CryptoKey& session_key)
140 {
141 auto handler = auth_registry.get_handler(protocol);
142 if (!handler) {
143 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
144 is_valid = false;
145 return true;
146 }
147
148 MgrSessionRef s(new MgrSession(cct));
149 s->inst.addr = con->get_peer_addr();
150 AuthCapsInfo caps_info;
151
152 RotatingKeyRing *keys = monc->rotating_secrets.get();
153 if (keys) {
154 is_valid = handler->verify_authorizer(
155 cct, keys,
156 authorizer_data,
157 authorizer_reply, s->entity_name,
158 s->global_id, caps_info,
159 session_key);
160 } else {
161 dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
162 is_valid = false;
163 }
164
165 if (is_valid) {
166 if (caps_info.allow_all) {
167 dout(10) << " session " << s << " " << s->entity_name
168 << " allow_all" << dendl;
169 s->caps.set_allow_all();
170 }
171 if (caps_info.caps.length() > 0) {
172 bufferlist::iterator p = caps_info.caps.begin();
173 string str;
174 try {
175 ::decode(str, p);
176 }
177 catch (buffer::error& e) {
178 }
179 bool success = s->caps.parse(str);
180 if (success) {
181 dout(10) << " session " << s << " " << s->entity_name
182 << " has caps " << s->caps << " '" << str << "'" << dendl;
183 } else {
184 dout(10) << " session " << s << " " << s->entity_name
185 << " failed to parse caps '" << str << "'" << dendl;
186 is_valid = false;
187 }
188 }
189 con->set_priv(s->get());
190
191 if (peer_type == CEPH_ENTITY_TYPE_OSD) {
192 Mutex::Locker l(lock);
193 s->osd_id = atoi(s->entity_name.get_id().c_str());
194 dout(10) << "registering osd." << s->osd_id << " session "
195 << s << " con " << con << dendl;
196 osd_cons[s->osd_id].insert(con);
197 }
198 }
199
200 return true;
201 }
202
203
204 bool DaemonServer::ms_get_authorizer(int dest_type,
205 AuthAuthorizer **authorizer, bool force_new)
206 {
207 dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
208
209 if (dest_type == CEPH_ENTITY_TYPE_MON) {
210 return true;
211 }
212
213 if (force_new) {
214 if (monc->wait_auth_rotating(10) < 0)
215 return false;
216 }
217
218 *authorizer = monc->build_authorizer(dest_type);
219 dout(20) << "got authorizer " << *authorizer << dendl;
220 return *authorizer != NULL;
221 }
222
223 bool DaemonServer::ms_handle_reset(Connection *con)
224 {
225 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
226 MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
227 if (!session) {
228 return false;
229 }
230 session->put(); // SessionRef takes a ref
231 Mutex::Locker l(lock);
232 dout(10) << "unregistering osd." << session->osd_id
233 << " session " << session << " con " << con << dendl;
234 osd_cons[session->osd_id].erase(con);
235 }
236 return false;
237 }
238
239 bool DaemonServer::ms_handle_refused(Connection *con)
240 {
241 // do nothing for now
242 return false;
243 }
244
245 bool DaemonServer::ms_dispatch(Message *m)
246 {
247 Mutex::Locker l(lock);
248
249 switch (m->get_type()) {
250 case MSG_PGSTATS:
251 cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
252 maybe_ready(m->get_source().num());
253 m->put();
254 return true;
255 case MSG_MGR_REPORT:
256 return handle_report(static_cast<MMgrReport*>(m));
257 case MSG_MGR_OPEN:
258 return handle_open(static_cast<MMgrOpen*>(m));
259 case MSG_COMMAND:
260 return handle_command(static_cast<MCommand*>(m));
261 default:
262 dout(1) << "Unhandled message type " << m->get_type() << dendl;
263 return false;
264 };
265 }
266
267 void DaemonServer::maybe_ready(int32_t osd_id)
268 {
269 if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
270 dout(4) << "initial report from osd " << osd_id << dendl;
271 reported_osds.insert(osd_id);
272 std::set<int32_t> up_osds;
273
274 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
275 osdmap.get_up_osds(up_osds);
276 });
277
278 std::set<int32_t> unreported_osds;
279 std::set_difference(up_osds.begin(), up_osds.end(),
280 reported_osds.begin(), reported_osds.end(),
281 std::inserter(unreported_osds, unreported_osds.begin()));
282
283 if (unreported_osds.size() == 0) {
284 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
285 pgmap_ready = true;
286 reported_osds.clear();
287 // Avoid waiting for next tick
288 send_report();
289 } else {
290 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
291 " to report in before PGMap is ready" << dendl;
292 }
293 }
294 }
295
296 void DaemonServer::shutdown()
297 {
298 dout(10) << "begin" << dendl;
299 msgr->shutdown();
300 msgr->wait();
301 dout(10) << "done" << dendl;
302 }
303
304
305
306 bool DaemonServer::handle_open(MMgrOpen *m)
307 {
308 DaemonKey key;
309 if (!m->service_name.empty()) {
310 key.first = m->service_name;
311 } else {
312 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
313 }
314 key.second = m->daemon_name;
315
316 dout(4) << "from " << m->get_connection() << " " << key << dendl;
317
318 auto configure = new MMgrConfigure();
319 configure->stats_period = g_conf->mgr_stats_period;
320 m->get_connection()->send_message(configure);
321
322 DaemonStatePtr daemon;
323 if (daemon_state.exists(key)) {
324 daemon = daemon_state.get(key);
325 }
326 if (daemon) {
327 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
328 Mutex::Locker l(daemon->lock);
329 daemon_state.get(key)->perf_counters.clear();
330 }
331
332 if (m->service_daemon) {
333 if (!daemon) {
334 dout(4) << "constructing new DaemonState for " << key << dendl;
335 daemon = std::make_shared<DaemonState>(daemon_state.types);
336 daemon->key = key;
337 if (m->daemon_metadata.count("hostname")) {
338 daemon->hostname = m->daemon_metadata["hostname"];
339 }
340 daemon_state.insert(daemon);
341 }
342 Mutex::Locker l(daemon->lock);
343 daemon->service_daemon = true;
344 daemon->metadata = m->daemon_metadata;
345 daemon->service_status = m->daemon_status;
346
347 utime_t now = ceph_clock_now();
348 auto d = pending_service_map.get_daemon(m->service_name,
349 m->daemon_name);
350 if (d->gid != (uint64_t)m->get_source().num()) {
351 dout(10) << "registering " << key << " in pending_service_map" << dendl;
352 d->gid = m->get_source().num();
353 d->addr = m->get_source_addr();
354 d->start_epoch = pending_service_map.epoch;
355 d->start_stamp = now;
356 d->metadata = m->daemon_metadata;
357 pending_service_map_dirty = pending_service_map.epoch;
358 }
359 }
360
361 m->put();
362 return true;
363 }
364
365 bool DaemonServer::handle_report(MMgrReport *m)
366 {
367 DaemonKey key;
368 if (!m->service_name.empty()) {
369 key.first = m->service_name;
370 } else {
371 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
372 }
373 key.second = m->daemon_name;
374
375 dout(4) << "from " << m->get_connection() << " " << key << dendl;
376
377 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
378 m->service_name.empty()) {
379 // Clients should not be sending us stats unless they are declaring
380 // themselves to be a daemon for some service.
381 dout(4) << "rejecting report from non-daemon client " << m->daemon_name
382 << dendl;
383 m->put();
384 return true;
385 }
386
387 DaemonStatePtr daemon;
388 if (daemon_state.exists(key)) {
389 dout(20) << "updating existing DaemonState for " << key << dendl;
390 daemon = daemon_state.get(key);
391 } else {
392 dout(4) << "constructing new DaemonState for " << key << dendl;
393 daemon = std::make_shared<DaemonState>(daemon_state.types);
394 // FIXME: crap, we don't know the hostname at this stage.
395 daemon->key = key;
396 daemon_state.insert(daemon);
397 // FIXME: we should avoid this case by rejecting MMgrReport from
398 // daemons without sessions, and ensuring that session open
399 // always contains metadata.
400 }
401 assert(daemon != nullptr);
402 auto &daemon_counters = daemon->perf_counters;
403 {
404 Mutex::Locker l(daemon->lock);
405 daemon_counters.update(m);
406 }
407 // if there are any schema updates, notify the python modules
408 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
409 ostringstream oss;
410 oss << key.first << '.' << key.second;
411 py_modules.notify_all("perf_schema_update", oss.str());
412 }
413
414 if (daemon->service_daemon) {
415 utime_t now = ceph_clock_now();
416 if (m->daemon_status) {
417 daemon->service_status = *m->daemon_status;
418 daemon->service_status_stamp = now;
419 }
420 daemon->last_service_beacon = now;
421 } else if (m->daemon_status) {
422 derr << "got status from non-daemon " << key << dendl;
423 }
424
425 m->put();
426 return true;
427 }
428
429
430 void DaemonServer::_generate_command_map(
431 map<string,cmd_vartype>& cmdmap,
432 map<string,string> &param_str_map)
433 {
434 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
435 p != cmdmap.end(); ++p) {
436 if (p->first == "prefix")
437 continue;
438 if (p->first == "caps") {
439 vector<string> cv;
440 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
441 cv.size() % 2 == 0) {
442 for (unsigned i = 0; i < cv.size(); i += 2) {
443 string k = string("caps_") + cv[i];
444 param_str_map[k] = cv[i + 1];
445 }
446 continue;
447 }
448 }
449 param_str_map[p->first] = cmd_vartype_stringify(p->second);
450 }
451 }
452
453 const MonCommand *DaemonServer::_get_mgrcommand(
454 const string &cmd_prefix,
455 const std::vector<MonCommand> &cmds)
456 {
457 const MonCommand *this_cmd = nullptr;
458 for (const auto &cmd : cmds) {
459 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
460 this_cmd = &cmd;
461 break;
462 }
463 }
464 return this_cmd;
465 }
466
467 bool DaemonServer::_allowed_command(
468 MgrSession *s,
469 const string &module,
470 const string &prefix,
471 const map<string,cmd_vartype>& cmdmap,
472 const map<string,string>& param_str_map,
473 const MonCommand *this_cmd) {
474
475 if (s->entity_name.is_mon()) {
476 // mon is all-powerful. even when it is forwarding commands on behalf of
477 // old clients; we expect the mon is validating commands before proxying!
478 return true;
479 }
480
481 bool cmd_r = this_cmd->requires_perm('r');
482 bool cmd_w = this_cmd->requires_perm('w');
483 bool cmd_x = this_cmd->requires_perm('x');
484
485 bool capable = s->caps.is_capable(
486 g_ceph_context,
487 CEPH_ENTITY_TYPE_MGR,
488 s->entity_name,
489 module, prefix, param_str_map,
490 cmd_r, cmd_w, cmd_x);
491
492 dout(10) << " " << s->entity_name << " "
493 << (capable ? "" : "not ") << "capable" << dendl;
494 return capable;
495 }
496
497 bool DaemonServer::handle_command(MCommand *m)
498 {
499 int r = 0;
500 std::stringstream ss;
501 std::string prefix;
502
503 assert(lock.is_locked_by_me());
504
505 /**
506 * The working data for processing an MCommand. This lives in
507 * a class to enable passing it into other threads for processing
508 * outside of the thread/locks that called handle_command.
509 */
510 class CommandContext
511 {
512 public:
513 MCommand *m;
514 bufferlist odata;
515 cmdmap_t cmdmap;
516
517 CommandContext(MCommand *m_)
518 : m(m_)
519 {
520 }
521
522 ~CommandContext()
523 {
524 m->put();
525 }
526
527 void reply(int r, const std::stringstream &ss)
528 {
529 reply(r, ss.str());
530 }
531
532 void reply(int r, const std::string &rs)
533 {
534 // Let the connection drop as soon as we've sent our response
535 ConnectionRef con = m->get_connection();
536 if (con) {
537 con->mark_disposable();
538 }
539
540 dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl;
541 if (con) {
542 MCommandReply *reply = new MCommandReply(r, rs);
543 reply->set_tid(m->get_tid());
544 reply->set_data(odata);
545 con->send_message(reply);
546 }
547 }
548 };
549
550 /**
551 * A context for receiving a bufferlist/error string from a background
552 * function and then calling back to a CommandContext when it's done
553 */
554 class ReplyOnFinish : public Context {
555 std::shared_ptr<CommandContext> cmdctx;
556
557 public:
558 bufferlist from_mon;
559 string outs;
560
561 ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
562 : cmdctx(cmdctx_)
563 {}
564 void finish(int r) override {
565 cmdctx->odata.claim_append(from_mon);
566 cmdctx->reply(r, outs);
567 }
568 };
569
570 std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
571
572 MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
573 if (!session) {
574 return true;
575 }
576 session->put(); // SessionRef takes a ref
577 if (session->inst.name == entity_name_t())
578 session->inst.name = m->get_source();
579
580 std::string format;
581 boost::scoped_ptr<Formatter> f;
582 map<string,string> param_str_map;
583
584 if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
585 cmdctx->reply(-EINVAL, ss);
586 return true;
587 }
588
589 {
590 cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
591 f.reset(Formatter::create(format));
592 }
593
594 cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
595
596 dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
597 dout(4) << "prefix=" << prefix << dendl;
598
599 if (prefix == "get_command_descriptions") {
600 dout(10) << "reading commands from python modules" << dendl;
601 const auto py_commands = py_modules.get_commands();
602
603 int cmdnum = 0;
604 JSONFormatter f;
605 f.open_object_section("command_descriptions");
606
607 auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){
608 ostringstream secname;
609 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
610 dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring,
611 mc.module, mc.req_perms, mc.availability, 0);
612 cmdnum++;
613 };
614
615 for (const auto &pyc : py_commands) {
616 dump_cmd(pyc);
617 }
618
619 for (const auto &mgr_cmd : mgr_commands) {
620 dump_cmd(mgr_cmd);
621 }
622
623 f.close_section(); // command_descriptions
624 f.flush(cmdctx->odata);
625 cmdctx->reply(0, ss);
626 return true;
627 }
628
629 // lookup command
630 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
631 _generate_command_map(cmdctx->cmdmap, param_str_map);
632 if (!mgr_cmd) {
633 MonCommand py_command = {"", "", "py", "rw", "cli"};
634 if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
635 param_str_map, &py_command)) {
636 dout(1) << " access denied" << dendl;
637 ss << "access denied; does your client key have mgr caps?"
638 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
639 cmdctx->reply(-EACCES, ss);
640 return true;
641 }
642 } else {
643 // validate user's permissions for requested command
644 if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
645 param_str_map, mgr_cmd)) {
646 dout(1) << " access denied" << dendl;
647 audit_clog->info() << "from='" << session->inst << "' "
648 << "entity='" << session->entity_name << "' "
649 << "cmd=" << m->cmd << ": access denied";
650 ss << "access denied' does your client key have mgr caps?"
651 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
652 cmdctx->reply(-EACCES, ss);
653 return true;
654 }
655 }
656
657 audit_clog->debug()
658 << "from='" << session->inst << "' "
659 << "entity='" << session->entity_name << "' "
660 << "cmd=" << m->cmd << ": dispatch";
661
662 // ----------------
663 // service map commands
664 if (prefix == "service dump") {
665 if (!f)
666 f.reset(Formatter::create("json-pretty"));
667 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
668 f->dump_object("service_map", service_map);
669 });
670 f->flush(cmdctx->odata);
671 cmdctx->reply(0, ss);
672 return true;
673 }
674 if (prefix == "service status") {
675 if (!f)
676 f.reset(Formatter::create("json-pretty"));
677 // only include state from services that are in the persisted service map
678 f->open_object_section("service_status");
679 ServiceMap s;
680 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
681 s = service_map;
682 });
683 for (auto& p : s.services) {
684 f->open_object_section(p.first.c_str());
685 for (auto& q : p.second.daemons) {
686 f->open_object_section(q.first.c_str());
687 DaemonKey key(p.first, q.first);
688 assert(daemon_state.exists(key));
689 auto daemon = daemon_state.get(key);
690 Mutex::Locker l(daemon->lock);
691 f->dump_stream("status_stamp") << daemon->service_status_stamp;
692 f->dump_stream("last_beacon") << daemon->last_service_beacon;
693 f->open_object_section("status");
694 for (auto& r : daemon->service_status) {
695 f->dump_string(r.first.c_str(), r.second);
696 }
697 f->close_section();
698 f->close_section();
699 }
700 f->close_section();
701 }
702 f->close_section();
703 f->flush(cmdctx->odata);
704 cmdctx->reply(0, ss);
705 return true;
706 }
707
708 // -----------
709 // PG commands
710
711 if (prefix == "pg scrub" ||
712 prefix == "pg repair" ||
713 prefix == "pg deep-scrub") {
714 string scrubop = prefix.substr(3, string::npos);
715 pg_t pgid;
716 string pgidstr;
717 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
718 if (!pgid.parse(pgidstr.c_str())) {
719 ss << "invalid pgid '" << pgidstr << "'";
720 cmdctx->reply(-EINVAL, ss);
721 return true;
722 }
723 bool pg_exists = false;
724 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
725 pg_exists = osdmap.pg_exists(pgid);
726 });
727 if (!pg_exists) {
728 ss << "pg " << pgid << " dne";
729 cmdctx->reply(-ENOENT, ss);
730 return true;
731 }
732 int acting_primary = -1;
733 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
734 acting_primary = osdmap.get_pg_acting_primary(pgid);
735 });
736 if (acting_primary == -1) {
737 ss << "pg " << pgid << " has no primary osd";
738 cmdctx->reply(-EAGAIN, ss);
739 return true;
740 }
741 auto p = osd_cons.find(acting_primary);
742 if (p == osd_cons.end()) {
743 ss << "pg " << pgid << " primary osd." << acting_primary
744 << " is not currently connected";
745 cmdctx->reply(-EAGAIN, ss);
746 }
747 vector<pg_t> pgs = { pgid };
748 for (auto& con : p->second) {
749 con->send_message(new MOSDScrub(monc->get_fsid(),
750 pgs,
751 scrubop == "repair",
752 scrubop == "deep-scrub"));
753 }
754 ss << "instructing pg " << pgid << " on osd." << acting_primary
755 << " to " << scrubop;
756 cmdctx->reply(0, ss);
757 return true;
758 } else if (prefix == "osd scrub" ||
759 prefix == "osd deep-scrub" ||
760 prefix == "osd repair") {
761 string whostr;
762 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
763 vector<string> pvec;
764 get_str_vec(prefix, pvec);
765
766 set<int> osds;
767 if (whostr == "*" || whostr == "all" || whostr == "any") {
768 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
769 for (int i = 0; i < osdmap.get_max_osd(); i++)
770 if (osdmap.is_up(i)) {
771 osds.insert(i);
772 }
773 });
774 } else {
775 long osd = parse_osd_id(whostr.c_str(), &ss);
776 if (osd < 0) {
777 ss << "invalid osd '" << whostr << "'";
778 cmdctx->reply(-EINVAL, ss);
779 return true;
780 }
781 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
782 if (osdmap.is_up(osd)) {
783 osds.insert(osd);
784 }
785 });
786 if (osds.empty()) {
787 ss << "osd." << osd << " is not up";
788 cmdctx->reply(-EAGAIN, ss);
789 return true;
790 }
791 }
792 set<int> sent_osds, failed_osds;
793 for (auto osd : osds) {
794 auto p = osd_cons.find(osd);
795 if (p == osd_cons.end()) {
796 failed_osds.insert(osd);
797 } else {
798 sent_osds.insert(osd);
799 for (auto& con : p->second) {
800 con->send_message(new MOSDScrub(monc->get_fsid(),
801 pvec.back() == "repair",
802 pvec.back() == "deep-scrub"));
803 }
804 }
805 }
806 if (failed_osds.size() == osds.size()) {
807 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
808 << " (not connected)";
809 r = -EAGAIN;
810 } else {
811 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
812 if (!failed_osds.empty()) {
813 ss << "; osd(s) " << failed_osds << " were not connected";
814 }
815 r = 0;
816 }
817 cmdctx->reply(0, ss);
818 return true;
819 } else if (prefix == "osd reweight-by-pg" ||
820 prefix == "osd reweight-by-utilization" ||
821 prefix == "osd test-reweight-by-pg" ||
822 prefix == "osd test-reweight-by-utilization") {
823 bool by_pg =
824 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
825 bool dry_run =
826 prefix == "osd test-reweight-by-pg" ||
827 prefix == "osd test-reweight-by-utilization";
828 int64_t oload;
829 cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
830 set<int64_t> pools;
831 vector<string> poolnames;
832 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
833 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
834 for (const auto& poolname : poolnames) {
835 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
836 if (pool < 0) {
837 ss << "pool '" << poolname << "' does not exist";
838 r = -ENOENT;
839 }
840 pools.insert(pool);
841 }
842 });
843 if (r) {
844 cmdctx->reply(r, ss);
845 return true;
846 }
847 double max_change = g_conf->mon_reweight_max_change;
848 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
849 if (max_change <= 0.0) {
850 ss << "max_change " << max_change << " must be positive";
851 cmdctx->reply(-EINVAL, ss);
852 return true;
853 }
854 int64_t max_osds = g_conf->mon_reweight_max_osds;
855 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
856 if (max_osds <= 0) {
857 ss << "max_osds " << max_osds << " must be positive";
858 cmdctx->reply(-EINVAL, ss);
859 return true;
860 }
861 string no_increasing;
862 cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
863 string out_str;
864 mempool::osdmap::map<int32_t, uint32_t> new_weights;
865 r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
866 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
867 return reweight::by_utilization(osdmap, pgmap,
868 oload,
869 max_change,
870 max_osds,
871 by_pg,
872 pools.empty() ? NULL : &pools,
873 no_increasing == "--no-increasing",
874 &new_weights,
875 &ss, &out_str, f.get());
876 });
877 });
878 if (r >= 0) {
879 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
880 }
881 if (f) {
882 f->flush(cmdctx->odata);
883 } else {
884 cmdctx->odata.append(out_str);
885 }
886 if (r < 0) {
887 ss << "FAILED reweight-by-pg";
888 cmdctx->reply(r, ss);
889 return true;
890 } else if (r == 0 || dry_run) {
891 ss << "no change";
892 cmdctx->reply(r, ss);
893 return true;
894 } else {
895 json_spirit::Object json_object;
896 for (const auto& osd_weight : new_weights) {
897 json_spirit::Config::add(json_object,
898 std::to_string(osd_weight.first),
899 std::to_string(osd_weight.second));
900 }
901 string s = json_spirit::write(json_object);
902 std::replace(begin(s), end(s), '\"', '\'');
903 const string cmd =
904 "{"
905 "\"prefix\": \"osd reweightn\", "
906 "\"weights\": \"" + s + "\""
907 "}";
908 auto on_finish = new ReplyOnFinish(cmdctx);
909 monc->start_mon_command({cmd}, {},
910 &on_finish->from_mon, &on_finish->outs, on_finish);
911 return true;
912 }
913 } else if (prefix == "osd df") {
914 string method;
915 cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
916 r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) {
917 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
918 print_osd_utilization(osdmap, &pgservice, ss,
919 f.get(), method == "tree");
920
921 cmdctx->odata.append(ss);
922 return 0;
923 });
924 });
925 cmdctx->reply(r, "");
926 return true;
927 } else if (prefix == "osd safe-to-destroy") {
928 vector<string> ids;
929 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
930 set<int> osds;
931 int r;
932 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
933 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
934 });
935 if (!r && osds.empty()) {
936 ss << "must specify one or more OSDs";
937 r = -EINVAL;
938 }
939 if (r < 0) {
940 cmdctx->reply(r, ss);
941 return true;
942 }
943 set<int> active_osds, missing_stats, stored_pgs;
944 int affected_pgs = 0;
945 cluster_state.with_pgmap([&](const PGMap& pg_map) {
946 if (pg_map.num_pg_unknown > 0) {
947 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
948 << " any conclusions";
949 r = -EAGAIN;
950 return;
951 }
952 int num_active_clean = 0;
953 for (auto& p : pg_map.num_pg_by_state) {
954 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
955 if ((p.first & want) == want) {
956 num_active_clean += p.second;
957 }
958 }
959 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
960 for (auto osd : osds) {
961 if (!osdmap.exists(osd)) {
962 continue; // clearly safe to destroy
963 }
964 auto q = pg_map.num_pg_by_osd.find(osd);
965 if (q != pg_map.num_pg_by_osd.end()) {
966 if (q->second.acting > 0 || q->second.up > 0) {
967 active_osds.insert(osd);
968 affected_pgs += q->second.acting + q->second.up;
969 continue;
970 }
971 }
972 if (num_active_clean < pg_map.num_pg) {
973 // all pgs aren't active+clean; we need to be careful.
974 auto p = pg_map.osd_stat.find(osd);
975 if (p == pg_map.osd_stat.end()) {
976 missing_stats.insert(osd);
977 }
978 if (p->second.num_pgs > 0) {
979 stored_pgs.insert(osd);
980 }
981 }
982 }
983 });
984 });
985 if (!r && !active_osds.empty()) {
986 ss << "OSD(s) " << active_osds << " have " << affected_pgs
987 << " pgs currently mapped to them";
988 r = -EBUSY;
989 } else if (!missing_stats.empty()) {
990 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
991 << " PGs are active+clean; we cannot draw any conclusions";
992 r = -EAGAIN;
993 } else if (!stored_pgs.empty()) {
994 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
995 << " data, and not all PGs are active+clean; we cannot be sure they"
996 << " aren't still needed.";
997 r = -EBUSY;
998 }
999 if (r) {
1000 cmdctx->reply(r, ss);
1001 return true;
1002 }
1003 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1004 << " durability.";
1005 cmdctx->reply(0, ss);
1006 return true;
1007 } else if (prefix == "osd ok-to-stop") {
1008 vector<string> ids;
1009 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1010 set<int> osds;
1011 int r;
1012 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1013 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1014 });
1015 if (!r && osds.empty()) {
1016 ss << "must specify one or more OSDs";
1017 r = -EINVAL;
1018 }
1019 if (r < 0) {
1020 cmdctx->reply(r, ss);
1021 return true;
1022 }
1023 map<pg_t,int> pg_delta; // pgid -> net acting set size change
1024 int dangerous_pgs = 0;
1025 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1026 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1027 if (pg_map.num_pg_unknown > 0) {
1028 ss << pg_map.num_pg_unknown << " pgs have unknown state; "
1029 << "cannot draw any conclusions";
1030 r = -EAGAIN;
1031 return;
1032 }
1033 for (auto osd : osds) {
1034 auto p = pg_map.pg_by_osd.find(osd);
1035 if (p != pg_map.pg_by_osd.end()) {
1036 for (auto& pgid : p->second) {
1037 --pg_delta[pgid];
1038 }
1039 }
1040 }
1041 for (auto& p : pg_delta) {
1042 auto q = pg_map.pg_stat.find(p.first);
1043 if (q == pg_map.pg_stat.end()) {
1044 ss << "missing information about " << p.first << "; cannot draw"
1045 << " any conclusions";
1046 r = -EAGAIN;
1047 return;
1048 }
1049 if (!(q->second.state & PG_STATE_ACTIVE) ||
1050 (q->second.state & PG_STATE_DEGRADED)) {
1051 // we don't currently have a good way to tell *how* degraded
1052 // a degraded PG is, so we have to assume we cannot remove
1053 // any more replicas/shards.
1054 ++dangerous_pgs;
1055 continue;
1056 }
1057 const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool());
1058 if (!pi) {
1059 ++dangerous_pgs; // pool is creating or deleting
1060 } else {
1061 if (q->second.acting.size() + p.second < pi->min_size) {
1062 ++dangerous_pgs;
1063 }
1064 }
1065 }
1066 });
1067 });
1068 if (r) {
1069 cmdctx->reply(r, ss);
1070 return true;
1071 }
1072 if (dangerous_pgs) {
1073 ss << dangerous_pgs << " PGs are already degraded or might become "
1074 << "unavailable";
1075 cmdctx->reply(-EBUSY, ss);
1076 return true;
1077 }
1078 ss << "OSD(s) " << osds << " are ok to stop without reducing"
1079 << " availability, provided there are no other concurrent failures"
1080 << " or interventions. " << pg_delta.size() << " PGs are likely to be"
1081 << " degraded (but remain available) as a result.";
1082 cmdctx->reply(0, ss);
1083 return true;
1084 } else if (prefix == "pg force-recovery" ||
1085 prefix == "pg force-backfill" ||
1086 prefix == "pg cancel-force-recovery" ||
1087 prefix == "pg cancel-force-backfill") {
1088 string forceop = prefix.substr(3, string::npos);
1089 list<pg_t> parsed_pgs;
1090 map<int, list<pg_t> > osdpgs;
1091
1092 // figure out actual op just once
1093 int actual_op = 0;
1094 if (forceop == "force-recovery") {
1095 actual_op = OFR_RECOVERY;
1096 } else if (forceop == "force-backfill") {
1097 actual_op = OFR_BACKFILL;
1098 } else if (forceop == "cancel-force-backfill") {
1099 actual_op = OFR_BACKFILL | OFR_CANCEL;
1100 } else if (forceop == "cancel-force-recovery") {
1101 actual_op = OFR_RECOVERY | OFR_CANCEL;
1102 }
1103
1104 // covnert pg names to pgs, discard any invalid ones while at it
1105 {
1106 // we don't want to keep pgidstr and pgidstr_nodup forever
1107 vector<string> pgidstr;
1108 // get pgids to process and prune duplicates
1109 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
1110 set<string> pgidstr_nodup(pgidstr.begin(), pgidstr.end());
1111 if (pgidstr.size() != pgidstr_nodup.size()) {
1112 // move elements only when there were duplicates, as this
1113 // reorders them
1114 pgidstr.resize(pgidstr_nodup.size());
1115 auto it = pgidstr_nodup.begin();
1116 for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) {
1117 pgidstr[i] = std::move(*it++);
1118 }
1119 }
1120
1121 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1122 for (auto& pstr : pgidstr) {
1123 pg_t parsed_pg;
1124 if (!parsed_pg.parse(pstr.c_str())) {
1125 ss << "invalid pgid '" << pstr << "'; ";
1126 r = -EINVAL;
1127 } else {
1128 auto workit = pg_map.pg_stat.find(parsed_pg);
1129 if (workit == pg_map.pg_stat.end()) {
1130 ss << "pg " << pstr << " not exists; ";
1131 r = -ENOENT;
1132 } else {
1133 pg_stat_t workpg = workit->second;
1134
1135 // discard pgs for which user requests are pointless
1136 switch (actual_op)
1137 {
1138 case OFR_RECOVERY:
1139 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) {
1140 // don't return error, user script may be racing with cluster. not fatal.
1141 ss << "pg " << pstr << " doesn't require recovery; ";
1142 continue;
1143 } else if (workpg.state & PG_STATE_FORCED_RECOVERY) {
1144 ss << "pg " << pstr << " recovery already forced; ";
1145 // return error, as it may be a bug in user script
1146 r = -EINVAL;
1147 continue;
1148 }
1149 break;
1150 case OFR_BACKFILL:
1151 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILL)) == 0) {
1152 ss << "pg " << pstr << " doesn't require backfilling; ";
1153 continue;
1154 } else if (workpg.state & PG_STATE_FORCED_BACKFILL) {
1155 ss << "pg " << pstr << " backfill already forced; ";
1156 r = -EINVAL;
1157 continue;
1158 }
1159 break;
1160 case OFR_BACKFILL | OFR_CANCEL:
1161 if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) {
1162 ss << "pg " << pstr << " backfill not forced; ";
1163 continue;
1164 }
1165 break;
1166 case OFR_RECOVERY | OFR_CANCEL:
1167 if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) {
1168 ss << "pg " << pstr << " recovery not forced; ";
1169 continue;
1170 }
1171 break;
1172 default:
1173 assert(0 == "actual_op value is not supported");
1174 }
1175
1176 parsed_pgs.push_back(std::move(parsed_pg));
1177 }
1178 }
1179 }
1180
1181 // group pgs to process by osd
1182 for (auto& pgid : parsed_pgs) {
1183 auto workit = pg_map.pg_stat.find(pgid);
1184 if (workit != pg_map.pg_stat.end()) {
1185 pg_stat_t workpg = workit->second;
1186 set<int32_t> osds(workpg.up.begin(), workpg.up.end());
1187 osds.insert(workpg.acting.begin(), workpg.acting.end());
1188 for (auto i : osds) {
1189 osdpgs[i].push_back(pgid);
1190 }
1191 }
1192 }
1193
1194 });
1195 }
1196
1197 // respond with error only when no pgs are correct
1198 // yes, in case of mixed errors, only the last one will be emitted,
1199 // but the message presented will be fine
1200 if (parsed_pgs.size() != 0) {
1201 // clear error to not confuse users/scripts
1202 r = 0;
1203 }
1204
1205 // optimize the command -> messages conversion, use only one message per distinct OSD
1206 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1207 for (auto& i : osdpgs) {
1208 if (osdmap.is_up(i.first)) {
1209 vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end()));
1210 auto p = osd_cons.find(i.first);
1211 if (p == osd_cons.end()) {
1212 ss << "osd." << i.first << " is not currently connected";
1213 r = -EAGAIN;
1214 continue;
1215 }
1216 for (auto& con : p->second) {
1217 con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op));
1218 }
1219 ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; ";
1220 }
1221 }
1222 });
1223 ss << std::endl;
1224 cmdctx->reply(r, ss);
1225 return true;
1226 } else {
1227 r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
1228 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1229 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
1230 f.get(), &ss, &cmdctx->odata);
1231 });
1232 });
1233
1234 if (r != -EOPNOTSUPP) {
1235 cmdctx->reply(r, ss);
1236 return true;
1237 }
1238 }
1239
1240 // None of the special native commands,
1241 MgrPyModule *handler = nullptr;
1242 auto py_commands = py_modules.get_py_commands();
1243 for (const auto &pyc : py_commands) {
1244 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1245 dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
1246 if (pyc_prefix == prefix) {
1247 handler = pyc.handler;
1248 break;
1249 }
1250 }
1251
1252 if (handler == nullptr) {
1253 ss << "No handler found for '" << prefix << "'";
1254 dout(4) << "No handler found for '" << prefix << "'" << dendl;
1255 cmdctx->reply(-EINVAL, ss);
1256 return true;
1257 } else {
1258 // Okay, now we have a handler to call, but we must not call it
1259 // in this thread, because the python handlers can do anything,
1260 // including blocking, and including calling back into mgr.
1261 dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
1262 finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
1263 std::stringstream ds;
1264 std::stringstream ss;
1265 int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
1266 cmdctx->odata.append(ds);
1267 cmdctx->reply(r, ss);
1268 }));
1269 return true;
1270 }
1271 }
1272
1273 void DaemonServer::_prune_pending_service_map()
1274 {
1275 utime_t cutoff = ceph_clock_now();
1276 cutoff -= g_conf->mgr_service_beacon_grace;
1277 auto p = pending_service_map.services.begin();
1278 while (p != pending_service_map.services.end()) {
1279 auto q = p->second.daemons.begin();
1280 while (q != p->second.daemons.end()) {
1281 DaemonKey key(p->first, q->first);
1282 if (!daemon_state.exists(key)) {
1283 derr << "missing key " << key << dendl;
1284 ++q;
1285 continue;
1286 }
1287 auto daemon = daemon_state.get(key);
1288 Mutex::Locker l(daemon->lock);
1289 if (daemon->last_service_beacon == utime_t()) {
1290 // we must have just restarted; assume they are alive now.
1291 daemon->last_service_beacon = ceph_clock_now();
1292 ++q;
1293 continue;
1294 }
1295 if (daemon->last_service_beacon < cutoff) {
1296 dout(10) << "pruning stale " << p->first << "." << q->first
1297 << " last_beacon " << daemon->last_service_beacon << dendl;
1298 q = p->second.daemons.erase(q);
1299 pending_service_map_dirty = pending_service_map.epoch;
1300 } else {
1301 ++q;
1302 }
1303 }
1304 if (p->second.daemons.empty()) {
1305 p = pending_service_map.services.erase(p);
1306 pending_service_map_dirty = pending_service_map.epoch;
1307 } else {
1308 ++p;
1309 }
1310 }
1311 }
1312
1313 void DaemonServer::send_report()
1314 {
1315 if (!pgmap_ready) {
1316 if (ceph_clock_now() - started_at > g_conf->mgr_stats_period * 4.0) {
1317 pgmap_ready = true;
1318 reported_osds.clear();
1319 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1320 << "potentially incomplete PG state to mon" << dendl;
1321 } else {
1322 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1323 << dendl;
1324 return;
1325 }
1326 }
1327
1328 auto m = new MMonMgrReport();
1329 py_modules.get_health_checks(&m->health_checks);
1330
1331 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1332 cluster_state.update_delta_stats();
1333
1334 if (pending_service_map.epoch) {
1335 _prune_pending_service_map();
1336 if (pending_service_map_dirty >= pending_service_map.epoch) {
1337 pending_service_map.modified = ceph_clock_now();
1338 ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
1339 dout(10) << "sending service_map e" << pending_service_map.epoch
1340 << dendl;
1341 pending_service_map.epoch++;
1342 }
1343 }
1344
1345 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1346 // FIXME: no easy way to get mon features here. this will do for
1347 // now, though, as long as we don't make a backward-incompat change.
1348 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
1349 dout(10) << pg_map << dendl;
1350
1351 pg_map.get_health_checks(g_ceph_context, osdmap,
1352 &m->health_checks);
1353
1354 dout(10) << m->health_checks.checks.size() << " health checks"
1355 << dendl;
1356 dout(20) << "health checks:\n";
1357 JSONFormatter jf(true);
1358 jf.dump_object("health_checks", m->health_checks);
1359 jf.flush(*_dout);
1360 *_dout << dendl;
1361 });
1362 });
1363 // TODO? We currently do not notify the PyModules
1364 // TODO: respect needs_send, so we send the report only if we are asked to do
1365 // so, or the state is updated.
1366 monc->send_mon_message(m);
1367 }
1368
1369 void DaemonServer::got_service_map()
1370 {
1371 Mutex::Locker l(lock);
1372
1373 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
1374 if (pending_service_map.epoch == 0) {
1375 // we just started up
1376 dout(10) << "got initial map e" << service_map.epoch << dendl;
1377 pending_service_map = service_map;
1378 } else {
1379 // we we already active and therefore must have persisted it,
1380 // which means ours is the same or newer.
1381 dout(10) << "got updated map e" << service_map.epoch << dendl;
1382 }
1383 pending_service_map.epoch = service_map.epoch + 1;
1384 });
1385
1386 // cull missing daemons, populate new ones
1387 for (auto& p : pending_service_map.services) {
1388 std::set<std::string> names;
1389 for (auto& q : p.second.daemons) {
1390 names.insert(q.first);
1391 DaemonKey key(p.first, q.first);
1392 if (!daemon_state.exists(key)) {
1393 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
1394 daemon->key = key;
1395 daemon->metadata = q.second.metadata;
1396 if (q.second.metadata.count("hostname")) {
1397 daemon->hostname = q.second.metadata["hostname"];
1398 }
1399 daemon->service_daemon = true;
1400 daemon_state.insert(daemon);
1401 dout(10) << "added missing " << key << dendl;
1402 }
1403 }
1404 daemon_state.cull(p.first, names);
1405 }
1406 }