#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
#include "messages/MLog.h"
+#include "messages/MServiceMap.h"
#include "Mgr.h"
#define dout_prefix *_dout << "mgr " << __func__ << " "
-Mgr::Mgr(MonClient *monc_, Messenger *clientm_, Objecter *objecter_,
+Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
+ Messenger *clientm_, Objecter *objecter_,
Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
monc(monc_),
objecter(objecter_),
lock("Mgr::lock"),
timer(g_ceph_context, lock),
finisher(g_ceph_context, "Mgr", "mgr-fin"),
- py_modules(daemon_state, cluster_state, *monc, *objecter, *client,
+ digest_received(false),
+ py_modules(daemon_state, cluster_state, *monc, clog_, *objecter, *client,
finisher),
- cluster_state(monc, nullptr),
+ cluster_state(monc, nullptr, mgrmap),
server(monc, finisher, daemon_state, cluster_state, py_modules,
clog_, audit_clog_),
initialized(false),
{
daemon_state.clear_updating(key);
if (r == 0) {
- if (key.first == CEPH_ENTITY_TYPE_MDS) {
+ if (key.first == "mds") {
json_spirit::mValue json_result;
bool read_ok = json_spirit::read(
outbl.to_str(), json_result);
if (!read_ok) {
dout(1) << "mon returned invalid JSON for "
- << ceph_entity_type_name(key.first)
- << "." << key.second << dendl;
+ << key.first << "." << key.second << dendl;
return;
}
daemon_state.insert(state);
}
- } else if (key.first == CEPH_ENTITY_TYPE_OSD) {
+ } else if (key.first == "osd") {
} else {
ceph_abort();
}
} else {
dout(1) << "mon failed to return metadata for "
- << ceph_entity_type_name(key.first)
- << "." << key.second << ": " << cpp_strerror(r) << dendl;
+ << key.first << "." << key.second << ": "
+ << cpp_strerror(r) << dendl;
}
}
};
-void Mgr::background_init()
+void Mgr::background_init(Context *completion)
{
Mutex::Locker l(lock);
assert(!initializing);
finisher.start();
- finisher.queue(new FunctionContext([this](int r){
+ finisher.queue(new FunctionContext([this, completion](int r){
init();
+ completion->complete(0);
}));
}
monc->sub_want("log-info", 0, 0);
monc->sub_want("mgrdigest", 0, 0);
monc->sub_want("fsmap", 0, 0);
+ monc->sub_want("servicemap", 0, 0);
dout(4) << "waiting for OSDMap..." << dendl;
// Subscribe to OSDMap update to pass on to ClusterState
// all sets will come via mgr)
load_config();
- // Wait for MgrDigest...?
- // TODO
+ // Wait for MgrDigest...
+ dout(4) << "waiting for MgrDigest..." << dendl;
+ while (!digest_received) {
+ digest_cond.Wait(lock);
+ }
// assume finisher already initialized in background_init
-
+ dout(4) << "starting PyModules..." << dendl;
py_modules.init();
py_modules.start();
}
DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
- dm->key = DaemonKey(CEPH_ENTITY_TYPE_MDS,
+ dm->key = DaemonKey("mds",
daemon_meta.at("name").get_str());
dm->hostname = daemon_meta.at("hostname").get_str();
}
DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
- dm->key = DaemonKey(CEPH_ENTITY_TYPE_MON,
+ dm->key = DaemonKey("mon",
daemon_meta.at("name").get_str());
dm->hostname = daemon_meta.at("hostname").get_str();
dout(4) << osd_metadata.at("hostname").get_str() << dendl;
DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
- dm->key = DaemonKey(CEPH_ENTITY_TYPE_OSD,
+ dm->key = DaemonKey("osd",
stringify(osd_metadata.at("id").get_int()));
dm->hostname = osd_metadata.at("hostname").get_str();
std::ostringstream cmd_json;
cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
get_cmd.run(monc, cmd_json.str());
+ lock.Unlock();
get_cmd.wait();
+ lock.Lock();
assert(get_cmd.r == 0);
-
loaded[key] = get_cmd.outbl.to_str();
}
}
// Consider whether to update the daemon metadata (new/restarted daemon)
bool update_meta = false;
- const auto k = DaemonKey(CEPH_ENTITY_TYPE_OSD, stringify(osd_id));
+ const auto k = DaemonKey("osd", stringify(osd_id));
if (daemon_state.is_updating(k)) {
continue;
}
});
// TODO: same culling for MonMap
- daemon_state.cull(CEPH_ENTITY_TYPE_OSD, names_exist);
+ daemon_state.cull("osd", names_exist);
}
void Mgr::handle_log(MLog *m)
m->put();
}
+void Mgr::handle_service_map(MServiceMap *m)
+{
+ dout(10) << "e" << m->service_map.epoch << dendl;
+ cluster_state.set_service_map(m->service_map);
+ server.got_service_map();
+}
+
bool Mgr::ms_dispatch(Message *m)
{
dout(4) << *m << dendl;
handle_mgr_digest(static_cast<MMgrDigest*>(m));
break;
case CEPH_MSG_MON_MAP:
- // FIXME: we probably never get called here because MonClient
- // has consumed the message. For consuming OSDMap we need
- // to be the tail dispatcher, but to see MonMap we would
- // need to be at the head.
- // Result is that ClusterState has access to monmap (it reads
- // from monclient anyway), but we don't see notifications. Hook
- // into MonClient to get notifications instead of messing
- // with message delivery to achieve it?
- ceph_abort();
-
py_modules.notify_all("mon_map", "");
m->put();
break;
objecter->maybe_request_map();
m->put();
break;
+ case MSG_SERVICE_MAP:
+ handle_service_map((MServiceMap*)m);
+ py_modules.notify_all("service_map", "");
+ m->put();
+ break;
case MSG_LOG:
handle_log(static_cast<MLog *>(m));
break;
// Remember which MDS exists so that we can cull any that don't
names_exist.insert(info.name);
- const auto k = DaemonKey(CEPH_ENTITY_TYPE_MDS, info.name);
+ const auto k = DaemonKey("mds", info.name);
if (daemon_state.is_updating(k)) {
continue;
}
{}, &c->outbl, &c->outs, c);
}
}
- daemon_state.cull(CEPH_ENTITY_TYPE_MDS, names_exist);
+ daemon_state.cull("mds", names_exist);
}
+bool Mgr::got_mgr_map(const MgrMap& m)
+{
+ Mutex::Locker l(lock);
+ dout(10) << m << dendl;
+
+ set<string> old_modules;
+ cluster_state.with_mgrmap([&](const MgrMap& m) {
+ old_modules = m.modules;
+ });
+ if (m.modules != old_modules) {
+ derr << "mgrmap module list changed to (" << m.modules << "), respawn"
+ << dendl;
+ return true;
+ }
+
+ cluster_state.set_mgr_map(m);
+
+ return false;
+}
void Mgr::handle_mgr_digest(MMgrDigest* m)
{
dout(10) << "done." << dendl;
m->put();
+
+ if (!digest_received) {
+ digest_received = true;
+ digest_cond.Signal();
+ }
}
+void Mgr::tick()
+{
+ dout(10) << dendl;
+ server.send_report();
+}