#include "global/signal_handler.h"
#include "mgr/MgrContext.h"
-#include "mgr/mgr_commands.h"
-//#include "MgrPyModule.h"
#include "DaemonServer.h"
-#include "messages/MMgrBeacon.h"
#include "messages/MMgrDigest.h"
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
#include "messages/MLog.h"
#include "messages/MServiceMap.h"
-
+#include "PyModule.h"
#include "Mgr.h"
#define dout_context g_ceph_context
client(client_),
client_messenger(clientm_),
lock("Mgr::lock"),
- timer(g_ceph_context, lock),
finisher(g_ceph_context, "Mgr", "mgr-fin"),
digest_received(false),
py_module_registry(py_module_registry_),
{
daemon_state.clear_updating(key);
if (r == 0) {
- if (key.first == "mds" || key.first == "osd") {
+ if (key.first == "mds" || key.first == "osd" ||
+ key.first == "mgr" || key.first == "mon") {
json_spirit::mValue json_result;
bool read_ok = json_spirit::read(
outbl.to_str(), json_result);
<< key.first << "." << key.second << dendl;
return;
}
+ if (json_result.type() != json_spirit::obj_type) {
+ dout(1) << "mon returned valid JSON "
+ << key.first << "." << key.second
+ << " but not an object: '" << outbl.to_str() << "'" << dendl;
+ return;
+ }
dout(4) << "mon returned valid metadata JSON for "
<< key.first << "." << key.second << dendl;
json_spirit::mObject daemon_meta = json_result.get_obj();
+ // Skip daemon who doesn't have hostname yet
+ if (daemon_meta.count("hostname") == 0) {
+ dout(1) << "Skipping incomplete metadata entry for "
+ << key.first << "." << key.second << dendl;
+ return;
+ }
+
// Apply any defaults
for (const auto &i : defaults) {
if (daemon_meta.find(i.first) == daemon_meta.end()) {
DaemonStatePtr state;
if (daemon_state.exists(key)) {
state = daemon_state.get(key);
- Mutex::Locker l(state->lock);
- if (key.first == "mds") {
+ if (key.first == "mds" || key.first == "mgr" || key.first == "mon") {
daemon_meta.erase("name");
} else if (key.first == "osd") {
daemon_meta.erase("id");
}
daemon_meta.erase("hostname");
- state->metadata.clear();
+ map<string,string> m;
for (const auto &i : daemon_meta) {
- state->metadata[i.first] = i.second.get_str();
- }
+ m[i.first] = i.second.get_str();
+ }
+
+ daemon_state.update_metadata(state, m);
} else {
state = std::make_shared<DaemonState>(daemon_state.types);
state->key = key;
state->hostname = daemon_meta.at("hostname").get_str();
- if (key.first == "mds") {
+ if (key.first == "mds" || key.first == "mgr" || key.first == "mon") {
daemon_meta.erase("name");
} else if (key.first == "osd") {
daemon_meta.erase("id");
}
daemon_meta.erase("hostname");
+ map<string,string> m;
for (const auto &i : daemon_meta) {
- state->metadata[i.first] = i.second.get_str();
+ m[i.first] = i.second.get_str();
}
+ state->set_metadata(m);
daemon_state.insert(state);
}
void Mgr::background_init(Context *completion)
{
- Mutex::Locker l(lock);
- assert(!initializing);
- assert(!initialized);
+ std::lock_guard l(lock);
+ ceph_assert(!initializing);
+ ceph_assert(!initialized);
initializing = true;
finisher.start();
}));
}
+std::map<std::string, std::string> Mgr::load_store()
+{
+ ceph_assert(lock.is_locked_by_me());
+
+ dout(10) << "listing keys" << dendl;
+ JSONCommand cmd;
+ cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
+ lock.Unlock();
+ cmd.wait();
+ lock.Lock();
+ ceph_assert(cmd.r == 0);
+
+ std::map<std::string, std::string> loaded;
+
+ for (auto &key_str : cmd.json_result.get_array()) {
+ std::string const key = key_str.get_str();
+
+ dout(20) << "saw key '" << key << "'" << dendl;
+
+ const std::string config_prefix = PyModule::config_prefix;
+ const std::string device_prefix = "device/";
+
+ if (key.substr(0, config_prefix.size()) == config_prefix ||
+ key.substr(0, device_prefix.size()) == device_prefix) {
+ dout(20) << "fetching '" << key << "'" << dendl;
+ Command get_cmd;
+ std::ostringstream cmd_json;
+ cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
+ get_cmd.run(monc, cmd_json.str());
+ lock.Unlock();
+ get_cmd.wait();
+ lock.Lock();
+ if (get_cmd.r == 0) { // tolerate racing config-key change
+ if (key.substr(0, device_prefix.size()) == device_prefix) {
+ // device/
+ string devid = key.substr(device_prefix.size());
+ map<string,string> meta;
+ ostringstream ss;
+ string val = get_cmd.outbl.to_str();
+ int r = get_json_str_map(val, ss, &meta, false);
+ if (r < 0) {
+ derr << __func__ << " failed to parse " << val << ": " << ss.str()
+ << dendl;
+ } else {
+ daemon_state.with_device_create(
+ devid, [&meta] (DeviceState& dev) {
+ dev.set_metadata(std::move(meta));
+ });
+ }
+ } else {
+ // config/
+ loaded[key] = get_cmd.outbl.to_str();
+ }
+ }
+ }
+ }
+
+ return loaded;
+}
+
void Mgr::init()
{
- Mutex::Locker l(lock);
- assert(initializing);
- assert(!initialized);
+ std::lock_guard l(lock);
+ ceph_assert(initializing);
+ ceph_assert(!initialized);
// Start communicating with daemons to learn statistics etc
- int r = server.init(monc->get_global_id(), client_messenger->get_myaddr());
+ int r = server.init(monc->get_global_id(), client_messenger->get_myaddrs());
if (r < 0) {
derr << "Initialize server fail: " << cpp_strerror(r) << dendl;
// This is typically due to a bind() failure, so let's let
// systemd restart us.
exit(1);
}
- dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
+ dout(4) << "Initialized server at " << server.get_myaddrs() << dendl;
// Preload all daemon metadata (will subsequently keep this
// up to date by watching maps, so do the initial load before
lock.Lock();
// Populate PGs in ClusterState
- objecter->with_osdmap([this](const OSDMap &osd_map) {
+ cluster_state.with_osdmap_and_pgmap([this](const OSDMap &osd_map,
+ const PGMap& pg_map) {
cluster_state.notify_osdmap(osd_map);
});
dout(4) << "waiting for config-keys..." << dendl;
- // Preload config keys (`get` for plugins is to be a fast local
- // operation, we we don't have to synchronize these later because
- // all sets will come via mgr)
- auto loaded_config = load_config();
-
// Wait for MgrDigest...
dout(4) << "waiting for MgrDigest..." << dendl;
while (!digest_received) {
digest_cond.Wait(lock);
}
+ // Load module KV store
+ auto kv_store = load_store();
+
+ // Migrate config from KV store on luminous->mimic
+ // drop lock because we do blocking config sets to mon
+ lock.Unlock();
+ py_module_registry->upgrade_config(monc, kv_store);
+ lock.Lock();
+
// assume finisher already initialized in background_init
dout(4) << "starting python modules..." << dendl;
- py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc,
- clog, *objecter, *client, finisher);
+ py_module_registry->active_start(daemon_state, cluster_state,
+ kv_store, *monc, clog, audit_clog, *objecter, *client,
+ finisher, server);
dout(4) << "Complete." << dendl;
initializing = false;
void Mgr::load_all_metadata()
{
- assert(lock.is_locked_by_me());
+ ceph_assert(lock.is_locked_by_me());
JSONCommand mds_cmd;
mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
mon_cmd.wait();
lock.Lock();
- assert(mds_cmd.r == 0);
- assert(mon_cmd.r == 0);
- assert(osd_cmd.r == 0);
+ ceph_assert(mds_cmd.r == 0);
+ ceph_assert(mon_cmd.r == 0);
+ ceph_assert(osd_cmd.r == 0);
for (auto &metadata_val : mds_cmd.json_result.get_array()) {
json_spirit::mObject daemon_meta = metadata_val.get_obj();
daemon_meta.erase("name");
daemon_meta.erase("hostname");
+ map<string,string> m;
for (const auto &i : daemon_meta) {
- dm->metadata[i.first] = i.second.get_str();
+ m[i.first] = i.second.get_str();
}
+ dm->set_metadata(m);
daemon_state.insert(dm);
}
osd_metadata.erase("id");
osd_metadata.erase("hostname");
+ map<string,string> m;
for (const auto &i : osd_metadata) {
- dm->metadata[i.first] = i.second.get_str();
+ m[i.first] = i.second.get_str();
}
+ dm->set_metadata(m);
daemon_state.insert(dm);
}
}
-std::map<std::string, std::string> Mgr::load_config()
-{
- assert(lock.is_locked_by_me());
-
- dout(10) << "listing keys" << dendl;
- JSONCommand cmd;
- cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
- lock.Unlock();
- cmd.wait();
- lock.Lock();
- assert(cmd.r == 0);
-
- std::map<std::string, std::string> loaded;
-
- for (auto &key_str : cmd.json_result.get_array()) {
- std::string const key = key_str.get_str();
- dout(20) << "saw key '" << key << "'" << dendl;
-
- const std::string config_prefix = PyModuleRegistry::config_prefix;
-
- if (key.substr(0, config_prefix.size()) == config_prefix) {
- dout(20) << "fetching '" << key << "'" << dendl;
- Command get_cmd;
- std::ostringstream cmd_json;
- cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
- get_cmd.run(monc, cmd_json.str());
- lock.Unlock();
- get_cmd.wait();
- lock.Lock();
- assert(get_cmd.r == 0);
- loaded[key] = get_cmd.outbl.to_str();
- }
- }
-
- return loaded;
-}
void Mgr::shutdown()
{
finisher.queue(new FunctionContext([&](int) {
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
monc->sub_unwant("log-info");
monc->sub_unwant("mgrdigest");
monc->sub_unwant("fsmap");
void Mgr::handle_osd_map()
{
- assert(lock.is_locked_by_me());
+ ceph_assert(lock.is_locked_by_me());
std::set<std::string> names_exist;
* see if they have changed (service restart), and if so
* reload the metadata.
*/
- objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
- for (unsigned int osd_id = 0; osd_id < osd_map.get_max_osd(); ++osd_id) {
+ cluster_state.with_osdmap_and_pgmap([this, &names_exist](const OSDMap &osd_map,
+ const PGMap &pg_map) {
+ for (int osd_id = 0; osd_id < osd_map.get_max_osd(); ++osd_id) {
if (!osd_map.exists(osd_id)) {
continue;
}
if (daemon_state.exists(k)) {
auto metadata = daemon_state.get(k);
- Mutex::Locker l(metadata->lock);
+ std::lock_guard l(metadata->lock);
auto addr_iter = metadata->metadata.find("front_addr");
if (addr_iter != metadata->metadata.end()) {
const std::string &metadata_addr = addr_iter->second;
- const auto &map_addr = osd_map.get_addr(osd_id);
+ const auto &map_addrs = osd_map.get_addrs(osd_id);
- if (metadata_addr != stringify(map_addr)) {
+ if (metadata_addr != stringify(map_addrs)) {
dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
- << " != " << stringify(map_addr) << dendl;
+ << " != " << stringify(map_addrs) << dendl;
update_meta = true;
} else {
dout(20) << "OSD[" << osd_id << "] addr unchanged: "
}
if (update_meta) {
- daemon_state.notify_updating(k);
auto c = new MetadataUpdate(daemon_state, k);
std::ostringstream cmd;
cmd << "{\"prefix\": \"osd metadata\", \"id\": "
server.got_service_map();
}
+void Mgr::handle_mon_map()
+{
+ dout(20) << __func__ << dendl;
+ assert(lock.is_locked_by_me());
+ std::set<std::string> names_exist;
+ cluster_state.with_monmap([&] (auto &monmap) {
+ for (unsigned int i = 0; i < monmap.size(); i++) {
+ names_exist.insert(monmap.get_name(i));
+ }
+ });
+ daemon_state.cull("mon", names_exist);
+}
+
bool Mgr::ms_dispatch(Message *m)
{
dout(4) << *m << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
switch (m->get_type()) {
case MSG_MGR_DIGEST:
break;
case CEPH_MSG_MON_MAP:
py_module_registry->notify_all("mon_map", "");
+ handle_mon_map();
m->put();
break;
case CEPH_MSG_FS_MAP:
m->put();
break;
case MSG_SERVICE_MAP:
- handle_service_map((MServiceMap*)m);
+ handle_service_map(static_cast<MServiceMap*>(m));
py_module_registry->notify_all("service_map", "");
m->put();
break;
void Mgr::handle_fs_map(MFSMap* m)
{
- assert(lock.is_locked_by_me());
+ ceph_assert(lock.is_locked_by_me());
std::set<std::string> names_exist;
bool update = false;
if (daemon_state.exists(k)) {
auto metadata = daemon_state.get(k);
- Mutex::Locker l(metadata->lock);
+ std::lock_guard l(metadata->lock);
if (metadata->metadata.empty() ||
metadata->metadata.count("addr") == 0) {
update = true;
} else {
- auto metadata_addr = metadata->metadata.at("addr");
- const auto map_addr = info.addr;
- update = metadata_addr != stringify(map_addr);
+ auto metadata_addrs = metadata->metadata.at("addr");
+ const auto map_addrs = info.addrs;
+ update = metadata_addrs != stringify(map_addrs);
if (update) {
- dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
- << " != " << stringify(map_addr) << dendl;
+ dout(4) << "MDS[" << info.name << "] addr change " << metadata_addrs
+ << " != " << stringify(map_addrs) << dendl;
}
}
} else {
}
if (update) {
- daemon_state.notify_updating(k);
auto c = new MetadataUpdate(daemon_state, k);
// Older MDS daemons don't have addr in the metadata, so
// fake it if the returned metadata doesn't have the field.
- c->set_default("addr", stringify(info.addr));
+ c->set_default("addr", stringify(info.addrs));
std::ostringstream cmd;
cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
bool Mgr::got_mgr_map(const MgrMap& m)
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
dout(10) << m << dendl;
set<string> old_modules;
}
cluster_state.set_mgr_map(m);
+ server.got_mgr_map();
return false;
}
}
}
-void Mgr::tick()
-{
- dout(10) << dendl;
- server.send_report();
-}
-
-std::vector<MonCommand> Mgr::get_command_set() const
-{
- Mutex::Locker l(lock);
-
- std::vector<MonCommand> commands = mgr_commands;
- std::vector<MonCommand> py_commands = py_module_registry->get_commands();
- commands.insert(commands.end(), py_commands.begin(), py_commands.end());
- return commands;
-}
-
std::map<std::string, std::string> Mgr::get_services() const
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
return py_module_registry->get_services();
}