]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/Mgr.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / mgr / Mgr.cc
index b36320edb1377e9a7002e05df0e30b755b2c061d..4b2cdf7c53685b506356802a06f1a2dcc4b95e12 100644 (file)
@@ -29,6 +29,7 @@
 #include "messages/MCommandReply.h"
 #include "messages/MLog.h"
 #include "messages/MServiceMap.h"
+#include "messages/MKVData.h"
 #include "PyModule.h"
 #include "Mgr.h"
 
@@ -100,9 +101,8 @@ void MetadataUpdate::finish(int r)
         }
       }
 
-      DaemonStatePtr state;
       if (daemon_state.exists(key)) {
-        state = daemon_state.get(key);
+        DaemonStatePtr state = daemon_state.get(key);
         state->hostname = daemon_meta.at("hostname").get_str();
 
         if (key.type == "mds" || key.type == "mgr" || key.type == "mon") {
@@ -112,13 +112,12 @@ void MetadataUpdate::finish(int r)
         }
         daemon_meta.erase("hostname");
        map<string,string> m;
-        for (const auto &i : daemon_meta) {
-          m[i.first] = i.second.get_str();
+        for (const auto &[key, val] : daemon_meta) {
+          m.emplace(key, val.get_str());
        }
-
        daemon_state.update_metadata(state, m);
       } else {
-        state = std::make_shared<DaemonState>(daemon_state.types);
+        auto state = std::make_shared<DaemonState>(daemon_state.types);
         state->key = key;
         state->hostname = daemon_meta.at("hostname").get_str();
 
@@ -130,8 +129,8 @@ void MetadataUpdate::finish(int r)
         daemon_meta.erase("hostname");
 
        map<string,string> m;
-        for (const auto &i : daemon_meta) {
-          m[i.first] = i.second.get_str();
+        for (const auto &[key, val] : daemon_meta) {
+          m.emplace(key, val.get_str());
         }
        state->set_metadata(m);
 
@@ -180,11 +179,11 @@ std::map<std::string, std::string> Mgr::load_store()
     
     dout(20) << "saw key '" << key << "'" << dendl;
 
-    const std::string config_prefix = PyModule::config_prefix;
+    const std::string store_prefix = PyModule::mgr_store_prefix;
     const std::string device_prefix = "device/";
 
-    if (key.substr(0, config_prefix.size()) == config_prefix ||
-       key.substr(0, device_prefix.size()) == device_prefix) {
+    if (key.substr(0, device_prefix.size()) == device_prefix ||
+       key.substr(0, store_prefix.size()) == store_prefix) {
       dout(20) << "fetching '" << key << "'" << dendl;
       Command get_cmd;
       std::ostringstream cmd_json;
@@ -194,26 +193,7 @@ std::map<std::string, std::string> Mgr::load_store()
       get_cmd.wait();
       lock.lock();
       if (get_cmd.r == 0) { // tolerate racing config-key change
-       if (key.substr(0, device_prefix.size()) == device_prefix) {
-         // device/
-         string devid = key.substr(device_prefix.size());
-         map<string,string> meta;
-         ostringstream ss;
-         string val = get_cmd.outbl.to_str();
-         int r = get_json_str_map(val, ss, &meta, false);
-         if (r < 0) {
-           derr << __func__ << " failed to parse " << val << ": " << ss.str()
-                << dendl;
-         } else {
-           daemon_state.with_device_create(
-             devid, [&meta] (DeviceState& dev) {
-               dev.set_metadata(std::move(meta));
-             });
-         }
-       } else {
-         // config/
-         loaded[key] = get_cmd.outbl.to_str();
-       }
+       loaded[key] = get_cmd.outbl.to_str();
       }
     }
   }
@@ -232,7 +212,7 @@ static void handle_mgr_signal(int signum)
   derr << " *** Got signal " << sig_str(signum) << " ***" << dendl;
 
   // The python modules don't reliably shut down, so don't even
-  // try. The mon will blacklist us (and all of our rados/cephfs
+  // try. The mon will blocklist us (and all of our rados/cephfs
   // clients) anyway. Just exit!
 
   _exit(0);  // exit with 0 result code, as if we had done an orderly shutdown
@@ -248,11 +228,35 @@ void Mgr::init()
   register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
   register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
 
+  // Only pacific+ monitors support subscribe to kv updates
+  bool mon_allows_kv_sub = false;
+  monc->with_monmap(
+    [&](const MonMap &monmap) {
+      if (monmap.get_required_features().contains_all(
+           ceph::features::mon::FEATURE_PACIFIC)) {
+       mon_allows_kv_sub = true;
+      }
+    });
+  if (!mon_allows_kv_sub) {
+    // mons are still pre-pacific.  wait long enough to ensure our
+    // next beacon is processed so that our module options are
+    // propagated.  See https://tracker.ceph.com/issues/49778
+    lock.unlock();
+    dout(10) << "waiting a bit for the pre-pacific mon to process our beacon" << dendl;
+    sleep(g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count() * 3);
+    lock.lock();
+  }
+
   // subscribe to all the maps
   monc->sub_want("log-info", 0, 0);
   monc->sub_want("mgrdigest", 0, 0);
   monc->sub_want("fsmap", 0, 0);
   monc->sub_want("servicemap", 0, 0);
+  if (mon_allows_kv_sub) {
+    monc->sub_want("kv:config/", 0, 0);
+    monc->sub_want("kv:mgr/", 0, 0);
+    monc->sub_want("kv:device/", 0, 0);
+  }
 
   dout(4) << "waiting for OSDMap..." << dendl;
   // Subscribe to OSDMap update to pass on to ClusterState
@@ -269,9 +273,9 @@ void Mgr::init()
   cluster_state.with_mgrmap([&e](const MgrMap& m) {
     e = m.last_failure_osd_epoch;
   });
-  /* wait for any blacklists to be applied to previous mgr instance */
+  /* wait for any blocklists to be applied to previous mgr instance */
   dout(4) << "Waiting for new OSDMap (e=" << e
-          << ") that may blacklist prior active." << dendl;
+          << ") that may blocklist prior active." << dendl;
   objecter->wait_for_osd_map(e);
   lock.lock();
 
@@ -301,26 +305,45 @@ void Mgr::init()
   dout(4) << "waiting for FSMap..." << dendl;
   fs_map_cond.wait(l, [this] { return cluster_state.have_fsmap();});
 
-  dout(4) << "waiting for config-keys..." << dendl;
-
   // Wait for MgrDigest...
   dout(4) << "waiting for MgrDigest..." << dendl;
   digest_cond.wait(l, [this] { return digest_received; });
 
-  // Load module KV store
-  auto kv_store = load_store();
-
-  // Migrate config from KV store on luminous->mimic
-  // drop lock because we do blocking config sets to mon
-  lock.unlock();
-  py_module_registry->upgrade_config(monc, kv_store);
-  lock.lock();
+  if (!mon_allows_kv_sub) {
+    dout(4) << "loading config-key data from pre-pacific mon cluster..." << dendl;
+    pre_init_store = load_store();
+  }
 
+  dout(4) << "initializing device state..." << dendl;
+  // Note: we only have to do this during startup because once we are
+  // active the only changes to this state will originate from one of our
+  // own modules.
+  for (auto p = pre_init_store.lower_bound("device/");
+       p != pre_init_store.end() && p->first.find("device/") == 0;
+       ++p) {
+    string devid = p->first.substr(7);
+    dout(10) << "  updating " << devid << dendl;
+    map<string,string> meta;
+    ostringstream ss;
+    int r = get_json_str_map(p->second, ss, &meta, false);
+    if (r < 0) {
+      derr << __func__ << " failed to parse " << p->second << ": " << ss.str()
+          << dendl;
+    } else {
+      daemon_state.with_device_create(
+       devid, [&meta] (DeviceState& dev) {
+                dev.set_metadata(std::move(meta));
+              });
+    }
+  }
+  
   // assume finisher already initialized in background_init
   dout(4) << "starting python modules..." << dendl;
-  py_module_registry->active_start(daemon_state, cluster_state,
-      kv_store, *monc, clog, audit_clog, *objecter, *client,
-      finisher, server);
+  py_module_registry->active_start(
+    daemon_state, cluster_state,
+    pre_init_store, mon_allows_kv_sub,
+    *monc, clog, audit_clog, *objecter, *client,
+    finisher, server);
 
   cluster_state.final_init();
 
@@ -371,8 +394,8 @@ void Mgr::load_all_metadata()
     daemon_meta.erase("name");
     daemon_meta.erase("hostname");
 
-    for (const auto &i : daemon_meta) {
-      dm->metadata[i.first] = i.second.get_str();
+    for (const auto &[key, val] : daemon_meta) {
+      dm->metadata.emplace(key, val.get_str());
     }
 
     daemon_state.insert(dm);
@@ -394,8 +417,8 @@ void Mgr::load_all_metadata()
     daemon_meta.erase("hostname");
 
     map<string,string> m;
-    for (const auto &i : daemon_meta) {
-      m[i.first] = i.second.get_str();
+    for (const auto &[key, val] : daemon_meta) {
+      m.emplace(key, val.get_str());
     }
     dm->set_metadata(m);
 
@@ -515,6 +538,7 @@ void Mgr::handle_log(ref_t<MLog> m)
 void Mgr::handle_service_map(ref_t<MServiceMap> m)
 {
   dout(10) << "e" << m->service_map.epoch << dendl;
+  monc->sub_got("servicemap", m->service_map.epoch);
   cluster_state.set_service_map(m->service_map);
   server.got_service_map();
 }
@@ -529,6 +553,16 @@ void Mgr::handle_mon_map()
       names_exist.insert(monmap.get_name(i));
     }
   });
+  for (const auto& name : names_exist) {
+    const auto k = DaemonKey{"mon", name};
+    if (daemon_state.is_updating(k)) {
+      continue;
+    }
+    auto c = new MetadataUpdate(daemon_state, k);
+    const char* cmd = R"({{"prefix": "mon metadata", "id": "{}"}})";
+    monc->start_mon_command({fmt::format(cmd, name)}, {},
+                           &c->outbl, &c->outs, c);
+  }
   daemon_state.cull("mon", names_exist);
 }
 
@@ -549,7 +583,6 @@ bool Mgr::ms_dispatch2(const ref_t<Message>& m)
       py_module_registry->notify_all("fs_map", "");
       handle_fs_map(ref_cast<MFSMap>(m));
       return false; // I shall let this pass through for Client
-      break;
     case CEPH_MSG_OSD_MAP:
       handle_osd_map();
 
@@ -566,6 +599,43 @@ bool Mgr::ms_dispatch2(const ref_t<Message>& m)
     case MSG_LOG:
       handle_log(ref_cast<MLog>(m));
       break;
+    case MSG_KV_DATA:
+      {
+       auto msg = ref_cast<MKVData>(m);
+       monc->sub_got("kv:"s + msg->prefix, msg->version);
+       if (!msg->data.empty()) {
+         if (initialized) {
+           py_module_registry->update_kv_data(
+             msg->prefix,
+             msg->incremental,
+             msg->data
+             );
+         } else {
+           // before we have created the ActivePyModules, we need to
+           // track the store regions we're monitoring
+           if (!msg->incremental) {
+             dout(10) << "full update on " << msg->prefix << dendl;
+             auto p = pre_init_store.lower_bound(msg->prefix);
+             while (p != pre_init_store.end() && p->first.find(msg->prefix) == 0) {
+               dout(20) << " rm prior " << p->first << dendl;
+               p = pre_init_store.erase(p);
+             }
+           } else {
+             dout(10) << "incremental update on " << msg->prefix << dendl;
+           }
+           for (auto& i : msg->data) {
+             if (i.second) {
+               dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl;
+               pre_init_store[i.first] = i.second->to_str();
+             } else {
+               dout(20) << " rm " << i.first << dendl;
+               pre_init_store.erase(i.first);
+             }
+           }
+         }
+       }
+      }
+      break;
 
     default:
       return false;
@@ -579,9 +649,10 @@ void Mgr::handle_fs_map(ref_t<MFSMap> m)
   ceph_assert(ceph_mutex_is_locked_by_me(lock));
 
   std::set<std::string> names_exist;
-  
   const FSMap &new_fsmap = m->get_fsmap();
 
+  monc->sub_got("fsmap", m->epoch);
+
   fs_map_cond.notify_all();
 
   // TODO: callers (e.g. from python land) are potentially going to see