]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/DaemonServer.cc
e8d8c8a212865f007054299787ced8db5d3a590a
[ceph.git] / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "DaemonServer.h"
15
16 #include "include/str_list.h"
17 #include "auth/RotatingKeyRing.h"
18 #include "json_spirit/json_spirit_writer.h"
19
20 #include "messages/MMgrOpen.h"
21 #include "messages/MMgrConfigure.h"
22 #include "messages/MMonMgrReport.h"
23 #include "messages/MCommand.h"
24 #include "messages/MCommandReply.h"
25 #include "messages/MPGStats.h"
26 #include "messages/MOSDScrub.h"
27
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_mgr
30 #undef dout_prefix
31 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
32
33 DaemonServer::DaemonServer(MonClient *monc_,
34 Finisher &finisher_,
35 DaemonStateIndex &daemon_state_,
36 ClusterState &cluster_state_,
37 PyModules &py_modules_,
38 LogChannelRef clog_,
39 LogChannelRef audit_clog_)
40 : Dispatcher(g_ceph_context),
41 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
42 g_conf->mgr_client_bytes)),
43 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
44 g_conf->mgr_client_messages)),
45 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
46 g_conf->mgr_osd_bytes)),
47 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
48 g_conf->mgr_osd_messages)),
49 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
50 g_conf->mgr_mds_bytes)),
51 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
52 g_conf->mgr_mds_messages)),
53 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
54 g_conf->mgr_mon_bytes)),
55 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
56 g_conf->mgr_mon_messages)),
57 msgr(nullptr),
58 monc(monc_),
59 finisher(finisher_),
60 daemon_state(daemon_state_),
61 cluster_state(cluster_state_),
62 py_modules(py_modules_),
63 clog(clog_),
64 audit_clog(audit_clog_),
65 auth_registry(g_ceph_context,
66 g_conf->auth_supported.empty() ?
67 g_conf->auth_cluster_required :
68 g_conf->auth_supported),
69 lock("DaemonServer")
70 {}
71
72 DaemonServer::~DaemonServer() {
73 delete msgr;
74 }
75
76 int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
77 {
78 // Initialize Messenger
79 std::string public_msgr_type = g_conf->ms_public_type.empty() ?
80 g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
81 msgr = Messenger::create(g_ceph_context, public_msgr_type,
82 entity_name_t::MGR(gid),
83 "mgr",
84 getpid(), 0);
85 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
86
87 // throttle clients
88 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
89 client_byte_throttler.get(),
90 client_msg_throttler.get());
91
92 // servers
93 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
94 osd_byte_throttler.get(),
95 osd_msg_throttler.get());
96 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
97 mds_byte_throttler.get(),
98 mds_msg_throttler.get());
99 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
100 mon_byte_throttler.get(),
101 mon_msg_throttler.get());
102
103 int r = msgr->bind(g_conf->public_addr);
104 if (r < 0) {
105 derr << "unable to bind mgr to " << g_conf->public_addr << dendl;
106 return r;
107 }
108
109 msgr->set_myname(entity_name_t::MGR(gid));
110 msgr->set_addr_unknowns(client_addr);
111
112 msgr->start();
113 msgr->add_dispatcher_tail(this);
114
115 return 0;
116 }
117
118 entity_addr_t DaemonServer::get_myaddr() const
119 {
120 return msgr->get_myaddr();
121 }
122
123
124 bool DaemonServer::ms_verify_authorizer(Connection *con,
125 int peer_type,
126 int protocol,
127 ceph::bufferlist& authorizer_data,
128 ceph::bufferlist& authorizer_reply,
129 bool& is_valid,
130 CryptoKey& session_key)
131 {
132 auto handler = auth_registry.get_handler(protocol);
133 if (!handler) {
134 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
135 is_valid = false;
136 return true;
137 }
138
139 MgrSessionRef s(new MgrSession(cct));
140 s->inst.addr = con->get_peer_addr();
141 AuthCapsInfo caps_info;
142
143 is_valid = handler->verify_authorizer(
144 cct, monc->rotating_secrets.get(),
145 authorizer_data,
146 authorizer_reply, s->entity_name,
147 s->global_id, caps_info,
148 session_key);
149
150 if (is_valid) {
151 if (caps_info.allow_all) {
152 dout(10) << " session " << s << " " << s->entity_name
153 << " allow_all" << dendl;
154 s->caps.set_allow_all();
155 }
156 if (caps_info.caps.length() > 0) {
157 bufferlist::iterator p = caps_info.caps.begin();
158 string str;
159 try {
160 ::decode(str, p);
161 }
162 catch (buffer::error& e) {
163 }
164 bool success = s->caps.parse(str);
165 if (success) {
166 dout(10) << " session " << s << " " << s->entity_name
167 << " has caps " << s->caps << " '" << str << "'" << dendl;
168 } else {
169 dout(10) << " session " << s << " " << s->entity_name
170 << " failed to parse caps '" << str << "'" << dendl;
171 is_valid = false;
172 }
173 }
174 con->set_priv(s->get());
175
176 if (peer_type == CEPH_ENTITY_TYPE_OSD) {
177 Mutex::Locker l(lock);
178 s->osd_id = atoi(s->entity_name.get_id().c_str());
179 dout(10) << __func__ << " registering osd." << s->osd_id << " session "
180 << s << " con " << con << dendl;
181 osd_cons[s->osd_id].insert(con);
182 }
183 }
184
185 return true;
186 }
187
188
189 bool DaemonServer::ms_get_authorizer(int dest_type,
190 AuthAuthorizer **authorizer, bool force_new)
191 {
192 dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
193
194 if (dest_type == CEPH_ENTITY_TYPE_MON) {
195 return true;
196 }
197
198 if (force_new) {
199 if (monc->wait_auth_rotating(10) < 0)
200 return false;
201 }
202
203 *authorizer = monc->build_authorizer(dest_type);
204 dout(20) << "got authorizer " << *authorizer << dendl;
205 return *authorizer != NULL;
206 }
207
208 bool DaemonServer::ms_handle_reset(Connection *con)
209 {
210 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
211 MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
212 if (!session) {
213 return false;
214 }
215 session->put(); // SessionRef takes a ref
216 Mutex::Locker l(lock);
217 dout(10) << __func__ << " unregistering osd." << session->osd_id
218 << " session " << session << " con " << con << dendl;
219 osd_cons[session->osd_id].erase(con);
220 }
221 return false;
222 }
223
224 bool DaemonServer::ms_handle_refused(Connection *con)
225 {
226 // do nothing for now
227 return false;
228 }
229
230 bool DaemonServer::ms_dispatch(Message *m)
231 {
232 Mutex::Locker l(lock);
233
234 switch (m->get_type()) {
235 case MSG_PGSTATS:
236 cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
237 m->put();
238 return true;
239 case MSG_MGR_REPORT:
240 return handle_report(static_cast<MMgrReport*>(m));
241 case MSG_MGR_OPEN:
242 return handle_open(static_cast<MMgrOpen*>(m));
243 case MSG_COMMAND:
244 return handle_command(static_cast<MCommand*>(m));
245 default:
246 dout(1) << "Unhandled message type " << m->get_type() << dendl;
247 return false;
248 };
249 }
250
251 void DaemonServer::shutdown()
252 {
253 dout(10) << __func__ << dendl;
254 msgr->shutdown();
255 msgr->wait();
256 dout(10) << __func__ << " done" << dendl;
257 }
258
259
260
261 bool DaemonServer::handle_open(MMgrOpen *m)
262 {
263 uint32_t type = m->get_connection()->get_peer_type();
264 DaemonKey key(type, m->daemon_name);
265
266 dout(4) << "from " << m->get_connection() << " name "
267 << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
268
269 auto configure = new MMgrConfigure();
270 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
271 // We don't want clients to send us stats
272 configure->stats_period = 0;
273 } else {
274 configure->stats_period = g_conf->mgr_stats_period;
275 }
276 m->get_connection()->send_message(configure);
277
278 if (daemon_state.exists(key)) {
279 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
280 daemon_state.get(key)->perf_counters.clear();
281 }
282
283 m->put();
284 return true;
285 }
286
287 bool DaemonServer::handle_report(MMgrReport *m)
288 {
289 uint32_t type = m->get_connection()->get_peer_type();
290 DaemonKey key(type, m->daemon_name);
291
292 dout(4) << "from " << m->get_connection() << " name "
293 << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
294
295 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
296 // Clients should not be sending us stats
297 dout(4) << "rejecting report from client " << m->daemon_name << dendl;
298 m->put();
299 return true;
300 }
301
302 DaemonStatePtr daemon;
303 if (daemon_state.exists(key)) {
304 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
305 daemon = daemon_state.get(key);
306 } else {
307 dout(4) << "constructing new DaemonState for " << m->daemon_name << dendl;
308 daemon = std::make_shared<DaemonState>(daemon_state.types);
309 // FIXME: crap, we don't know the hostname at this stage.
310 daemon->key = key;
311 daemon_state.insert(daemon);
312 // FIXME: we should request metadata at this stage
313 }
314
315 assert(daemon != nullptr);
316 auto &daemon_counters = daemon->perf_counters;
317 daemon_counters.update(m);
318
319 m->put();
320 return true;
321 }
322
323 struct MgrCommand {
324 string cmdstring;
325 string helpstring;
326 string module;
327 string perm;
328 string availability;
329
330 bool requires_perm(char p) const {
331 return (perm.find(p) != string::npos);
332 }
333
334 } mgr_commands[] = {
335
336 #define COMMAND(parsesig, helptext, module, perm, availability) \
337 {parsesig, helptext, module, perm, availability},
338 #include "MgrCommands.h"
339 #undef COMMAND
340 };
341
342 void DaemonServer::_generate_command_map(
343 map<string,cmd_vartype>& cmdmap,
344 map<string,string> &param_str_map)
345 {
346 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
347 p != cmdmap.end(); ++p) {
348 if (p->first == "prefix")
349 continue;
350 if (p->first == "caps") {
351 vector<string> cv;
352 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
353 cv.size() % 2 == 0) {
354 for (unsigned i = 0; i < cv.size(); i += 2) {
355 string k = string("caps_") + cv[i];
356 param_str_map[k] = cv[i + 1];
357 }
358 continue;
359 }
360 }
361 param_str_map[p->first] = cmd_vartype_stringify(p->second);
362 }
363 }
364
365 const MgrCommand *DaemonServer::_get_mgrcommand(
366 const string &cmd_prefix,
367 MgrCommand *cmds,
368 int cmds_size)
369 {
370 MgrCommand *this_cmd = NULL;
371 for (MgrCommand *cp = cmds;
372 cp < &cmds[cmds_size]; cp++) {
373 if (cp->cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
374 this_cmd = cp;
375 break;
376 }
377 }
378 return this_cmd;
379 }
380
381 bool DaemonServer::_allowed_command(
382 MgrSession *s,
383 const string &module,
384 const string &prefix,
385 const map<string,cmd_vartype>& cmdmap,
386 const map<string,string>& param_str_map,
387 const MgrCommand *this_cmd) {
388
389 if (s->entity_name.is_mon()) {
390 // mon is all-powerful. even when it is forwarding commands on behalf of
391 // old clients; we expect the mon is validating commands before proxying!
392 return true;
393 }
394
395 bool cmd_r = this_cmd->requires_perm('r');
396 bool cmd_w = this_cmd->requires_perm('w');
397 bool cmd_x = this_cmd->requires_perm('x');
398
399 bool capable = s->caps.is_capable(
400 g_ceph_context,
401 CEPH_ENTITY_TYPE_MGR,
402 s->entity_name,
403 module, prefix, param_str_map,
404 cmd_r, cmd_w, cmd_x);
405
406 dout(10) << " " << s->entity_name << " "
407 << (capable ? "" : "not ") << "capable" << dendl;
408 return capable;
409 }
410
411 bool DaemonServer::handle_command(MCommand *m)
412 {
413 int r = 0;
414 std::stringstream ss;
415 std::string prefix;
416
417 assert(lock.is_locked_by_me());
418
419 /**
420 * The working data for processing an MCommand. This lives in
421 * a class to enable passing it into other threads for processing
422 * outside of the thread/locks that called handle_command.
423 */
424 class CommandContext
425 {
426 public:
427 MCommand *m;
428 bufferlist odata;
429 cmdmap_t cmdmap;
430
431 CommandContext(MCommand *m_)
432 : m(m_)
433 {
434 }
435
436 ~CommandContext()
437 {
438 m->put();
439 }
440
441 void reply(int r, const std::stringstream &ss)
442 {
443 reply(r, ss.str());
444 }
445
446 void reply(int r, const std::string &rs)
447 {
448 // Let the connection drop as soon as we've sent our response
449 ConnectionRef con = m->get_connection();
450 if (con) {
451 con->mark_disposable();
452 }
453
454 dout(1) << "do_command r=" << r << " " << rs << dendl;
455 if (con) {
456 MCommandReply *reply = new MCommandReply(r, rs);
457 reply->set_tid(m->get_tid());
458 reply->set_data(odata);
459 con->send_message(reply);
460 }
461 }
462 };
463
464 /**
465 * A context for receiving a bufferlist/error string from a background
466 * function and then calling back to a CommandContext when it's done
467 */
468 class ReplyOnFinish : public Context {
469 std::shared_ptr<CommandContext> cmdctx;
470
471 public:
472 bufferlist from_mon;
473 string outs;
474
475 ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
476 : cmdctx(cmdctx_)
477 {}
478 void finish(int r) override {
479 cmdctx->odata.claim_append(from_mon);
480 cmdctx->reply(r, outs);
481 }
482 };
483
484 std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
485
486 MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
487 if (!session) {
488 return true;
489 }
490 session->put(); // SessionRef takes a ref
491 if (session->inst.name == entity_name_t())
492 session->inst.name = m->get_source();
493
494 std::string format;
495 boost::scoped_ptr<Formatter> f;
496 map<string,string> param_str_map;
497
498 if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
499 cmdctx->reply(-EINVAL, ss);
500 return true;
501 }
502
503 {
504 cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
505 f.reset(Formatter::create(format));
506 }
507
508 cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
509
510 dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
511 dout(4) << "prefix=" << prefix << dendl;
512
513 if (prefix == "get_command_descriptions") {
514 int cmdnum = 0;
515
516 dout(10) << "reading commands from python modules" << dendl;
517 auto py_commands = py_modules.get_commands();
518
519 JSONFormatter f;
520 f.open_object_section("command_descriptions");
521 for (const auto &pyc : py_commands) {
522 ostringstream secname;
523 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
524 dout(20) << "Dumping " << pyc.cmdstring << " (" << pyc.helpstring
525 << ")" << dendl;
526 dump_cmddesc_to_json(&f, secname.str(), pyc.cmdstring, pyc.helpstring,
527 "mgr", pyc.perm, "cli", 0);
528 cmdnum++;
529 }
530
531 for (const auto &cp : mgr_commands) {
532 ostringstream secname;
533 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
534 dump_cmddesc_to_json(&f, secname.str(), cp.cmdstring, cp.helpstring,
535 cp.module, cp.perm, cp.availability, 0);
536 cmdnum++;
537 }
538 f.close_section(); // command_descriptions
539 f.flush(cmdctx->odata);
540 cmdctx->reply(0, ss);
541 return true;
542 }
543
544 // lookup command
545 const MgrCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
546 ARRAY_SIZE(mgr_commands));
547 _generate_command_map(cmdctx->cmdmap, param_str_map);
548 if (!mgr_cmd) {
549 MgrCommand py_command = {"", "", "py", "rw", "cli"};
550 if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
551 param_str_map, &py_command)) {
552 dout(1) << " access denied" << dendl;
553 ss << "access denied; does your client key have mgr caps?"
554 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
555 cmdctx->reply(-EACCES, ss);
556 return true;
557 }
558 } else {
559 // validate user's permissions for requested command
560 if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
561 param_str_map, mgr_cmd)) {
562 dout(1) << " access denied" << dendl;
563 audit_clog->info() << "from='" << session->inst << "' "
564 << "entity='" << session->entity_name << "' "
565 << "cmd=" << m->cmd << ": access denied";
566 ss << "access denied' does your client key have mgr caps?"
567 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
568 cmdctx->reply(-EACCES, ss);
569 return true;
570 }
571 }
572
573 audit_clog->debug()
574 << "from='" << session->inst << "' "
575 << "entity='" << session->entity_name << "' "
576 << "cmd=" << m->cmd << ": dispatch";
577
578 // -----------
579 // PG commands
580
581 if (prefix == "pg scrub" ||
582 prefix == "pg repair" ||
583 prefix == "pg deep-scrub") {
584 string scrubop = prefix.substr(3, string::npos);
585 pg_t pgid;
586 string pgidstr;
587 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
588 if (!pgid.parse(pgidstr.c_str())) {
589 ss << "invalid pgid '" << pgidstr << "'";
590 cmdctx->reply(-EINVAL, ss);
591 return true;
592 }
593 bool pg_exists = false;
594 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
595 pg_exists = osdmap.pg_exists(pgid);
596 });
597 if (!pg_exists) {
598 ss << "pg " << pgid << " dne";
599 cmdctx->reply(-ENOENT, ss);
600 return true;
601 }
602 int acting_primary = -1;
603 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
604 acting_primary = osdmap.get_pg_acting_primary(pgid);
605 });
606 if (acting_primary == -1) {
607 ss << "pg " << pgid << " has no primary osd";
608 cmdctx->reply(-EAGAIN, ss);
609 return true;
610 }
611 auto p = osd_cons.find(acting_primary);
612 if (p == osd_cons.end()) {
613 ss << "pg " << pgid << " primary osd." << acting_primary
614 << " is not currently connected";
615 cmdctx->reply(-EAGAIN, ss);
616 }
617 vector<pg_t> pgs = { pgid };
618 for (auto& con : p->second) {
619 con->send_message(new MOSDScrub(monc->get_fsid(),
620 pgs,
621 scrubop == "repair",
622 scrubop == "deep-scrub"));
623 }
624 ss << "instructing pg " << pgid << " on osd." << acting_primary
625 << " to " << scrubop;
626 cmdctx->reply(0, ss);
627 return true;
628 } else if (prefix == "osd scrub" ||
629 prefix == "osd deep-scrub" ||
630 prefix == "osd repair") {
631 string whostr;
632 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
633 vector<string> pvec;
634 get_str_vec(prefix, pvec);
635
636 set<int> osds;
637 if (whostr == "*") {
638 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
639 for (int i = 0; i < osdmap.get_max_osd(); i++)
640 if (osdmap.is_up(i)) {
641 osds.insert(i);
642 }
643 });
644 } else {
645 long osd = parse_osd_id(whostr.c_str(), &ss);
646 if (osd < 0) {
647 ss << "invalid osd '" << whostr << "'";
648 cmdctx->reply(-EINVAL, ss);
649 return true;
650 }
651 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
652 if (osdmap.is_up(osd)) {
653 osds.insert(osd);
654 }
655 });
656 if (osds.empty()) {
657 ss << "osd." << osd << " is not up";
658 cmdctx->reply(-EAGAIN, ss);
659 return true;
660 }
661 }
662 set<int> sent_osds, failed_osds;
663 for (auto osd : osds) {
664 auto p = osd_cons.find(osd);
665 if (p == osd_cons.end()) {
666 failed_osds.insert(osd);
667 } else {
668 sent_osds.insert(osd);
669 for (auto& con : p->second) {
670 con->send_message(new MOSDScrub(monc->get_fsid(),
671 pvec.back() == "repair",
672 pvec.back() == "deep-scrub"));
673 }
674 }
675 }
676 if (failed_osds.size() == osds.size()) {
677 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
678 << " (not connected)";
679 r = -EAGAIN;
680 } else {
681 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
682 if (!failed_osds.empty()) {
683 ss << "; osd(s) " << failed_osds << " were not connected";
684 }
685 r = 0;
686 }
687 cmdctx->reply(0, ss);
688 return true;
689 } else if (prefix == "osd reweight-by-pg" ||
690 prefix == "osd reweight-by-utilization" ||
691 prefix == "osd test-reweight-by-pg" ||
692 prefix == "osd test-reweight-by-utilization") {
693 bool by_pg =
694 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
695 bool dry_run =
696 prefix == "osd test-reweight-by-pg" ||
697 prefix == "osd test-reweight-by-utilization";
698 int64_t oload;
699 cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
700 set<int64_t> pools;
701 vector<string> poolnames;
702 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
703 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
704 for (const auto& poolname : poolnames) {
705 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
706 if (pool < 0) {
707 ss << "pool '" << poolname << "' does not exist";
708 r = -ENOENT;
709 }
710 pools.insert(pool);
711 }
712 });
713 if (r) {
714 cmdctx->reply(r, ss);
715 return true;
716 }
717 double max_change = g_conf->mon_reweight_max_change;
718 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
719 if (max_change <= 0.0) {
720 ss << "max_change " << max_change << " must be positive";
721 cmdctx->reply(-EINVAL, ss);
722 return true;
723 }
724 int64_t max_osds = g_conf->mon_reweight_max_osds;
725 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
726 if (max_osds <= 0) {
727 ss << "max_osds " << max_osds << " must be positive";
728 cmdctx->reply(-EINVAL, ss);
729 return true;
730 }
731 string no_increasing;
732 cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
733 string out_str;
734 mempool::osdmap::map<int32_t, uint32_t> new_weights;
735 r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
736 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
737 return reweight::by_utilization(osdmap, pgmap,
738 oload,
739 max_change,
740 max_osds,
741 by_pg,
742 pools.empty() ? NULL : &pools,
743 no_increasing == "--no-increasing",
744 &new_weights,
745 &ss, &out_str, f.get());
746 });
747 });
748 if (r >= 0) {
749 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
750 }
751 if (f) {
752 f->flush(cmdctx->odata);
753 } else {
754 cmdctx->odata.append(out_str);
755 }
756 if (r < 0) {
757 ss << "FAILED reweight-by-pg";
758 cmdctx->reply(r, ss);
759 return true;
760 } else if (r == 0 || dry_run) {
761 ss << "no change";
762 cmdctx->reply(r, ss);
763 return true;
764 } else {
765 json_spirit::Object json_object;
766 for (const auto& osd_weight : new_weights) {
767 json_spirit::Config::add(json_object,
768 std::to_string(osd_weight.first),
769 std::to_string(osd_weight.second));
770 }
771 string s = json_spirit::write(json_object);
772 std::replace(begin(s), end(s), '\"', '\'');
773 const string cmd =
774 "{"
775 "\"prefix\": \"osd reweightn\", "
776 "\"weights\": \"" + s + "\""
777 "}";
778 auto on_finish = new ReplyOnFinish(cmdctx);
779 monc->start_mon_command({cmd}, {},
780 &on_finish->from_mon, &on_finish->outs, on_finish);
781 return true;
782 }
783 } else if (prefix == "osd df") {
784 string method;
785 cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
786 r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) {
787 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
788 print_osd_utilization(osdmap, &pgservice, ss,
789 f.get(), method == "tree");
790
791 cmdctx->odata.append(ss);
792 return 0;
793 });
794 });
795 cmdctx->reply(r, "");
796 return true;
797 } else {
798 r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
799 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
800 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
801 f.get(), &ss, &cmdctx->odata);
802 });
803 });
804
805 if (r != -EOPNOTSUPP) {
806 cmdctx->reply(r, ss);
807 return true;
808 }
809 }
810
811 // None of the special native commands,
812 MgrPyModule *handler = nullptr;
813 auto py_commands = py_modules.get_commands();
814 for (const auto &pyc : py_commands) {
815 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
816 dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
817 if (pyc_prefix == prefix) {
818 handler = pyc.handler;
819 break;
820 }
821 }
822
823 if (handler == nullptr) {
824 ss << "No handler found for '" << prefix << "'";
825 dout(4) << "No handler found for '" << prefix << "'" << dendl;
826 cmdctx->reply(-EINVAL, ss);
827 return true;
828 } else {
829 // Okay, now we have a handler to call, but we must not call it
830 // in this thread, because the python handlers can do anything,
831 // including blocking, and including calling back into mgr.
832 dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
833 finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
834 std::stringstream ds;
835 std::stringstream ss;
836 int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
837 cmdctx->odata.append(ds);
838 cmdctx->reply(r, ss);
839 }));
840 return true;
841 }
842 }
843
844 void DaemonServer::send_report()
845 {
846 auto m = new MMonMgrReport();
847 cluster_state.with_pgmap([&](const PGMap& pg_map) {
848 cluster_state.update_delta_stats();
849
850 // FIXME: reporting health detail here might be a bad idea?
851 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
852 // FIXME: no easy way to get mon features here. this will do for
853 // now, though, as long as we don't make a backward-incompat change.
854 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
855 dout(10) << pg_map << dendl;
856 pg_map.get_health(g_ceph_context, osdmap,
857 m->health_summary,
858 &m->health_detail);
859 });
860 });
861 // TODO? We currently do not notify the PyModules
862 // TODO: respect needs_send, so we send the report only if we are asked to do
863 // so, or the state is updated.
864 monc->send_mon_message(m);
865 }