]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/DaemonServer.cc
update sources to v12.1.2
[ceph.git] / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "DaemonServer.h"
15
16 #include "include/str_list.h"
17 #include "auth/RotatingKeyRing.h"
18 #include "json_spirit/json_spirit_writer.h"
19
20 #include "mgr/mgr_commands.h"
21 #include "mon/MonCommand.h"
22
23 #include "messages/MMgrOpen.h"
24 #include "messages/MMgrConfigure.h"
25 #include "messages/MMonMgrReport.h"
26 #include "messages/MCommand.h"
27 #include "messages/MCommandReply.h"
28 #include "messages/MPGStats.h"
29 #include "messages/MOSDScrub.h"
30 #include "messages/MOSDForceRecovery.h"
31 #include "common/errno.h"
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mgr
35 #undef dout_prefix
36 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
37
38
39
40 DaemonServer::DaemonServer(MonClient *monc_,
41 Finisher &finisher_,
42 DaemonStateIndex &daemon_state_,
43 ClusterState &cluster_state_,
44 PyModules &py_modules_,
45 LogChannelRef clog_,
46 LogChannelRef audit_clog_)
47 : Dispatcher(g_ceph_context),
48 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
49 g_conf->mgr_client_bytes)),
50 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
51 g_conf->mgr_client_messages)),
52 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
53 g_conf->mgr_osd_bytes)),
54 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
55 g_conf->mgr_osd_messages)),
56 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
57 g_conf->mgr_mds_bytes)),
58 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
59 g_conf->mgr_mds_messages)),
60 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
61 g_conf->mgr_mon_bytes)),
62 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
63 g_conf->mgr_mon_messages)),
64 msgr(nullptr),
65 monc(monc_),
66 finisher(finisher_),
67 daemon_state(daemon_state_),
68 cluster_state(cluster_state_),
69 py_modules(py_modules_),
70 clog(clog_),
71 audit_clog(audit_clog_),
72 auth_registry(g_ceph_context,
73 g_conf->auth_supported.empty() ?
74 g_conf->auth_cluster_required :
75 g_conf->auth_supported),
76 lock("DaemonServer")
77 {}
78
79 DaemonServer::~DaemonServer() {
80 delete msgr;
81 }
82
83 int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
84 {
85 // Initialize Messenger
86 std::string public_msgr_type = g_conf->ms_public_type.empty() ?
87 g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
88 msgr = Messenger::create(g_ceph_context, public_msgr_type,
89 entity_name_t::MGR(gid),
90 "mgr",
91 getpid(), 0);
92 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
93
94 // throttle clients
95 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
96 client_byte_throttler.get(),
97 client_msg_throttler.get());
98
99 // servers
100 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
101 osd_byte_throttler.get(),
102 osd_msg_throttler.get());
103 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
104 mds_byte_throttler.get(),
105 mds_msg_throttler.get());
106 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
107 mon_byte_throttler.get(),
108 mon_msg_throttler.get());
109
110 int r = msgr->bind(g_conf->public_addr);
111 if (r < 0) {
112 derr << "unable to bind mgr to " << g_conf->public_addr << dendl;
113 return r;
114 }
115
116 msgr->set_myname(entity_name_t::MGR(gid));
117 msgr->set_addr_unknowns(client_addr);
118
119 msgr->start();
120 msgr->add_dispatcher_tail(this);
121
122 started_at = ceph_clock_now();
123
124 return 0;
125 }
126
127 entity_addr_t DaemonServer::get_myaddr() const
128 {
129 return msgr->get_myaddr();
130 }
131
132
133 bool DaemonServer::ms_verify_authorizer(Connection *con,
134 int peer_type,
135 int protocol,
136 ceph::bufferlist& authorizer_data,
137 ceph::bufferlist& authorizer_reply,
138 bool& is_valid,
139 CryptoKey& session_key)
140 {
141 auto handler = auth_registry.get_handler(protocol);
142 if (!handler) {
143 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
144 is_valid = false;
145 return true;
146 }
147
148 MgrSessionRef s(new MgrSession(cct));
149 s->inst.addr = con->get_peer_addr();
150 AuthCapsInfo caps_info;
151
152 RotatingKeyRing *keys = monc->rotating_secrets.get();
153 if (keys) {
154 is_valid = handler->verify_authorizer(
155 cct, keys,
156 authorizer_data,
157 authorizer_reply, s->entity_name,
158 s->global_id, caps_info,
159 session_key);
160 } else {
161 dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
162 is_valid = false;
163 }
164
165 if (is_valid) {
166 if (caps_info.allow_all) {
167 dout(10) << " session " << s << " " << s->entity_name
168 << " allow_all" << dendl;
169 s->caps.set_allow_all();
170 }
171 if (caps_info.caps.length() > 0) {
172 bufferlist::iterator p = caps_info.caps.begin();
173 string str;
174 try {
175 ::decode(str, p);
176 }
177 catch (buffer::error& e) {
178 }
179 bool success = s->caps.parse(str);
180 if (success) {
181 dout(10) << " session " << s << " " << s->entity_name
182 << " has caps " << s->caps << " '" << str << "'" << dendl;
183 } else {
184 dout(10) << " session " << s << " " << s->entity_name
185 << " failed to parse caps '" << str << "'" << dendl;
186 is_valid = false;
187 }
188 }
189 con->set_priv(s->get());
190
191 if (peer_type == CEPH_ENTITY_TYPE_OSD) {
192 Mutex::Locker l(lock);
193 s->osd_id = atoi(s->entity_name.get_id().c_str());
194 dout(10) << "registering osd." << s->osd_id << " session "
195 << s << " con " << con << dendl;
196 osd_cons[s->osd_id].insert(con);
197 }
198 }
199
200 return true;
201 }
202
203
204 bool DaemonServer::ms_get_authorizer(int dest_type,
205 AuthAuthorizer **authorizer, bool force_new)
206 {
207 dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
208
209 if (dest_type == CEPH_ENTITY_TYPE_MON) {
210 return true;
211 }
212
213 if (force_new) {
214 if (monc->wait_auth_rotating(10) < 0)
215 return false;
216 }
217
218 *authorizer = monc->build_authorizer(dest_type);
219 dout(20) << "got authorizer " << *authorizer << dendl;
220 return *authorizer != NULL;
221 }
222
223 bool DaemonServer::ms_handle_reset(Connection *con)
224 {
225 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
226 MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
227 if (!session) {
228 return false;
229 }
230 session->put(); // SessionRef takes a ref
231 Mutex::Locker l(lock);
232 dout(10) << "unregistering osd." << session->osd_id
233 << " session " << session << " con " << con << dendl;
234 osd_cons[session->osd_id].erase(con);
235 }
236 return false;
237 }
238
239 bool DaemonServer::ms_handle_refused(Connection *con)
240 {
241 // do nothing for now
242 return false;
243 }
244
245 bool DaemonServer::ms_dispatch(Message *m)
246 {
247 Mutex::Locker l(lock);
248
249 switch (m->get_type()) {
250 case MSG_PGSTATS:
251 cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
252 maybe_ready(m->get_source().num());
253 m->put();
254 return true;
255 case MSG_MGR_REPORT:
256 return handle_report(static_cast<MMgrReport*>(m));
257 case MSG_MGR_OPEN:
258 return handle_open(static_cast<MMgrOpen*>(m));
259 case MSG_COMMAND:
260 return handle_command(static_cast<MCommand*>(m));
261 default:
262 dout(1) << "Unhandled message type " << m->get_type() << dendl;
263 return false;
264 };
265 }
266
267 void DaemonServer::maybe_ready(int32_t osd_id)
268 {
269 if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
270 dout(4) << "initial report from osd " << osd_id << dendl;
271 reported_osds.insert(osd_id);
272 std::set<int32_t> up_osds;
273
274 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
275 osdmap.get_up_osds(up_osds);
276 });
277
278 std::set<int32_t> unreported_osds;
279 std::set_difference(up_osds.begin(), up_osds.end(),
280 reported_osds.begin(), reported_osds.end(),
281 std::inserter(unreported_osds, unreported_osds.begin()));
282
283 if (unreported_osds.size() == 0) {
284 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
285 pgmap_ready = true;
286 reported_osds.clear();
287 // Avoid waiting for next tick
288 send_report();
289 } else {
290 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
291 " to report in before PGMap is ready" << dendl;
292 }
293 }
294 }
295
296 void DaemonServer::shutdown()
297 {
298 dout(10) << "begin" << dendl;
299 msgr->shutdown();
300 msgr->wait();
301 dout(10) << "done" << dendl;
302 }
303
304
305
306 bool DaemonServer::handle_open(MMgrOpen *m)
307 {
308 DaemonKey key;
309 if (!m->service_name.empty()) {
310 key.first = m->service_name;
311 } else {
312 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
313 }
314 key.second = m->daemon_name;
315
316 dout(4) << "from " << m->get_connection() << " " << key << dendl;
317
318 auto configure = new MMgrConfigure();
319 configure->stats_period = g_conf->mgr_stats_period;
320 m->get_connection()->send_message(configure);
321
322 DaemonStatePtr daemon;
323 if (daemon_state.exists(key)) {
324 daemon = daemon_state.get(key);
325 }
326 if (daemon) {
327 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
328 Mutex::Locker l(daemon->lock);
329 daemon_state.get(key)->perf_counters.clear();
330 }
331
332 if (m->service_daemon) {
333 if (!daemon) {
334 dout(4) << "constructing new DaemonState for " << key << dendl;
335 daemon = std::make_shared<DaemonState>(daemon_state.types);
336 daemon->key = key;
337 if (m->daemon_metadata.count("hostname")) {
338 daemon->hostname = m->daemon_metadata["hostname"];
339 }
340 daemon_state.insert(daemon);
341 }
342 Mutex::Locker l(daemon->lock);
343 daemon->service_daemon = true;
344 daemon->metadata = m->daemon_metadata;
345 daemon->service_status = m->daemon_status;
346
347 utime_t now = ceph_clock_now();
348 auto d = pending_service_map.get_daemon(m->service_name,
349 m->daemon_name);
350 if (d->gid != (uint64_t)m->get_source().num()) {
351 dout(10) << "registering " << key << " in pending_service_map" << dendl;
352 d->gid = m->get_source().num();
353 d->addr = m->get_source_addr();
354 d->start_epoch = pending_service_map.epoch;
355 d->start_stamp = now;
356 d->metadata = m->daemon_metadata;
357 pending_service_map_dirty = pending_service_map.epoch;
358 }
359 }
360
361 m->put();
362 return true;
363 }
364
365 bool DaemonServer::handle_report(MMgrReport *m)
366 {
367 DaemonKey key;
368 if (!m->service_name.empty()) {
369 key.first = m->service_name;
370 } else {
371 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
372 }
373 key.second = m->daemon_name;
374
375 dout(4) << "from " << m->get_connection() << " " << key << dendl;
376
377 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
378 m->service_name.empty()) {
379 // Clients should not be sending us stats unless they are declaring
380 // themselves to be a daemon for some service.
381 dout(4) << "rejecting report from non-daemon client " << m->daemon_name
382 << dendl;
383 m->put();
384 return true;
385 }
386
387 DaemonStatePtr daemon;
388 if (daemon_state.exists(key)) {
389 dout(20) << "updating existing DaemonState for " << key << dendl;
390 daemon = daemon_state.get(key);
391 } else {
392 dout(4) << "constructing new DaemonState for " << key << dendl;
393 daemon = std::make_shared<DaemonState>(daemon_state.types);
394 // FIXME: crap, we don't know the hostname at this stage.
395 daemon->key = key;
396 daemon_state.insert(daemon);
397 // FIXME: we should avoid this case by rejecting MMgrReport from
398 // daemons without sessions, and ensuring that session open
399 // always contains metadata.
400 }
401 assert(daemon != nullptr);
402 auto &daemon_counters = daemon->perf_counters;
403 {
404 Mutex::Locker l(daemon->lock);
405 daemon_counters.update(m);
406 }
407 // if there are any schema updates, notify the python modules
408 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
409 ostringstream oss;
410 oss << key.first << '.' << key.second;
411 py_modules.notify_all("perf_schema_update", oss.str());
412 }
413
414 if (daemon->service_daemon) {
415 utime_t now = ceph_clock_now();
416 if (m->daemon_status) {
417 daemon->service_status = *m->daemon_status;
418 daemon->service_status_stamp = now;
419 }
420 daemon->last_service_beacon = now;
421 } else if (m->daemon_status) {
422 derr << "got status from non-daemon " << key << dendl;
423 }
424
425 m->put();
426 return true;
427 }
428
429
430 void DaemonServer::_generate_command_map(
431 map<string,cmd_vartype>& cmdmap,
432 map<string,string> &param_str_map)
433 {
434 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
435 p != cmdmap.end(); ++p) {
436 if (p->first == "prefix")
437 continue;
438 if (p->first == "caps") {
439 vector<string> cv;
440 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
441 cv.size() % 2 == 0) {
442 for (unsigned i = 0; i < cv.size(); i += 2) {
443 string k = string("caps_") + cv[i];
444 param_str_map[k] = cv[i + 1];
445 }
446 continue;
447 }
448 }
449 param_str_map[p->first] = cmd_vartype_stringify(p->second);
450 }
451 }
452
453 const MonCommand *DaemonServer::_get_mgrcommand(
454 const string &cmd_prefix,
455 const std::vector<MonCommand> &cmds)
456 {
457 const MonCommand *this_cmd = nullptr;
458 for (const auto &cmd : cmds) {
459 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
460 this_cmd = &cmd;
461 break;
462 }
463 }
464 return this_cmd;
465 }
466
467 bool DaemonServer::_allowed_command(
468 MgrSession *s,
469 const string &module,
470 const string &prefix,
471 const map<string,cmd_vartype>& cmdmap,
472 const map<string,string>& param_str_map,
473 const MonCommand *this_cmd) {
474
475 if (s->entity_name.is_mon()) {
476 // mon is all-powerful. even when it is forwarding commands on behalf of
477 // old clients; we expect the mon is validating commands before proxying!
478 return true;
479 }
480
481 bool cmd_r = this_cmd->requires_perm('r');
482 bool cmd_w = this_cmd->requires_perm('w');
483 bool cmd_x = this_cmd->requires_perm('x');
484
485 bool capable = s->caps.is_capable(
486 g_ceph_context,
487 CEPH_ENTITY_TYPE_MGR,
488 s->entity_name,
489 module, prefix, param_str_map,
490 cmd_r, cmd_w, cmd_x);
491
492 dout(10) << " " << s->entity_name << " "
493 << (capable ? "" : "not ") << "capable" << dendl;
494 return capable;
495 }
496
497 bool DaemonServer::handle_command(MCommand *m)
498 {
499 int r = 0;
500 std::stringstream ss;
501 std::string prefix;
502
503 assert(lock.is_locked_by_me());
504
505 /**
506 * The working data for processing an MCommand. This lives in
507 * a class to enable passing it into other threads for processing
508 * outside of the thread/locks that called handle_command.
509 */
510 class CommandContext
511 {
512 public:
513 MCommand *m;
514 bufferlist odata;
515 cmdmap_t cmdmap;
516
517 CommandContext(MCommand *m_)
518 : m(m_)
519 {
520 }
521
522 ~CommandContext()
523 {
524 m->put();
525 }
526
527 void reply(int r, const std::stringstream &ss)
528 {
529 reply(r, ss.str());
530 }
531
532 void reply(int r, const std::string &rs)
533 {
534 // Let the connection drop as soon as we've sent our response
535 ConnectionRef con = m->get_connection();
536 if (con) {
537 con->mark_disposable();
538 }
539
540 dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl;
541 if (con) {
542 MCommandReply *reply = new MCommandReply(r, rs);
543 reply->set_tid(m->get_tid());
544 reply->set_data(odata);
545 con->send_message(reply);
546 }
547 }
548 };
549
550 /**
551 * A context for receiving a bufferlist/error string from a background
552 * function and then calling back to a CommandContext when it's done
553 */
554 class ReplyOnFinish : public Context {
555 std::shared_ptr<CommandContext> cmdctx;
556
557 public:
558 bufferlist from_mon;
559 string outs;
560
561 ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
562 : cmdctx(cmdctx_)
563 {}
564 void finish(int r) override {
565 cmdctx->odata.claim_append(from_mon);
566 cmdctx->reply(r, outs);
567 }
568 };
569
570 std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
571
572 MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
573 if (!session) {
574 return true;
575 }
576 session->put(); // SessionRef takes a ref
577 if (session->inst.name == entity_name_t())
578 session->inst.name = m->get_source();
579
580 std::string format;
581 boost::scoped_ptr<Formatter> f;
582 map<string,string> param_str_map;
583
584 if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
585 cmdctx->reply(-EINVAL, ss);
586 return true;
587 }
588
589 {
590 cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
591 f.reset(Formatter::create(format));
592 }
593
594 cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
595
596 dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
597 dout(4) << "prefix=" << prefix << dendl;
598
599 if (prefix == "get_command_descriptions") {
600 dout(10) << "reading commands from python modules" << dendl;
601 const auto py_commands = py_modules.get_commands();
602
603 int cmdnum = 0;
604 JSONFormatter f;
605 f.open_object_section("command_descriptions");
606
607 auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){
608 ostringstream secname;
609 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
610 dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring,
611 mc.module, mc.req_perms, mc.availability, 0);
612 cmdnum++;
613 };
614
615 for (const auto &pyc : py_commands) {
616 dump_cmd(pyc);
617 }
618
619 for (const auto &mgr_cmd : mgr_commands) {
620 dump_cmd(mgr_cmd);
621 }
622
623 f.close_section(); // command_descriptions
624 f.flush(cmdctx->odata);
625 cmdctx->reply(0, ss);
626 return true;
627 }
628
629 // lookup command
630 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
631 _generate_command_map(cmdctx->cmdmap, param_str_map);
632 if (!mgr_cmd) {
633 MonCommand py_command = {"", "", "py", "rw", "cli"};
634 if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
635 param_str_map, &py_command)) {
636 dout(1) << " access denied" << dendl;
637 ss << "access denied; does your client key have mgr caps?"
638 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
639 cmdctx->reply(-EACCES, ss);
640 return true;
641 }
642 } else {
643 // validate user's permissions for requested command
644 if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
645 param_str_map, mgr_cmd)) {
646 dout(1) << " access denied" << dendl;
647 audit_clog->info() << "from='" << session->inst << "' "
648 << "entity='" << session->entity_name << "' "
649 << "cmd=" << m->cmd << ": access denied";
650 ss << "access denied' does your client key have mgr caps?"
651 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
652 cmdctx->reply(-EACCES, ss);
653 return true;
654 }
655 }
656
657 audit_clog->debug()
658 << "from='" << session->inst << "' "
659 << "entity='" << session->entity_name << "' "
660 << "cmd=" << m->cmd << ": dispatch";
661
662 // ----------------
663 // service map commands
664 if (prefix == "service dump") {
665 if (!f)
666 f.reset(Formatter::create("json-pretty"));
667 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
668 f->dump_object("service_map", service_map);
669 });
670 f->flush(cmdctx->odata);
671 cmdctx->reply(0, ss);
672 return true;
673 }
674 if (prefix == "service status") {
675 if (!f)
676 f.reset(Formatter::create("json-pretty"));
677 // only include state from services that are in the persisted service map
678 f->open_object_section("service_status");
679 ServiceMap s;
680 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
681 s = service_map;
682 });
683 for (auto& p : s.services) {
684 f->open_object_section(p.first.c_str());
685 for (auto& q : p.second.daemons) {
686 f->open_object_section(q.first.c_str());
687 DaemonKey key(p.first, q.first);
688 assert(daemon_state.exists(key));
689 auto daemon = daemon_state.get(key);
690 Mutex::Locker l(daemon->lock);
691 f->dump_stream("status_stamp") << daemon->service_status_stamp;
692 f->dump_stream("last_beacon") << daemon->last_service_beacon;
693 f->open_object_section("status");
694 for (auto& r : daemon->service_status) {
695 f->dump_string(r.first.c_str(), r.second);
696 }
697 f->close_section();
698 f->close_section();
699 }
700 f->close_section();
701 }
702 f->close_section();
703 f->flush(cmdctx->odata);
704 cmdctx->reply(0, ss);
705 return true;
706 }
707
708 // -----------
709 // PG commands
710
711 if (prefix == "pg scrub" ||
712 prefix == "pg repair" ||
713 prefix == "pg deep-scrub") {
714 string scrubop = prefix.substr(3, string::npos);
715 pg_t pgid;
716 string pgidstr;
717 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
718 if (!pgid.parse(pgidstr.c_str())) {
719 ss << "invalid pgid '" << pgidstr << "'";
720 cmdctx->reply(-EINVAL, ss);
721 return true;
722 }
723 bool pg_exists = false;
724 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
725 pg_exists = osdmap.pg_exists(pgid);
726 });
727 if (!pg_exists) {
728 ss << "pg " << pgid << " dne";
729 cmdctx->reply(-ENOENT, ss);
730 return true;
731 }
732 int acting_primary = -1;
733 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
734 acting_primary = osdmap.get_pg_acting_primary(pgid);
735 });
736 if (acting_primary == -1) {
737 ss << "pg " << pgid << " has no primary osd";
738 cmdctx->reply(-EAGAIN, ss);
739 return true;
740 }
741 auto p = osd_cons.find(acting_primary);
742 if (p == osd_cons.end()) {
743 ss << "pg " << pgid << " primary osd." << acting_primary
744 << " is not currently connected";
745 cmdctx->reply(-EAGAIN, ss);
746 }
747 vector<pg_t> pgs = { pgid };
748 for (auto& con : p->second) {
749 con->send_message(new MOSDScrub(monc->get_fsid(),
750 pgs,
751 scrubop == "repair",
752 scrubop == "deep-scrub"));
753 }
754 ss << "instructing pg " << pgid << " on osd." << acting_primary
755 << " to " << scrubop;
756 cmdctx->reply(0, ss);
757 return true;
758 } else if (prefix == "osd scrub" ||
759 prefix == "osd deep-scrub" ||
760 prefix == "osd repair") {
761 string whostr;
762 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
763 vector<string> pvec;
764 get_str_vec(prefix, pvec);
765
766 set<int> osds;
767 if (whostr == "*" || whostr == "all" || whostr == "any") {
768 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
769 for (int i = 0; i < osdmap.get_max_osd(); i++)
770 if (osdmap.is_up(i)) {
771 osds.insert(i);
772 }
773 });
774 } else {
775 long osd = parse_osd_id(whostr.c_str(), &ss);
776 if (osd < 0) {
777 ss << "invalid osd '" << whostr << "'";
778 cmdctx->reply(-EINVAL, ss);
779 return true;
780 }
781 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
782 if (osdmap.is_up(osd)) {
783 osds.insert(osd);
784 }
785 });
786 if (osds.empty()) {
787 ss << "osd." << osd << " is not up";
788 cmdctx->reply(-EAGAIN, ss);
789 return true;
790 }
791 }
792 set<int> sent_osds, failed_osds;
793 for (auto osd : osds) {
794 auto p = osd_cons.find(osd);
795 if (p == osd_cons.end()) {
796 failed_osds.insert(osd);
797 } else {
798 sent_osds.insert(osd);
799 for (auto& con : p->second) {
800 con->send_message(new MOSDScrub(monc->get_fsid(),
801 pvec.back() == "repair",
802 pvec.back() == "deep-scrub"));
803 }
804 }
805 }
806 if (failed_osds.size() == osds.size()) {
807 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
808 << " (not connected)";
809 r = -EAGAIN;
810 } else {
811 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
812 if (!failed_osds.empty()) {
813 ss << "; osd(s) " << failed_osds << " were not connected";
814 }
815 r = 0;
816 }
817 cmdctx->reply(0, ss);
818 return true;
819 } else if (prefix == "osd reweight-by-pg" ||
820 prefix == "osd reweight-by-utilization" ||
821 prefix == "osd test-reweight-by-pg" ||
822 prefix == "osd test-reweight-by-utilization") {
823 bool by_pg =
824 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
825 bool dry_run =
826 prefix == "osd test-reweight-by-pg" ||
827 prefix == "osd test-reweight-by-utilization";
828 int64_t oload;
829 cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
830 set<int64_t> pools;
831 vector<string> poolnames;
832 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
833 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
834 for (const auto& poolname : poolnames) {
835 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
836 if (pool < 0) {
837 ss << "pool '" << poolname << "' does not exist";
838 r = -ENOENT;
839 }
840 pools.insert(pool);
841 }
842 });
843 if (r) {
844 cmdctx->reply(r, ss);
845 return true;
846 }
847 double max_change = g_conf->mon_reweight_max_change;
848 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
849 if (max_change <= 0.0) {
850 ss << "max_change " << max_change << " must be positive";
851 cmdctx->reply(-EINVAL, ss);
852 return true;
853 }
854 int64_t max_osds = g_conf->mon_reweight_max_osds;
855 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
856 if (max_osds <= 0) {
857 ss << "max_osds " << max_osds << " must be positive";
858 cmdctx->reply(-EINVAL, ss);
859 return true;
860 }
861 string no_increasing;
862 cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
863 string out_str;
864 mempool::osdmap::map<int32_t, uint32_t> new_weights;
865 r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
866 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
867 return reweight::by_utilization(osdmap, pgmap,
868 oload,
869 max_change,
870 max_osds,
871 by_pg,
872 pools.empty() ? NULL : &pools,
873 no_increasing == "--no-increasing",
874 &new_weights,
875 &ss, &out_str, f.get());
876 });
877 });
878 if (r >= 0) {
879 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
880 }
881 if (f) {
882 f->flush(cmdctx->odata);
883 } else {
884 cmdctx->odata.append(out_str);
885 }
886 if (r < 0) {
887 ss << "FAILED reweight-by-pg";
888 cmdctx->reply(r, ss);
889 return true;
890 } else if (r == 0 || dry_run) {
891 ss << "no change";
892 cmdctx->reply(r, ss);
893 return true;
894 } else {
895 json_spirit::Object json_object;
896 for (const auto& osd_weight : new_weights) {
897 json_spirit::Config::add(json_object,
898 std::to_string(osd_weight.first),
899 std::to_string(osd_weight.second));
900 }
901 string s = json_spirit::write(json_object);
902 std::replace(begin(s), end(s), '\"', '\'');
903 const string cmd =
904 "{"
905 "\"prefix\": \"osd reweightn\", "
906 "\"weights\": \"" + s + "\""
907 "}";
908 auto on_finish = new ReplyOnFinish(cmdctx);
909 monc->start_mon_command({cmd}, {},
910 &on_finish->from_mon, &on_finish->outs, on_finish);
911 return true;
912 }
913 } else if (prefix == "osd df") {
914 string method;
915 cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
916 r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) {
917 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
918 print_osd_utilization(osdmap, &pgservice, ss,
919 f.get(), method == "tree");
920
921 cmdctx->odata.append(ss);
922 return 0;
923 });
924 });
925 cmdctx->reply(r, "");
926 return true;
927 } else if (prefix == "pg force-recovery" ||
928 prefix == "pg force-backfill" ||
929 prefix == "pg cancel-force-recovery" ||
930 prefix == "pg cancel-force-backfill") {
931 string forceop = prefix.substr(3, string::npos);
932 list<pg_t> parsed_pgs;
933 map<int, list<pg_t> > osdpgs;
934
935 // figure out actual op just once
936 int actual_op = 0;
937 if (forceop == "force-recovery") {
938 actual_op = OFR_RECOVERY;
939 } else if (forceop == "force-backfill") {
940 actual_op = OFR_BACKFILL;
941 } else if (forceop == "cancel-force-backfill") {
942 actual_op = OFR_BACKFILL | OFR_CANCEL;
943 } else if (forceop == "cancel-force-recovery") {
944 actual_op = OFR_RECOVERY | OFR_CANCEL;
945 }
946
947 // covnert pg names to pgs, discard any invalid ones while at it
948 {
949 // we don't want to keep pgidstr and pgidstr_nodup forever
950 vector<string> pgidstr;
951 // get pgids to process and prune duplicates
952 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
953 set<string> pgidstr_nodup(pgidstr.begin(), pgidstr.end());
954 if (pgidstr.size() != pgidstr_nodup.size()) {
955 // move elements only when there were duplicates, as this
956 // reorders them
957 pgidstr.resize(pgidstr_nodup.size());
958 auto it = pgidstr_nodup.begin();
959 for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) {
960 pgidstr[i] = std::move(*it++);
961 }
962 }
963
964 cluster_state.with_pgmap([&](const PGMap& pg_map) {
965 for (auto& pstr : pgidstr) {
966 pg_t parsed_pg;
967 if (!parsed_pg.parse(pstr.c_str())) {
968 ss << "invalid pgid '" << pstr << "'; ";
969 r = -EINVAL;
970 } else {
971 auto workit = pg_map.pg_stat.find(parsed_pg);
972 if (workit == pg_map.pg_stat.end()) {
973 ss << "pg " << pstr << " not exists; ";
974 r = -ENOENT;
975 } else {
976 pg_stat_t workpg = workit->second;
977
978 // discard pgs for which user requests are pointless
979 switch (actual_op)
980 {
981 case OFR_RECOVERY:
982 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) {
983 // don't return error, user script may be racing with cluster. not fatal.
984 ss << "pg " << pstr << " doesn't require recovery; ";
985 continue;
986 } else if (workpg.state & PG_STATE_FORCED_RECOVERY) {
987 ss << "pg " << pstr << " recovery already forced; ";
988 // return error, as it may be a bug in user script
989 r = -EINVAL;
990 continue;
991 }
992 break;
993 case OFR_BACKFILL:
994 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILL)) == 0) {
995 ss << "pg " << pstr << " doesn't require backfilling; ";
996 continue;
997 } else if (workpg.state & PG_STATE_FORCED_BACKFILL) {
998 ss << "pg " << pstr << " backfill already forced; ";
999 r = -EINVAL;
1000 continue;
1001 }
1002 break;
1003 case OFR_BACKFILL | OFR_CANCEL:
1004 if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) {
1005 ss << "pg " << pstr << " backfill not forced; ";
1006 continue;
1007 }
1008 break;
1009 case OFR_RECOVERY | OFR_CANCEL:
1010 if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) {
1011 ss << "pg " << pstr << " recovery not forced; ";
1012 continue;
1013 }
1014 break;
1015 default:
1016 assert(0 == "actual_op value is not supported");
1017 }
1018
1019 parsed_pgs.push_back(std::move(parsed_pg));
1020 }
1021 }
1022 }
1023
1024 // group pgs to process by osd
1025 for (auto& pgid : parsed_pgs) {
1026 auto workit = pg_map.pg_stat.find(pgid);
1027 if (workit != pg_map.pg_stat.end()) {
1028 pg_stat_t workpg = workit->second;
1029 set<int32_t> osds(workpg.up.begin(), workpg.up.end());
1030 osds.insert(workpg.acting.begin(), workpg.acting.end());
1031 for (auto i : osds) {
1032 osdpgs[i].push_back(pgid);
1033 }
1034 }
1035 }
1036
1037 });
1038 }
1039
1040 // respond with error only when no pgs are correct
1041 // yes, in case of mixed errors, only the last one will be emitted,
1042 // but the message presented will be fine
1043 if (parsed_pgs.size() != 0) {
1044 // clear error to not confuse users/scripts
1045 r = 0;
1046 }
1047
1048 // optimize the command -> messages conversion, use only one message per distinct OSD
1049 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1050 for (auto& i : osdpgs) {
1051 if (osdmap.is_up(i.first)) {
1052 vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end()));
1053 auto p = osd_cons.find(i.first);
1054 if (p == osd_cons.end()) {
1055 ss << "osd." << i.first << " is not currently connected";
1056 r = -EAGAIN;
1057 continue;
1058 }
1059 for (auto& con : p->second) {
1060 con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op));
1061 }
1062 ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; ";
1063 }
1064 }
1065 });
1066 ss << std::endl;
1067 cmdctx->reply(r, ss);
1068 return true;
1069 } else {
1070 r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
1071 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1072 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
1073 f.get(), &ss, &cmdctx->odata);
1074 });
1075 });
1076
1077 if (r != -EOPNOTSUPP) {
1078 cmdctx->reply(r, ss);
1079 return true;
1080 }
1081 }
1082
1083 // None of the special native commands,
1084 MgrPyModule *handler = nullptr;
1085 auto py_commands = py_modules.get_py_commands();
1086 for (const auto &pyc : py_commands) {
1087 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1088 dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
1089 if (pyc_prefix == prefix) {
1090 handler = pyc.handler;
1091 break;
1092 }
1093 }
1094
1095 if (handler == nullptr) {
1096 ss << "No handler found for '" << prefix << "'";
1097 dout(4) << "No handler found for '" << prefix << "'" << dendl;
1098 cmdctx->reply(-EINVAL, ss);
1099 return true;
1100 } else {
1101 // Okay, now we have a handler to call, but we must not call it
1102 // in this thread, because the python handlers can do anything,
1103 // including blocking, and including calling back into mgr.
1104 dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
1105 finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
1106 std::stringstream ds;
1107 std::stringstream ss;
1108 int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
1109 cmdctx->odata.append(ds);
1110 cmdctx->reply(r, ss);
1111 }));
1112 return true;
1113 }
1114 }
1115
1116 void DaemonServer::_prune_pending_service_map()
1117 {
1118 utime_t cutoff = ceph_clock_now();
1119 cutoff -= g_conf->mgr_service_beacon_grace;
1120 auto p = pending_service_map.services.begin();
1121 while (p != pending_service_map.services.end()) {
1122 auto q = p->second.daemons.begin();
1123 while (q != p->second.daemons.end()) {
1124 DaemonKey key(p->first, q->first);
1125 if (!daemon_state.exists(key)) {
1126 derr << "missing key " << key << dendl;
1127 ++q;
1128 continue;
1129 }
1130 auto daemon = daemon_state.get(key);
1131 Mutex::Locker l(daemon->lock);
1132 if (daemon->last_service_beacon == utime_t()) {
1133 // we must have just restarted; assume they are alive now.
1134 daemon->last_service_beacon = ceph_clock_now();
1135 ++q;
1136 continue;
1137 }
1138 if (daemon->last_service_beacon < cutoff) {
1139 dout(10) << "pruning stale " << p->first << "." << q->first
1140 << " last_beacon " << daemon->last_service_beacon << dendl;
1141 q = p->second.daemons.erase(q);
1142 pending_service_map_dirty = pending_service_map.epoch;
1143 } else {
1144 ++q;
1145 }
1146 }
1147 if (p->second.daemons.empty()) {
1148 p = pending_service_map.services.erase(p);
1149 pending_service_map_dirty = pending_service_map.epoch;
1150 } else {
1151 ++p;
1152 }
1153 }
1154 }
1155
1156 void DaemonServer::send_report()
1157 {
1158 if (!pgmap_ready) {
1159 if (ceph_clock_now() - started_at > g_conf->mgr_stats_period * 4.0) {
1160 pgmap_ready = true;
1161 reported_osds.clear();
1162 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1163 << "potentially incomplete PG state to mon" << dendl;
1164 } else {
1165 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1166 << dendl;
1167 return;
1168 }
1169 }
1170
1171 auto m = new MMonMgrReport();
1172 py_modules.get_health_checks(&m->health_checks);
1173
1174 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1175 cluster_state.update_delta_stats();
1176
1177 if (pending_service_map.epoch) {
1178 _prune_pending_service_map();
1179 if (pending_service_map_dirty >= pending_service_map.epoch) {
1180 pending_service_map.modified = ceph_clock_now();
1181 ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
1182 dout(10) << "sending service_map e" << pending_service_map.epoch
1183 << dendl;
1184 pending_service_map.epoch++;
1185 }
1186 }
1187
1188 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1189 // FIXME: no easy way to get mon features here. this will do for
1190 // now, though, as long as we don't make a backward-incompat change.
1191 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
1192 dout(10) << pg_map << dendl;
1193
1194 pg_map.get_health_checks(g_ceph_context, osdmap,
1195 &m->health_checks);
1196
1197 dout(10) << m->health_checks.checks.size() << " health checks"
1198 << dendl;
1199 dout(20) << "health checks:\n";
1200 JSONFormatter jf(true);
1201 jf.dump_object("health_checks", m->health_checks);
1202 jf.flush(*_dout);
1203 *_dout << dendl;
1204 });
1205 });
1206 // TODO? We currently do not notify the PyModules
1207 // TODO: respect needs_send, so we send the report only if we are asked to do
1208 // so, or the state is updated.
1209 monc->send_mon_message(m);
1210 }
1211
1212 void DaemonServer::got_service_map()
1213 {
1214 Mutex::Locker l(lock);
1215
1216 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
1217 if (pending_service_map.epoch == 0) {
1218 // we just started up
1219 dout(10) << "got initial map e" << service_map.epoch << dendl;
1220 pending_service_map = service_map;
1221 } else {
1222 // we we already active and therefore must have persisted it,
1223 // which means ours is the same or newer.
1224 dout(10) << "got updated map e" << service_map.epoch << dendl;
1225 }
1226 pending_service_map.epoch = service_map.epoch + 1;
1227 });
1228
1229 // cull missing daemons, populate new ones
1230 for (auto& p : pending_service_map.services) {
1231 std::set<std::string> names;
1232 for (auto& q : p.second.daemons) {
1233 names.insert(q.first);
1234 DaemonKey key(p.first, q.first);
1235 if (!daemon_state.exists(key)) {
1236 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
1237 daemon->key = key;
1238 daemon->metadata = q.second.metadata;
1239 if (q.second.metadata.count("hostname")) {
1240 daemon->hostname = q.second.metadata["hostname"];
1241 }
1242 daemon->service_daemon = true;
1243 daemon_state.insert(daemon);
1244 dout(10) << "added missing " << key << dendl;
1245 }
1246 }
1247 daemon_state.cull(p.first, names);
1248 }
1249 }