]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/DaemonServer.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "DaemonServer.h"
15
16 #include "include/str_list.h"
17 #include "auth/RotatingKeyRing.h"
18 #include "json_spirit/json_spirit_writer.h"
19
20 #include "messages/MMgrOpen.h"
21 #include "messages/MMgrConfigure.h"
22 #include "messages/MMonMgrReport.h"
23 #include "messages/MCommand.h"
24 #include "messages/MCommandReply.h"
25 #include "messages/MPGStats.h"
26 #include "messages/MOSDScrub.h"
27 #include "common/errno.h"
28
29 #define dout_context g_ceph_context
30 #define dout_subsys ceph_subsys_mgr
31 #undef dout_prefix
32 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
33
34 DaemonServer::DaemonServer(MonClient *monc_,
35 Finisher &finisher_,
36 DaemonStateIndex &daemon_state_,
37 ClusterState &cluster_state_,
38 PyModules &py_modules_,
39 LogChannelRef clog_,
40 LogChannelRef audit_clog_)
41 : Dispatcher(g_ceph_context),
42 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
43 g_conf->mgr_client_bytes)),
44 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
45 g_conf->mgr_client_messages)),
46 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
47 g_conf->mgr_osd_bytes)),
48 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
49 g_conf->mgr_osd_messages)),
50 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
51 g_conf->mgr_mds_bytes)),
52 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
53 g_conf->mgr_mds_messages)),
54 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
55 g_conf->mgr_mon_bytes)),
56 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
57 g_conf->mgr_mon_messages)),
58 msgr(nullptr),
59 monc(monc_),
60 finisher(finisher_),
61 daemon_state(daemon_state_),
62 cluster_state(cluster_state_),
63 py_modules(py_modules_),
64 clog(clog_),
65 audit_clog(audit_clog_),
66 auth_registry(g_ceph_context,
67 g_conf->auth_supported.empty() ?
68 g_conf->auth_cluster_required :
69 g_conf->auth_supported),
70 lock("DaemonServer")
71 {}
72
73 DaemonServer::~DaemonServer() {
74 delete msgr;
75 }
76
77 int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
78 {
79 // Initialize Messenger
80 std::string public_msgr_type = g_conf->ms_public_type.empty() ?
81 g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
82 msgr = Messenger::create(g_ceph_context, public_msgr_type,
83 entity_name_t::MGR(gid),
84 "mgr",
85 getpid(), 0);
86 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
87
88 // throttle clients
89 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
90 client_byte_throttler.get(),
91 client_msg_throttler.get());
92
93 // servers
94 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
95 osd_byte_throttler.get(),
96 osd_msg_throttler.get());
97 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
98 mds_byte_throttler.get(),
99 mds_msg_throttler.get());
100 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
101 mon_byte_throttler.get(),
102 mon_msg_throttler.get());
103
104 int r = msgr->bind(g_conf->public_addr);
105 if (r < 0) {
106 derr << "unable to bind mgr to " << g_conf->public_addr << dendl;
107 return r;
108 }
109
110 msgr->set_myname(entity_name_t::MGR(gid));
111 msgr->set_addr_unknowns(client_addr);
112
113 msgr->start();
114 msgr->add_dispatcher_tail(this);
115
116 started_at = ceph_clock_now();
117
118 return 0;
119 }
120
121 entity_addr_t DaemonServer::get_myaddr() const
122 {
123 return msgr->get_myaddr();
124 }
125
126
127 bool DaemonServer::ms_verify_authorizer(Connection *con,
128 int peer_type,
129 int protocol,
130 ceph::bufferlist& authorizer_data,
131 ceph::bufferlist& authorizer_reply,
132 bool& is_valid,
133 CryptoKey& session_key)
134 {
135 auto handler = auth_registry.get_handler(protocol);
136 if (!handler) {
137 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
138 is_valid = false;
139 return true;
140 }
141
142 MgrSessionRef s(new MgrSession(cct));
143 s->inst.addr = con->get_peer_addr();
144 AuthCapsInfo caps_info;
145
146 is_valid = handler->verify_authorizer(
147 cct, monc->rotating_secrets.get(),
148 authorizer_data,
149 authorizer_reply, s->entity_name,
150 s->global_id, caps_info,
151 session_key);
152
153 if (is_valid) {
154 if (caps_info.allow_all) {
155 dout(10) << " session " << s << " " << s->entity_name
156 << " allow_all" << dendl;
157 s->caps.set_allow_all();
158 }
159 if (caps_info.caps.length() > 0) {
160 bufferlist::iterator p = caps_info.caps.begin();
161 string str;
162 try {
163 ::decode(str, p);
164 }
165 catch (buffer::error& e) {
166 }
167 bool success = s->caps.parse(str);
168 if (success) {
169 dout(10) << " session " << s << " " << s->entity_name
170 << " has caps " << s->caps << " '" << str << "'" << dendl;
171 } else {
172 dout(10) << " session " << s << " " << s->entity_name
173 << " failed to parse caps '" << str << "'" << dendl;
174 is_valid = false;
175 }
176 }
177 con->set_priv(s->get());
178
179 if (peer_type == CEPH_ENTITY_TYPE_OSD) {
180 Mutex::Locker l(lock);
181 s->osd_id = atoi(s->entity_name.get_id().c_str());
182 dout(10) << "registering osd." << s->osd_id << " session "
183 << s << " con " << con << dendl;
184 osd_cons[s->osd_id].insert(con);
185 }
186 }
187
188 return true;
189 }
190
191
192 bool DaemonServer::ms_get_authorizer(int dest_type,
193 AuthAuthorizer **authorizer, bool force_new)
194 {
195 dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
196
197 if (dest_type == CEPH_ENTITY_TYPE_MON) {
198 return true;
199 }
200
201 if (force_new) {
202 if (monc->wait_auth_rotating(10) < 0)
203 return false;
204 }
205
206 *authorizer = monc->build_authorizer(dest_type);
207 dout(20) << "got authorizer " << *authorizer << dendl;
208 return *authorizer != NULL;
209 }
210
211 bool DaemonServer::ms_handle_reset(Connection *con)
212 {
213 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
214 MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
215 if (!session) {
216 return false;
217 }
218 session->put(); // SessionRef takes a ref
219 Mutex::Locker l(lock);
220 dout(10) << "unregistering osd." << session->osd_id
221 << " session " << session << " con " << con << dendl;
222 osd_cons[session->osd_id].erase(con);
223 }
224 return false;
225 }
226
227 bool DaemonServer::ms_handle_refused(Connection *con)
228 {
229 // do nothing for now
230 return false;
231 }
232
233 bool DaemonServer::ms_dispatch(Message *m)
234 {
235 Mutex::Locker l(lock);
236
237 switch (m->get_type()) {
238 case MSG_PGSTATS:
239 cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
240 maybe_ready(m->get_source().num());
241 m->put();
242 return true;
243 case MSG_MGR_REPORT:
244 return handle_report(static_cast<MMgrReport*>(m));
245 case MSG_MGR_OPEN:
246 return handle_open(static_cast<MMgrOpen*>(m));
247 case MSG_COMMAND:
248 return handle_command(static_cast<MCommand*>(m));
249 default:
250 dout(1) << "Unhandled message type " << m->get_type() << dendl;
251 return false;
252 };
253 }
254
255 void DaemonServer::maybe_ready(int32_t osd_id)
256 {
257 if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
258 dout(4) << "initial report from osd " << osd_id << dendl;
259 reported_osds.insert(osd_id);
260 std::set<int32_t> up_osds;
261
262 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
263 osdmap.get_up_osds(up_osds);
264 });
265
266 std::set<int32_t> unreported_osds;
267 std::set_difference(up_osds.begin(), up_osds.end(),
268 reported_osds.begin(), reported_osds.end(),
269 std::inserter(unreported_osds, unreported_osds.begin()));
270
271 if (unreported_osds.size() == 0) {
272 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
273 pgmap_ready = true;
274 reported_osds.clear();
275 // Avoid waiting for next tick
276 send_report();
277 } else {
278 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
279 " to report in before PGMap is ready" << dendl;
280 }
281 }
282 }
283
284 void DaemonServer::shutdown()
285 {
286 dout(10) << "begin" << dendl;
287 msgr->shutdown();
288 msgr->wait();
289 dout(10) << "done" << dendl;
290 }
291
292
293
294 bool DaemonServer::handle_open(MMgrOpen *m)
295 {
296 DaemonKey key;
297 if (!m->service_name.empty()) {
298 key.first = m->service_name;
299 } else {
300 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
301 }
302 key.second = m->daemon_name;
303
304 dout(4) << "from " << m->get_connection() << " " << key << dendl;
305
306 auto configure = new MMgrConfigure();
307 configure->stats_period = g_conf->mgr_stats_period;
308 m->get_connection()->send_message(configure);
309
310 if (daemon_state.exists(key)) {
311 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
312 daemon_state.get(key)->perf_counters.clear();
313 }
314
315 if (m->service_daemon) {
316 DaemonStatePtr daemon;
317 if (daemon_state.exists(key)) {
318 daemon = daemon_state.get(key);
319 } else {
320 dout(4) << "constructing new DaemonState for " << key << dendl;
321 daemon = std::make_shared<DaemonState>(daemon_state.types);
322 daemon->key = key;
323 if (m->daemon_metadata.count("hostname")) {
324 daemon->hostname = m->daemon_metadata["hostname"];
325 }
326 daemon_state.insert(daemon);
327 }
328 daemon->service_daemon = true;
329 daemon->metadata = m->daemon_metadata;
330 daemon->service_status = m->daemon_status;
331
332 utime_t now = ceph_clock_now();
333 auto d = pending_service_map.get_daemon(m->service_name,
334 m->daemon_name);
335 if (d->gid != (uint64_t)m->get_source().num()) {
336 dout(10) << "registering " << key << " in pending_service_map" << dendl;
337 d->gid = m->get_source().num();
338 d->addr = m->get_source_addr();
339 d->start_epoch = pending_service_map.epoch;
340 d->start_stamp = now;
341 d->metadata = m->daemon_metadata;
342 pending_service_map_dirty = pending_service_map.epoch;
343 }
344 }
345
346 m->put();
347 return true;
348 }
349
350 bool DaemonServer::handle_report(MMgrReport *m)
351 {
352 DaemonKey key;
353 if (!m->service_name.empty()) {
354 key.first = m->service_name;
355 } else {
356 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
357 }
358 key.second = m->daemon_name;
359
360 dout(4) << "from " << m->get_connection() << " " << key << dendl;
361
362 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
363 m->service_name.empty()) {
364 // Clients should not be sending us stats unless they are declaring
365 // themselves to be a daemon for some service.
366 dout(4) << "rejecting report from non-daemon client " << m->daemon_name
367 << dendl;
368 m->put();
369 return true;
370 }
371
372 DaemonStatePtr daemon;
373 if (daemon_state.exists(key)) {
374 dout(20) << "updating existing DaemonState for " << key << dendl;
375 daemon = daemon_state.get(key);
376 } else {
377 dout(4) << "constructing new DaemonState for " << key << dendl;
378 daemon = std::make_shared<DaemonState>(daemon_state.types);
379 // FIXME: crap, we don't know the hostname at this stage.
380 daemon->key = key;
381 daemon_state.insert(daemon);
382 // FIXME: we should avoid this case by rejecting MMgrReport from
383 // daemons without sessions, and ensuring that session open
384 // always contains metadata.
385 }
386 assert(daemon != nullptr);
387 auto &daemon_counters = daemon->perf_counters;
388 daemon_counters.update(m);
389
390 if (daemon->service_daemon) {
391 utime_t now = ceph_clock_now();
392 if (m->daemon_status) {
393 daemon->service_status = *m->daemon_status;
394 daemon->service_status_stamp = now;
395 }
396 daemon->last_service_beacon = now;
397 } else if (m->daemon_status) {
398 derr << "got status from non-daemon " << key << dendl;
399 }
400
401 m->put();
402 return true;
403 }
404
405 struct MgrCommand {
406 string cmdstring;
407 string helpstring;
408 string module;
409 string perm;
410 string availability;
411
412 bool requires_perm(char p) const {
413 return (perm.find(p) != string::npos);
414 }
415
416 } mgr_commands[] = {
417
418 #define COMMAND(parsesig, helptext, module, perm, availability) \
419 {parsesig, helptext, module, perm, availability},
420 #include "MgrCommands.h"
421 #undef COMMAND
422 };
423
424 void DaemonServer::_generate_command_map(
425 map<string,cmd_vartype>& cmdmap,
426 map<string,string> &param_str_map)
427 {
428 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
429 p != cmdmap.end(); ++p) {
430 if (p->first == "prefix")
431 continue;
432 if (p->first == "caps") {
433 vector<string> cv;
434 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
435 cv.size() % 2 == 0) {
436 for (unsigned i = 0; i < cv.size(); i += 2) {
437 string k = string("caps_") + cv[i];
438 param_str_map[k] = cv[i + 1];
439 }
440 continue;
441 }
442 }
443 param_str_map[p->first] = cmd_vartype_stringify(p->second);
444 }
445 }
446
447 const MgrCommand *DaemonServer::_get_mgrcommand(
448 const string &cmd_prefix,
449 MgrCommand *cmds,
450 int cmds_size)
451 {
452 MgrCommand *this_cmd = NULL;
453 for (MgrCommand *cp = cmds;
454 cp < &cmds[cmds_size]; cp++) {
455 if (cp->cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
456 this_cmd = cp;
457 break;
458 }
459 }
460 return this_cmd;
461 }
462
463 bool DaemonServer::_allowed_command(
464 MgrSession *s,
465 const string &module,
466 const string &prefix,
467 const map<string,cmd_vartype>& cmdmap,
468 const map<string,string>& param_str_map,
469 const MgrCommand *this_cmd) {
470
471 if (s->entity_name.is_mon()) {
472 // mon is all-powerful. even when it is forwarding commands on behalf of
473 // old clients; we expect the mon is validating commands before proxying!
474 return true;
475 }
476
477 bool cmd_r = this_cmd->requires_perm('r');
478 bool cmd_w = this_cmd->requires_perm('w');
479 bool cmd_x = this_cmd->requires_perm('x');
480
481 bool capable = s->caps.is_capable(
482 g_ceph_context,
483 CEPH_ENTITY_TYPE_MGR,
484 s->entity_name,
485 module, prefix, param_str_map,
486 cmd_r, cmd_w, cmd_x);
487
488 dout(10) << " " << s->entity_name << " "
489 << (capable ? "" : "not ") << "capable" << dendl;
490 return capable;
491 }
492
493 bool DaemonServer::handle_command(MCommand *m)
494 {
495 int r = 0;
496 std::stringstream ss;
497 std::string prefix;
498
499 assert(lock.is_locked_by_me());
500
501 /**
502 * The working data for processing an MCommand. This lives in
503 * a class to enable passing it into other threads for processing
504 * outside of the thread/locks that called handle_command.
505 */
506 class CommandContext
507 {
508 public:
509 MCommand *m;
510 bufferlist odata;
511 cmdmap_t cmdmap;
512
513 CommandContext(MCommand *m_)
514 : m(m_)
515 {
516 }
517
518 ~CommandContext()
519 {
520 m->put();
521 }
522
523 void reply(int r, const std::stringstream &ss)
524 {
525 reply(r, ss.str());
526 }
527
528 void reply(int r, const std::string &rs)
529 {
530 // Let the connection drop as soon as we've sent our response
531 ConnectionRef con = m->get_connection();
532 if (con) {
533 con->mark_disposable();
534 }
535
536 dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl;
537 if (con) {
538 MCommandReply *reply = new MCommandReply(r, rs);
539 reply->set_tid(m->get_tid());
540 reply->set_data(odata);
541 con->send_message(reply);
542 }
543 }
544 };
545
546 /**
547 * A context for receiving a bufferlist/error string from a background
548 * function and then calling back to a CommandContext when it's done
549 */
550 class ReplyOnFinish : public Context {
551 std::shared_ptr<CommandContext> cmdctx;
552
553 public:
554 bufferlist from_mon;
555 string outs;
556
557 ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
558 : cmdctx(cmdctx_)
559 {}
560 void finish(int r) override {
561 cmdctx->odata.claim_append(from_mon);
562 cmdctx->reply(r, outs);
563 }
564 };
565
566 std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
567
568 MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
569 if (!session) {
570 return true;
571 }
572 session->put(); // SessionRef takes a ref
573 if (session->inst.name == entity_name_t())
574 session->inst.name = m->get_source();
575
576 std::string format;
577 boost::scoped_ptr<Formatter> f;
578 map<string,string> param_str_map;
579
580 if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
581 cmdctx->reply(-EINVAL, ss);
582 return true;
583 }
584
585 {
586 cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
587 f.reset(Formatter::create(format));
588 }
589
590 cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
591
592 dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
593 dout(4) << "prefix=" << prefix << dendl;
594
595 if (prefix == "get_command_descriptions") {
596 int cmdnum = 0;
597
598 dout(10) << "reading commands from python modules" << dendl;
599 auto py_commands = py_modules.get_commands();
600
601 JSONFormatter f;
602 f.open_object_section("command_descriptions");
603 for (const auto &pyc : py_commands) {
604 ostringstream secname;
605 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
606 dout(20) << "Dumping " << pyc.cmdstring << " (" << pyc.helpstring
607 << ")" << dendl;
608 dump_cmddesc_to_json(&f, secname.str(), pyc.cmdstring, pyc.helpstring,
609 "mgr", pyc.perm, "cli", 0);
610 cmdnum++;
611 }
612
613 for (const auto &cp : mgr_commands) {
614 ostringstream secname;
615 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
616 dump_cmddesc_to_json(&f, secname.str(), cp.cmdstring, cp.helpstring,
617 cp.module, cp.perm, cp.availability, 0);
618 cmdnum++;
619 }
620 f.close_section(); // command_descriptions
621 f.flush(cmdctx->odata);
622 cmdctx->reply(0, ss);
623 return true;
624 }
625
626 // lookup command
627 const MgrCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
628 ARRAY_SIZE(mgr_commands));
629 _generate_command_map(cmdctx->cmdmap, param_str_map);
630 if (!mgr_cmd) {
631 MgrCommand py_command = {"", "", "py", "rw", "cli"};
632 if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
633 param_str_map, &py_command)) {
634 dout(1) << " access denied" << dendl;
635 ss << "access denied; does your client key have mgr caps?"
636 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
637 cmdctx->reply(-EACCES, ss);
638 return true;
639 }
640 } else {
641 // validate user's permissions for requested command
642 if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
643 param_str_map, mgr_cmd)) {
644 dout(1) << " access denied" << dendl;
645 audit_clog->info() << "from='" << session->inst << "' "
646 << "entity='" << session->entity_name << "' "
647 << "cmd=" << m->cmd << ": access denied";
648 ss << "access denied' does your client key have mgr caps?"
649 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
650 cmdctx->reply(-EACCES, ss);
651 return true;
652 }
653 }
654
655 audit_clog->debug()
656 << "from='" << session->inst << "' "
657 << "entity='" << session->entity_name << "' "
658 << "cmd=" << m->cmd << ": dispatch";
659
660 // ----------------
661 // service map commands
662 if (prefix == "service dump") {
663 if (!f)
664 f.reset(Formatter::create("json-pretty"));
665 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
666 f->dump_object("service_map", service_map);
667 });
668 f->flush(cmdctx->odata);
669 cmdctx->reply(0, ss);
670 return true;
671 }
672 if (prefix == "service status") {
673 if (!f)
674 f.reset(Formatter::create("json-pretty"));
675 // only include state from services that are in the persisted service map
676 f->open_object_section("service_status");
677 ServiceMap s;
678 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
679 s = service_map;
680 });
681 for (auto& p : s.services) {
682 f->open_object_section(p.first.c_str());
683 for (auto& q : p.second.daemons) {
684 f->open_object_section(q.first.c_str());
685 DaemonKey key(p.first, q.first);
686 assert(daemon_state.exists(key));
687 auto daemon = daemon_state.get(key);
688 f->dump_stream("status_stamp") << daemon->service_status_stamp;
689 f->dump_stream("last_beacon") << daemon->last_service_beacon;
690 f->open_object_section("status");
691 for (auto& r : daemon->service_status) {
692 f->dump_string(r.first.c_str(), r.second);
693 }
694 f->close_section();
695 f->close_section();
696 }
697 f->close_section();
698 }
699 f->close_section();
700 f->flush(cmdctx->odata);
701 cmdctx->reply(0, ss);
702 return true;
703 }
704
705 // -----------
706 // PG commands
707
708 if (prefix == "pg scrub" ||
709 prefix == "pg repair" ||
710 prefix == "pg deep-scrub") {
711 string scrubop = prefix.substr(3, string::npos);
712 pg_t pgid;
713 string pgidstr;
714 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
715 if (!pgid.parse(pgidstr.c_str())) {
716 ss << "invalid pgid '" << pgidstr << "'";
717 cmdctx->reply(-EINVAL, ss);
718 return true;
719 }
720 bool pg_exists = false;
721 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
722 pg_exists = osdmap.pg_exists(pgid);
723 });
724 if (!pg_exists) {
725 ss << "pg " << pgid << " dne";
726 cmdctx->reply(-ENOENT, ss);
727 return true;
728 }
729 int acting_primary = -1;
730 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
731 acting_primary = osdmap.get_pg_acting_primary(pgid);
732 });
733 if (acting_primary == -1) {
734 ss << "pg " << pgid << " has no primary osd";
735 cmdctx->reply(-EAGAIN, ss);
736 return true;
737 }
738 auto p = osd_cons.find(acting_primary);
739 if (p == osd_cons.end()) {
740 ss << "pg " << pgid << " primary osd." << acting_primary
741 << " is not currently connected";
742 cmdctx->reply(-EAGAIN, ss);
743 }
744 vector<pg_t> pgs = { pgid };
745 for (auto& con : p->second) {
746 con->send_message(new MOSDScrub(monc->get_fsid(),
747 pgs,
748 scrubop == "repair",
749 scrubop == "deep-scrub"));
750 }
751 ss << "instructing pg " << pgid << " on osd." << acting_primary
752 << " to " << scrubop;
753 cmdctx->reply(0, ss);
754 return true;
755 } else if (prefix == "osd scrub" ||
756 prefix == "osd deep-scrub" ||
757 prefix == "osd repair") {
758 string whostr;
759 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
760 vector<string> pvec;
761 get_str_vec(prefix, pvec);
762
763 set<int> osds;
764 if (whostr == "*" || whostr == "all" || whostr == "any") {
765 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
766 for (int i = 0; i < osdmap.get_max_osd(); i++)
767 if (osdmap.is_up(i)) {
768 osds.insert(i);
769 }
770 });
771 } else {
772 long osd = parse_osd_id(whostr.c_str(), &ss);
773 if (osd < 0) {
774 ss << "invalid osd '" << whostr << "'";
775 cmdctx->reply(-EINVAL, ss);
776 return true;
777 }
778 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
779 if (osdmap.is_up(osd)) {
780 osds.insert(osd);
781 }
782 });
783 if (osds.empty()) {
784 ss << "osd." << osd << " is not up";
785 cmdctx->reply(-EAGAIN, ss);
786 return true;
787 }
788 }
789 set<int> sent_osds, failed_osds;
790 for (auto osd : osds) {
791 auto p = osd_cons.find(osd);
792 if (p == osd_cons.end()) {
793 failed_osds.insert(osd);
794 } else {
795 sent_osds.insert(osd);
796 for (auto& con : p->second) {
797 con->send_message(new MOSDScrub(monc->get_fsid(),
798 pvec.back() == "repair",
799 pvec.back() == "deep-scrub"));
800 }
801 }
802 }
803 if (failed_osds.size() == osds.size()) {
804 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
805 << " (not connected)";
806 r = -EAGAIN;
807 } else {
808 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
809 if (!failed_osds.empty()) {
810 ss << "; osd(s) " << failed_osds << " were not connected";
811 }
812 r = 0;
813 }
814 cmdctx->reply(0, ss);
815 return true;
816 } else if (prefix == "osd reweight-by-pg" ||
817 prefix == "osd reweight-by-utilization" ||
818 prefix == "osd test-reweight-by-pg" ||
819 prefix == "osd test-reweight-by-utilization") {
820 bool by_pg =
821 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
822 bool dry_run =
823 prefix == "osd test-reweight-by-pg" ||
824 prefix == "osd test-reweight-by-utilization";
825 int64_t oload;
826 cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
827 set<int64_t> pools;
828 vector<string> poolnames;
829 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
830 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
831 for (const auto& poolname : poolnames) {
832 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
833 if (pool < 0) {
834 ss << "pool '" << poolname << "' does not exist";
835 r = -ENOENT;
836 }
837 pools.insert(pool);
838 }
839 });
840 if (r) {
841 cmdctx->reply(r, ss);
842 return true;
843 }
844 double max_change = g_conf->mon_reweight_max_change;
845 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
846 if (max_change <= 0.0) {
847 ss << "max_change " << max_change << " must be positive";
848 cmdctx->reply(-EINVAL, ss);
849 return true;
850 }
851 int64_t max_osds = g_conf->mon_reweight_max_osds;
852 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
853 if (max_osds <= 0) {
854 ss << "max_osds " << max_osds << " must be positive";
855 cmdctx->reply(-EINVAL, ss);
856 return true;
857 }
858 string no_increasing;
859 cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
860 string out_str;
861 mempool::osdmap::map<int32_t, uint32_t> new_weights;
862 r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
863 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
864 return reweight::by_utilization(osdmap, pgmap,
865 oload,
866 max_change,
867 max_osds,
868 by_pg,
869 pools.empty() ? NULL : &pools,
870 no_increasing == "--no-increasing",
871 &new_weights,
872 &ss, &out_str, f.get());
873 });
874 });
875 if (r >= 0) {
876 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
877 }
878 if (f) {
879 f->flush(cmdctx->odata);
880 } else {
881 cmdctx->odata.append(out_str);
882 }
883 if (r < 0) {
884 ss << "FAILED reweight-by-pg";
885 cmdctx->reply(r, ss);
886 return true;
887 } else if (r == 0 || dry_run) {
888 ss << "no change";
889 cmdctx->reply(r, ss);
890 return true;
891 } else {
892 json_spirit::Object json_object;
893 for (const auto& osd_weight : new_weights) {
894 json_spirit::Config::add(json_object,
895 std::to_string(osd_weight.first),
896 std::to_string(osd_weight.second));
897 }
898 string s = json_spirit::write(json_object);
899 std::replace(begin(s), end(s), '\"', '\'');
900 const string cmd =
901 "{"
902 "\"prefix\": \"osd reweightn\", "
903 "\"weights\": \"" + s + "\""
904 "}";
905 auto on_finish = new ReplyOnFinish(cmdctx);
906 monc->start_mon_command({cmd}, {},
907 &on_finish->from_mon, &on_finish->outs, on_finish);
908 return true;
909 }
910 } else if (prefix == "osd df") {
911 string method;
912 cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
913 r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) {
914 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
915 print_osd_utilization(osdmap, &pgservice, ss,
916 f.get(), method == "tree");
917
918 cmdctx->odata.append(ss);
919 return 0;
920 });
921 });
922 cmdctx->reply(r, "");
923 return true;
924 } else {
925 r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
926 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
927 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
928 f.get(), &ss, &cmdctx->odata);
929 });
930 });
931
932 if (r != -EOPNOTSUPP) {
933 cmdctx->reply(r, ss);
934 return true;
935 }
936 }
937
938 // None of the special native commands,
939 MgrPyModule *handler = nullptr;
940 auto py_commands = py_modules.get_commands();
941 for (const auto &pyc : py_commands) {
942 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
943 dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
944 if (pyc_prefix == prefix) {
945 handler = pyc.handler;
946 break;
947 }
948 }
949
950 if (handler == nullptr) {
951 ss << "No handler found for '" << prefix << "'";
952 dout(4) << "No handler found for '" << prefix << "'" << dendl;
953 cmdctx->reply(-EINVAL, ss);
954 return true;
955 } else {
956 // Okay, now we have a handler to call, but we must not call it
957 // in this thread, because the python handlers can do anything,
958 // including blocking, and including calling back into mgr.
959 dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
960 finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
961 std::stringstream ds;
962 std::stringstream ss;
963 int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
964 cmdctx->odata.append(ds);
965 cmdctx->reply(r, ss);
966 }));
967 return true;
968 }
969 }
970
971 void DaemonServer::_prune_pending_service_map()
972 {
973 utime_t cutoff = ceph_clock_now();
974 cutoff -= g_conf->mgr_service_beacon_grace;
975 auto p = pending_service_map.services.begin();
976 while (p != pending_service_map.services.end()) {
977 auto q = p->second.daemons.begin();
978 while (q != p->second.daemons.end()) {
979 DaemonKey key(p->first, q->first);
980 if (!daemon_state.exists(key)) {
981 derr << "missing key " << key << dendl;
982 ++q;
983 continue;
984 }
985 auto daemon = daemon_state.get(key);
986 if (daemon->last_service_beacon == utime_t()) {
987 // we must have just restarted; assume they are alive now.
988 daemon->last_service_beacon = ceph_clock_now();
989 ++q;
990 continue;
991 }
992 if (daemon->last_service_beacon < cutoff) {
993 dout(10) << "pruning stale " << p->first << "." << q->first
994 << " last_beacon " << daemon->last_service_beacon << dendl;
995 q = p->second.daemons.erase(q);
996 pending_service_map_dirty = pending_service_map.epoch;
997 } else {
998 ++q;
999 }
1000 }
1001 if (p->second.daemons.empty()) {
1002 p = pending_service_map.services.erase(p);
1003 pending_service_map_dirty = pending_service_map.epoch;
1004 } else {
1005 ++p;
1006 }
1007 }
1008 }
1009
1010 void DaemonServer::send_report()
1011 {
1012 if (!pgmap_ready) {
1013 if (ceph_clock_now() - started_at > g_conf->mgr_stats_period * 4.0) {
1014 pgmap_ready = true;
1015 reported_osds.clear();
1016 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1017 << "potentially incomplete PG state to mon" << dendl;
1018 } else {
1019 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1020 << dendl;
1021 return;
1022 }
1023 }
1024
1025 auto m = new MMonMgrReport();
1026 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1027 cluster_state.update_delta_stats();
1028
1029 if (pending_service_map.epoch) {
1030 _prune_pending_service_map();
1031 if (pending_service_map_dirty >= pending_service_map.epoch) {
1032 pending_service_map.modified = ceph_clock_now();
1033 ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
1034 dout(10) << "sending service_map e" << pending_service_map.epoch
1035 << dendl;
1036 pending_service_map.epoch++;
1037 }
1038 }
1039
1040 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1041 // FIXME: no easy way to get mon features here. this will do for
1042 // now, though, as long as we don't make a backward-incompat change.
1043 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
1044 dout(10) << pg_map << dendl;
1045
1046 pg_map.get_health_checks(g_ceph_context, osdmap,
1047 &m->health_checks);
1048 dout(10) << m->health_checks.checks.size() << " health checks"
1049 << dendl;
1050 dout(20) << "health checks:\n";
1051 JSONFormatter jf(true);
1052 jf.dump_object("health_checks", m->health_checks);
1053 jf.flush(*_dout);
1054 *_dout << dendl;
1055 });
1056 });
1057 // TODO? We currently do not notify the PyModules
1058 // TODO: respect needs_send, so we send the report only if we are asked to do
1059 // so, or the state is updated.
1060 monc->send_mon_message(m);
1061 }
1062
1063 void DaemonServer::got_service_map()
1064 {
1065 Mutex::Locker l(lock);
1066
1067 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
1068 if (pending_service_map.epoch == 0) {
1069 // we just started up
1070 dout(10) << "got initial map e" << service_map.epoch << dendl;
1071 pending_service_map = service_map;
1072 } else {
1073 // we we already active and therefore must have persisted it,
1074 // which means ours is the same or newer.
1075 dout(10) << "got updated map e" << service_map.epoch << dendl;
1076 }
1077 pending_service_map.epoch = service_map.epoch + 1;
1078 });
1079
1080 // cull missing daemons, populate new ones
1081 for (auto& p : pending_service_map.services) {
1082 std::set<std::string> names;
1083 for (auto& q : p.second.daemons) {
1084 names.insert(q.first);
1085 DaemonKey key(p.first, q.first);
1086 if (!daemon_state.exists(key)) {
1087 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
1088 daemon->key = key;
1089 daemon->metadata = q.second.metadata;
1090 if (q.second.metadata.count("hostname")) {
1091 daemon->hostname = q.second.metadata["hostname"];
1092 }
1093 daemon->service_daemon = true;
1094 daemon_state.insert(daemon);
1095 dout(10) << "added missing " << key << dendl;
1096 }
1097 }
1098 daemon_state.cull(p.first, names);
1099 }
1100 }