#include "messages/MCommandReply.h"
#include "messages/MLog.h"
#include "messages/MServiceMap.h"
+#include "messages/MKVData.h"
#include "PyModule.h"
#include "Mgr.h"
}
}
- DaemonStatePtr state;
if (daemon_state.exists(key)) {
- state = daemon_state.get(key);
+ DaemonStatePtr state = daemon_state.get(key);
state->hostname = daemon_meta.at("hostname").get_str();
if (key.type == "mds" || key.type == "mgr" || key.type == "mon") {
}
daemon_meta.erase("hostname");
map<string,string> m;
- for (const auto &i : daemon_meta) {
- m[i.first] = i.second.get_str();
+ for (const auto &[key, val] : daemon_meta) {
+ m.emplace(key, val.get_str());
}
-
daemon_state.update_metadata(state, m);
} else {
- state = std::make_shared<DaemonState>(daemon_state.types);
+ auto state = std::make_shared<DaemonState>(daemon_state.types);
state->key = key;
state->hostname = daemon_meta.at("hostname").get_str();
daemon_meta.erase("hostname");
map<string,string> m;
- for (const auto &i : daemon_meta) {
- m[i.first] = i.second.get_str();
+ for (const auto &[key, val] : daemon_meta) {
+ m.emplace(key, val.get_str());
}
state->set_metadata(m);
dout(20) << "saw key '" << key << "'" << dendl;
- const std::string config_prefix = PyModule::config_prefix;
+ const std::string store_prefix = PyModule::mgr_store_prefix;
const std::string device_prefix = "device/";
- if (key.substr(0, config_prefix.size()) == config_prefix ||
- key.substr(0, device_prefix.size()) == device_prefix) {
+ if (key.substr(0, device_prefix.size()) == device_prefix ||
+ key.substr(0, store_prefix.size()) == store_prefix) {
dout(20) << "fetching '" << key << "'" << dendl;
Command get_cmd;
std::ostringstream cmd_json;
get_cmd.wait();
lock.lock();
if (get_cmd.r == 0) { // tolerate racing config-key change
- if (key.substr(0, device_prefix.size()) == device_prefix) {
- // device/
- string devid = key.substr(device_prefix.size());
- map<string,string> meta;
- ostringstream ss;
- string val = get_cmd.outbl.to_str();
- int r = get_json_str_map(val, ss, &meta, false);
- if (r < 0) {
- derr << __func__ << " failed to parse " << val << ": " << ss.str()
- << dendl;
- } else {
- daemon_state.with_device_create(
- devid, [&meta] (DeviceState& dev) {
- dev.set_metadata(std::move(meta));
- });
- }
- } else {
- // config/
- loaded[key] = get_cmd.outbl.to_str();
- }
+ loaded[key] = get_cmd.outbl.to_str();
}
}
}
derr << " *** Got signal " << sig_str(signum) << " ***" << dendl;
// The python modules don't reliably shut down, so don't even
- // try. The mon will blacklist us (and all of our rados/cephfs
+ // try. The mon will blocklist us (and all of our rados/cephfs
// clients) anyway. Just exit!
_exit(0); // exit with 0 result code, as if we had done an orderly shutdown
register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
+ // Only pacific+ monitors support subscribe to kv updates
+ bool mon_allows_kv_sub = false;
+ monc->with_monmap(
+ [&](const MonMap &monmap) {
+ if (monmap.get_required_features().contains_all(
+ ceph::features::mon::FEATURE_PACIFIC)) {
+ mon_allows_kv_sub = true;
+ }
+ });
+ if (!mon_allows_kv_sub) {
+ // mons are still pre-pacific. wait long enough to ensure our
+ // next beacon is processed so that our module options are
+ // propagated. See https://tracker.ceph.com/issues/49778
+ lock.unlock();
+ dout(10) << "waiting a bit for the pre-pacific mon to process our beacon" << dendl;
+ sleep(g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count() * 3);
+ lock.lock();
+ }
+
// subscribe to all the maps
monc->sub_want("log-info", 0, 0);
monc->sub_want("mgrdigest", 0, 0);
monc->sub_want("fsmap", 0, 0);
monc->sub_want("servicemap", 0, 0);
+ if (mon_allows_kv_sub) {
+ monc->sub_want("kv:config/", 0, 0);
+ monc->sub_want("kv:mgr/", 0, 0);
+ monc->sub_want("kv:device/", 0, 0);
+ }
dout(4) << "waiting for OSDMap..." << dendl;
// Subscribe to OSDMap update to pass on to ClusterState
cluster_state.with_mgrmap([&e](const MgrMap& m) {
e = m.last_failure_osd_epoch;
});
- /* wait for any blacklists to be applied to previous mgr instance */
+ /* wait for any blocklists to be applied to previous mgr instance */
dout(4) << "Waiting for new OSDMap (e=" << e
- << ") that may blacklist prior active." << dendl;
+ << ") that may blocklist prior active." << dendl;
objecter->wait_for_osd_map(e);
lock.lock();
dout(4) << "waiting for FSMap..." << dendl;
fs_map_cond.wait(l, [this] { return cluster_state.have_fsmap();});
- dout(4) << "waiting for config-keys..." << dendl;
-
// Wait for MgrDigest...
dout(4) << "waiting for MgrDigest..." << dendl;
digest_cond.wait(l, [this] { return digest_received; });
- // Load module KV store
- auto kv_store = load_store();
-
- // Migrate config from KV store on luminous->mimic
- // drop lock because we do blocking config sets to mon
- lock.unlock();
- py_module_registry->upgrade_config(monc, kv_store);
- lock.lock();
+ if (!mon_allows_kv_sub) {
+ dout(4) << "loading config-key data from pre-pacific mon cluster..." << dendl;
+ pre_init_store = load_store();
+ }
+ dout(4) << "initializing device state..." << dendl;
+ // Note: we only have to do this during startup because once we are
+ // active the only changes to this state will originate from one of our
+ // own modules.
+ for (auto p = pre_init_store.lower_bound("device/");
+ p != pre_init_store.end() && p->first.find("device/") == 0;
+ ++p) {
+ string devid = p->first.substr(7);
+ dout(10) << " updating " << devid << dendl;
+ map<string,string> meta;
+ ostringstream ss;
+ int r = get_json_str_map(p->second, ss, &meta, false);
+ if (r < 0) {
+ derr << __func__ << " failed to parse " << p->second << ": " << ss.str()
+ << dendl;
+ } else {
+ daemon_state.with_device_create(
+ devid, [&meta] (DeviceState& dev) {
+ dev.set_metadata(std::move(meta));
+ });
+ }
+ }
+
// assume finisher already initialized in background_init
dout(4) << "starting python modules..." << dendl;
- py_module_registry->active_start(daemon_state, cluster_state,
- kv_store, *monc, clog, audit_clog, *objecter, *client,
- finisher, server);
+ py_module_registry->active_start(
+ daemon_state, cluster_state,
+ pre_init_store, mon_allows_kv_sub,
+ *monc, clog, audit_clog, *objecter, *client,
+ finisher, server);
cluster_state.final_init();
daemon_meta.erase("name");
daemon_meta.erase("hostname");
- for (const auto &i : daemon_meta) {
- dm->metadata[i.first] = i.second.get_str();
+ for (const auto &[key, val] : daemon_meta) {
+ dm->metadata.emplace(key, val.get_str());
}
daemon_state.insert(dm);
daemon_meta.erase("hostname");
map<string,string> m;
- for (const auto &i : daemon_meta) {
- m[i.first] = i.second.get_str();
+ for (const auto &[key, val] : daemon_meta) {
+ m.emplace(key, val.get_str());
}
dm->set_metadata(m);
void Mgr::handle_service_map(ref_t<MServiceMap> m)
{
dout(10) << "e" << m->service_map.epoch << dendl;
+ monc->sub_got("servicemap", m->service_map.epoch);
cluster_state.set_service_map(m->service_map);
server.got_service_map();
}
names_exist.insert(monmap.get_name(i));
}
});
+ for (const auto& name : names_exist) {
+ const auto k = DaemonKey{"mon", name};
+ if (daemon_state.is_updating(k)) {
+ continue;
+ }
+ auto c = new MetadataUpdate(daemon_state, k);
+ const char* cmd = R"({{"prefix": "mon metadata", "id": "{}"}})";
+ monc->start_mon_command({fmt::format(cmd, name)}, {},
+ &c->outbl, &c->outs, c);
+ }
daemon_state.cull("mon", names_exist);
}
py_module_registry->notify_all("fs_map", "");
handle_fs_map(ref_cast<MFSMap>(m));
return false; // I shall let this pass through for Client
- break;
case CEPH_MSG_OSD_MAP:
handle_osd_map();
case MSG_LOG:
handle_log(ref_cast<MLog>(m));
break;
+ case MSG_KV_DATA:
+ {
+ auto msg = ref_cast<MKVData>(m);
+ monc->sub_got("kv:"s + msg->prefix, msg->version);
+ if (!msg->data.empty()) {
+ if (initialized) {
+ py_module_registry->update_kv_data(
+ msg->prefix,
+ msg->incremental,
+ msg->data
+ );
+ } else {
+ // before we have created the ActivePyModules, we need to
+ // track the store regions we're monitoring
+ if (!msg->incremental) {
+ dout(10) << "full update on " << msg->prefix << dendl;
+ auto p = pre_init_store.lower_bound(msg->prefix);
+ while (p != pre_init_store.end() && p->first.find(msg->prefix) == 0) {
+ dout(20) << " rm prior " << p->first << dendl;
+ p = pre_init_store.erase(p);
+ }
+ } else {
+ dout(10) << "incremental update on " << msg->prefix << dendl;
+ }
+ for (auto& i : msg->data) {
+ if (i.second) {
+ dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl;
+ pre_init_store[i.first] = i.second->to_str();
+ } else {
+ dout(20) << " rm " << i.first << dendl;
+ pre_init_store.erase(i.first);
+ }
+ }
+ }
+ }
+ }
+ break;
default:
return false;
ceph_assert(ceph_mutex_is_locked_by_me(lock));
std::set<std::string> names_exist;
-
const FSMap &new_fsmap = m->get_fsmap();
+ monc->sub_got("fsmap", m->epoch);
+
fs_map_cond.notify_all();
// TODO: callers (e.g. from python land) are potentially going to see