#include "MgrClient.h"
#include "mgr/MgrContext.h"
+#include "mon/MonMap.h"
#include "msg/Messenger.h"
#include "messages/MMgrMap.h"
#include "messages/MMgrConfigure.h"
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
+#include "messages/MMgrCommand.h"
+#include "messages/MMgrCommandReply.h"
#include "messages/MPGStats.h"
+using std::string;
+using std::vector;
+
+using ceph::bufferlist;
+
#define dout_subsys ceph_subsys_mgrc
#undef dout_prefix
#define dout_prefix *_dout << "mgrc " << __func__ << " "
-MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_)
- : Dispatcher(cct_), cct(cct_), msgr(msgr_),
- timer(cct_, lock)
+MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_, MonMap *monmap_)
+ : Dispatcher(cct_),
+ cct(cct_),
+ msgr(msgr_),
+ monmap(monmap_),
+ timer(cct_, lock)
{
ceph_assert(cct != nullptr);
}
ceph_assert(msgr != nullptr);
timer.init();
+ initialized = true;
}
void MgrClient::shutdown()
{
- std::lock_guard l(lock);
+ std::unique_lock l(lock);
ldout(cct, 10) << dendl;
if (connect_retry_callback) {
session->con &&
HAVE_FEATURE(session->con->get_features(), SERVER_MIMIC)) {
ldout(cct, 10) << "closing mgr session" << dendl;
- MMgrClose *m = new MMgrClose();
+ auto m = make_message<MMgrClose>();
m->daemon_name = daemon_name;
m->service_name = service_name;
- session->con->send_message(m);
- utime_t timeout;
- timeout.set_from_double(cct->_conf.get_val<double>(
+ session->con->send_message2(m);
+ auto timeout = ceph::make_timespan(cct->_conf.get_val<double>(
"mgr_client_service_daemon_unregister_timeout"));
- shutdown_cond.WaitInterval(lock, timeout);
+ shutdown_cond.wait_for(l, timeout);
}
timer.shutdown();
}
}
-bool MgrClient::ms_dispatch(Message *m)
+bool MgrClient::ms_dispatch2(const ref_t<Message>& m)
{
std::lock_guard l(lock);
switch(m->get_type()) {
case MSG_MGR_MAP:
- return handle_mgr_map(static_cast<MMgrMap*>(m));
+ return handle_mgr_map(ref_cast<MMgrMap>(m));
case MSG_MGR_CONFIGURE:
- return handle_mgr_configure(static_cast<MMgrConfigure*>(m));
+ return handle_mgr_configure(ref_cast<MMgrConfigure>(m));
case MSG_MGR_CLOSE:
- return handle_mgr_close(static_cast<MMgrClose*>(m));
+ return handle_mgr_close(ref_cast<MMgrClose>(m));
case MSG_COMMAND_REPLY:
if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
- handle_command_reply(static_cast<MCommandReply*>(m));
+ MCommandReply *c = static_cast<MCommandReply*>(m.get());
+ handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
+ return true;
+ } else {
+ return false;
+ }
+ case MSG_MGR_COMMAND_REPLY:
+ if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
+ MMgrCommandReply *c = static_cast<MMgrCommandReply*>(m.get());
+ handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
return true;
} else {
return false;
void MgrClient::reconnect()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
if (session) {
ldout(cct, 4) << "Terminating session with "
return;
}
- if (last_connect_attempt != utime_t()) {
- utime_t now = ceph_clock_now();
- utime_t when = last_connect_attempt;
- when += cct->_conf.get_val<double>("mgr_connect_retry_interval");
+ if (!clock_t::is_zero(last_connect_attempt)) {
+ auto now = clock_t::now();
+ auto when = last_connect_attempt +
+ ceph::make_timespan(
+ cct->_conf.get_val<double>("mgr_connect_retry_interval"));
if (now < when) {
if (!connect_retry_callback) {
connect_retry_callback = timer.add_event_at(
when,
- new FunctionContext([this](int r){
+ new LambdaContext([this](int r){
connect_retry_callback = nullptr;
reconnect();
}));
ldout(cct, 4) << "Starting new session with " << map.get_active_addrs()
<< dendl;
- last_connect_attempt = ceph_clock_now();
+ last_connect_attempt = clock_t::now();
session.reset(new MgrSessionState());
session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR,
}
// resend any pending commands
- for (const auto &p : command_table.get_commands()) {
- auto m = p.second.get_message({});
+ auto p = command_table.get_commands().begin();
+ while (p != command_table.get_commands().end()) {
+ auto tid = p->first;
+ auto& op = p->second;
+ ldout(cct,10) << "resending " << tid << (op.tell ? " (tell)":" (cli)") << dendl;
+ MessageRef m;
+ if (op.tell) {
+ if (op.name.size() && op.name != map.active_name) {
+ ldout(cct, 10) << "active mgr " << map.active_name << " != target "
+ << op.name << dendl;
+ if (op.on_finish) {
+ op.on_finish->complete(-ENXIO);
+ }
+ ++p;
+ command_table.erase(tid);
+ continue;
+ }
+ // Set fsid argument to signal that this is really a tell message (and
+ // we are not a legacy client sending a non-tell command via MCommand).
+ m = op.get_message(monmap->fsid, false);
+ } else {
+ m = op.get_message(
+ {},
+ HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
+ }
ceph_assert(session);
ceph_assert(session->con);
session->con->send_message2(std::move(m));
+ ++p;
}
}
void MgrClient::_send_open()
{
if (session && session->con) {
- auto open = new MMgrOpen();
+ auto open = make_message<MMgrOpen>();
if (!service_name.empty()) {
open->service_name = service_name;
open->daemon_name = daemon_name;
}
cct->_conf.get_config_bl(0, &open->config_bl, &last_config_bl_version);
cct->_conf.get_defaults_bl(&open->config_defaults_bl);
- session->con->send_message(open);
+ session->con->send_message2(open);
}
}
-bool MgrClient::handle_mgr_map(MMgrMap *m)
+bool MgrClient::handle_mgr_map(ref_t<MMgrMap> m)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
ldout(cct, 20) << *m << dendl;
map = m->get_map();
ldout(cct, 4) << "Got map version " << map.epoch << dendl;
- m->put();
ldout(cct, 4) << "Active mgr is now " << map.get_active_addrs() << dendl;
if (stats_period != 0) {
report_callback = timer.add_event_after(
stats_period,
- new FunctionContext([this](int) {
+ new LambdaContext([this](int) {
_send_stats();
}));
}
void MgrClient::_send_report()
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
ceph_assert(session);
report_callback = nullptr;
- auto report = new MMgrReport();
+ auto report = make_message<MMgrReport>();
auto pcc = cct->get_perfcounters_collection();
pcc->with_counters([this, report](
daemon_dirty_status = false;
}
+ if (task_dirty_status) {
+ report->task_status = task_status;
+ task_dirty_status = false;
+ }
+
report->daemon_health_metrics = std::move(daemon_health_metrics);
cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
&last_config_bl_version);
if (get_perf_report_cb) {
- get_perf_report_cb(&report->osd_perf_metric_reports);
+ MetricPayload payload = get_perf_report_cb();
+ MetricReportMessage message(payload);
+ report->metric_report_message = message;
}
- session->con->send_message(report);
+ session->con->send_message2(report);
}
void MgrClient::send_pgstats()
}
}
-bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
+bool MgrClient::handle_mgr_configure(ref_t<MMgrConfigure> m)
{
- ceph_assert(lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
ldout(cct, 20) << *m << dendl;
if (!session) {
lderr(cct) << "dropping unexpected configure message" << dendl;
- m->put();
return true;
}
stats_threshold = m->stats_threshold;
}
- if (set_perf_queries_cb) {
- set_perf_queries_cb(m->osd_perf_metric_queries);
+ if (!m->osd_perf_metric_queries.empty()) {
+ handle_config_payload(m->osd_perf_metric_queries);
+ } else if (m->metric_config_message) {
+ const MetricConfigMessage &message = *m->metric_config_message;
+ boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
}
bool starting = (stats_period == 0) && (m->stats_period != 0);
_send_stats();
}
- m->put();
return true;
}
-bool MgrClient::handle_mgr_close(MMgrClose *m)
+bool MgrClient::handle_mgr_close(ref_t<MMgrClose> m)
{
service_daemon = false;
- shutdown_cond.Signal();
- m->put();
+ shutdown_cond.notify_all();
return true;
}
int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
- bufferlist *outbl, string *outs,
- Context *onfinish)
+ bufferlist *outbl, string *outs,
+ Context *onfinish)
{
std::lock_guard l(lock);
op.on_finish = onfinish;
if (session && session->con) {
- // Leaving fsid argument null because it isn't used.
- auto m = op.get_message({});
+ // Leaving fsid argument null because it isn't used historically, and
+ // we can use it as a signal that we are sending a non-tell command.
+ auto m = op.get_message(
+ {},
+ HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
session->con->send_message2(std::move(m));
} else {
ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl;
return 0;
}
-bool MgrClient::handle_command_reply(MCommandReply *m)
+int MgrClient::start_tell_command(
+ const string& name,
+ const vector<string>& cmd, const bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish)
{
- ceph_assert(lock.is_locked_by_me());
+ std::lock_guard l(lock);
- ldout(cct, 20) << *m << dendl;
+ ldout(cct, 20) << "target: " << name << " cmd: " << cmd << dendl;
+
+ if (map.epoch == 0 && mgr_optional) {
+ ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
+ return -EACCES;
+ }
+
+ auto &op = command_table.start_command();
+ op.tell = true;
+ op.name = name;
+ op.cmd = cmd;
+ op.inbl = inbl;
+ op.outbl = outbl;
+ op.outs = outs;
+ op.on_finish = onfinish;
+
+ if (session && session->con && (name.size() == 0 || map.active_name == name)) {
+ // Set fsid argument to signal that this is really a tell message (and
+ // we are not a legacy client sending a non-tell command via MCommand).
+ auto m = op.get_message(monmap->fsid, false);
+ session->con->send_message2(std::move(m));
+ } else {
+ ldout(cct, 5) << "no mgr session (no running mgr daemon?), or "
+ << name << " not active mgr, waiting" << dendl;
+ }
+ return 0;
+}
+
+bool MgrClient::handle_command_reply(
+ uint64_t tid,
+ bufferlist& data,
+ const std::string& rs,
+ int r)
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(lock));
+
+ ldout(cct, 20) << "tid " << tid << " r " << r << dendl;
- const auto tid = m->get_tid();
if (!command_table.exists(tid)) {
- ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
+ ldout(cct, 4) << "handle_command_reply tid " << tid
<< " not found" << dendl;
- m->put();
return true;
}
auto &op = command_table.get_command(tid);
if (op.outbl) {
- op.outbl->claim(m->get_data());
+ op.outbl->claim(data);
}
if (op.outs) {
- *(op.outs) = m->rs;
+ *(op.outs) = rs;
}
if (op.on_finish) {
- op.on_finish->complete(m->r);
+ op.on_finish->complete(r);
}
command_table.erase(tid);
-
- m->put();
return true;
}
const std::map<std::string,std::string>& metadata)
{
std::lock_guard l(lock);
- if (service == "osd" ||
- service == "mds" ||
- service == "client" ||
- service == "mon" ||
- service == "mgr") {
- // normal ceph entity types are not allowed!
- return -EINVAL;
- }
if (service_daemon) {
return -EEXIST;
}
return 0;
}
+int MgrClient::service_daemon_update_task_status(
+ std::map<std::string,std::string> &&status) {
+ std::lock_guard l(lock);
+ ldout(cct,10) << status << dendl;
+ task_status = std::move(status);
+ task_dirty_status = true;
+ return 0;
+}
+
void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
{
std::lock_guard l(lock);