*/
#include "DaemonServer.h"
+#include <boost/algorithm/string.hpp>
#include "mgr/Mgr.h"
#include "include/stringify.h"
#include "mgr/mgr_commands.h"
#include "mgr/DaemonHealthMetricCollector.h"
#include "mgr/OSDPerfMetricCollector.h"
+#include "mgr/MDSPerfMetricCollector.h"
#include "mon/MonCommand.h"
#include "messages/MMgrOpen.h"
#include "messages/MMonMgrReport.h"
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
+#include "messages/MMgrCommand.h"
+#include "messages/MMgrCommandReply.h"
#include "messages/MPGStats.h"
#include "messages/MOSDScrub.h"
#include "messages/MOSDScrub2.h"
#define dout_subsys ceph_subsys_mgr
#undef dout_prefix
#define dout_prefix *_dout << "mgr.server " << __func__ << " "
-
-
+using namespace TOPNSPC::common;
+namespace {
+ template <typename Map>
+ bool map_compare(Map const &lhs, Map const &rhs) {
+ return lhs.size() == rhs.size()
+ && std::equal(lhs.begin(), lhs.end(), rhs.begin(),
+ [] (auto a, auto b) { return a.first == b.first && a.second == b.second; });
+ }
+}
DaemonServer::DaemonServer(MonClient *monc_,
Finisher &finisher_,
py_modules(py_modules_),
clog(clog_),
audit_clog(audit_clog_),
- lock("DaemonServer"),
pgmap_ready(false),
timer(g_ceph_context, lock),
shutting_down(false),
tick_event(nullptr),
osd_perf_metric_collector_listener(this),
- osd_perf_metric_collector(osd_perf_metric_collector_listener)
+ osd_perf_metric_collector(osd_perf_metric_collector_listener),
+ mds_perf_metric_collector_listener(this),
+ mds_perf_metric_collector(mds_perf_metric_collector_listener)
{
g_conf().add_observer(this);
}
msgr = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::MGR(gid),
"mgr",
- getpid(), 0);
+ Messenger::get_pid_nonce());
msgr->set_default_policy(Messenger::Policy::stateless_server(0));
msgr->set_auth_client(monc);
return msgr->get_myaddrs();
}
-KeyStore *DaemonServer::ms_get_auth1_authorizer_keystore()
-{
- return monc->rotating_secrets.get();
-}
-
int DaemonServer::ms_handle_authentication(Connection *con)
{
- int ret = 0;
- MgrSession *s = new MgrSession(cct);
+ auto s = ceph::make_ref<MgrSession>(cct);
con->set_priv(s);
s->inst.addr = con->get_peer_addr();
s->entity_name = con->peer_name;
dout(10) << " session " << s << " " << s->entity_name
<< " allow_all" << dendl;
s->caps.set_allow_all();
- }
-
- if (caps_info.caps.length() > 0) {
+ } else if (caps_info.caps.length() > 0) {
auto p = caps_info.caps.cbegin();
string str;
try {
decode(str, p);
}
catch (buffer::error& e) {
- ret = -EPERM;
- }
- bool success = s->caps.parse(str);
- if (success) {
dout(10) << " session " << s << " " << s->entity_name
- << " has caps " << s->caps << " '" << str << "'" << dendl;
- ret = 1;
- } else {
+ << " failed to decode caps" << dendl;
+ return -EACCES;
+ }
+ if (!s->caps.parse(str)) {
dout(10) << " session " << s << " " << s->entity_name
<< " failed to parse caps '" << str << "'" << dendl;
- ret = -EPERM;
+ return -EACCES;
}
+ dout(10) << " session " << s << " " << s->entity_name
+ << " has caps " << s->caps << " '" << str << "'" << dendl;
}
if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
osd_cons[s->osd_id].insert(con);
}
- return ret;
-}
-
-bool DaemonServer::ms_get_authorizer(
- int dest_type,
- AuthAuthorizer **authorizer)
-{
- dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
-
- if (dest_type == CEPH_ENTITY_TYPE_MON) {
- return true;
- }
-
- *authorizer = monc->build_authorizer(dest_type);
- dout(20) << "got authorizer " << *authorizer << dendl;
- return *authorizer != NULL;
+ return 1;
}
bool DaemonServer::ms_handle_reset(Connection *con)
return false;
}
-bool DaemonServer::ms_dispatch(Message *m)
+bool DaemonServer::ms_dispatch2(const ref_t<Message>& m)
{
// Note that we do *not* take ::lock here, in order to avoid
// serializing all message handling. It's up to each handler
// to take whatever locks it needs.
switch (m->get_type()) {
case MSG_PGSTATS:
- cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
+ cluster_state.ingest_pgstats(ref_cast<MPGStats>(m));
maybe_ready(m->get_source().num());
- m->put();
return true;
case MSG_MGR_REPORT:
- return handle_report(static_cast<MMgrReport*>(m));
+ return handle_report(ref_cast<MMgrReport>(m));
case MSG_MGR_OPEN:
- return handle_open(static_cast<MMgrOpen*>(m));
+ return handle_open(ref_cast<MMgrOpen>(m));
case MSG_MGR_CLOSE:
- return handle_close(static_cast<MMgrClose*>(m));
+ return handle_close(ref_cast<MMgrClose>(m));
case MSG_COMMAND:
- return handle_command(static_cast<MCommand*>(m));
+ return handle_command(ref_cast<MCommand>(m));
+ case MSG_MGR_COMMAND:
+ return handle_command(ref_cast<MMgrCommand>(m));
default:
dout(1) << "Unhandled message type " << m->get_type() << dendl;
return false;
};
}
+void DaemonServer::dump_pg_ready(ceph::Formatter *f)
+{
+ f->dump_bool("pg_ready", pgmap_ready.load());
+}
+
void DaemonServer::maybe_ready(int32_t osd_id)
{
if (pgmap_ready.load()) {
// fire after all modules have had a chance to set their health checks.
void DaemonServer::schedule_tick_locked(double delay_sec)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
if (tick_event) {
timer.cancel_event(tick_event);
return;
tick_event = timer.add_event_after(delay_sec,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
tick();
}));
}
// Send a fresh MMgrConfigure to all clients, so that they can follow
// the new policy for transmitting stats
- finisher.queue(new FunctionContext([this](int r) {
+ finisher.queue(new LambdaContext([this](int r) {
std::lock_guard l(lock);
for (auto &c : daemon_connections) {
if (c->peer_is_osd()) {
}));
}
+void DaemonServer::handle_mds_perf_metric_query_updated()
+{
+ dout(10) << dendl;
+
+ // Send a fresh MMgrConfigure to all clients, so that they can follow
+ // the new policy for transmitting stats
+ finisher.queue(new LambdaContext([this](int r) {
+ std::lock_guard l(lock);
+ for (auto &c : daemon_connections) {
+ if (c->peer_is_mds()) {
+ _send_configure(c);
+ }
+ }
+ }));
+}
+
void DaemonServer::shutdown()
{
dout(10) << "begin" << dendl;
const std::string& daemon_name)
{
if (!service_name.empty()) {
- return DaemonKey(service_name, daemon_name);
+ return DaemonKey{service_name, daemon_name};
} else {
- return DaemonKey(ceph_entity_type_name(peer_type), daemon_name);
+ return DaemonKey{ceph_entity_type_name(peer_type), daemon_name};
}
}
-static bool key_from_string(
- const std::string& name,
- DaemonKey *out)
+void DaemonServer::fetch_missing_metadata(const DaemonKey& key,
+ const entity_addr_t& addr)
{
- auto p = name.find('.');
- if (p == std::string::npos) {
- return false;
+ if (!daemon_state.is_updating(key) &&
+ (key.type == "osd" || key.type == "mds" || key.type == "mon")) {
+ std::ostringstream oss;
+ auto c = new MetadataUpdate(daemon_state, key);
+ if (key.type == "osd") {
+ oss << "{\"prefix\": \"osd metadata\", \"id\": "
+ << key.name<< "}";
+ } else if (key.type == "mds") {
+ c->set_default("addr", stringify(addr));
+ oss << "{\"prefix\": \"mds metadata\", \"who\": \""
+ << key.name << "\"}";
+ } else if (key.type == "mon") {
+ oss << "{\"prefix\": \"mon metadata\", \"id\": \""
+ << key.name << "\"}";
+ } else {
+ ceph_abort();
+ }
+ monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
}
- out->first = name.substr(0, p);
- out->second = name.substr(p + 1);
- return true;
}
-bool DaemonServer::handle_open(MMgrOpen *m)
+bool DaemonServer::handle_open(const ref_t<MMgrOpen>& m)
{
- std::lock_guard l(lock);
+ std::unique_lock l(lock);
DaemonKey key = key_from_service(m->service_name,
m->get_connection()->get_peer_type(),
m->daemon_name);
auto con = m->get_connection();
- dout(4) << "from " << key << " " << con << dendl;
+ dout(10) << "from " << key << " " << con->get_peer_addr() << dendl;
_send_configure(con);
dout(2) << "ignoring open from " << key << " " << con->get_peer_addr()
<< "; not ready for session (expect reconnect)" << dendl;
con->mark_down();
- m->put();
+ l.unlock();
+ fetch_missing_metadata(key, m->get_source_addr());
return true;
}
}
std::lock_guard l(daemon->lock);
daemon->perf_counters.clear();
+ daemon->service_daemon = m->service_daemon;
if (m->service_daemon) {
daemon->service_status = m->daemon_status;
utime_t now = ceph_clock_now();
- auto d = pending_service_map.get_daemon(m->service_name,
- m->daemon_name);
- if (d->gid != (uint64_t)m->get_source().num()) {
+ auto [d, added] = pending_service_map.get_daemon(m->service_name,
+ m->daemon_name);
+ if (added || d->gid != (uint64_t)m->get_source().num()) {
dout(10) << "registering " << key << " in pending_service_map" << dendl;
d->gid = m->get_source().num();
d->addr = m->get_source_addr();
daemon_connections.insert(con);
}
- m->put();
return true;
}
-bool DaemonServer::handle_close(MMgrClose *m)
+bool DaemonServer::handle_close(const ref_t<MMgrClose>& m)
{
std::lock_guard l(lock);
}
// send same message back as a reply
- m->get_connection()->send_message(m);
+ m->get_connection()->send_message2(m);
return true;
}
-bool DaemonServer::handle_report(MMgrReport *m)
+void DaemonServer::update_task_status(
+ DaemonKey key,
+ const std::map<std::string,std::string>& task_status)
+{
+ dout(10) << "got task status from " << key << dendl;
+
+ [[maybe_unused]] auto [daemon, added] =
+ pending_service_map.get_daemon(key.type, key.name);
+ if (daemon->task_status != task_status) {
+ daemon->task_status = task_status;
+ pending_service_map_dirty = pending_service_map.epoch;
+ }
+}
+
+bool DaemonServer::handle_report(const ref_t<MMgrReport>& m)
{
DaemonKey key;
if (!m->service_name.empty()) {
- key.first = m->service_name;
+ key.type = m->service_name;
} else {
- key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+ key.type = ceph_entity_type_name(m->get_connection()->get_peer_type());
}
- key.second = m->daemon_name;
+ key.name = m->daemon_name;
- dout(4) << "from " << m->get_connection() << " " << key << dendl;
+ dout(10) << "from " << m->get_connection() << " " << key << dendl;
if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
m->service_name.empty()) {
// Clients should not be sending us stats unless they are declaring
// themselves to be a daemon for some service.
- dout(4) << "rejecting report from non-daemon client " << m->daemon_name
- << dendl;
+ dout(10) << "rejecting report from non-daemon client " << m->daemon_name
+ << dendl;
+ clog->warn() << "rejecting report from non-daemon client " << m->daemon_name
+ << " at " << m->get_connection()->get_peer_addrs();
m->get_connection()->mark_down();
- m->put();
return true;
}
- // Look up the DaemonState
- DaemonStatePtr daemon;
- if (daemon_state.exists(key)) {
- dout(20) << "updating existing DaemonState for " << key << dendl;
- daemon = daemon_state.get(key);
- } else {
- // we don't know the hostname at this stage, reject MMgrReport here.
- dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
- << dendl;
-
- // issue metadata request in background
- if (!daemon_state.is_updating(key) &&
- (key.first == "osd" || key.first == "mds" || key.first == "mon")) {
-
- std::ostringstream oss;
- auto c = new MetadataUpdate(daemon_state, key);
- if (key.first == "osd") {
- oss << "{\"prefix\": \"osd metadata\", \"id\": "
- << key.second<< "}";
-
- } else if (key.first == "mds") {
- c->set_default("addr", stringify(m->get_source_addr()));
- oss << "{\"prefix\": \"mds metadata\", \"who\": \""
- << key.second << "\"}";
-
- } else if (key.first == "mon") {
- oss << "{\"prefix\": \"mon metadata\", \"id\": \""
- << key.second << "\"}";
- } else {
- ceph_abort();
- }
- monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
- }
-
- {
- std::lock_guard l(lock);
+ {
+ std::unique_lock locker(lock);
+
+ DaemonStatePtr daemon;
+ // Look up the DaemonState
+ if (daemon_state.exists(key)) {
+ dout(20) << "updating existing DaemonState for " << key << dendl;
+ daemon = daemon_state.get(key);
+ } else {
+ locker.unlock();
+
+ // we don't know the hostname at this stage, reject MMgrReport here.
+ dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
+ << dendl;
+ // issue metadata request in background
+ fetch_missing_metadata(key, m->get_source_addr());
+
+ locker.lock();
+
// kill session
auto priv = m->get_connection()->get_priv();
auto session = static_cast<MgrSession*>(priv.get());
if (!session) {
- return false;
+ return false;
}
m->get_connection()->mark_down();
dout(10) << "unregistering osd." << session->osd_id
- << " session " << session << " con " << m->get_connection() << dendl;
+ << " session " << session << " con " << m->get_connection() << dendl;
if (osd_cons.find(session->osd_id) != osd_cons.end()) {
- osd_cons[session->osd_id].erase(m->get_connection());
- }
+ osd_cons[session->osd_id].erase(m->get_connection());
+ }
auto iter = daemon_connections.find(m->get_connection());
if (iter != daemon_connections.end()) {
- daemon_connections.erase(iter);
+ daemon_connections.erase(iter);
}
- }
-
- return false;
- }
- // Update the DaemonState
- ceph_assert(daemon != nullptr);
- {
- std::lock_guard l(daemon->lock);
- auto &daemon_counters = daemon->perf_counters;
- daemon_counters.update(m);
-
- auto p = m->config_bl.cbegin();
- if (p != m->config_bl.end()) {
- decode(daemon->config, p);
- decode(daemon->ignored_mon_config, p);
- dout(20) << " got config " << daemon->config
- << " ignored " << daemon->ignored_mon_config << dendl;
+ return false;
}
- if (daemon->service_daemon) {
+ // Update the DaemonState
+ ceph_assert(daemon != nullptr);
+ {
+ std::lock_guard l(daemon->lock);
+ auto &daemon_counters = daemon->perf_counters;
+ daemon_counters.update(*m.get());
+
+ auto p = m->config_bl.cbegin();
+ if (p != m->config_bl.end()) {
+ decode(daemon->config, p);
+ decode(daemon->ignored_mon_config, p);
+ dout(20) << " got config " << daemon->config
+ << " ignored " << daemon->ignored_mon_config << dendl;
+ }
+
utime_t now = ceph_clock_now();
- if (m->daemon_status) {
- daemon->service_status = *m->daemon_status;
- daemon->service_status_stamp = now;
+ if (daemon->service_daemon) {
+ if (m->daemon_status) {
+ daemon->service_status_stamp = now;
+ daemon->service_status = *m->daemon_status;
+ }
+ daemon->last_service_beacon = now;
+ } else if (m->daemon_status) {
+ derr << "got status from non-daemon " << key << dendl;
+ }
+ // update task status
+ if (m->task_status) {
+ update_task_status(key, *m->task_status);
+ daemon->last_service_beacon = now;
+ }
+ if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
+ // only OSD and MON send health_checks to me now
+ daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
+ dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
+ << dendl;
}
- daemon->last_service_beacon = now;
- } else if (m->daemon_status) {
- derr << "got status from non-daemon " << key << dendl;
- }
- if (m->get_connection()->peer_is_osd() || m->get_connection()->peer_is_mon()) {
- // only OSD and MON send health_checks to me now
- daemon->daemon_health_metrics = std::move(m->daemon_health_metrics);
- dout(10) << "daemon_health_metrics " << daemon->daemon_health_metrics
- << dendl;
}
}
// if there are any schema updates, notify the python modules
if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
- ostringstream oss;
- oss << key.first << '.' << key.second;
- py_modules.notify_all("perf_schema_update", oss.str());
+ py_modules.notify_all("perf_schema_update", ceph::to_string(key));
}
if (m->get_connection()->peer_is_osd()) {
osd_perf_metric_collector.process_reports(m->osd_perf_metric_reports);
}
- m->put();
+ if (m->metric_report_message) {
+ const MetricReportMessage &message = *m->metric_report_message;
+ boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
+ }
+
return true;
}
continue;
if (p->first == "caps") {
vector<string> cv;
- if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
+ if (cmd_getval(cmdmap, "caps", cv) &&
cv.size() % 2 == 0) {
for (unsigned i = 0; i < cv.size(); i += 2) {
string k = string("caps_") + cv[i];
*/
class CommandContext {
public:
- MCommand *m;
+ ceph::ref_t<MCommand> m_tell;
+ ceph::ref_t<MMgrCommand> m_mgr;
+ const std::vector<std::string>& cmd; ///< ref into m_tell or m_mgr
+ const bufferlist& data; ///< ref into m_tell or m_mgr
bufferlist odata;
cmdmap_t cmdmap;
- explicit CommandContext(MCommand *m_)
- : m(m_) {
+ explicit CommandContext(ceph::ref_t<MCommand> m)
+ : m_tell{std::move(m)},
+ cmd(m_tell->cmd),
+ data(m_tell->get_data()) {
}
-
- ~CommandContext() {
- m->put();
+ explicit CommandContext(ceph::ref_t<MMgrCommand> m)
+ : m_mgr{std::move(m)},
+ cmd(m_mgr->cmd),
+ data(m_mgr->get_data()) {
}
void reply(int r, const std::stringstream &ss) {
void reply(int r, const std::string &rs) {
// Let the connection drop as soon as we've sent our response
- ConnectionRef con = m->get_connection();
+ ConnectionRef con = m_tell ? m_tell->get_connection()
+ : m_mgr->get_connection();
if (con) {
con->mark_disposable();
}
if (r == 0) {
- dout(4) << __func__ << " success" << dendl;
+ dout(20) << "success" << dendl;
} else {
derr << __func__ << " " << cpp_strerror(r) << " " << rs << dendl;
}
if (con) {
- MCommandReply *reply = new MCommandReply(r, rs);
- reply->set_tid(m->get_tid());
- reply->set_data(odata);
- con->send_message(reply);
+ if (m_tell) {
+ MCommandReply *reply = new MCommandReply(r, rs);
+ reply->set_tid(m_tell->get_tid());
+ reply->set_data(odata);
+ con->send_message(reply);
+ } else {
+ MMgrCommandReply *reply = new MMgrCommandReply(r, rs);
+ reply->set_tid(m_mgr->get_tid());
+ reply->set_data(odata);
+ con->send_message(reply);
+ }
}
}
};
}
};
-bool DaemonServer::handle_command(MCommand *m)
+bool DaemonServer::handle_command(const ref_t<MCommand>& m)
{
std::lock_guard l(lock);
- std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
+ auto cmdctx = std::make_shared<CommandContext>(m);
try {
- return _handle_command(m, cmdctx);
+ return _handle_command(cmdctx);
+ } catch (const bad_cmd_get& e) {
+ cmdctx->reply(-EINVAL, e.what());
+ return true;
+ }
+}
+
+bool DaemonServer::handle_command(const ref_t<MMgrCommand>& m)
+{
+ std::lock_guard l(lock);
+ auto cmdctx = std::make_shared<CommandContext>(m);
+ try {
+ return _handle_command(cmdctx);
} catch (const bad_cmd_get& e) {
cmdctx->reply(-EINVAL, e.what());
return true;
dout(1) << " access denied" << dendl;
audit_clog->info() << "from='" << session->inst << "' "
<< "entity='" << session->entity_name << "' "
- << "cmd=" << cmdctx->cmdmap << ": access denied";
+ << "cmd=" << cmdctx->cmd << ": access denied";
ss << "access denied: does your client key have mgr caps? "
- "See http://docs.ceph.com/docs/master/mgr/administrator/"
+ "See http://docs.ceph.com/en/latest/mgr/administrator/"
"#client-authentication";
}
+void DaemonServer::_check_offlines_pgs(
+ const set<int>& osds,
+ const OSDMap& osdmap,
+ const PGMap& pgmap,
+ offline_pg_report *report)
+{
+ // reset output
+ *report = offline_pg_report();
+ report->osds = osds;
+
+ for (const auto& q : pgmap.pg_stat) {
+ set<int32_t> pg_acting; // net acting sets (with no missing if degraded)
+ bool found = false;
+ if (q.second.state == 0) {
+ report->unknown.insert(q.first);
+ continue;
+ }
+ if (q.second.state & PG_STATE_DEGRADED) {
+ for (auto& anm : q.second.avail_no_missing) {
+ if (osds.count(anm.osd)) {
+ found = true;
+ continue;
+ }
+ if (anm.osd != CRUSH_ITEM_NONE) {
+ pg_acting.insert(anm.osd);
+ }
+ }
+ } else {
+ for (auto& a : q.second.acting) {
+ if (osds.count(a)) {
+ found = true;
+ continue;
+ }
+ if (a != CRUSH_ITEM_NONE) {
+ pg_acting.insert(a);
+ }
+ }
+ }
+ if (!found) {
+ continue;
+ }
+ const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
+ bool dangerous = false;
+ if (!pi) {
+ report->bad_no_pool.insert(q.first); // pool is creating or deleting
+ dangerous = true;
+ }
+ if (!(q.second.state & PG_STATE_ACTIVE)) {
+ report->bad_already_inactive.insert(q.first);
+ dangerous = true;
+ }
+ if (pg_acting.size() < pi->min_size) {
+ report->bad_become_inactive.insert(q.first);
+ dangerous = true;
+ }
+ if (dangerous) {
+ report->not_ok.insert(q.first);
+ } else {
+ report->ok.insert(q.first);
+ if (q.second.state & PG_STATE_DEGRADED) {
+ report->ok_become_more_degraded.insert(q.first);
+ } else {
+ report->ok_become_degraded.insert(q.first);
+ }
+ }
+ }
+ dout(20) << osds << " -> " << report->ok.size() << " ok, "
+ << report->not_ok.size() << " not ok, "
+ << report->unknown.size() << " unknown"
+ << dendl;
+}
+
+void DaemonServer::_maximize_ok_to_stop_set(
+ const set<int>& orig_osds,
+ unsigned max,
+ const OSDMap& osdmap,
+ const PGMap& pgmap,
+ offline_pg_report *out_report)
+{
+ dout(20) << "orig_osds " << orig_osds << " max " << max << dendl;
+ _check_offlines_pgs(orig_osds, osdmap, pgmap, out_report);
+ if (!out_report->ok_to_stop()) {
+ return;
+ }
+ if (orig_osds.size() >= max) {
+ // already at max
+ return;
+ }
+
+ // semi-arbitrarily start with the first osd in the set
+ offline_pg_report report;
+ set<int> osds = orig_osds;
+ int parent = *osds.begin();
+ set<int> children;
+
+ while (true) {
+ // identify the next parent
+ int r = osdmap.crush->get_immediate_parent_id(parent, &parent);
+ if (r < 0) {
+ return; // just go with what we have so far!
+ }
+
+ // get candidate additions that are beneath this point in the tree
+ children.clear();
+ r = osdmap.crush->get_all_children(parent, &children);
+ if (r < 0) {
+ return; // just go with what we have so far!
+ }
+ dout(20) << " parent " << parent << " children " << children << dendl;
+
+ // try adding in more osds
+ int failed = 0; // how many children we failed to add to our set
+ for (auto o : children) {
+ if (o >= 0 && osdmap.is_up(o) && osds.count(o) == 0) {
+ osds.insert(o);
+ _check_offlines_pgs(osds, osdmap, pgmap, &report);
+ if (!report.ok_to_stop()) {
+ osds.erase(o);
+ ++failed;
+ continue;
+ }
+ *out_report = report;
+ if (osds.size() == max) {
+ dout(20) << " hit max" << dendl;
+ return; // yay, we hit the max
+ }
+ }
+ }
+
+ if (failed) {
+ // we hit some failures; go with what we have
+ dout(20) << " hit some peer failures" << dendl;
+ return;
+ }
+ }
+}
+
bool DaemonServer::_handle_command(
- MCommand *m,
std::shared_ptr<CommandContext>& cmdctx)
{
+ MessageRef m;
+ bool admin_socket_cmd = false;
+ if (cmdctx->m_tell) {
+ m = cmdctx->m_tell;
+ // a blank fsid in MCommand signals a legacy client sending a "mon-mgr" CLI
+ // command.
+ admin_socket_cmd = (cmdctx->m_tell->fsid != uuid_d());
+ } else {
+ m = cmdctx->m_mgr;
+ }
auto priv = m->get_connection()->get_priv();
auto session = static_cast<MgrSession*>(priv.get());
if (!session) {
return true;
}
- if (session->inst.name == entity_name_t())
+ if (session->inst.name == entity_name_t()) {
session->inst.name = m->get_source();
+ }
- std::string format;
- boost::scoped_ptr<Formatter> f;
map<string,string> param_str_map;
std::stringstream ss;
int r = 0;
- if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
+ if (!cmdmap_from_json(cmdctx->cmd, &(cmdctx->cmdmap), ss)) {
cmdctx->reply(-EINVAL, ss);
return true;
}
+ string prefix;
+ cmd_getval(cmdctx->cmdmap, "prefix", prefix);
+ dout(10) << "decoded-size=" << cmdctx->cmdmap.size() << " prefix=" << prefix << dendl;
+
+ boost::scoped_ptr<Formatter> f;
{
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
+ std::string format;
+ if (boost::algorithm::ends_with(prefix, "_json")) {
+ format = "json";
+ } else {
+ cmd_getval(cmdctx->cmdmap, "format", format, string("plain"));
+ }
f.reset(Formatter::create(format));
}
- string prefix;
- cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
-
- dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
- dout(4) << "prefix=" << prefix << dendl;
-
- if (prefix == "get_command_descriptions") {
+ // this is just for mgr commands - admin socket commands will fall
+ // through and use the admin socket version of
+ // get_command_descriptions
+ if (prefix == "get_command_descriptions" && !admin_socket_cmd) {
dout(10) << "reading commands from python modules" << dendl;
const auto py_commands = py_modules.get_commands();
bool is_allowed = false;
ModuleCommand py_command;
- if (!mgr_cmd) {
+ if (admin_socket_cmd) {
+ // admin socket commands require all capabilities
+ is_allowed = session->caps.is_allow_all();
+ } else if (!mgr_cmd) {
// Resolve the command to the name of the module that will
// handle it (if the command exists)
auto py_commands = py_modules.get_py_commands();
audit_clog->debug()
<< "from='" << session->inst << "' "
<< "entity='" << session->entity_name << "' "
- << "cmd=" << m->cmd << ": dispatch";
+ << "cmd=" << cmdctx->cmd << ": dispatch";
+
+ if (admin_socket_cmd) {
+ cct->get_admin_socket()->queue_tell_command(cmdctx->m_tell);
+ return true;
+ }
// ----------------
// service map commands
f.reset(Formatter::create("json-pretty"));
// only include state from services that are in the persisted service map
f->open_object_section("service_status");
- for (auto& p : pending_service_map.services) {
- f->open_object_section(p.first.c_str());
- for (auto& q : p.second.daemons) {
+ for (auto& [type, service] : pending_service_map.services) {
+ if (ServiceMap::is_normal_ceph_entity(type)) {
+ continue;
+ }
+
+ f->open_object_section(type.c_str());
+ for (auto& q : service.daemons) {
f->open_object_section(q.first.c_str());
- DaemonKey key(p.first, q.first);
+ DaemonKey key{type, q.first};
ceph_assert(daemon_state.exists(key));
auto daemon = daemon_state.get(key);
std::lock_guard l(daemon->lock);
if (prefix == "config set") {
std::string key;
std::string val;
- cmd_getval(cct, cmdctx->cmdmap, "key", key);
- cmd_getval(cct, cmdctx->cmdmap, "value", val);
+ cmd_getval(cmdctx->cmdmap, "key", key);
+ cmd_getval(cmdctx->cmdmap, "value", val);
r = cct->_conf.set_val(key, val, &ss);
if (r == 0) {
cct->_conf.apply_changes(nullptr);
pg_t pgid;
spg_t spgid;
string pgidstr;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
+ cmd_getval(cmdctx->cmdmap, "pgid", pgidstr);
if (!pgid.parse(pgidstr.c_str())) {
ss << "invalid pgid '" << pgidstr << "'";
cmdctx->reply(-EINVAL, ss);
prefix == "osd deep-scrub" ||
prefix == "osd repair") {
string whostr;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
+ cmd_getval(cmdctx->cmdmap, "who", whostr);
vector<string> pvec;
get_str_vec(prefix, pvec);
prefix == "osd pool deep-scrub" ||
prefix == "osd pool repair") {
vector<string> pool_names;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", pool_names);
+ cmd_getval(cmdctx->cmdmap, "who", pool_names);
if (pool_names.empty()) {
ss << "must specify one or more pool names";
cmdctx->reply(-EINVAL, ss);
prefix == "osd test-reweight-by-pg" ||
prefix == "osd test-reweight-by-utilization";
int64_t oload;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
+ cmd_getval(cmdctx->cmdmap, "oload", oload, int64_t(120));
set<int64_t> pools;
vector<string> poolnames;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
+ cmd_getval(cmdctx->cmdmap, "pools", poolnames);
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
for (const auto& poolname : poolnames) {
int64_t pool = osdmap.lookup_pg_pool_name(poolname);
}
double max_change = g_conf().get_val<double>("mon_reweight_max_change");
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
+ cmd_getval(cmdctx->cmdmap, "max_change", max_change);
if (max_change <= 0.0) {
ss << "max_change " << max_change << " must be positive";
cmdctx->reply(-EINVAL, ss);
return true;
}
int64_t max_osds = g_conf().get_val<int64_t>("mon_reweight_max_osds");
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
+ cmd_getval(cmdctx->cmdmap, "max_osds", max_osds);
if (max_osds <= 0) {
ss << "max_osds " << max_osds << " must be positive";
cmdctx->reply(-EINVAL, ss);
return true;
}
bool no_increasing = false;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
+ cmd_getval(cmdctx->cmdmap, "no_increasing", no_increasing);
string out_str;
mempool::osdmap::map<int32_t, uint32_t> new_weights;
r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap &osdmap, const PGMap& pgmap) {
return true;
}
} else if (prefix == "osd df") {
- string method;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
- string filter_by;
- string filter;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "filter_by", filter_by);
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "filter", filter);
- if (filter_by.empty() != filter.empty()) {
- cmdctx->reply(-EINVAL, "you must specify both 'filter_by' and 'filter'");
- return true;
- }
+ string method, filter;
+ cmd_getval(cmdctx->cmdmap, "output_method", method);
+ cmd_getval(cmdctx->cmdmap, "filter", filter);
stringstream rs;
r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pgmap) {
- string class_name;
- string item_name;
// sanity check filter(s)
- if (filter_by == "class") {
- if (!osdmap.crush->class_exists(filter)) {
- rs << "specified class '" << filter << "' does not exist";
- return -EINVAL;
- }
- class_name = filter;
- }
- if (filter_by == "name") {
- if (!osdmap.crush->name_exists(filter)) {
- rs << "specified name '" << filter << "' does not exist";
- return -EINVAL;
- }
- item_name = filter;
+ if (!filter.empty() &&
+ osdmap.lookup_pg_pool_name(filter) < 0 &&
+ !osdmap.crush->class_exists(filter) &&
+ !osdmap.crush->name_exists(filter)) {
+ rs << "'" << filter << "' not a pool, crush node or device class name";
+ return -EINVAL;
}
print_osd_utilization(osdmap, pgmap, ss,
- f.get(), method == "tree",
- class_name, item_name);
-
+ f.get(), method == "tree", filter);
cmdctx->odata.append(ss);
return 0;
});
return true;
} else if (prefix == "osd pool stats") {
string pool_name;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "pool_name", pool_name);
+ cmd_getval(cmdctx->cmdmap, "pool_name", pool_name);
int64_t poolid = -ENOENT;
bool one_pool = false;
r = cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
int r = 0;
if (prefix == "osd safe-to-destroy") {
vector<string> ids;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
+ cmd_getval(cmdctx->cmdmap, "ids", ids);
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
r = osdmap.parse_osd_id_list(ids, &osds, &ss);
});
}
} else {
int64_t id;
- if (!cmd_getval(g_ceph_context, cmdctx->cmdmap, "id", id)) {
+ if (!cmd_getval(cmdctx->cmdmap, "id", id)) {
r = -EINVAL;
ss << "must specify OSD id";
} else {
if (r) {
bool force = false;
- cmd_getval(cct, cmdctx->cmdmap, "force", force);
+ cmd_getval(cmdctx->cmdmap, "force", force);
if (!force) {
// Backward compat
- cmd_getval(cct, cmdctx->cmdmap, "yes_i_really_mean_it", force);
+ cmd_getval(cmdctx->cmdmap, "yes_i_really_mean_it", force);
}
if (!force) {
ss << "\nYou can proceed by passing --force, but be warned that"
return true;
} else if (prefix == "osd ok-to-stop") {
vector<string> ids;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
+ cmd_getval(cmdctx->cmdmap, "ids", ids);
set<int> osds;
+ int64_t max = 1;
+ cmd_getval(cmdctx->cmdmap, "max", max);
int r;
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
r = osdmap.parse_osd_id_list(ids, &osds, &ss);
ss << "must specify one or more OSDs";
r = -EINVAL;
}
+ if (max < (int)osds.size()) {
+ max = osds.size();
+ }
if (r < 0) {
cmdctx->reply(r, ss);
return true;
}
- int touched_pgs = 0;
- int dangerous_pgs = 0;
+ offline_pg_report out_report;
cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap, const PGMap& pg_map) {
- if (pg_map.num_pg_unknown > 0) {
- ss << pg_map.num_pg_unknown << " pgs have unknown state; "
- << "cannot draw any conclusions";
- r = -EAGAIN;
- return;
- }
- for (const auto& q : pg_map.pg_stat) {
- set<int32_t> pg_acting; // net acting sets (with no missing if degraded)
- bool found = false;
- if (q.second.state & PG_STATE_DEGRADED) {
- for (auto& anm : q.second.avail_no_missing) {
- if (osds.count(anm.osd)) {
- found = true;
- continue;
- }
- if (anm.osd != CRUSH_ITEM_NONE) {
- pg_acting.insert(anm.osd);
- }
- }
- } else {
- for (auto& a : q.second.acting) {
- if (osds.count(a)) {
- found = true;
- continue;
- }
- if (a != CRUSH_ITEM_NONE) {
- pg_acting.insert(a);
- }
- }
- }
- if (!found) {
- continue;
- }
- touched_pgs++;
- if (!(q.second.state & PG_STATE_ACTIVE) ||
- (q.second.state & PG_STATE_DEGRADED)) {
- ++dangerous_pgs;
- continue;
- }
- const pg_pool_t *pi = osdmap.get_pg_pool(q.first.pool());
- if (!pi) {
- ++dangerous_pgs; // pool is creating or deleting
- } else {
- if (pg_acting.size() < pi->min_size) {
- ++dangerous_pgs;
- }
- }
- }
+ _maximize_ok_to_stop_set(
+ osds, max, osdmap, pg_map,
+ &out_report);
});
- if (r) {
- cmdctx->reply(r, ss);
- return true;
+ if (!f) {
+ f.reset(Formatter::create("json"));
+ }
+ f->dump_object("ok_to_stop", out_report);
+ f->flush(cmdctx->odata);
+ cmdctx->odata.append("\n");
+ if (!out_report.unknown.empty()) {
+ ss << out_report.unknown.size() << " pgs have unknown state; "
+ << "cannot draw any conclusions";
+ cmdctx->reply(-EAGAIN, ss);
}
- if (dangerous_pgs) {
- ss << dangerous_pgs << " PGs are already too degraded, would become"
- << " too degraded or might become unavailable";
+ if (!out_report.ok_to_stop()) {
+ ss << "unsafe to stop osd(s) at this time (" << out_report.not_ok.size() << " PGs are or would become offline)";
cmdctx->reply(-EBUSY, ss);
- return true;
+ } else {
+ cmdctx->reply(0, ss);
}
- ss << "OSD(s) " << osds << " are ok to stop without reducing"
- << " availability or risking data, provided there are no other concurrent failures"
- << " or interventions." << std::endl;
- ss << touched_pgs << " PGs are likely to be"
- << " degraded (but remain available) as a result.";
- cmdctx->reply(0, ss);
return true;
} else if (prefix == "pg force-recovery" ||
prefix == "pg force-backfill" ||
if (granularity == "pg") {
// covnert pg names to pgs, discard any invalid ones while at it
vector<string> pgids;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgids);
+ cmd_getval(cmdctx->cmdmap, "pgid", pgids);
for (auto& i : pgids) {
pg_t pgid;
if (!pgid.parse(i.c_str())) {
} else {
// per pool
vector<string> pool_names;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", pool_names);
+ cmd_getval(cmdctx->cmdmap, "who", pool_names);
if (pool_names.empty()) {
ss << "must specify one or more pool names";
cmdctx->reply(-EINVAL, ss);
} else if (prefix == "config show" ||
prefix == "config show-with-defaults") {
string who;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", who);
- int r = 0;
- auto dot = who.find('.');
- DaemonKey key;
- key.first = who.substr(0, dot);
- key.second = who.substr(dot + 1);
+ cmd_getval(cmdctx->cmdmap, "who", who);
+ auto [key, valid] = DaemonKey::parse(who);
+ if (!valid) {
+ ss << "invalid daemon name: use <type>.<id>";
+ cmdctx->reply(-EINVAL, ss);
+ return true;
+ }
DaemonStatePtr daemon = daemon_state.get(key);
- string name;
if (!daemon) {
ss << "no config state for daemon " << who;
cmdctx->reply(-ENOENT, ss);
std::lock_guard l(daemon->lock);
- if (cmd_getval(g_ceph_context, cmdctx->cmdmap, "key", name)) {
+ int r = 0;
+ string name;
+ if (cmd_getval(cmdctx->cmdmap, "key", name)) {
+ // handle special options
+ if (name == "fsid") {
+ cmdctx->odata.append(stringify(monc->get_fsid()) + "\n");
+ cmdctx->reply(r, ss);
+ return true;
+ }
auto p = daemon->config.find(name);
if (p != daemon->config.end() &&
!p->second.empty()) {
tbl.define_column("DEVICE", TextTable::LEFT, TextTable::LEFT);
tbl.define_column("HOST:DEV", TextTable::LEFT, TextTable::LEFT);
tbl.define_column("DAEMONS", TextTable::LEFT, TextTable::LEFT);
+ tbl.define_column("WEAR", TextTable::RIGHT, TextTable::RIGHT);
tbl.define_column("LIFE EXPECTANCY", TextTable::LEFT, TextTable::LEFT);
auto now = ceph_clock_now();
daemon_state.with_devices([&tbl, now](const DeviceState& dev) {
string h;
- for (auto& i : dev.devnames) {
+ for (auto& i : dev.attachments) {
if (h.size()) {
h += " ";
}
- h += i.first + ":" + i.second;
+ h += std::get<0>(i) + ":" + std::get<1>(i);
}
string d;
for (auto& i : dev.daemons) {
}
d += to_string(i);
}
+ char wear_level_str[16] = {0};
+ if (dev.wear_level >= 0) {
+ snprintf(wear_level_str, sizeof(wear_level_str)-1, "%d%%",
+ (int)(100.1 * dev.wear_level));
+ }
tbl << dev.devid
<< h
<< d
+ << wear_level_str
<< dev.get_life_expectancy_str(now)
<< TextTable::endrow;
});
return true;
} else if (prefix == "device ls-by-daemon") {
string who;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", who);
- DaemonKey k;
- if (!key_from_string(who, &k)) {
+ cmd_getval(cmdctx->cmdmap, "who", who);
+ if (auto [k, valid] = DaemonKey::parse(who); !valid) {
ss << who << " is not a valid daemon name";
r = -EINVAL;
} else {
daemon_state.with_device(
i.first, [&tbl, now] (const DeviceState& dev) {
string h;
- for (auto& i : dev.devnames) {
+ for (auto& i : dev.attachments) {
if (h.size()) {
h += " ";
}
- h += i.first + ":" + i.second;
+ h += std::get<0>(i) + ":" + std::get<1>(i);
}
tbl << dev.devid
<< h
}
} else if (prefix == "device ls-by-host") {
string host;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "host", host);
+ cmd_getval(cmdctx->cmdmap, "host", host);
set<string> devids;
daemon_state.list_devids_by_server(host, &devids);
if (f) {
daemon_state.with_device(
devid, [&tbl, &host, now] (const DeviceState& dev) {
string n;
- for (auto& j : dev.devnames) {
- if (j.first == host) {
+ for (auto& j : dev.attachments) {
+ if (std::get<0>(j) == host) {
if (n.size()) {
n += " ";
}
- n += j.second;
+ n += std::get<1>(j);
}
}
string d;
return true;
} else if (prefix == "device info") {
string devid;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
+ cmd_getval(cmdctx->cmdmap, "devid", devid);
int r = 0;
ostringstream rs;
if (!daemon_state.with_device(devid,
return true;
} else if (prefix == "device set-life-expectancy") {
string devid;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
+ cmd_getval(cmdctx->cmdmap, "devid", devid);
string from_str, to_str;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "from", from_str);
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "to", to_str);
+ cmd_getval(cmdctx->cmdmap, "from", from_str);
+ cmd_getval(cmdctx->cmdmap, "to", to_str);
utime_t from, to;
if (!from.parse(from_str)) {
ss << "unable to parse datetime '" << from_str << "'";
return true;
} else if (prefix == "device rm-life-expectancy") {
string devid;
- cmd_getval(g_ceph_context, cmdctx->cmdmap, "devid", devid);
+ cmd_getval(cmdctx->cmdmap, "devid", devid);
map<string,string> meta;
if (daemon_state.with_device_write(devid, [&meta] (DeviceState& dev) {
dev.rm_life_expectancy();
return true;
}
- dout(10) << "passing through " << cmdctx->cmdmap.size() << dendl;
- finisher.queue(new FunctionContext([this, cmdctx, session, py_command, prefix]
- (int r_) mutable {
+ dout(10) << "passing through command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
+ finisher.queue(new LambdaContext([this, cmdctx, session, py_command, prefix]
+ (int r_) mutable {
std::stringstream ss;
+ dout(10) << "dispatching command '" << prefix << "' size " << cmdctx->cmdmap.size() << dendl;
+
// Validate that the module is enabled
auto& py_handler_name = py_command.module_name;
PyModuleRef module = py_modules.get_module(py_handler_name);
}
std::stringstream ds;
- bufferlist inbl = cmdctx->m->get_data();
+ bufferlist inbl = cmdctx->data;
int r = py_modules.handle_command(py_command, *session, cmdctx->cmdmap,
inbl, &ds, &ss);
if (r == -EACCES) {
cmdctx->odata.append(ds);
cmdctx->reply(r, ss);
+ dout(10) << " command returned " << r << dendl;
}));
return true;
}
while (p != pending_service_map.services.end()) {
auto q = p->second.daemons.begin();
while (q != p->second.daemons.end()) {
- DaemonKey key(p->first, q->first);
+ DaemonKey key{p->first, q->first};
if (!daemon_state.exists(key)) {
- derr << "missing key " << key << dendl;
- ++q;
- continue;
+ if (ServiceMap::is_normal_ceph_entity(p->first)) {
+ dout(10) << "daemon " << key << " in service map but not in daemon state "
+ << "index -- force pruning" << dendl;
+ q = p->second.daemons.erase(q);
+ pending_service_map_dirty = pending_service_map.epoch;
+ } else {
+ derr << "missing key " << key << dendl;
+ ++q;
+ }
+
+ continue;
}
+
auto daemon = daemon_state.get(key);
std::lock_guard l(daemon->lock);
if (daemon->last_service_beacon == utime_t()) {
}
}
- auto m = new MMonMgrReport();
+ auto m = ceph::make_message<MMonMgrReport>();
+ m->gid = monc->get_global_id();
py_modules.get_health_checks(&m->health_checks);
py_modules.get_progress_events(&m->progress_events);
jf.dump_object("health_checks", m->health_checks);
jf.flush(*_dout);
*_dout << dendl;
- if (osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+ if (osdmap.require_osd_release >= ceph_release_t::luminous) {
clog->debug() << "pgmap v" << pg_map.version << ": " << pg_map;
}
});
if (acc == accumulated.end()) {
auto collector = DaemonHealthMetricCollector::create(metric.get_type());
if (!collector) {
- derr << __func__ << " " << key.first << "." << key.second
+ derr << __func__ << " " << key
<< " sent me an unknown health metric: "
<< std::hex << static_cast<uint8_t>(metric.get_type())
<< std::dec << dendl;
// TODO? We currently do not notify the PyModules
// TODO: respect needs_send, so we send the report only if we are asked to do
// so, or the state is updated.
- monc->send_mon_message(m);
+ monc->send_mon_message(std::move(m));
}
void DaemonServer::adjust_pgs()
<< dendl;
ok = false;
}
+ vector<int32_t> source_acting;
for (auto &merge_participant : {merge_source, merge_target}) {
bool is_merge_source = merge_participant == merge_source;
if (osdmap.have_pg_upmaps(merge_participant)) {
<< ")" << dendl;
ok = false;
}
+ if (is_merge_source) {
+ source_acting = q->second.acting;
+ } else if (ok && q->second.acting != source_acting) {
+ dout(10) << "pool " << i.first
+ << " pg_num_target " << p.get_pg_num_target()
+ << " pg_num " << p.get_pg_num()
+ << (is_merge_source ? " - merge source " : " - merge target ")
+ << merge_participant
+ << " acting does not match (source " << source_acting
+ << " != target " << q->second.acting
+ << ")" << dendl;
+ ok = false;
+ }
}
if (ok) {
<< " and " << merge_target
<< ")" << dendl;
pg_num_to_set[osdmap.get_pool_name(i.first)] = target;
+ continue;
}
} else if (p.get_pg_num_target() > p.get_pg_num()) {
// pg_num increase (split)
// max_misplaced, to somewhat limit the magnitude of
// our potential error here.
int next;
-
+ static constexpr unsigned MAX_NUM_OBJECTS_PER_PG_FOR_LEAP = 1;
pool_stat_t s = pg_map.get_pg_pool_sum_stat(i.first);
if (aggro ||
// pool is (virtually) empty; just jump to final pgp_num?
(p.get_pgp_num_target() > p.get_pgp_num() &&
- s.stats.sum.num_objects <= p.get_pgp_num_target())) {
+ s.stats.sum.num_objects <= (MAX_NUM_OBJECTS_PER_PG_FOR_LEAP *
+ p.get_pgp_num_target()))) {
next = target;
} else {
double room =
max_misplaced / 2.0);
unsigned estmax = std::max<unsigned>(
(double)p.get_pg_num() * room, 1u);
- int delta = target - p.get_pgp_num();
- next = p.get_pgp_num();
- if (delta < 0) {
- next += std::max<int>(-estmax, delta);
- } else {
- next += std::min<int>(estmax, delta);
+ unsigned next_min = 0;
+ if (p.get_pgp_num() > estmax) {
+ next_min = p.get_pgp_num() - estmax;
}
+ next = std::clamp(target,
+ next_min,
+ p.get_pgp_num() + estmax);
dout(20) << " room " << room << " estmax " << estmax
- << " delta " << delta << " next " << next << dendl;
+ << " delta " << (target-p.get_pgp_num())
+ << " next " << next << dendl;
if (p.get_pgp_num_target() == p.get_pg_num_target() &&
p.get_pgp_num_target() < p.get_pg_num()) {
// since pgp_num is tracking pg_num, ceph is handling
// we just started up
dout(10) << "got initial map e" << service_map.epoch << dendl;
pending_service_map = service_map;
+ pending_service_map.epoch = service_map.epoch + 1;
} else {
// we we already active and therefore must have persisted it,
// which means ours is the same or newer.
dout(10) << "got updated map e" << service_map.epoch << dendl;
+ ceph_assert(pending_service_map.epoch > service_map.epoch);
}
- pending_service_map.epoch = service_map.epoch + 1;
});
// cull missing daemons, populate new ones
std::set<std::string> types;
- for (auto& p : pending_service_map.services) {
- types.insert(p.first);
+ for (auto& [type, service] : pending_service_map.services) {
+ if (ServiceMap::is_normal_ceph_entity(type)) {
+ continue;
+ }
+
+ types.insert(type);
std::set<std::string> names;
- for (auto& q : p.second.daemons) {
+ for (auto& q : service.daemons) {
names.insert(q.first);
- DaemonKey key(p.first, q.first);
+ DaemonKey key{type, q.first};
if (!daemon_state.exists(key)) {
auto daemon = std::make_shared<DaemonState>(daemon_state.types);
daemon->key = key;
dout(10) << "added missing " << key << dendl;
}
}
- daemon_state.cull(p.first, names);
+ daemon_state.cull(type, names);
}
daemon_state.cull_services(types);
}
auto c = new MetadataUpdate(daemon_state, key);
// FIXME remove post-nautilus: include 'id' for luminous mons
oss << "{\"prefix\": \"mgr metadata\", \"who\": \""
- << key.second << "\", \"id\": \"" << key.second << "\"}";
+ << key.name << "\", \"id\": \"" << key.name << "\"}";
monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
};
if (mgrmap.active_name.size()) {
- DaemonKey key("mgr", mgrmap.active_name);
+ DaemonKey key{"mgr", mgrmap.active_name};
have.insert(mgrmap.active_name);
if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
md_update(key);
}
}
for (auto& i : mgrmap.standbys) {
- DaemonKey key("mgr", i.second.name);
+ DaemonKey key{"mgr", i.second.name};
have.insert(i.second.name);
if (!daemon_state.exists(key) && !daemon_state.is_updating(key)) {
md_update(key);
<< daemon_connections.size() << " clients" << dendl;
// Send a fresh MMgrConfigure to all clients, so that they can follow
// the new policy for transmitting stats
- finisher.queue(new FunctionContext([this](int r) {
+ finisher.queue(new LambdaContext([this](int r) {
std::lock_guard l(lock);
for (auto &c : daemon_connections) {
_send_configure(c);
void DaemonServer::_send_configure(ConnectionRef c)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
- auto configure = new MMgrConfigure();
+ auto configure = make_message<MMgrConfigure>();
configure->stats_period = g_conf().get_val<int64_t>("mgr_stats_period");
configure->stats_threshold = g_conf().get_val<int64_t>("mgr_stats_threshold");
if (c->peer_is_osd()) {
configure->osd_perf_metric_queries =
osd_perf_metric_collector.get_queries();
+ } else if (c->peer_is_mds()) {
+ configure->metric_config_message =
+ MetricConfigMessage(MDSConfigPayload(mds_perf_metric_collector.get_queries()));
}
- c->send_message(configure);
+ c->send_message2(configure);
}
-OSDPerfMetricQueryID DaemonServer::add_osd_perf_query(
+MetricQueryID DaemonServer::add_osd_perf_query(
const OSDPerfMetricQuery &query,
const std::optional<OSDPerfMetricLimit> &limit)
{
return osd_perf_metric_collector.add_query(query, limit);
}
-int DaemonServer::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
+int DaemonServer::remove_osd_perf_query(MetricQueryID query_id)
{
return osd_perf_metric_collector.remove_query(query_id);
}
-int DaemonServer::get_osd_perf_counters(
- OSDPerfMetricQueryID query_id,
- std::map<OSDPerfMetricKey, PerformanceCounters> *counters)
+int DaemonServer::get_osd_perf_counters(OSDPerfCollector *collector)
+{
+ return osd_perf_metric_collector.get_counters(collector);
+}
+
+MetricQueryID DaemonServer::add_mds_perf_query(
+ const MDSPerfMetricQuery &query,
+ const std::optional<MDSPerfMetricLimit> &limit)
+{
+ return mds_perf_metric_collector.add_query(query, limit);
+}
+
+int DaemonServer::remove_mds_perf_query(MetricQueryID query_id)
+{
+ return mds_perf_metric_collector.remove_query(query_id);
+}
+
+int DaemonServer::get_mds_perf_counters(MDSPerfCollector *collector)
{
- return osd_perf_metric_collector.get_counters(query_id, counters);
+ return mds_perf_metric_collector.get_counters(collector);
}