]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/DaemonServer.cc
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / mgr / DaemonServer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#include "DaemonServer.h"
15
31f18b77 16#include "include/str_list.h"
7c673cae 17#include "auth/RotatingKeyRing.h"
7c673cae
FG
18#include "json_spirit/json_spirit_writer.h"
19
c07f9fc5
FG
20#include "mgr/mgr_commands.h"
21#include "mon/MonCommand.h"
22
7c673cae
FG
23#include "messages/MMgrOpen.h"
24#include "messages/MMgrConfigure.h"
31f18b77 25#include "messages/MMonMgrReport.h"
7c673cae
FG
26#include "messages/MCommand.h"
27#include "messages/MCommandReply.h"
28#include "messages/MPGStats.h"
29#include "messages/MOSDScrub.h"
c07f9fc5 30#include "messages/MOSDForceRecovery.h"
224ce89b 31#include "common/errno.h"
7c673cae
FG
32
33#define dout_context g_ceph_context
34#define dout_subsys ceph_subsys_mgr
35#undef dout_prefix
36#define dout_prefix *_dout << "mgr.server " << __func__ << " "
37
c07f9fc5
FG
38
39
7c673cae
FG
40DaemonServer::DaemonServer(MonClient *monc_,
41 Finisher &finisher_,
42 DaemonStateIndex &daemon_state_,
43 ClusterState &cluster_state_,
3efd9988 44 PyModuleRegistry &py_modules_,
7c673cae
FG
45 LogChannelRef clog_,
46 LogChannelRef audit_clog_)
47 : Dispatcher(g_ceph_context),
48 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
3efd9988 49 g_conf->get_val<uint64_t>("mgr_client_bytes"))),
7c673cae 50 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
3efd9988 51 g_conf->get_val<uint64_t>("mgr_client_messages"))),
7c673cae 52 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
3efd9988 53 g_conf->get_val<uint64_t>("mgr_osd_bytes"))),
7c673cae 54 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
3efd9988 55 g_conf->get_val<uint64_t>("mgr_osd_messages"))),
7c673cae 56 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
3efd9988 57 g_conf->get_val<uint64_t>("mgr_mds_bytes"))),
7c673cae 58 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
3efd9988 59 g_conf->get_val<uint64_t>("mgr_mds_messages"))),
7c673cae 60 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
3efd9988 61 g_conf->get_val<uint64_t>("mgr_mon_bytes"))),
7c673cae 62 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
3efd9988 63 g_conf->get_val<uint64_t>("mgr_mon_messages"))),
7c673cae
FG
64 msgr(nullptr),
65 monc(monc_),
66 finisher(finisher_),
67 daemon_state(daemon_state_),
68 cluster_state(cluster_state_),
69 py_modules(py_modules_),
70 clog(clog_),
71 audit_clog(audit_clog_),
72 auth_registry(g_ceph_context,
73 g_conf->auth_supported.empty() ?
74 g_conf->auth_cluster_required :
75 g_conf->auth_supported),
3efd9988
FG
76 lock("DaemonServer"),
77 pgmap_ready(false)
78{
79 g_conf->add_observer(this);
80}
7c673cae
FG
81
82DaemonServer::~DaemonServer() {
83 delete msgr;
3efd9988 84 g_conf->remove_observer(this);
7c673cae
FG
85}
86
87int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
88{
89 // Initialize Messenger
90 std::string public_msgr_type = g_conf->ms_public_type.empty() ?
91 g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
92 msgr = Messenger::create(g_ceph_context, public_msgr_type,
93 entity_name_t::MGR(gid),
94 "mgr",
95 getpid(), 0);
96 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
97
98 // throttle clients
99 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
100 client_byte_throttler.get(),
101 client_msg_throttler.get());
102
103 // servers
104 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
105 osd_byte_throttler.get(),
106 osd_msg_throttler.get());
107 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
108 mds_byte_throttler.get(),
109 mds_msg_throttler.get());
110 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
111 mon_byte_throttler.get(),
112 mon_msg_throttler.get());
113
114 int r = msgr->bind(g_conf->public_addr);
115 if (r < 0) {
116 derr << "unable to bind mgr to " << g_conf->public_addr << dendl;
117 return r;
118 }
119
120 msgr->set_myname(entity_name_t::MGR(gid));
121 msgr->set_addr_unknowns(client_addr);
122
123 msgr->start();
124 msgr->add_dispatcher_tail(this);
125
224ce89b
WB
126 started_at = ceph_clock_now();
127
7c673cae
FG
128 return 0;
129}
130
131entity_addr_t DaemonServer::get_myaddr() const
132{
133 return msgr->get_myaddr();
134}
135
136
137bool DaemonServer::ms_verify_authorizer(Connection *con,
138 int peer_type,
139 int protocol,
140 ceph::bufferlist& authorizer_data,
141 ceph::bufferlist& authorizer_reply,
142 bool& is_valid,
143 CryptoKey& session_key)
144{
145 auto handler = auth_registry.get_handler(protocol);
146 if (!handler) {
147 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
148 is_valid = false;
149 return true;
150 }
151
152 MgrSessionRef s(new MgrSession(cct));
153 s->inst.addr = con->get_peer_addr();
154 AuthCapsInfo caps_info;
155
c07f9fc5
FG
156 RotatingKeyRing *keys = monc->rotating_secrets.get();
157 if (keys) {
158 is_valid = handler->verify_authorizer(
159 cct, keys,
160 authorizer_data,
161 authorizer_reply, s->entity_name,
162 s->global_id, caps_info,
163 session_key);
164 } else {
165 dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
166 is_valid = false;
167 }
7c673cae
FG
168
169 if (is_valid) {
170 if (caps_info.allow_all) {
171 dout(10) << " session " << s << " " << s->entity_name
172 << " allow_all" << dendl;
173 s->caps.set_allow_all();
174 }
175 if (caps_info.caps.length() > 0) {
176 bufferlist::iterator p = caps_info.caps.begin();
177 string str;
178 try {
179 ::decode(str, p);
180 }
181 catch (buffer::error& e) {
182 }
183 bool success = s->caps.parse(str);
184 if (success) {
185 dout(10) << " session " << s << " " << s->entity_name
186 << " has caps " << s->caps << " '" << str << "'" << dendl;
187 } else {
188 dout(10) << " session " << s << " " << s->entity_name
189 << " failed to parse caps '" << str << "'" << dendl;
190 is_valid = false;
191 }
192 }
193 con->set_priv(s->get());
31f18b77
FG
194
195 if (peer_type == CEPH_ENTITY_TYPE_OSD) {
196 Mutex::Locker l(lock);
197 s->osd_id = atoi(s->entity_name.get_id().c_str());
224ce89b 198 dout(10) << "registering osd." << s->osd_id << " session "
31f18b77
FG
199 << s << " con " << con << dendl;
200 osd_cons[s->osd_id].insert(con);
201 }
7c673cae
FG
202 }
203
204 return true;
205}
206
207
208bool DaemonServer::ms_get_authorizer(int dest_type,
209 AuthAuthorizer **authorizer, bool force_new)
210{
211 dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
212
213 if (dest_type == CEPH_ENTITY_TYPE_MON) {
214 return true;
215 }
216
217 if (force_new) {
218 if (monc->wait_auth_rotating(10) < 0)
219 return false;
220 }
221
222 *authorizer = monc->build_authorizer(dest_type);
223 dout(20) << "got authorizer " << *authorizer << dendl;
224 return *authorizer != NULL;
225}
226
31f18b77
FG
227bool DaemonServer::ms_handle_reset(Connection *con)
228{
229 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
230 MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
231 if (!session) {
232 return false;
233 }
234 session->put(); // SessionRef takes a ref
235 Mutex::Locker l(lock);
224ce89b 236 dout(10) << "unregistering osd." << session->osd_id
31f18b77
FG
237 << " session " << session << " con " << con << dendl;
238 osd_cons[session->osd_id].erase(con);
3efd9988
FG
239
240 auto iter = daemon_connections.find(con);
241 if (iter != daemon_connections.end()) {
242 daemon_connections.erase(iter);
243 }
31f18b77
FG
244 }
245 return false;
246}
247
7c673cae
FG
248bool DaemonServer::ms_handle_refused(Connection *con)
249{
250 // do nothing for now
251 return false;
252}
253
254bool DaemonServer::ms_dispatch(Message *m)
255{
3efd9988
FG
256 // Note that we do *not* take ::lock here, in order to avoid
257 // serializing all message handling. It's up to each handler
258 // to take whatever locks it needs.
7c673cae
FG
259 switch (m->get_type()) {
260 case MSG_PGSTATS:
261 cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
224ce89b 262 maybe_ready(m->get_source().num());
7c673cae
FG
263 m->put();
264 return true;
265 case MSG_MGR_REPORT:
266 return handle_report(static_cast<MMgrReport*>(m));
267 case MSG_MGR_OPEN:
268 return handle_open(static_cast<MMgrOpen*>(m));
269 case MSG_COMMAND:
270 return handle_command(static_cast<MCommand*>(m));
271 default:
272 dout(1) << "Unhandled message type " << m->get_type() << dendl;
273 return false;
274 };
275}
276
224ce89b
WB
277void DaemonServer::maybe_ready(int32_t osd_id)
278{
3efd9988
FG
279 if (pgmap_ready.load()) {
280 // Fast path: we don't need to take lock because pgmap_ready
281 // is already set
282 } else {
283 Mutex::Locker l(lock);
224ce89b 284
3efd9988
FG
285 if (reported_osds.find(osd_id) == reported_osds.end()) {
286 dout(4) << "initial report from osd " << osd_id << dendl;
287 reported_osds.insert(osd_id);
288 std::set<int32_t> up_osds;
224ce89b 289
3efd9988
FG
290 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
291 osdmap.get_up_osds(up_osds);
292 });
224ce89b 293
3efd9988
FG
294 std::set<int32_t> unreported_osds;
295 std::set_difference(up_osds.begin(), up_osds.end(),
296 reported_osds.begin(), reported_osds.end(),
297 std::inserter(unreported_osds, unreported_osds.begin()));
298
299 if (unreported_osds.size() == 0) {
300 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
301 pgmap_ready = true;
302 reported_osds.clear();
303 // Avoid waiting for next tick
304 send_report();
305 } else {
306 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
307 " to report in before PGMap is ready" << dendl;
308 }
224ce89b
WB
309 }
310 }
311}
312
7c673cae
FG
313void DaemonServer::shutdown()
314{
224ce89b 315 dout(10) << "begin" << dendl;
7c673cae
FG
316 msgr->shutdown();
317 msgr->wait();
224ce89b 318 dout(10) << "done" << dendl;
7c673cae
FG
319}
320
321
322
323bool DaemonServer::handle_open(MMgrOpen *m)
324{
3efd9988
FG
325 Mutex::Locker l(lock);
326
224ce89b
WB
327 DaemonKey key;
328 if (!m->service_name.empty()) {
329 key.first = m->service_name;
330 } else {
331 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
332 }
333 key.second = m->daemon_name;
7c673cae 334
224ce89b 335 dout(4) << "from " << m->get_connection() << " " << key << dendl;
7c673cae 336
3efd9988 337 _send_configure(m->get_connection());
7c673cae 338
c07f9fc5 339 DaemonStatePtr daemon;
7c673cae 340 if (daemon_state.exists(key)) {
c07f9fc5
FG
341 daemon = daemon_state.get(key);
342 }
343 if (daemon) {
7c673cae 344 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
c07f9fc5 345 Mutex::Locker l(daemon->lock);
3efd9988 346 daemon->perf_counters.clear();
7c673cae
FG
347 }
348
224ce89b 349 if (m->service_daemon) {
c07f9fc5 350 if (!daemon) {
224ce89b
WB
351 dout(4) << "constructing new DaemonState for " << key << dendl;
352 daemon = std::make_shared<DaemonState>(daemon_state.types);
353 daemon->key = key;
354 if (m->daemon_metadata.count("hostname")) {
355 daemon->hostname = m->daemon_metadata["hostname"];
356 }
357 daemon_state.insert(daemon);
358 }
c07f9fc5 359 Mutex::Locker l(daemon->lock);
224ce89b
WB
360 daemon->service_daemon = true;
361 daemon->metadata = m->daemon_metadata;
362 daemon->service_status = m->daemon_status;
363
364 utime_t now = ceph_clock_now();
365 auto d = pending_service_map.get_daemon(m->service_name,
366 m->daemon_name);
367 if (d->gid != (uint64_t)m->get_source().num()) {
368 dout(10) << "registering " << key << " in pending_service_map" << dendl;
369 d->gid = m->get_source().num();
370 d->addr = m->get_source_addr();
371 d->start_epoch = pending_service_map.epoch;
372 d->start_stamp = now;
373 d->metadata = m->daemon_metadata;
374 pending_service_map_dirty = pending_service_map.epoch;
375 }
376 }
377
3efd9988
FG
378 if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT &&
379 m->service_name.empty())
380 {
381 // Store in set of the daemon/service connections, i.e. those
382 // connections that require an update in the event of stats
383 // configuration changes.
384 daemon_connections.insert(m->get_connection());
385 }
386
7c673cae
FG
387 m->put();
388 return true;
389}
390
391bool DaemonServer::handle_report(MMgrReport *m)
392{
224ce89b
WB
393 DaemonKey key;
394 if (!m->service_name.empty()) {
395 key.first = m->service_name;
396 } else {
397 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
398 }
399 key.second = m->daemon_name;
7c673cae 400
224ce89b 401 dout(4) << "from " << m->get_connection() << " " << key << dendl;
31f18b77 402
224ce89b
WB
403 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
404 m->service_name.empty()) {
405 // Clients should not be sending us stats unless they are declaring
406 // themselves to be a daemon for some service.
407 dout(4) << "rejecting report from non-daemon client " << m->daemon_name
408 << dendl;
31f18b77
FG
409 m->put();
410 return true;
411 }
7c673cae 412
3efd9988 413 // Look up the DaemonState
7c673cae
FG
414 DaemonStatePtr daemon;
415 if (daemon_state.exists(key)) {
224ce89b 416 dout(20) << "updating existing DaemonState for " << key << dendl;
7c673cae
FG
417 daemon = daemon_state.get(key);
418 } else {
224ce89b 419 dout(4) << "constructing new DaemonState for " << key << dendl;
7c673cae
FG
420 daemon = std::make_shared<DaemonState>(daemon_state.types);
421 // FIXME: crap, we don't know the hostname at this stage.
422 daemon->key = key;
423 daemon_state.insert(daemon);
224ce89b
WB
424 // FIXME: we should avoid this case by rejecting MMgrReport from
425 // daemons without sessions, and ensuring that session open
426 // always contains metadata.
7c673cae 427 }
3efd9988
FG
428
429 // Update the DaemonState
7c673cae 430 assert(daemon != nullptr);
c07f9fc5
FG
431 {
432 Mutex::Locker l(daemon->lock);
3efd9988 433 auto &daemon_counters = daemon->perf_counters;
c07f9fc5 434 daemon_counters.update(m);
3efd9988
FG
435
436 if (daemon->service_daemon) {
437 utime_t now = ceph_clock_now();
438 if (m->daemon_status) {
439 daemon->service_status = *m->daemon_status;
440 daemon->service_status_stamp = now;
441 }
442 daemon->last_service_beacon = now;
443 } else if (m->daemon_status) {
444 derr << "got status from non-daemon " << key << dendl;
445 }
c07f9fc5 446 }
3efd9988 447
c07f9fc5
FG
448 // if there are any schema updates, notify the python modules
449 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
450 ostringstream oss;
451 oss << key.first << '.' << key.second;
452 py_modules.notify_all("perf_schema_update", oss.str());
453 }
224ce89b 454
7c673cae
FG
455 m->put();
456 return true;
457}
458
7c673cae
FG
459
460void DaemonServer::_generate_command_map(
461 map<string,cmd_vartype>& cmdmap,
462 map<string,string> &param_str_map)
463{
464 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
465 p != cmdmap.end(); ++p) {
466 if (p->first == "prefix")
467 continue;
468 if (p->first == "caps") {
469 vector<string> cv;
470 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
471 cv.size() % 2 == 0) {
472 for (unsigned i = 0; i < cv.size(); i += 2) {
473 string k = string("caps_") + cv[i];
474 param_str_map[k] = cv[i + 1];
475 }
476 continue;
477 }
478 }
479 param_str_map[p->first] = cmd_vartype_stringify(p->second);
480 }
481}
482
c07f9fc5 483const MonCommand *DaemonServer::_get_mgrcommand(
7c673cae 484 const string &cmd_prefix,
c07f9fc5 485 const std::vector<MonCommand> &cmds)
7c673cae 486{
c07f9fc5
FG
487 const MonCommand *this_cmd = nullptr;
488 for (const auto &cmd : cmds) {
489 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
490 this_cmd = &cmd;
7c673cae
FG
491 break;
492 }
493 }
494 return this_cmd;
495}
496
497bool DaemonServer::_allowed_command(
498 MgrSession *s,
499 const string &module,
500 const string &prefix,
501 const map<string,cmd_vartype>& cmdmap,
502 const map<string,string>& param_str_map,
c07f9fc5 503 const MonCommand *this_cmd) {
7c673cae
FG
504
505 if (s->entity_name.is_mon()) {
506 // mon is all-powerful. even when it is forwarding commands on behalf of
507 // old clients; we expect the mon is validating commands before proxying!
508 return true;
509 }
510
511 bool cmd_r = this_cmd->requires_perm('r');
512 bool cmd_w = this_cmd->requires_perm('w');
513 bool cmd_x = this_cmd->requires_perm('x');
514
515 bool capable = s->caps.is_capable(
516 g_ceph_context,
517 CEPH_ENTITY_TYPE_MGR,
518 s->entity_name,
519 module, prefix, param_str_map,
520 cmd_r, cmd_w, cmd_x);
521
522 dout(10) << " " << s->entity_name << " "
523 << (capable ? "" : "not ") << "capable" << dendl;
524 return capable;
525}
526
527bool DaemonServer::handle_command(MCommand *m)
528{
3efd9988 529 Mutex::Locker l(lock);
7c673cae
FG
530 int r = 0;
531 std::stringstream ss;
532 std::string prefix;
533
534 assert(lock.is_locked_by_me());
535
536 /**
537 * The working data for processing an MCommand. This lives in
538 * a class to enable passing it into other threads for processing
539 * outside of the thread/locks that called handle_command.
540 */
541 class CommandContext
542 {
543 public:
544 MCommand *m;
545 bufferlist odata;
546 cmdmap_t cmdmap;
547
548 CommandContext(MCommand *m_)
549 : m(m_)
550 {
551 }
552
553 ~CommandContext()
554 {
555 m->put();
556 }
557
558 void reply(int r, const std::stringstream &ss)
559 {
560 reply(r, ss.str());
561 }
562
563 void reply(int r, const std::string &rs)
564 {
565 // Let the connection drop as soon as we've sent our response
566 ConnectionRef con = m->get_connection();
567 if (con) {
568 con->mark_disposable();
569 }
570
224ce89b 571 dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl;
7c673cae
FG
572 if (con) {
573 MCommandReply *reply = new MCommandReply(r, rs);
574 reply->set_tid(m->get_tid());
575 reply->set_data(odata);
576 con->send_message(reply);
577 }
578 }
579 };
580
581 /**
582 * A context for receiving a bufferlist/error string from a background
583 * function and then calling back to a CommandContext when it's done
584 */
585 class ReplyOnFinish : public Context {
586 std::shared_ptr<CommandContext> cmdctx;
587
588 public:
589 bufferlist from_mon;
590 string outs;
591
592 ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
593 : cmdctx(cmdctx_)
594 {}
595 void finish(int r) override {
596 cmdctx->odata.claim_append(from_mon);
597 cmdctx->reply(r, outs);
598 }
599 };
600
601 std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
602
603 MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
604 if (!session) {
605 return true;
606 }
607 session->put(); // SessionRef takes a ref
608 if (session->inst.name == entity_name_t())
609 session->inst.name = m->get_source();
610
611 std::string format;
612 boost::scoped_ptr<Formatter> f;
613 map<string,string> param_str_map;
614
615 if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
616 cmdctx->reply(-EINVAL, ss);
617 return true;
618 }
619
620 {
621 cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
622 f.reset(Formatter::create(format));
623 }
624
625 cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
626
627 dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
628 dout(4) << "prefix=" << prefix << dendl;
629
630 if (prefix == "get_command_descriptions") {
7c673cae 631 dout(10) << "reading commands from python modules" << dendl;
c07f9fc5 632 const auto py_commands = py_modules.get_commands();
7c673cae 633
c07f9fc5 634 int cmdnum = 0;
7c673cae
FG
635 JSONFormatter f;
636 f.open_object_section("command_descriptions");
c07f9fc5
FG
637
638 auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){
7c673cae
FG
639 ostringstream secname;
640 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
c07f9fc5
FG
641 dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring,
642 mc.module, mc.req_perms, mc.availability, 0);
7c673cae 643 cmdnum++;
c07f9fc5
FG
644 };
645
646 for (const auto &pyc : py_commands) {
647 dump_cmd(pyc);
7c673cae 648 }
7c673cae 649
c07f9fc5
FG
650 for (const auto &mgr_cmd : mgr_commands) {
651 dump_cmd(mgr_cmd);
7c673cae 652 }
c07f9fc5 653
7c673cae
FG
654 f.close_section(); // command_descriptions
655 f.flush(cmdctx->odata);
656 cmdctx->reply(0, ss);
657 return true;
658 }
659
660 // lookup command
c07f9fc5 661 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
7c673cae
FG
662 _generate_command_map(cmdctx->cmdmap, param_str_map);
663 if (!mgr_cmd) {
c07f9fc5 664 MonCommand py_command = {"", "", "py", "rw", "cli"};
7c673cae
FG
665 if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
666 param_str_map, &py_command)) {
667 dout(1) << " access denied" << dendl;
31f18b77
FG
668 ss << "access denied; does your client key have mgr caps?"
669 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
7c673cae
FG
670 cmdctx->reply(-EACCES, ss);
671 return true;
672 }
673 } else {
674 // validate user's permissions for requested command
675 if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
676 param_str_map, mgr_cmd)) {
677 dout(1) << " access denied" << dendl;
678 audit_clog->info() << "from='" << session->inst << "' "
679 << "entity='" << session->entity_name << "' "
680 << "cmd=" << m->cmd << ": access denied";
31f18b77
FG
681 ss << "access denied' does your client key have mgr caps?"
682 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
7c673cae
FG
683 cmdctx->reply(-EACCES, ss);
684 return true;
685 }
686 }
687
688 audit_clog->debug()
689 << "from='" << session->inst << "' "
690 << "entity='" << session->entity_name << "' "
691 << "cmd=" << m->cmd << ": dispatch";
692
224ce89b
WB
693 // ----------------
694 // service map commands
695 if (prefix == "service dump") {
696 if (!f)
697 f.reset(Formatter::create("json-pretty"));
698 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
699 f->dump_object("service_map", service_map);
700 });
701 f->flush(cmdctx->odata);
702 cmdctx->reply(0, ss);
703 return true;
704 }
705 if (prefix == "service status") {
706 if (!f)
707 f.reset(Formatter::create("json-pretty"));
708 // only include state from services that are in the persisted service map
709 f->open_object_section("service_status");
710 ServiceMap s;
711 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
712 s = service_map;
713 });
714 for (auto& p : s.services) {
715 f->open_object_section(p.first.c_str());
716 for (auto& q : p.second.daemons) {
717 f->open_object_section(q.first.c_str());
718 DaemonKey key(p.first, q.first);
719 assert(daemon_state.exists(key));
720 auto daemon = daemon_state.get(key);
c07f9fc5 721 Mutex::Locker l(daemon->lock);
224ce89b
WB
722 f->dump_stream("status_stamp") << daemon->service_status_stamp;
723 f->dump_stream("last_beacon") << daemon->last_service_beacon;
724 f->open_object_section("status");
725 for (auto& r : daemon->service_status) {
726 f->dump_string(r.first.c_str(), r.second);
727 }
728 f->close_section();
729 f->close_section();
730 }
731 f->close_section();
732 }
733 f->close_section();
734 f->flush(cmdctx->odata);
735 cmdctx->reply(0, ss);
736 return true;
737 }
738
3efd9988
FG
739 if (prefix == "config set") {
740 std::string key;
741 std::string val;
742 cmd_getval(cct, cmdctx->cmdmap, "key", key);
743 cmd_getval(cct, cmdctx->cmdmap, "value", val);
744 r = cct->_conf->set_val(key, val, true, &ss);
745 if (r == 0) {
746 cct->_conf->apply_changes(nullptr);
747 }
748 cmdctx->reply(0, ss);
749 return true;
750 }
751
7c673cae
FG
752 // -----------
753 // PG commands
754
755 if (prefix == "pg scrub" ||
756 prefix == "pg repair" ||
757 prefix == "pg deep-scrub") {
758 string scrubop = prefix.substr(3, string::npos);
759 pg_t pgid;
760 string pgidstr;
761 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
762 if (!pgid.parse(pgidstr.c_str())) {
763 ss << "invalid pgid '" << pgidstr << "'";
764 cmdctx->reply(-EINVAL, ss);
765 return true;
766 }
767 bool pg_exists = false;
768 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
769 pg_exists = osdmap.pg_exists(pgid);
770 });
771 if (!pg_exists) {
772 ss << "pg " << pgid << " dne";
773 cmdctx->reply(-ENOENT, ss);
774 return true;
775 }
776 int acting_primary = -1;
7c673cae
FG
777 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
778 acting_primary = osdmap.get_pg_acting_primary(pgid);
7c673cae
FG
779 });
780 if (acting_primary == -1) {
781 ss << "pg " << pgid << " has no primary osd";
782 cmdctx->reply(-EAGAIN, ss);
783 return true;
784 }
31f18b77
FG
785 auto p = osd_cons.find(acting_primary);
786 if (p == osd_cons.end()) {
787 ss << "pg " << pgid << " primary osd." << acting_primary
788 << " is not currently connected";
789 cmdctx->reply(-EAGAIN, ss);
790 }
7c673cae 791 vector<pg_t> pgs = { pgid };
31f18b77
FG
792 for (auto& con : p->second) {
793 con->send_message(new MOSDScrub(monc->get_fsid(),
794 pgs,
795 scrubop == "repair",
796 scrubop == "deep-scrub"));
797 }
7c673cae 798 ss << "instructing pg " << pgid << " on osd." << acting_primary
31f18b77
FG
799 << " to " << scrubop;
800 cmdctx->reply(0, ss);
801 return true;
802 } else if (prefix == "osd scrub" ||
803 prefix == "osd deep-scrub" ||
804 prefix == "osd repair") {
805 string whostr;
806 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
807 vector<string> pvec;
808 get_str_vec(prefix, pvec);
809
810 set<int> osds;
224ce89b 811 if (whostr == "*" || whostr == "all" || whostr == "any") {
31f18b77
FG
812 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
813 for (int i = 0; i < osdmap.get_max_osd(); i++)
814 if (osdmap.is_up(i)) {
815 osds.insert(i);
816 }
817 });
818 } else {
819 long osd = parse_osd_id(whostr.c_str(), &ss);
820 if (osd < 0) {
821 ss << "invalid osd '" << whostr << "'";
822 cmdctx->reply(-EINVAL, ss);
823 return true;
824 }
825 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
826 if (osdmap.is_up(osd)) {
827 osds.insert(osd);
828 }
829 });
830 if (osds.empty()) {
831 ss << "osd." << osd << " is not up";
832 cmdctx->reply(-EAGAIN, ss);
833 return true;
834 }
835 }
836 set<int> sent_osds, failed_osds;
837 for (auto osd : osds) {
838 auto p = osd_cons.find(osd);
839 if (p == osd_cons.end()) {
840 failed_osds.insert(osd);
841 } else {
842 sent_osds.insert(osd);
843 for (auto& con : p->second) {
844 con->send_message(new MOSDScrub(monc->get_fsid(),
845 pvec.back() == "repair",
846 pvec.back() == "deep-scrub"));
847 }
848 }
849 }
850 if (failed_osds.size() == osds.size()) {
851 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
852 << " (not connected)";
853 r = -EAGAIN;
854 } else {
855 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
856 if (!failed_osds.empty()) {
857 ss << "; osd(s) " << failed_osds << " were not connected";
858 }
859 r = 0;
860 }
7c673cae
FG
861 cmdctx->reply(0, ss);
862 return true;
863 } else if (prefix == "osd reweight-by-pg" ||
864 prefix == "osd reweight-by-utilization" ||
865 prefix == "osd test-reweight-by-pg" ||
866 prefix == "osd test-reweight-by-utilization") {
867 bool by_pg =
868 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
869 bool dry_run =
870 prefix == "osd test-reweight-by-pg" ||
871 prefix == "osd test-reweight-by-utilization";
872 int64_t oload;
873 cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
874 set<int64_t> pools;
875 vector<string> poolnames;
876 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
877 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
878 for (const auto& poolname : poolnames) {
879 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
880 if (pool < 0) {
881 ss << "pool '" << poolname << "' does not exist";
882 r = -ENOENT;
883 }
884 pools.insert(pool);
885 }
886 });
887 if (r) {
888 cmdctx->reply(r, ss);
889 return true;
890 }
891 double max_change = g_conf->mon_reweight_max_change;
892 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
893 if (max_change <= 0.0) {
894 ss << "max_change " << max_change << " must be positive";
895 cmdctx->reply(-EINVAL, ss);
896 return true;
897 }
898 int64_t max_osds = g_conf->mon_reweight_max_osds;
899 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
900 if (max_osds <= 0) {
901 ss << "max_osds " << max_osds << " must be positive";
902 cmdctx->reply(-EINVAL, ss);
903 return true;
904 }
905 string no_increasing;
906 cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
907 string out_str;
908 mempool::osdmap::map<int32_t, uint32_t> new_weights;
909 r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
910 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
911 return reweight::by_utilization(osdmap, pgmap,
912 oload,
913 max_change,
914 max_osds,
915 by_pg,
916 pools.empty() ? NULL : &pools,
917 no_increasing == "--no-increasing",
918 &new_weights,
919 &ss, &out_str, f.get());
920 });
921 });
922 if (r >= 0) {
923 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
924 }
925 if (f) {
926 f->flush(cmdctx->odata);
927 } else {
928 cmdctx->odata.append(out_str);
929 }
930 if (r < 0) {
931 ss << "FAILED reweight-by-pg";
932 cmdctx->reply(r, ss);
933 return true;
934 } else if (r == 0 || dry_run) {
935 ss << "no change";
936 cmdctx->reply(r, ss);
937 return true;
938 } else {
939 json_spirit::Object json_object;
940 for (const auto& osd_weight : new_weights) {
941 json_spirit::Config::add(json_object,
942 std::to_string(osd_weight.first),
943 std::to_string(osd_weight.second));
944 }
945 string s = json_spirit::write(json_object);
946 std::replace(begin(s), end(s), '\"', '\'');
947 const string cmd =
948 "{"
949 "\"prefix\": \"osd reweightn\", "
950 "\"weights\": \"" + s + "\""
951 "}";
952 auto on_finish = new ReplyOnFinish(cmdctx);
953 monc->start_mon_command({cmd}, {},
954 &on_finish->from_mon, &on_finish->outs, on_finish);
955 return true;
956 }
31f18b77
FG
957 } else if (prefix == "osd df") {
958 string method;
959 cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
960 r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) {
961 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
962 print_osd_utilization(osdmap, &pgservice, ss,
963 f.get(), method == "tree");
964
965 cmdctx->odata.append(ss);
966 return 0;
967 });
968 });
969 cmdctx->reply(r, "");
970 return true;
35e4c445
FG
971 } else if (prefix == "osd safe-to-destroy") {
972 vector<string> ids;
973 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
974 set<int> osds;
975 int r;
976 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
977 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
978 });
979 if (!r && osds.empty()) {
980 ss << "must specify one or more OSDs";
981 r = -EINVAL;
982 }
983 if (r < 0) {
984 cmdctx->reply(r, ss);
985 return true;
986 }
987 set<int> active_osds, missing_stats, stored_pgs;
988 int affected_pgs = 0;
989 cluster_state.with_pgmap([&](const PGMap& pg_map) {
990 if (pg_map.num_pg_unknown > 0) {
991 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
992 << " any conclusions";
993 r = -EAGAIN;
994 return;
995 }
996 int num_active_clean = 0;
997 for (auto& p : pg_map.num_pg_by_state) {
998 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
999 if ((p.first & want) == want) {
1000 num_active_clean += p.second;
1001 }
1002 }
1003 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1004 for (auto osd : osds) {
1005 if (!osdmap.exists(osd)) {
1006 continue; // clearly safe to destroy
1007 }
1008 auto q = pg_map.num_pg_by_osd.find(osd);
1009 if (q != pg_map.num_pg_by_osd.end()) {
1010 if (q->second.acting > 0 || q->second.up > 0) {
1011 active_osds.insert(osd);
1012 affected_pgs += q->second.acting + q->second.up;
1013 continue;
1014 }
1015 }
1016 if (num_active_clean < pg_map.num_pg) {
1017 // all pgs aren't active+clean; we need to be careful.
1018 auto p = pg_map.osd_stat.find(osd);
1019 if (p == pg_map.osd_stat.end()) {
1020 missing_stats.insert(osd);
1021 }
1022 if (p->second.num_pgs > 0) {
1023 stored_pgs.insert(osd);
1024 }
1025 }
1026 }
1027 });
1028 });
1029 if (!r && !active_osds.empty()) {
1030 ss << "OSD(s) " << active_osds << " have " << affected_pgs
1031 << " pgs currently mapped to them";
1032 r = -EBUSY;
1033 } else if (!missing_stats.empty()) {
1034 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1035 << " PGs are active+clean; we cannot draw any conclusions";
1036 r = -EAGAIN;
1037 } else if (!stored_pgs.empty()) {
1038 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1039 << " data, and not all PGs are active+clean; we cannot be sure they"
1040 << " aren't still needed.";
1041 r = -EBUSY;
1042 }
1043 if (r) {
1044 cmdctx->reply(r, ss);
1045 return true;
1046 }
1047 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1048 << " durability.";
1049 cmdctx->reply(0, ss);
1050 return true;
1051 } else if (prefix == "osd ok-to-stop") {
1052 vector<string> ids;
1053 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1054 set<int> osds;
1055 int r;
1056 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1057 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1058 });
1059 if (!r && osds.empty()) {
1060 ss << "must specify one or more OSDs";
1061 r = -EINVAL;
1062 }
1063 if (r < 0) {
1064 cmdctx->reply(r, ss);
1065 return true;
1066 }
1067 map<pg_t,int> pg_delta; // pgid -> net acting set size change
1068 int dangerous_pgs = 0;
1069 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1070 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1071 if (pg_map.num_pg_unknown > 0) {
1072 ss << pg_map.num_pg_unknown << " pgs have unknown state; "
1073 << "cannot draw any conclusions";
1074 r = -EAGAIN;
1075 return;
1076 }
1077 for (auto osd : osds) {
1078 auto p = pg_map.pg_by_osd.find(osd);
1079 if (p != pg_map.pg_by_osd.end()) {
1080 for (auto& pgid : p->second) {
1081 --pg_delta[pgid];
1082 }
1083 }
1084 }
1085 for (auto& p : pg_delta) {
1086 auto q = pg_map.pg_stat.find(p.first);
1087 if (q == pg_map.pg_stat.end()) {
1088 ss << "missing information about " << p.first << "; cannot draw"
1089 << " any conclusions";
1090 r = -EAGAIN;
1091 return;
1092 }
1093 if (!(q->second.state & PG_STATE_ACTIVE) ||
1094 (q->second.state & PG_STATE_DEGRADED)) {
1095 // we don't currently have a good way to tell *how* degraded
1096 // a degraded PG is, so we have to assume we cannot remove
1097 // any more replicas/shards.
1098 ++dangerous_pgs;
1099 continue;
1100 }
1101 const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool());
1102 if (!pi) {
1103 ++dangerous_pgs; // pool is creating or deleting
1104 } else {
1105 if (q->second.acting.size() + p.second < pi->min_size) {
1106 ++dangerous_pgs;
1107 }
1108 }
1109 }
1110 });
1111 });
1112 if (r) {
1113 cmdctx->reply(r, ss);
1114 return true;
1115 }
1116 if (dangerous_pgs) {
1117 ss << dangerous_pgs << " PGs are already degraded or might become "
1118 << "unavailable";
1119 cmdctx->reply(-EBUSY, ss);
1120 return true;
1121 }
1122 ss << "OSD(s) " << osds << " are ok to stop without reducing"
1123 << " availability, provided there are no other concurrent failures"
1124 << " or interventions. " << pg_delta.size() << " PGs are likely to be"
1125 << " degraded (but remain available) as a result.";
1126 cmdctx->reply(0, ss);
1127 return true;
c07f9fc5
FG
1128 } else if (prefix == "pg force-recovery" ||
1129 prefix == "pg force-backfill" ||
1130 prefix == "pg cancel-force-recovery" ||
1131 prefix == "pg cancel-force-backfill") {
1132 string forceop = prefix.substr(3, string::npos);
1133 list<pg_t> parsed_pgs;
1134 map<int, list<pg_t> > osdpgs;
1135
1136 // figure out actual op just once
1137 int actual_op = 0;
1138 if (forceop == "force-recovery") {
1139 actual_op = OFR_RECOVERY;
1140 } else if (forceop == "force-backfill") {
1141 actual_op = OFR_BACKFILL;
1142 } else if (forceop == "cancel-force-backfill") {
1143 actual_op = OFR_BACKFILL | OFR_CANCEL;
1144 } else if (forceop == "cancel-force-recovery") {
1145 actual_op = OFR_RECOVERY | OFR_CANCEL;
1146 }
1147
1148 // covnert pg names to pgs, discard any invalid ones while at it
1149 {
1150 // we don't want to keep pgidstr and pgidstr_nodup forever
1151 vector<string> pgidstr;
1152 // get pgids to process and prune duplicates
1153 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
1154 set<string> pgidstr_nodup(pgidstr.begin(), pgidstr.end());
1155 if (pgidstr.size() != pgidstr_nodup.size()) {
1156 // move elements only when there were duplicates, as this
1157 // reorders them
1158 pgidstr.resize(pgidstr_nodup.size());
1159 auto it = pgidstr_nodup.begin();
1160 for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) {
1161 pgidstr[i] = std::move(*it++);
1162 }
1163 }
1164
1165 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1166 for (auto& pstr : pgidstr) {
1167 pg_t parsed_pg;
1168 if (!parsed_pg.parse(pstr.c_str())) {
1169 ss << "invalid pgid '" << pstr << "'; ";
1170 r = -EINVAL;
1171 } else {
1172 auto workit = pg_map.pg_stat.find(parsed_pg);
1173 if (workit == pg_map.pg_stat.end()) {
1174 ss << "pg " << pstr << " not exists; ";
1175 r = -ENOENT;
1176 } else {
1177 pg_stat_t workpg = workit->second;
1178
1179 // discard pgs for which user requests are pointless
1180 switch (actual_op)
1181 {
1182 case OFR_RECOVERY:
1183 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) {
1184 // don't return error, user script may be racing with cluster. not fatal.
1185 ss << "pg " << pstr << " doesn't require recovery; ";
1186 continue;
1187 } else if (workpg.state & PG_STATE_FORCED_RECOVERY) {
1188 ss << "pg " << pstr << " recovery already forced; ";
1189 // return error, as it may be a bug in user script
1190 r = -EINVAL;
1191 continue;
1192 }
1193 break;
1194 case OFR_BACKFILL:
3efd9988 1195 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING)) == 0) {
c07f9fc5
FG
1196 ss << "pg " << pstr << " doesn't require backfilling; ";
1197 continue;
1198 } else if (workpg.state & PG_STATE_FORCED_BACKFILL) {
1199 ss << "pg " << pstr << " backfill already forced; ";
1200 r = -EINVAL;
1201 continue;
1202 }
1203 break;
1204 case OFR_BACKFILL | OFR_CANCEL:
1205 if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) {
1206 ss << "pg " << pstr << " backfill not forced; ";
1207 continue;
1208 }
1209 break;
1210 case OFR_RECOVERY | OFR_CANCEL:
1211 if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) {
1212 ss << "pg " << pstr << " recovery not forced; ";
1213 continue;
1214 }
1215 break;
1216 default:
1217 assert(0 == "actual_op value is not supported");
1218 }
1219
1220 parsed_pgs.push_back(std::move(parsed_pg));
1221 }
1222 }
1223 }
1224
1225 // group pgs to process by osd
1226 for (auto& pgid : parsed_pgs) {
1227 auto workit = pg_map.pg_stat.find(pgid);
1228 if (workit != pg_map.pg_stat.end()) {
1229 pg_stat_t workpg = workit->second;
1230 set<int32_t> osds(workpg.up.begin(), workpg.up.end());
1231 osds.insert(workpg.acting.begin(), workpg.acting.end());
1232 for (auto i : osds) {
1233 osdpgs[i].push_back(pgid);
1234 }
1235 }
1236 }
1237
1238 });
1239 }
1240
1241 // respond with error only when no pgs are correct
1242 // yes, in case of mixed errors, only the last one will be emitted,
1243 // but the message presented will be fine
1244 if (parsed_pgs.size() != 0) {
1245 // clear error to not confuse users/scripts
1246 r = 0;
1247 }
1248
1249 // optimize the command -> messages conversion, use only one message per distinct OSD
1250 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1251 for (auto& i : osdpgs) {
1252 if (osdmap.is_up(i.first)) {
1253 vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end()));
1254 auto p = osd_cons.find(i.first);
1255 if (p == osd_cons.end()) {
1256 ss << "osd." << i.first << " is not currently connected";
1257 r = -EAGAIN;
1258 continue;
1259 }
1260 for (auto& con : p->second) {
1261 con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op));
1262 }
1263 ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; ";
1264 }
1265 }
1266 });
1267 ss << std::endl;
1268 cmdctx->reply(r, ss);
1269 return true;
7c673cae
FG
1270 } else {
1271 r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
1272 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1273 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
1274 f.get(), &ss, &cmdctx->odata);
1275 });
1276 });
1277
1278 if (r != -EOPNOTSUPP) {
1279 cmdctx->reply(r, ss);
1280 return true;
1281 }
1282 }
1283
1284 // None of the special native commands,
3efd9988 1285 ActivePyModule *handler = nullptr;
c07f9fc5 1286 auto py_commands = py_modules.get_py_commands();
7c673cae
FG
1287 for (const auto &pyc : py_commands) {
1288 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1289 dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
1290 if (pyc_prefix == prefix) {
1291 handler = pyc.handler;
1292 break;
1293 }
1294 }
1295
1296 if (handler == nullptr) {
1297 ss << "No handler found for '" << prefix << "'";
1298 dout(4) << "No handler found for '" << prefix << "'" << dendl;
1299 cmdctx->reply(-EINVAL, ss);
1300 return true;
1301 } else {
1302 // Okay, now we have a handler to call, but we must not call it
1303 // in this thread, because the python handlers can do anything,
1304 // including blocking, and including calling back into mgr.
1305 dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
1306 finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
1307 std::stringstream ds;
1308 std::stringstream ss;
1309 int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
1310 cmdctx->odata.append(ds);
1311 cmdctx->reply(r, ss);
1312 }));
1313 return true;
1314 }
1315}
31f18b77 1316
224ce89b
WB
1317void DaemonServer::_prune_pending_service_map()
1318{
1319 utime_t cutoff = ceph_clock_now();
3efd9988 1320 cutoff -= g_conf->get_val<double>("mgr_service_beacon_grace");
224ce89b
WB
1321 auto p = pending_service_map.services.begin();
1322 while (p != pending_service_map.services.end()) {
1323 auto q = p->second.daemons.begin();
1324 while (q != p->second.daemons.end()) {
1325 DaemonKey key(p->first, q->first);
1326 if (!daemon_state.exists(key)) {
1327 derr << "missing key " << key << dendl;
1328 ++q;
1329 continue;
1330 }
1331 auto daemon = daemon_state.get(key);
c07f9fc5 1332 Mutex::Locker l(daemon->lock);
224ce89b
WB
1333 if (daemon->last_service_beacon == utime_t()) {
1334 // we must have just restarted; assume they are alive now.
1335 daemon->last_service_beacon = ceph_clock_now();
1336 ++q;
1337 continue;
1338 }
1339 if (daemon->last_service_beacon < cutoff) {
1340 dout(10) << "pruning stale " << p->first << "." << q->first
1341 << " last_beacon " << daemon->last_service_beacon << dendl;
1342 q = p->second.daemons.erase(q);
1343 pending_service_map_dirty = pending_service_map.epoch;
1344 } else {
1345 ++q;
1346 }
1347 }
1348 if (p->second.daemons.empty()) {
1349 p = pending_service_map.services.erase(p);
1350 pending_service_map_dirty = pending_service_map.epoch;
1351 } else {
1352 ++p;
1353 }
1354 }
1355}
1356
31f18b77
FG
1357void DaemonServer::send_report()
1358{
224ce89b 1359 if (!pgmap_ready) {
3efd9988 1360 if (ceph_clock_now() - started_at > g_conf->get_val<int64_t>("mgr_stats_period") * 4.0) {
224ce89b
WB
1361 pgmap_ready = true;
1362 reported_osds.clear();
1363 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1364 << "potentially incomplete PG state to mon" << dendl;
1365 } else {
1366 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1367 << dendl;
1368 return;
1369 }
1370 }
1371
31f18b77 1372 auto m = new MMonMgrReport();
c07f9fc5
FG
1373 py_modules.get_health_checks(&m->health_checks);
1374
31f18b77
FG
1375 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1376 cluster_state.update_delta_stats();
1377
224ce89b
WB
1378 if (pending_service_map.epoch) {
1379 _prune_pending_service_map();
1380 if (pending_service_map_dirty >= pending_service_map.epoch) {
1381 pending_service_map.modified = ceph_clock_now();
1382 ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
1383 dout(10) << "sending service_map e" << pending_service_map.epoch
1384 << dendl;
1385 pending_service_map.epoch++;
1386 }
1387 }
1388
31f18b77
FG
1389 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1390 // FIXME: no easy way to get mon features here. this will do for
1391 // now, though, as long as we don't make a backward-incompat change.
1392 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
1393 dout(10) << pg_map << dendl;
224ce89b
WB
1394
1395 pg_map.get_health_checks(g_ceph_context, osdmap,
1396 &m->health_checks);
c07f9fc5 1397
224ce89b
WB
1398 dout(10) << m->health_checks.checks.size() << " health checks"
1399 << dendl;
1400 dout(20) << "health checks:\n";
1401 JSONFormatter jf(true);
1402 jf.dump_object("health_checks", m->health_checks);
1403 jf.flush(*_dout);
1404 *_dout << dendl;
31f18b77
FG
1405 });
1406 });
1407 // TODO? We currently do not notify the PyModules
1408 // TODO: respect needs_send, so we send the report only if we are asked to do
1409 // so, or the state is updated.
1410 monc->send_mon_message(m);
1411}
224ce89b
WB
1412
1413void DaemonServer::got_service_map()
1414{
1415 Mutex::Locker l(lock);
1416
1417 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
1418 if (pending_service_map.epoch == 0) {
1419 // we just started up
1420 dout(10) << "got initial map e" << service_map.epoch << dendl;
1421 pending_service_map = service_map;
1422 } else {
1423 // we we already active and therefore must have persisted it,
1424 // which means ours is the same or newer.
1425 dout(10) << "got updated map e" << service_map.epoch << dendl;
1426 }
1427 pending_service_map.epoch = service_map.epoch + 1;
1428 });
1429
1430 // cull missing daemons, populate new ones
1431 for (auto& p : pending_service_map.services) {
1432 std::set<std::string> names;
1433 for (auto& q : p.second.daemons) {
1434 names.insert(q.first);
1435 DaemonKey key(p.first, q.first);
1436 if (!daemon_state.exists(key)) {
1437 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
1438 daemon->key = key;
1439 daemon->metadata = q.second.metadata;
1440 if (q.second.metadata.count("hostname")) {
1441 daemon->hostname = q.second.metadata["hostname"];
1442 }
1443 daemon->service_daemon = true;
1444 daemon_state.insert(daemon);
1445 dout(10) << "added missing " << key << dendl;
1446 }
1447 }
1448 daemon_state.cull(p.first, names);
1449 }
1450}
3efd9988
FG
1451
1452
1453const char** DaemonServer::get_tracked_conf_keys() const
1454{
1455 static const char *KEYS[] = {
1456 "mgr_stats_threshold",
1457 "mgr_stats_period",
1458 nullptr
1459 };
1460
1461 return KEYS;
1462}
1463
1464void DaemonServer::handle_conf_change(const struct md_config_t *conf,
1465 const std::set <std::string> &changed)
1466{
1467 dout(4) << "ohai" << dendl;
1468 // We may be called within lock (via MCommand `config set`) or outwith the
1469 // lock (via admin socket `config set`), so handle either case.
1470 const bool initially_locked = lock.is_locked_by_me();
1471 if (!initially_locked) {
1472 lock.Lock();
1473 }
1474
1475 if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
1476 dout(4) << "Updating stats threshold/period on "
1477 << daemon_connections.size() << " clients" << dendl;
1478 // Send a fresh MMgrConfigure to all clients, so that they can follow
1479 // the new policy for transmitting stats
1480 for (auto &c : daemon_connections) {
1481 _send_configure(c);
1482 }
1483 }
1484}
1485
1486void DaemonServer::_send_configure(ConnectionRef c)
1487{
1488 assert(lock.is_locked_by_me());
1489
1490 auto configure = new MMgrConfigure();
1491 configure->stats_period = g_conf->get_val<int64_t>("mgr_stats_period");
1492 configure->stats_threshold = g_conf->get_val<int64_t>("mgr_stats_threshold");
1493 c->send_message(configure);
1494}
1495