]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/DaemonServer.cc
update sources to v12.2.3
[ceph.git] / ceph / src / mgr / DaemonServer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#include "DaemonServer.h"
b32b8144 15#include "mgr/Mgr.h"
7c673cae 16
b32b8144 17#include "include/stringify.h"
31f18b77 18#include "include/str_list.h"
7c673cae 19#include "auth/RotatingKeyRing.h"
7c673cae
FG
20#include "json_spirit/json_spirit_writer.h"
21
c07f9fc5 22#include "mgr/mgr_commands.h"
b32b8144 23#include "mgr/OSDHealthMetricCollector.h"
c07f9fc5
FG
24#include "mon/MonCommand.h"
25
7c673cae
FG
26#include "messages/MMgrOpen.h"
27#include "messages/MMgrConfigure.h"
31f18b77 28#include "messages/MMonMgrReport.h"
7c673cae
FG
29#include "messages/MCommand.h"
30#include "messages/MCommandReply.h"
31#include "messages/MPGStats.h"
32#include "messages/MOSDScrub.h"
c07f9fc5 33#include "messages/MOSDForceRecovery.h"
224ce89b 34#include "common/errno.h"
7c673cae
FG
35
36#define dout_context g_ceph_context
37#define dout_subsys ceph_subsys_mgr
38#undef dout_prefix
39#define dout_prefix *_dout << "mgr.server " << __func__ << " "
40
c07f9fc5
FG
41
42
7c673cae
FG
43DaemonServer::DaemonServer(MonClient *monc_,
44 Finisher &finisher_,
45 DaemonStateIndex &daemon_state_,
46 ClusterState &cluster_state_,
3efd9988 47 PyModuleRegistry &py_modules_,
7c673cae
FG
48 LogChannelRef clog_,
49 LogChannelRef audit_clog_)
50 : Dispatcher(g_ceph_context),
51 client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
3efd9988 52 g_conf->get_val<uint64_t>("mgr_client_bytes"))),
7c673cae 53 client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
3efd9988 54 g_conf->get_val<uint64_t>("mgr_client_messages"))),
7c673cae 55 osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
3efd9988 56 g_conf->get_val<uint64_t>("mgr_osd_bytes"))),
7c673cae 57 osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
3efd9988 58 g_conf->get_val<uint64_t>("mgr_osd_messages"))),
7c673cae 59 mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
3efd9988 60 g_conf->get_val<uint64_t>("mgr_mds_bytes"))),
7c673cae 61 mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
3efd9988 62 g_conf->get_val<uint64_t>("mgr_mds_messages"))),
7c673cae 63 mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
3efd9988 64 g_conf->get_val<uint64_t>("mgr_mon_bytes"))),
7c673cae 65 mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
3efd9988 66 g_conf->get_val<uint64_t>("mgr_mon_messages"))),
7c673cae
FG
67 msgr(nullptr),
68 monc(monc_),
69 finisher(finisher_),
70 daemon_state(daemon_state_),
71 cluster_state(cluster_state_),
72 py_modules(py_modules_),
73 clog(clog_),
74 audit_clog(audit_clog_),
b32b8144 75 auth_cluster_registry(g_ceph_context,
7c673cae
FG
76 g_conf->auth_supported.empty() ?
77 g_conf->auth_cluster_required :
78 g_conf->auth_supported),
b32b8144
FG
79 auth_service_registry(g_ceph_context,
80 g_conf->auth_supported.empty() ?
81 g_conf->auth_service_required :
82 g_conf->auth_supported),
3efd9988
FG
83 lock("DaemonServer"),
84 pgmap_ready(false)
85{
86 g_conf->add_observer(this);
87}
7c673cae
FG
88
89DaemonServer::~DaemonServer() {
90 delete msgr;
3efd9988 91 g_conf->remove_observer(this);
7c673cae
FG
92}
93
94int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
95{
96 // Initialize Messenger
97 std::string public_msgr_type = g_conf->ms_public_type.empty() ?
98 g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
99 msgr = Messenger::create(g_ceph_context, public_msgr_type,
100 entity_name_t::MGR(gid),
101 "mgr",
102 getpid(), 0);
103 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
104
105 // throttle clients
106 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
107 client_byte_throttler.get(),
108 client_msg_throttler.get());
109
110 // servers
111 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
112 osd_byte_throttler.get(),
113 osd_msg_throttler.get());
114 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
115 mds_byte_throttler.get(),
116 mds_msg_throttler.get());
117 msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
118 mon_byte_throttler.get(),
119 mon_msg_throttler.get());
120
121 int r = msgr->bind(g_conf->public_addr);
122 if (r < 0) {
123 derr << "unable to bind mgr to " << g_conf->public_addr << dendl;
124 return r;
125 }
126
127 msgr->set_myname(entity_name_t::MGR(gid));
128 msgr->set_addr_unknowns(client_addr);
129
130 msgr->start();
131 msgr->add_dispatcher_tail(this);
132
224ce89b
WB
133 started_at = ceph_clock_now();
134
7c673cae
FG
135 return 0;
136}
137
138entity_addr_t DaemonServer::get_myaddr() const
139{
140 return msgr->get_myaddr();
141}
142
143
144bool DaemonServer::ms_verify_authorizer(Connection *con,
145 int peer_type,
146 int protocol,
147 ceph::bufferlist& authorizer_data,
148 ceph::bufferlist& authorizer_reply,
149 bool& is_valid,
150 CryptoKey& session_key)
151{
b32b8144
FG
152 AuthAuthorizeHandler *handler = nullptr;
153 if (peer_type == CEPH_ENTITY_TYPE_OSD ||
154 peer_type == CEPH_ENTITY_TYPE_MON ||
155 peer_type == CEPH_ENTITY_TYPE_MDS ||
156 peer_type == CEPH_ENTITY_TYPE_MGR) {
157 handler = auth_cluster_registry.get_handler(protocol);
158 } else {
159 handler = auth_service_registry.get_handler(protocol);
160 }
7c673cae
FG
161 if (!handler) {
162 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
163 is_valid = false;
164 return true;
165 }
166
167 MgrSessionRef s(new MgrSession(cct));
168 s->inst.addr = con->get_peer_addr();
169 AuthCapsInfo caps_info;
170
c07f9fc5
FG
171 RotatingKeyRing *keys = monc->rotating_secrets.get();
172 if (keys) {
173 is_valid = handler->verify_authorizer(
174 cct, keys,
175 authorizer_data,
176 authorizer_reply, s->entity_name,
177 s->global_id, caps_info,
178 session_key);
179 } else {
180 dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
181 is_valid = false;
182 }
7c673cae
FG
183
184 if (is_valid) {
185 if (caps_info.allow_all) {
186 dout(10) << " session " << s << " " << s->entity_name
187 << " allow_all" << dendl;
188 s->caps.set_allow_all();
189 }
190 if (caps_info.caps.length() > 0) {
191 bufferlist::iterator p = caps_info.caps.begin();
192 string str;
193 try {
194 ::decode(str, p);
195 }
196 catch (buffer::error& e) {
197 }
198 bool success = s->caps.parse(str);
199 if (success) {
200 dout(10) << " session " << s << " " << s->entity_name
201 << " has caps " << s->caps << " '" << str << "'" << dendl;
202 } else {
203 dout(10) << " session " << s << " " << s->entity_name
204 << " failed to parse caps '" << str << "'" << dendl;
205 is_valid = false;
206 }
207 }
208 con->set_priv(s->get());
31f18b77
FG
209
210 if (peer_type == CEPH_ENTITY_TYPE_OSD) {
211 Mutex::Locker l(lock);
212 s->osd_id = atoi(s->entity_name.get_id().c_str());
224ce89b 213 dout(10) << "registering osd." << s->osd_id << " session "
31f18b77
FG
214 << s << " con " << con << dendl;
215 osd_cons[s->osd_id].insert(con);
216 }
7c673cae
FG
217 }
218
219 return true;
220}
221
222
223bool DaemonServer::ms_get_authorizer(int dest_type,
224 AuthAuthorizer **authorizer, bool force_new)
225{
226 dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
227
228 if (dest_type == CEPH_ENTITY_TYPE_MON) {
229 return true;
230 }
231
232 if (force_new) {
233 if (monc->wait_auth_rotating(10) < 0)
234 return false;
235 }
236
237 *authorizer = monc->build_authorizer(dest_type);
238 dout(20) << "got authorizer " << *authorizer << dendl;
239 return *authorizer != NULL;
240}
241
31f18b77
FG
242bool DaemonServer::ms_handle_reset(Connection *con)
243{
244 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
245 MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
246 if (!session) {
247 return false;
248 }
249 session->put(); // SessionRef takes a ref
250 Mutex::Locker l(lock);
224ce89b 251 dout(10) << "unregistering osd." << session->osd_id
31f18b77
FG
252 << " session " << session << " con " << con << dendl;
253 osd_cons[session->osd_id].erase(con);
3efd9988
FG
254
255 auto iter = daemon_connections.find(con);
256 if (iter != daemon_connections.end()) {
257 daemon_connections.erase(iter);
258 }
31f18b77
FG
259 }
260 return false;
261}
262
7c673cae
FG
263bool DaemonServer::ms_handle_refused(Connection *con)
264{
265 // do nothing for now
266 return false;
267}
268
269bool DaemonServer::ms_dispatch(Message *m)
270{
3efd9988
FG
271 // Note that we do *not* take ::lock here, in order to avoid
272 // serializing all message handling. It's up to each handler
273 // to take whatever locks it needs.
7c673cae
FG
274 switch (m->get_type()) {
275 case MSG_PGSTATS:
276 cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
224ce89b 277 maybe_ready(m->get_source().num());
7c673cae
FG
278 m->put();
279 return true;
280 case MSG_MGR_REPORT:
281 return handle_report(static_cast<MMgrReport*>(m));
282 case MSG_MGR_OPEN:
283 return handle_open(static_cast<MMgrOpen*>(m));
284 case MSG_COMMAND:
285 return handle_command(static_cast<MCommand*>(m));
286 default:
287 dout(1) << "Unhandled message type " << m->get_type() << dendl;
288 return false;
289 };
290}
291
224ce89b
WB
292void DaemonServer::maybe_ready(int32_t osd_id)
293{
3efd9988
FG
294 if (pgmap_ready.load()) {
295 // Fast path: we don't need to take lock because pgmap_ready
296 // is already set
297 } else {
298 Mutex::Locker l(lock);
224ce89b 299
3efd9988
FG
300 if (reported_osds.find(osd_id) == reported_osds.end()) {
301 dout(4) << "initial report from osd " << osd_id << dendl;
302 reported_osds.insert(osd_id);
303 std::set<int32_t> up_osds;
224ce89b 304
3efd9988
FG
305 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
306 osdmap.get_up_osds(up_osds);
307 });
224ce89b 308
3efd9988
FG
309 std::set<int32_t> unreported_osds;
310 std::set_difference(up_osds.begin(), up_osds.end(),
311 reported_osds.begin(), reported_osds.end(),
312 std::inserter(unreported_osds, unreported_osds.begin()));
313
314 if (unreported_osds.size() == 0) {
315 dout(4) << "all osds have reported, sending PG state to mon" << dendl;
316 pgmap_ready = true;
317 reported_osds.clear();
318 // Avoid waiting for next tick
319 send_report();
320 } else {
321 dout(4) << "still waiting for " << unreported_osds.size() << " osds"
322 " to report in before PGMap is ready" << dendl;
323 }
224ce89b
WB
324 }
325 }
326}
327
7c673cae
FG
328void DaemonServer::shutdown()
329{
224ce89b 330 dout(10) << "begin" << dendl;
7c673cae
FG
331 msgr->shutdown();
332 msgr->wait();
224ce89b 333 dout(10) << "done" << dendl;
7c673cae
FG
334}
335
336
337
338bool DaemonServer::handle_open(MMgrOpen *m)
339{
3efd9988
FG
340 Mutex::Locker l(lock);
341
224ce89b
WB
342 DaemonKey key;
343 if (!m->service_name.empty()) {
344 key.first = m->service_name;
345 } else {
346 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
347 }
348 key.second = m->daemon_name;
7c673cae 349
224ce89b 350 dout(4) << "from " << m->get_connection() << " " << key << dendl;
7c673cae 351
3efd9988 352 _send_configure(m->get_connection());
7c673cae 353
c07f9fc5 354 DaemonStatePtr daemon;
7c673cae 355 if (daemon_state.exists(key)) {
c07f9fc5
FG
356 daemon = daemon_state.get(key);
357 }
358 if (daemon) {
7c673cae 359 dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
c07f9fc5 360 Mutex::Locker l(daemon->lock);
3efd9988 361 daemon->perf_counters.clear();
7c673cae
FG
362 }
363
224ce89b 364 if (m->service_daemon) {
c07f9fc5 365 if (!daemon) {
224ce89b
WB
366 dout(4) << "constructing new DaemonState for " << key << dendl;
367 daemon = std::make_shared<DaemonState>(daemon_state.types);
368 daemon->key = key;
369 if (m->daemon_metadata.count("hostname")) {
370 daemon->hostname = m->daemon_metadata["hostname"];
371 }
372 daemon_state.insert(daemon);
373 }
c07f9fc5 374 Mutex::Locker l(daemon->lock);
224ce89b
WB
375 daemon->service_daemon = true;
376 daemon->metadata = m->daemon_metadata;
377 daemon->service_status = m->daemon_status;
378
379 utime_t now = ceph_clock_now();
380 auto d = pending_service_map.get_daemon(m->service_name,
381 m->daemon_name);
382 if (d->gid != (uint64_t)m->get_source().num()) {
383 dout(10) << "registering " << key << " in pending_service_map" << dendl;
384 d->gid = m->get_source().num();
385 d->addr = m->get_source_addr();
386 d->start_epoch = pending_service_map.epoch;
387 d->start_stamp = now;
388 d->metadata = m->daemon_metadata;
389 pending_service_map_dirty = pending_service_map.epoch;
390 }
391 }
392
3efd9988
FG
393 if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT &&
394 m->service_name.empty())
395 {
396 // Store in set of the daemon/service connections, i.e. those
397 // connections that require an update in the event of stats
398 // configuration changes.
399 daemon_connections.insert(m->get_connection());
400 }
401
7c673cae
FG
402 m->put();
403 return true;
404}
405
406bool DaemonServer::handle_report(MMgrReport *m)
407{
224ce89b
WB
408 DaemonKey key;
409 if (!m->service_name.empty()) {
410 key.first = m->service_name;
411 } else {
412 key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
413 }
414 key.second = m->daemon_name;
7c673cae 415
224ce89b 416 dout(4) << "from " << m->get_connection() << " " << key << dendl;
31f18b77 417
224ce89b
WB
418 if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
419 m->service_name.empty()) {
420 // Clients should not be sending us stats unless they are declaring
421 // themselves to be a daemon for some service.
422 dout(4) << "rejecting report from non-daemon client " << m->daemon_name
423 << dendl;
b32b8144 424 m->get_connection()->mark_down();
31f18b77
FG
425 m->put();
426 return true;
427 }
7c673cae 428
3efd9988 429 // Look up the DaemonState
7c673cae
FG
430 DaemonStatePtr daemon;
431 if (daemon_state.exists(key)) {
224ce89b 432 dout(20) << "updating existing DaemonState for " << key << dendl;
7c673cae
FG
433 daemon = daemon_state.get(key);
434 } else {
b32b8144
FG
435 // we don't know the hostname at this stage, reject MMgrReport here.
436 dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
437 << dendl;
438
439 // issue metadata request in background
440 if (!daemon_state.is_updating(key) &&
441 (key.first == "osd" || key.first == "mds")) {
442
443 std::ostringstream oss;
444 auto c = new MetadataUpdate(daemon_state, key);
445 if (key.first == "osd") {
446 oss << "{\"prefix\": \"osd metadata\", \"id\": "
447 << key.second<< "}";
448
449 } else if (key.first == "mds") {
450 c->set_default("addr", stringify(m->get_source_addr()));
451 oss << "{\"prefix\": \"mds metadata\", \"who\": \""
452 << key.second << "\"}";
453
454 } else {
455 ceph_abort();
456 }
457
458 monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
459 }
460
461 {
462 Mutex::Locker l(lock);
463 // kill session
464 MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
465 if (!session) {
466 return false;
467 }
468 m->get_connection()->mark_down();
469 session->put();
470
471 dout(10) << "unregistering osd." << session->osd_id
472 << " session " << session << " con " << m->get_connection() << dendl;
473
474 if (osd_cons.find(session->osd_id) != osd_cons.end()) {
475 osd_cons[session->osd_id].erase(m->get_connection());
476 }
477
478 auto iter = daemon_connections.find(m->get_connection());
479 if (iter != daemon_connections.end()) {
480 daemon_connections.erase(iter);
481 }
482 }
483
484 return false;
7c673cae 485 }
3efd9988
FG
486
487 // Update the DaemonState
7c673cae 488 assert(daemon != nullptr);
c07f9fc5
FG
489 {
490 Mutex::Locker l(daemon->lock);
3efd9988 491 auto &daemon_counters = daemon->perf_counters;
c07f9fc5 492 daemon_counters.update(m);
3efd9988
FG
493
494 if (daemon->service_daemon) {
495 utime_t now = ceph_clock_now();
496 if (m->daemon_status) {
497 daemon->service_status = *m->daemon_status;
498 daemon->service_status_stamp = now;
499 }
500 daemon->last_service_beacon = now;
501 } else if (m->daemon_status) {
502 derr << "got status from non-daemon " << key << dendl;
503 }
b32b8144
FG
504 if (m->get_connection()->peer_is_osd()) {
505 // only OSD sends health_checks to me now
506 daemon->osd_health_metrics = std::move(m->osd_health_metrics);
507 }
c07f9fc5 508 }
3efd9988 509
c07f9fc5
FG
510 // if there are any schema updates, notify the python modules
511 if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
512 ostringstream oss;
513 oss << key.first << '.' << key.second;
514 py_modules.notify_all("perf_schema_update", oss.str());
515 }
224ce89b 516
7c673cae
FG
517 m->put();
518 return true;
519}
520
7c673cae
FG
521
522void DaemonServer::_generate_command_map(
523 map<string,cmd_vartype>& cmdmap,
524 map<string,string> &param_str_map)
525{
526 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
527 p != cmdmap.end(); ++p) {
528 if (p->first == "prefix")
529 continue;
530 if (p->first == "caps") {
531 vector<string> cv;
532 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
533 cv.size() % 2 == 0) {
534 for (unsigned i = 0; i < cv.size(); i += 2) {
535 string k = string("caps_") + cv[i];
536 param_str_map[k] = cv[i + 1];
537 }
538 continue;
539 }
540 }
541 param_str_map[p->first] = cmd_vartype_stringify(p->second);
542 }
543}
544
c07f9fc5 545const MonCommand *DaemonServer::_get_mgrcommand(
7c673cae 546 const string &cmd_prefix,
c07f9fc5 547 const std::vector<MonCommand> &cmds)
7c673cae 548{
c07f9fc5
FG
549 const MonCommand *this_cmd = nullptr;
550 for (const auto &cmd : cmds) {
551 if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
552 this_cmd = &cmd;
7c673cae
FG
553 break;
554 }
555 }
556 return this_cmd;
557}
558
559bool DaemonServer::_allowed_command(
560 MgrSession *s,
561 const string &module,
562 const string &prefix,
563 const map<string,cmd_vartype>& cmdmap,
564 const map<string,string>& param_str_map,
c07f9fc5 565 const MonCommand *this_cmd) {
7c673cae
FG
566
567 if (s->entity_name.is_mon()) {
568 // mon is all-powerful. even when it is forwarding commands on behalf of
569 // old clients; we expect the mon is validating commands before proxying!
570 return true;
571 }
572
573 bool cmd_r = this_cmd->requires_perm('r');
574 bool cmd_w = this_cmd->requires_perm('w');
575 bool cmd_x = this_cmd->requires_perm('x');
576
577 bool capable = s->caps.is_capable(
578 g_ceph_context,
579 CEPH_ENTITY_TYPE_MGR,
580 s->entity_name,
581 module, prefix, param_str_map,
582 cmd_r, cmd_w, cmd_x);
583
584 dout(10) << " " << s->entity_name << " "
585 << (capable ? "" : "not ") << "capable" << dendl;
586 return capable;
587}
588
589bool DaemonServer::handle_command(MCommand *m)
590{
3efd9988 591 Mutex::Locker l(lock);
7c673cae
FG
592 int r = 0;
593 std::stringstream ss;
594 std::string prefix;
595
596 assert(lock.is_locked_by_me());
597
598 /**
599 * The working data for processing an MCommand. This lives in
600 * a class to enable passing it into other threads for processing
601 * outside of the thread/locks that called handle_command.
602 */
603 class CommandContext
604 {
605 public:
606 MCommand *m;
607 bufferlist odata;
608 cmdmap_t cmdmap;
609
610 CommandContext(MCommand *m_)
611 : m(m_)
612 {
613 }
614
615 ~CommandContext()
616 {
617 m->put();
618 }
619
620 void reply(int r, const std::stringstream &ss)
621 {
622 reply(r, ss.str());
623 }
624
625 void reply(int r, const std::string &rs)
626 {
627 // Let the connection drop as soon as we've sent our response
628 ConnectionRef con = m->get_connection();
629 if (con) {
630 con->mark_disposable();
631 }
632
224ce89b 633 dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl;
7c673cae
FG
634 if (con) {
635 MCommandReply *reply = new MCommandReply(r, rs);
636 reply->set_tid(m->get_tid());
637 reply->set_data(odata);
638 con->send_message(reply);
639 }
640 }
641 };
642
643 /**
644 * A context for receiving a bufferlist/error string from a background
645 * function and then calling back to a CommandContext when it's done
646 */
647 class ReplyOnFinish : public Context {
648 std::shared_ptr<CommandContext> cmdctx;
649
650 public:
651 bufferlist from_mon;
652 string outs;
653
654 ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
655 : cmdctx(cmdctx_)
656 {}
657 void finish(int r) override {
658 cmdctx->odata.claim_append(from_mon);
659 cmdctx->reply(r, outs);
660 }
661 };
662
663 std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
664
665 MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
666 if (!session) {
667 return true;
668 }
669 session->put(); // SessionRef takes a ref
670 if (session->inst.name == entity_name_t())
671 session->inst.name = m->get_source();
672
673 std::string format;
674 boost::scoped_ptr<Formatter> f;
675 map<string,string> param_str_map;
676
677 if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
678 cmdctx->reply(-EINVAL, ss);
679 return true;
680 }
681
682 {
683 cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
684 f.reset(Formatter::create(format));
685 }
686
687 cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
688
689 dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
690 dout(4) << "prefix=" << prefix << dendl;
691
692 if (prefix == "get_command_descriptions") {
7c673cae 693 dout(10) << "reading commands from python modules" << dendl;
c07f9fc5 694 const auto py_commands = py_modules.get_commands();
7c673cae 695
c07f9fc5 696 int cmdnum = 0;
7c673cae
FG
697 JSONFormatter f;
698 f.open_object_section("command_descriptions");
c07f9fc5
FG
699
700 auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){
7c673cae
FG
701 ostringstream secname;
702 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
c07f9fc5
FG
703 dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring,
704 mc.module, mc.req_perms, mc.availability, 0);
7c673cae 705 cmdnum++;
c07f9fc5
FG
706 };
707
708 for (const auto &pyc : py_commands) {
709 dump_cmd(pyc);
7c673cae 710 }
7c673cae 711
c07f9fc5
FG
712 for (const auto &mgr_cmd : mgr_commands) {
713 dump_cmd(mgr_cmd);
7c673cae 714 }
c07f9fc5 715
7c673cae
FG
716 f.close_section(); // command_descriptions
717 f.flush(cmdctx->odata);
718 cmdctx->reply(0, ss);
719 return true;
720 }
721
722 // lookup command
c07f9fc5 723 const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
7c673cae
FG
724 _generate_command_map(cmdctx->cmdmap, param_str_map);
725 if (!mgr_cmd) {
c07f9fc5 726 MonCommand py_command = {"", "", "py", "rw", "cli"};
7c673cae
FG
727 if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
728 param_str_map, &py_command)) {
729 dout(1) << " access denied" << dendl;
31f18b77
FG
730 ss << "access denied; does your client key have mgr caps?"
731 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
7c673cae
FG
732 cmdctx->reply(-EACCES, ss);
733 return true;
734 }
735 } else {
736 // validate user's permissions for requested command
737 if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
738 param_str_map, mgr_cmd)) {
739 dout(1) << " access denied" << dendl;
740 audit_clog->info() << "from='" << session->inst << "' "
741 << "entity='" << session->entity_name << "' "
742 << "cmd=" << m->cmd << ": access denied";
31f18b77
FG
743 ss << "access denied' does your client key have mgr caps?"
744 " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
7c673cae
FG
745 cmdctx->reply(-EACCES, ss);
746 return true;
747 }
748 }
749
750 audit_clog->debug()
751 << "from='" << session->inst << "' "
752 << "entity='" << session->entity_name << "' "
753 << "cmd=" << m->cmd << ": dispatch";
754
224ce89b
WB
755 // ----------------
756 // service map commands
757 if (prefix == "service dump") {
758 if (!f)
759 f.reset(Formatter::create("json-pretty"));
760 cluster_state.with_servicemap([&](const ServiceMap &service_map) {
761 f->dump_object("service_map", service_map);
762 });
763 f->flush(cmdctx->odata);
764 cmdctx->reply(0, ss);
765 return true;
766 }
767 if (prefix == "service status") {
768 if (!f)
769 f.reset(Formatter::create("json-pretty"));
770 // only include state from services that are in the persisted service map
771 f->open_object_section("service_status");
772 ServiceMap s;
773 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
774 s = service_map;
775 });
776 for (auto& p : s.services) {
777 f->open_object_section(p.first.c_str());
778 for (auto& q : p.second.daemons) {
779 f->open_object_section(q.first.c_str());
780 DaemonKey key(p.first, q.first);
781 assert(daemon_state.exists(key));
782 auto daemon = daemon_state.get(key);
c07f9fc5 783 Mutex::Locker l(daemon->lock);
224ce89b
WB
784 f->dump_stream("status_stamp") << daemon->service_status_stamp;
785 f->dump_stream("last_beacon") << daemon->last_service_beacon;
786 f->open_object_section("status");
787 for (auto& r : daemon->service_status) {
788 f->dump_string(r.first.c_str(), r.second);
789 }
790 f->close_section();
791 f->close_section();
792 }
793 f->close_section();
794 }
795 f->close_section();
796 f->flush(cmdctx->odata);
797 cmdctx->reply(0, ss);
798 return true;
799 }
800
3efd9988
FG
801 if (prefix == "config set") {
802 std::string key;
803 std::string val;
804 cmd_getval(cct, cmdctx->cmdmap, "key", key);
805 cmd_getval(cct, cmdctx->cmdmap, "value", val);
806 r = cct->_conf->set_val(key, val, true, &ss);
807 if (r == 0) {
808 cct->_conf->apply_changes(nullptr);
809 }
810 cmdctx->reply(0, ss);
811 return true;
812 }
813
7c673cae
FG
814 // -----------
815 // PG commands
816
817 if (prefix == "pg scrub" ||
818 prefix == "pg repair" ||
819 prefix == "pg deep-scrub") {
820 string scrubop = prefix.substr(3, string::npos);
821 pg_t pgid;
822 string pgidstr;
823 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
824 if (!pgid.parse(pgidstr.c_str())) {
825 ss << "invalid pgid '" << pgidstr << "'";
826 cmdctx->reply(-EINVAL, ss);
827 return true;
828 }
829 bool pg_exists = false;
830 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
831 pg_exists = osdmap.pg_exists(pgid);
832 });
833 if (!pg_exists) {
834 ss << "pg " << pgid << " dne";
835 cmdctx->reply(-ENOENT, ss);
836 return true;
837 }
838 int acting_primary = -1;
7c673cae
FG
839 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
840 acting_primary = osdmap.get_pg_acting_primary(pgid);
7c673cae
FG
841 });
842 if (acting_primary == -1) {
843 ss << "pg " << pgid << " has no primary osd";
844 cmdctx->reply(-EAGAIN, ss);
845 return true;
846 }
31f18b77
FG
847 auto p = osd_cons.find(acting_primary);
848 if (p == osd_cons.end()) {
849 ss << "pg " << pgid << " primary osd." << acting_primary
850 << " is not currently connected";
851 cmdctx->reply(-EAGAIN, ss);
852 }
7c673cae 853 vector<pg_t> pgs = { pgid };
31f18b77
FG
854 for (auto& con : p->second) {
855 con->send_message(new MOSDScrub(monc->get_fsid(),
856 pgs,
857 scrubop == "repair",
858 scrubop == "deep-scrub"));
859 }
7c673cae 860 ss << "instructing pg " << pgid << " on osd." << acting_primary
31f18b77
FG
861 << " to " << scrubop;
862 cmdctx->reply(0, ss);
863 return true;
864 } else if (prefix == "osd scrub" ||
865 prefix == "osd deep-scrub" ||
866 prefix == "osd repair") {
867 string whostr;
868 cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
869 vector<string> pvec;
870 get_str_vec(prefix, pvec);
871
872 set<int> osds;
224ce89b 873 if (whostr == "*" || whostr == "all" || whostr == "any") {
31f18b77
FG
874 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
875 for (int i = 0; i < osdmap.get_max_osd(); i++)
876 if (osdmap.is_up(i)) {
877 osds.insert(i);
878 }
879 });
880 } else {
881 long osd = parse_osd_id(whostr.c_str(), &ss);
882 if (osd < 0) {
883 ss << "invalid osd '" << whostr << "'";
884 cmdctx->reply(-EINVAL, ss);
885 return true;
886 }
887 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
888 if (osdmap.is_up(osd)) {
889 osds.insert(osd);
890 }
891 });
892 if (osds.empty()) {
893 ss << "osd." << osd << " is not up";
894 cmdctx->reply(-EAGAIN, ss);
895 return true;
896 }
897 }
898 set<int> sent_osds, failed_osds;
899 for (auto osd : osds) {
900 auto p = osd_cons.find(osd);
901 if (p == osd_cons.end()) {
902 failed_osds.insert(osd);
903 } else {
904 sent_osds.insert(osd);
905 for (auto& con : p->second) {
906 con->send_message(new MOSDScrub(monc->get_fsid(),
907 pvec.back() == "repair",
908 pvec.back() == "deep-scrub"));
909 }
910 }
911 }
912 if (failed_osds.size() == osds.size()) {
913 ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
914 << " (not connected)";
915 r = -EAGAIN;
916 } else {
917 ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
918 if (!failed_osds.empty()) {
919 ss << "; osd(s) " << failed_osds << " were not connected";
920 }
921 r = 0;
922 }
7c673cae
FG
923 cmdctx->reply(0, ss);
924 return true;
925 } else if (prefix == "osd reweight-by-pg" ||
926 prefix == "osd reweight-by-utilization" ||
927 prefix == "osd test-reweight-by-pg" ||
928 prefix == "osd test-reweight-by-utilization") {
929 bool by_pg =
930 prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
931 bool dry_run =
932 prefix == "osd test-reweight-by-pg" ||
933 prefix == "osd test-reweight-by-utilization";
934 int64_t oload;
935 cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
936 set<int64_t> pools;
937 vector<string> poolnames;
938 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
939 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
940 for (const auto& poolname : poolnames) {
941 int64_t pool = osdmap.lookup_pg_pool_name(poolname);
942 if (pool < 0) {
943 ss << "pool '" << poolname << "' does not exist";
944 r = -ENOENT;
945 }
946 pools.insert(pool);
947 }
948 });
949 if (r) {
950 cmdctx->reply(r, ss);
951 return true;
952 }
953 double max_change = g_conf->mon_reweight_max_change;
954 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
955 if (max_change <= 0.0) {
956 ss << "max_change " << max_change << " must be positive";
957 cmdctx->reply(-EINVAL, ss);
958 return true;
959 }
960 int64_t max_osds = g_conf->mon_reweight_max_osds;
961 cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
962 if (max_osds <= 0) {
963 ss << "max_osds " << max_osds << " must be positive";
964 cmdctx->reply(-EINVAL, ss);
965 return true;
966 }
967 string no_increasing;
968 cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
969 string out_str;
970 mempool::osdmap::map<int32_t, uint32_t> new_weights;
971 r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
972 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
973 return reweight::by_utilization(osdmap, pgmap,
974 oload,
975 max_change,
976 max_osds,
977 by_pg,
978 pools.empty() ? NULL : &pools,
979 no_increasing == "--no-increasing",
980 &new_weights,
981 &ss, &out_str, f.get());
982 });
983 });
984 if (r >= 0) {
985 dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
986 }
987 if (f) {
988 f->flush(cmdctx->odata);
989 } else {
990 cmdctx->odata.append(out_str);
991 }
992 if (r < 0) {
993 ss << "FAILED reweight-by-pg";
994 cmdctx->reply(r, ss);
995 return true;
996 } else if (r == 0 || dry_run) {
997 ss << "no change";
998 cmdctx->reply(r, ss);
999 return true;
1000 } else {
1001 json_spirit::Object json_object;
1002 for (const auto& osd_weight : new_weights) {
1003 json_spirit::Config::add(json_object,
1004 std::to_string(osd_weight.first),
1005 std::to_string(osd_weight.second));
1006 }
1007 string s = json_spirit::write(json_object);
1008 std::replace(begin(s), end(s), '\"', '\'');
1009 const string cmd =
1010 "{"
1011 "\"prefix\": \"osd reweightn\", "
1012 "\"weights\": \"" + s + "\""
1013 "}";
1014 auto on_finish = new ReplyOnFinish(cmdctx);
1015 monc->start_mon_command({cmd}, {},
1016 &on_finish->from_mon, &on_finish->outs, on_finish);
1017 return true;
1018 }
31f18b77
FG
1019 } else if (prefix == "osd df") {
1020 string method;
1021 cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
1022 r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) {
1023 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1024 print_osd_utilization(osdmap, &pgservice, ss,
1025 f.get(), method == "tree");
1026
1027 cmdctx->odata.append(ss);
1028 return 0;
1029 });
1030 });
1031 cmdctx->reply(r, "");
1032 return true;
35e4c445
FG
1033 } else if (prefix == "osd safe-to-destroy") {
1034 vector<string> ids;
1035 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1036 set<int> osds;
1037 int r;
1038 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1039 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1040 });
1041 if (!r && osds.empty()) {
1042 ss << "must specify one or more OSDs";
1043 r = -EINVAL;
1044 }
1045 if (r < 0) {
1046 cmdctx->reply(r, ss);
1047 return true;
1048 }
1049 set<int> active_osds, missing_stats, stored_pgs;
1050 int affected_pgs = 0;
1051 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1052 if (pg_map.num_pg_unknown > 0) {
1053 ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
1054 << " any conclusions";
1055 r = -EAGAIN;
1056 return;
1057 }
1058 int num_active_clean = 0;
1059 for (auto& p : pg_map.num_pg_by_state) {
1060 unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
1061 if ((p.first & want) == want) {
1062 num_active_clean += p.second;
1063 }
1064 }
1065 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1066 for (auto osd : osds) {
1067 if (!osdmap.exists(osd)) {
1068 continue; // clearly safe to destroy
1069 }
1070 auto q = pg_map.num_pg_by_osd.find(osd);
1071 if (q != pg_map.num_pg_by_osd.end()) {
1072 if (q->second.acting > 0 || q->second.up > 0) {
1073 active_osds.insert(osd);
1074 affected_pgs += q->second.acting + q->second.up;
1075 continue;
1076 }
1077 }
1078 if (num_active_clean < pg_map.num_pg) {
1079 // all pgs aren't active+clean; we need to be careful.
1080 auto p = pg_map.osd_stat.find(osd);
1081 if (p == pg_map.osd_stat.end()) {
1082 missing_stats.insert(osd);
1083 }
1084 if (p->second.num_pgs > 0) {
1085 stored_pgs.insert(osd);
1086 }
1087 }
1088 }
1089 });
1090 });
1091 if (!r && !active_osds.empty()) {
1092 ss << "OSD(s) " << active_osds << " have " << affected_pgs
1093 << " pgs currently mapped to them";
1094 r = -EBUSY;
1095 } else if (!missing_stats.empty()) {
1096 ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1097 << " PGs are active+clean; we cannot draw any conclusions";
1098 r = -EAGAIN;
1099 } else if (!stored_pgs.empty()) {
1100 ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1101 << " data, and not all PGs are active+clean; we cannot be sure they"
1102 << " aren't still needed.";
1103 r = -EBUSY;
1104 }
1105 if (r) {
1106 cmdctx->reply(r, ss);
1107 return true;
1108 }
1109 ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1110 << " durability.";
1111 cmdctx->reply(0, ss);
1112 return true;
1113 } else if (prefix == "osd ok-to-stop") {
1114 vector<string> ids;
1115 cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1116 set<int> osds;
1117 int r;
1118 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1119 r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1120 });
1121 if (!r && osds.empty()) {
1122 ss << "must specify one or more OSDs";
1123 r = -EINVAL;
1124 }
1125 if (r < 0) {
1126 cmdctx->reply(r, ss);
1127 return true;
1128 }
1129 map<pg_t,int> pg_delta; // pgid -> net acting set size change
1130 int dangerous_pgs = 0;
1131 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1132 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1133 if (pg_map.num_pg_unknown > 0) {
1134 ss << pg_map.num_pg_unknown << " pgs have unknown state; "
1135 << "cannot draw any conclusions";
1136 r = -EAGAIN;
1137 return;
1138 }
1139 for (auto osd : osds) {
1140 auto p = pg_map.pg_by_osd.find(osd);
1141 if (p != pg_map.pg_by_osd.end()) {
1142 for (auto& pgid : p->second) {
1143 --pg_delta[pgid];
1144 }
1145 }
1146 }
1147 for (auto& p : pg_delta) {
1148 auto q = pg_map.pg_stat.find(p.first);
1149 if (q == pg_map.pg_stat.end()) {
1150 ss << "missing information about " << p.first << "; cannot draw"
1151 << " any conclusions";
1152 r = -EAGAIN;
1153 return;
1154 }
1155 if (!(q->second.state & PG_STATE_ACTIVE) ||
1156 (q->second.state & PG_STATE_DEGRADED)) {
1157 // we don't currently have a good way to tell *how* degraded
1158 // a degraded PG is, so we have to assume we cannot remove
1159 // any more replicas/shards.
1160 ++dangerous_pgs;
1161 continue;
1162 }
1163 const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool());
1164 if (!pi) {
1165 ++dangerous_pgs; // pool is creating or deleting
1166 } else {
1167 if (q->second.acting.size() + p.second < pi->min_size) {
1168 ++dangerous_pgs;
1169 }
1170 }
1171 }
1172 });
1173 });
1174 if (r) {
1175 cmdctx->reply(r, ss);
1176 return true;
1177 }
1178 if (dangerous_pgs) {
1179 ss << dangerous_pgs << " PGs are already degraded or might become "
1180 << "unavailable";
1181 cmdctx->reply(-EBUSY, ss);
1182 return true;
1183 }
1184 ss << "OSD(s) " << osds << " are ok to stop without reducing"
1185 << " availability, provided there are no other concurrent failures"
1186 << " or interventions. " << pg_delta.size() << " PGs are likely to be"
1187 << " degraded (but remain available) as a result.";
1188 cmdctx->reply(0, ss);
1189 return true;
c07f9fc5
FG
1190 } else if (prefix == "pg force-recovery" ||
1191 prefix == "pg force-backfill" ||
1192 prefix == "pg cancel-force-recovery" ||
1193 prefix == "pg cancel-force-backfill") {
1194 string forceop = prefix.substr(3, string::npos);
1195 list<pg_t> parsed_pgs;
1196 map<int, list<pg_t> > osdpgs;
1197
1198 // figure out actual op just once
1199 int actual_op = 0;
1200 if (forceop == "force-recovery") {
1201 actual_op = OFR_RECOVERY;
1202 } else if (forceop == "force-backfill") {
1203 actual_op = OFR_BACKFILL;
1204 } else if (forceop == "cancel-force-backfill") {
1205 actual_op = OFR_BACKFILL | OFR_CANCEL;
1206 } else if (forceop == "cancel-force-recovery") {
1207 actual_op = OFR_RECOVERY | OFR_CANCEL;
1208 }
1209
1210 // covnert pg names to pgs, discard any invalid ones while at it
1211 {
1212 // we don't want to keep pgidstr and pgidstr_nodup forever
1213 vector<string> pgidstr;
1214 // get pgids to process and prune duplicates
1215 cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
1216 set<string> pgidstr_nodup(pgidstr.begin(), pgidstr.end());
1217 if (pgidstr.size() != pgidstr_nodup.size()) {
1218 // move elements only when there were duplicates, as this
1219 // reorders them
1220 pgidstr.resize(pgidstr_nodup.size());
1221 auto it = pgidstr_nodup.begin();
1222 for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) {
1223 pgidstr[i] = std::move(*it++);
1224 }
1225 }
1226
1227 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1228 for (auto& pstr : pgidstr) {
1229 pg_t parsed_pg;
1230 if (!parsed_pg.parse(pstr.c_str())) {
1231 ss << "invalid pgid '" << pstr << "'; ";
1232 r = -EINVAL;
1233 } else {
1234 auto workit = pg_map.pg_stat.find(parsed_pg);
1235 if (workit == pg_map.pg_stat.end()) {
b32b8144 1236 ss << "pg " << pstr << " does not exist; ";
c07f9fc5
FG
1237 r = -ENOENT;
1238 } else {
1239 pg_stat_t workpg = workit->second;
1240
1241 // discard pgs for which user requests are pointless
1242 switch (actual_op)
1243 {
1244 case OFR_RECOVERY:
1245 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) {
1246 // don't return error, user script may be racing with cluster. not fatal.
1247 ss << "pg " << pstr << " doesn't require recovery; ";
1248 continue;
1249 } else if (workpg.state & PG_STATE_FORCED_RECOVERY) {
1250 ss << "pg " << pstr << " recovery already forced; ";
1251 // return error, as it may be a bug in user script
1252 r = -EINVAL;
1253 continue;
1254 }
1255 break;
1256 case OFR_BACKFILL:
3efd9988 1257 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING)) == 0) {
c07f9fc5
FG
1258 ss << "pg " << pstr << " doesn't require backfilling; ";
1259 continue;
1260 } else if (workpg.state & PG_STATE_FORCED_BACKFILL) {
1261 ss << "pg " << pstr << " backfill already forced; ";
1262 r = -EINVAL;
1263 continue;
1264 }
1265 break;
1266 case OFR_BACKFILL | OFR_CANCEL:
1267 if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) {
1268 ss << "pg " << pstr << " backfill not forced; ";
1269 continue;
1270 }
1271 break;
1272 case OFR_RECOVERY | OFR_CANCEL:
1273 if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) {
1274 ss << "pg " << pstr << " recovery not forced; ";
1275 continue;
1276 }
1277 break;
1278 default:
1279 assert(0 == "actual_op value is not supported");
1280 }
1281
1282 parsed_pgs.push_back(std::move(parsed_pg));
1283 }
1284 }
1285 }
1286
1287 // group pgs to process by osd
1288 for (auto& pgid : parsed_pgs) {
1289 auto workit = pg_map.pg_stat.find(pgid);
1290 if (workit != pg_map.pg_stat.end()) {
1291 pg_stat_t workpg = workit->second;
1292 set<int32_t> osds(workpg.up.begin(), workpg.up.end());
1293 osds.insert(workpg.acting.begin(), workpg.acting.end());
1294 for (auto i : osds) {
1295 osdpgs[i].push_back(pgid);
1296 }
1297 }
1298 }
1299
1300 });
1301 }
1302
1303 // respond with error only when no pgs are correct
1304 // yes, in case of mixed errors, only the last one will be emitted,
1305 // but the message presented will be fine
1306 if (parsed_pgs.size() != 0) {
1307 // clear error to not confuse users/scripts
1308 r = 0;
1309 }
1310
1311 // optimize the command -> messages conversion, use only one message per distinct OSD
1312 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1313 for (auto& i : osdpgs) {
1314 if (osdmap.is_up(i.first)) {
1315 vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end()));
1316 auto p = osd_cons.find(i.first);
1317 if (p == osd_cons.end()) {
1318 ss << "osd." << i.first << " is not currently connected";
1319 r = -EAGAIN;
1320 continue;
1321 }
1322 for (auto& con : p->second) {
1323 con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op));
1324 }
1325 ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; ";
1326 }
1327 }
1328 });
1329 ss << std::endl;
1330 cmdctx->reply(r, ss);
1331 return true;
7c673cae
FG
1332 } else {
1333 r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
1334 return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1335 return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
1336 f.get(), &ss, &cmdctx->odata);
1337 });
1338 });
1339
1340 if (r != -EOPNOTSUPP) {
1341 cmdctx->reply(r, ss);
1342 return true;
1343 }
1344 }
1345
1346 // None of the special native commands,
3efd9988 1347 ActivePyModule *handler = nullptr;
c07f9fc5 1348 auto py_commands = py_modules.get_py_commands();
7c673cae
FG
1349 for (const auto &pyc : py_commands) {
1350 auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1351 dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
1352 if (pyc_prefix == prefix) {
1353 handler = pyc.handler;
1354 break;
1355 }
1356 }
1357
1358 if (handler == nullptr) {
1359 ss << "No handler found for '" << prefix << "'";
1360 dout(4) << "No handler found for '" << prefix << "'" << dendl;
1361 cmdctx->reply(-EINVAL, ss);
1362 return true;
1363 } else {
1364 // Okay, now we have a handler to call, but we must not call it
1365 // in this thread, because the python handlers can do anything,
1366 // including blocking, and including calling back into mgr.
1367 dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
1368 finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
1369 std::stringstream ds;
1370 std::stringstream ss;
1371 int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
1372 cmdctx->odata.append(ds);
1373 cmdctx->reply(r, ss);
1374 }));
1375 return true;
1376 }
1377}
31f18b77 1378
224ce89b
WB
1379void DaemonServer::_prune_pending_service_map()
1380{
1381 utime_t cutoff = ceph_clock_now();
3efd9988 1382 cutoff -= g_conf->get_val<double>("mgr_service_beacon_grace");
224ce89b
WB
1383 auto p = pending_service_map.services.begin();
1384 while (p != pending_service_map.services.end()) {
1385 auto q = p->second.daemons.begin();
1386 while (q != p->second.daemons.end()) {
1387 DaemonKey key(p->first, q->first);
1388 if (!daemon_state.exists(key)) {
1389 derr << "missing key " << key << dendl;
1390 ++q;
1391 continue;
1392 }
1393 auto daemon = daemon_state.get(key);
c07f9fc5 1394 Mutex::Locker l(daemon->lock);
224ce89b
WB
1395 if (daemon->last_service_beacon == utime_t()) {
1396 // we must have just restarted; assume they are alive now.
1397 daemon->last_service_beacon = ceph_clock_now();
1398 ++q;
1399 continue;
1400 }
1401 if (daemon->last_service_beacon < cutoff) {
1402 dout(10) << "pruning stale " << p->first << "." << q->first
1403 << " last_beacon " << daemon->last_service_beacon << dendl;
1404 q = p->second.daemons.erase(q);
1405 pending_service_map_dirty = pending_service_map.epoch;
1406 } else {
1407 ++q;
1408 }
1409 }
1410 if (p->second.daemons.empty()) {
1411 p = pending_service_map.services.erase(p);
1412 pending_service_map_dirty = pending_service_map.epoch;
1413 } else {
1414 ++p;
1415 }
1416 }
1417}
1418
31f18b77
FG
1419void DaemonServer::send_report()
1420{
224ce89b 1421 if (!pgmap_ready) {
3efd9988 1422 if (ceph_clock_now() - started_at > g_conf->get_val<int64_t>("mgr_stats_period") * 4.0) {
224ce89b
WB
1423 pgmap_ready = true;
1424 reported_osds.clear();
1425 dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1426 << "potentially incomplete PG state to mon" << dendl;
1427 } else {
1428 dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1429 << dendl;
1430 return;
1431 }
1432 }
1433
31f18b77 1434 auto m = new MMonMgrReport();
c07f9fc5
FG
1435 py_modules.get_health_checks(&m->health_checks);
1436
31f18b77
FG
1437 cluster_state.with_pgmap([&](const PGMap& pg_map) {
1438 cluster_state.update_delta_stats();
1439
224ce89b
WB
1440 if (pending_service_map.epoch) {
1441 _prune_pending_service_map();
1442 if (pending_service_map_dirty >= pending_service_map.epoch) {
1443 pending_service_map.modified = ceph_clock_now();
1444 ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
1445 dout(10) << "sending service_map e" << pending_service_map.epoch
1446 << dendl;
1447 pending_service_map.epoch++;
1448 }
1449 }
1450
31f18b77
FG
1451 cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1452 // FIXME: no easy way to get mon features here. this will do for
1453 // now, though, as long as we don't make a backward-incompat change.
1454 pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
1455 dout(10) << pg_map << dendl;
224ce89b
WB
1456
1457 pg_map.get_health_checks(g_ceph_context, osdmap,
1458 &m->health_checks);
c07f9fc5 1459
224ce89b
WB
1460 dout(10) << m->health_checks.checks.size() << " health checks"
1461 << dendl;
1462 dout(20) << "health checks:\n";
1463 JSONFormatter jf(true);
1464 jf.dump_object("health_checks", m->health_checks);
1465 jf.flush(*_dout);
1466 *_dout << dendl;
31f18b77
FG
1467 });
1468 });
b32b8144
FG
1469
1470 auto osds = daemon_state.get_by_service("osd");
1471 map<osd_metric, unique_ptr<OSDHealthMetricCollector>> accumulated;
1472 for (const auto& osd : osds) {
1473 Mutex::Locker l(osd.second->lock);
1474 for (const auto& metric : osd.second->osd_health_metrics) {
1475 auto acc = accumulated.find(metric.get_type());
1476 if (acc == accumulated.end()) {
1477 auto collector = OSDHealthMetricCollector::create(metric.get_type());
1478 if (!collector) {
1479 derr << __func__ << " " << osd.first << "." << osd.second
1480 << " sent me an unknown health metric: "
1481 << static_cast<uint8_t>(metric.get_type()) << dendl;
1482 continue;
1483 }
1484 tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
1485 std::move(collector));
1486 }
1487 acc->second->update(osd.first, metric);
1488 }
1489 }
1490 for (const auto& acc : accumulated) {
1491 acc.second->summarize(m->health_checks);
1492 }
31f18b77
FG
1493 // TODO? We currently do not notify the PyModules
1494 // TODO: respect needs_send, so we send the report only if we are asked to do
1495 // so, or the state is updated.
1496 monc->send_mon_message(m);
1497}
224ce89b
WB
1498
1499void DaemonServer::got_service_map()
1500{
1501 Mutex::Locker l(lock);
1502
1503 cluster_state.with_servicemap([&](const ServiceMap& service_map) {
1504 if (pending_service_map.epoch == 0) {
1505 // we just started up
1506 dout(10) << "got initial map e" << service_map.epoch << dendl;
1507 pending_service_map = service_map;
1508 } else {
1509 // we we already active and therefore must have persisted it,
1510 // which means ours is the same or newer.
1511 dout(10) << "got updated map e" << service_map.epoch << dendl;
1512 }
1513 pending_service_map.epoch = service_map.epoch + 1;
1514 });
1515
1516 // cull missing daemons, populate new ones
1517 for (auto& p : pending_service_map.services) {
1518 std::set<std::string> names;
1519 for (auto& q : p.second.daemons) {
1520 names.insert(q.first);
1521 DaemonKey key(p.first, q.first);
1522 if (!daemon_state.exists(key)) {
1523 auto daemon = std::make_shared<DaemonState>(daemon_state.types);
1524 daemon->key = key;
1525 daemon->metadata = q.second.metadata;
1526 if (q.second.metadata.count("hostname")) {
1527 daemon->hostname = q.second.metadata["hostname"];
1528 }
1529 daemon->service_daemon = true;
1530 daemon_state.insert(daemon);
1531 dout(10) << "added missing " << key << dendl;
1532 }
1533 }
1534 daemon_state.cull(p.first, names);
1535 }
1536}
3efd9988
FG
1537
1538
1539const char** DaemonServer::get_tracked_conf_keys() const
1540{
1541 static const char *KEYS[] = {
1542 "mgr_stats_threshold",
1543 "mgr_stats_period",
1544 nullptr
1545 };
1546
1547 return KEYS;
1548}
1549
1550void DaemonServer::handle_conf_change(const struct md_config_t *conf,
1551 const std::set <std::string> &changed)
1552{
1553 dout(4) << "ohai" << dendl;
1554 // We may be called within lock (via MCommand `config set`) or outwith the
1555 // lock (via admin socket `config set`), so handle either case.
1556 const bool initially_locked = lock.is_locked_by_me();
1557 if (!initially_locked) {
1558 lock.Lock();
1559 }
1560
1561 if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
1562 dout(4) << "Updating stats threshold/period on "
1563 << daemon_connections.size() << " clients" << dendl;
1564 // Send a fresh MMgrConfigure to all clients, so that they can follow
1565 // the new policy for transmitting stats
1566 for (auto &c : daemon_connections) {
1567 _send_configure(c);
1568 }
1569 }
1570}
1571
1572void DaemonServer::_send_configure(ConnectionRef c)
1573{
1574 assert(lock.is_locked_by_me());
1575
1576 auto configure = new MMgrConfigure();
1577 configure->stats_period = g_conf->get_val<int64_t>("mgr_stats_period");
1578 configure->stats_threshold = g_conf->get_val<int64_t>("mgr_stats_threshold");
1579 c->send_message(configure);
1580}
1581