]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/Mgr.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mgr / Mgr.cc
index 1d36225acbf1c9f639ba01075173600165939aa5..7cd6c91eb655d6e94ef6893ae112bcd6c3644886 100644 (file)
 #include "global/signal_handler.h"
 
 #include "mgr/MgrContext.h"
-#include "mgr/mgr_commands.h"
 
-//#include "MgrPyModule.h"
 #include "DaemonServer.h"
-#include "messages/MMgrBeacon.h"
 #include "messages/MMgrDigest.h"
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
 #include "messages/MLog.h"
 #include "messages/MServiceMap.h"
-
+#include "PyModule.h"
 #include "Mgr.h"
 
 #define dout_context g_ceph_context
@@ -50,7 +47,6 @@ Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
   client(client_),
   client_messenger(clientm_),
   lock("Mgr::lock"),
-  timer(g_ceph_context, lock),
   finisher(g_ceph_context, "Mgr", "mgr-fin"),
   digest_received(false),
   py_module_registry(py_module_registry_),
@@ -74,7 +70,8 @@ void MetadataUpdate::finish(int r)
 {
   daemon_state.clear_updating(key);
   if (r == 0) {
-    if (key.first == "mds" || key.first == "osd") {
+    if (key.first == "mds" || key.first == "osd" || 
+        key.first == "mgr" || key.first == "mon") {
       json_spirit::mValue json_result;
       bool read_ok = json_spirit::read(
           outbl.to_str(), json_result);
@@ -83,11 +80,24 @@ void MetadataUpdate::finish(int r)
                 << key.first << "." << key.second << dendl;
         return;
       }
+      if (json_result.type() != json_spirit::obj_type) {
+        dout(1) << "mon returned valid JSON "
+                << key.first << "." << key.second
+               << " but not an object: '" << outbl.to_str() << "'" << dendl;
+        return;
+      }
       dout(4) << "mon returned valid metadata JSON for "
               << key.first << "." << key.second << dendl;
 
       json_spirit::mObject daemon_meta = json_result.get_obj();
 
+      // Skip daemon who doesn't have hostname yet
+      if (daemon_meta.count("hostname") == 0) {
+        dout(1) << "Skipping incomplete metadata entry for "
+                << key.first << "." << key.second << dendl;
+        return;
+      }
+
       // Apply any defaults
       for (const auto &i : defaults) {
         if (daemon_meta.find(i.first) == daemon_meta.end()) {
@@ -98,32 +108,35 @@ void MetadataUpdate::finish(int r)
       DaemonStatePtr state;
       if (daemon_state.exists(key)) {
         state = daemon_state.get(key);
-       Mutex::Locker l(state->lock);
-        if (key.first == "mds") {
+        if (key.first == "mds" || key.first == "mgr" || key.first == "mon") {
           daemon_meta.erase("name");
         } else if (key.first == "osd") {
           daemon_meta.erase("id");
         }
         daemon_meta.erase("hostname");
-        state->metadata.clear();
+       map<string,string> m;
         for (const auto &i : daemon_meta) {
-          state->metadata[i.first] = i.second.get_str();
-        }
+          m[i.first] = i.second.get_str();
+       }
+
+       daemon_state.update_metadata(state, m);
       } else {
         state = std::make_shared<DaemonState>(daemon_state.types);
         state->key = key;
         state->hostname = daemon_meta.at("hostname").get_str();
 
-        if (key.first == "mds") {
+        if (key.first == "mds" || key.first == "mgr" || key.first == "mon") {
           daemon_meta.erase("name");
         } else if (key.first == "osd") {
           daemon_meta.erase("id");
         }
         daemon_meta.erase("hostname");
 
+       map<string,string> m;
         for (const auto &i : daemon_meta) {
-          state->metadata[i.first] = i.second.get_str();
+          m[i.first] = i.second.get_str();
         }
+       state->set_metadata(m);
 
         daemon_state.insert(state);
       }
@@ -139,9 +152,9 @@ void MetadataUpdate::finish(int r)
 
 void Mgr::background_init(Context *completion)
 {
-  Mutex::Locker l(lock);
-  assert(!initializing);
-  assert(!initialized);
+  std::lock_guard l(lock);
+  ceph_assert(!initializing);
+  ceph_assert(!initialized);
   initializing = true;
 
   finisher.start();
@@ -152,21 +165,81 @@ void Mgr::background_init(Context *completion)
   }));
 }
 
+std::map<std::string, std::string> Mgr::load_store()
+{
+  ceph_assert(lock.is_locked_by_me());
+
+  dout(10) << "listing keys" << dendl;
+  JSONCommand cmd;
+  cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
+  lock.Unlock();
+  cmd.wait();
+  lock.Lock();
+  ceph_assert(cmd.r == 0);
+
+  std::map<std::string, std::string> loaded;
+  
+  for (auto &key_str : cmd.json_result.get_array()) {
+    std::string const key = key_str.get_str();
+    
+    dout(20) << "saw key '" << key << "'" << dendl;
+
+    const std::string config_prefix = PyModule::config_prefix;
+    const std::string device_prefix = "device/";
+
+    if (key.substr(0, config_prefix.size()) == config_prefix ||
+       key.substr(0, device_prefix.size()) == device_prefix) {
+      dout(20) << "fetching '" << key << "'" << dendl;
+      Command get_cmd;
+      std::ostringstream cmd_json;
+      cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
+      get_cmd.run(monc, cmd_json.str());
+      lock.Unlock();
+      get_cmd.wait();
+      lock.Lock();
+      if (get_cmd.r == 0) { // tolerate racing config-key change
+       if (key.substr(0, device_prefix.size()) == device_prefix) {
+         // device/
+         string devid = key.substr(device_prefix.size());
+         map<string,string> meta;
+         ostringstream ss;
+         string val = get_cmd.outbl.to_str();
+         int r = get_json_str_map(val, ss, &meta, false);
+         if (r < 0) {
+           derr << __func__ << " failed to parse " << val << ": " << ss.str()
+                << dendl;
+         } else {
+           daemon_state.with_device_create(
+             devid, [&meta] (DeviceState& dev) {
+               dev.set_metadata(std::move(meta));
+             });
+         }
+       } else {
+         // config/
+         loaded[key] = get_cmd.outbl.to_str();
+       }
+      }
+    }
+  }
+
+  return loaded;
+}
+
 void Mgr::init()
 {
-  Mutex::Locker l(lock);
-  assert(initializing);
-  assert(!initialized);
+  std::lock_guard l(lock);
+  ceph_assert(initializing);
+  ceph_assert(!initialized);
 
   // Start communicating with daemons to learn statistics etc
-  int r = server.init(monc->get_global_id(), client_messenger->get_myaddr());
+  int r = server.init(monc->get_global_id(), client_messenger->get_myaddrs());
   if (r < 0) {
     derr << "Initialize server fail: " << cpp_strerror(r) << dendl;
     // This is typically due to a bind() failure, so let's let
     // systemd restart us.
     exit(1);
   }
-  dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
+  dout(4) << "Initialized server at " << server.get_myaddrs() << dendl;
 
   // Preload all daemon metadata (will subsequently keep this
   // up to date by watching maps, so do the initial load before
@@ -195,7 +268,8 @@ void Mgr::init()
   lock.Lock();
 
   // Populate PGs in ClusterState
-  objecter->with_osdmap([this](const OSDMap &osd_map) {
+  cluster_state.with_osdmap_and_pgmap([this](const OSDMap &osd_map,
+                                            const PGMap& pg_map) {
     cluster_state.notify_osdmap(osd_map);
   });
 
@@ -207,21 +281,26 @@ void Mgr::init()
 
   dout(4) << "waiting for config-keys..." << dendl;
 
-  // Preload config keys (`get` for plugins is to be a fast local
-  // operation, we we don't have to synchronize these later because
-  // all sets will come via mgr)
-  auto loaded_config = load_config();
-
   // Wait for MgrDigest...
   dout(4) << "waiting for MgrDigest..." << dendl;
   while (!digest_received) {
     digest_cond.Wait(lock);
   }
 
+  // Load module KV store
+  auto kv_store = load_store();
+
+  // Migrate config from KV store on luminous->mimic
+  // drop lock because we do blocking config sets to mon
+  lock.Unlock();
+  py_module_registry->upgrade_config(monc, kv_store);
+  lock.Lock();
+
   // assume finisher already initialized in background_init
   dout(4) << "starting python modules..." << dendl;
-  py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc,
-      clog, *objecter, *client, finisher);
+  py_module_registry->active_start(daemon_state, cluster_state,
+      kv_store, *monc, clog, audit_clog, *objecter, *client,
+      finisher, server);
 
   dout(4) << "Complete." << dendl;
   initializing = false;
@@ -230,7 +309,7 @@ void Mgr::init()
 
 void Mgr::load_all_metadata()
 {
-  assert(lock.is_locked_by_me());
+  ceph_assert(lock.is_locked_by_me());
 
   JSONCommand mds_cmd;
   mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
@@ -245,9 +324,9 @@ void Mgr::load_all_metadata()
   mon_cmd.wait();
   lock.Lock();
 
-  assert(mds_cmd.r == 0);
-  assert(mon_cmd.r == 0);
-  assert(osd_cmd.r == 0);
+  ceph_assert(mds_cmd.r == 0);
+  ceph_assert(mon_cmd.r == 0);
+  ceph_assert(osd_cmd.r == 0);
 
   for (auto &metadata_val : mds_cmd.json_result.get_array()) {
     json_spirit::mObject daemon_meta = metadata_val.get_obj();
@@ -286,9 +365,11 @@ void Mgr::load_all_metadata()
     daemon_meta.erase("name");
     daemon_meta.erase("hostname");
 
+    map<string,string> m;
     for (const auto &i : daemon_meta) {
-      dm->metadata[i.first] = i.second.get_str();
+      m[i.first] = i.second.get_str();
     }
+    dm->set_metadata(m);
 
     daemon_state.insert(dm);
   }
@@ -309,56 +390,22 @@ void Mgr::load_all_metadata()
     osd_metadata.erase("id");
     osd_metadata.erase("hostname");
 
+    map<string,string> m;
     for (const auto &i : osd_metadata) {
-      dm->metadata[i.first] = i.second.get_str();
+      m[i.first] = i.second.get_str();
     }
+    dm->set_metadata(m);
 
     daemon_state.insert(dm);
   }
 }
 
-std::map<std::string, std::string> Mgr::load_config()
-{
-  assert(lock.is_locked_by_me());
-
-  dout(10) << "listing keys" << dendl;
-  JSONCommand cmd;
-  cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
-  lock.Unlock();
-  cmd.wait();
-  lock.Lock();
-  assert(cmd.r == 0);
-
-  std::map<std::string, std::string> loaded;
-  
-  for (auto &key_str : cmd.json_result.get_array()) {
-    std::string const key = key_str.get_str();
-    dout(20) << "saw key '" << key << "'" << dendl;
-
-    const std::string config_prefix = PyModuleRegistry::config_prefix;
-
-    if (key.substr(0, config_prefix.size()) == config_prefix) {
-      dout(20) << "fetching '" << key << "'" << dendl;
-      Command get_cmd;
-      std::ostringstream cmd_json;
-      cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
-      get_cmd.run(monc, cmd_json.str());
-      lock.Unlock();
-      get_cmd.wait();
-      lock.Lock();
-      assert(get_cmd.r == 0);
-      loaded[key] = get_cmd.outbl.to_str();
-    }
-  }
-
-  return loaded;
-}
 
 void Mgr::shutdown()
 {
   finisher.queue(new FunctionContext([&](int) {
     {
-      Mutex::Locker l(lock);
+      std::lock_guard l(lock);
       monc->sub_unwant("log-info");
       monc->sub_unwant("mgrdigest");
       monc->sub_unwant("fsmap");
@@ -378,7 +425,7 @@ void Mgr::shutdown()
 
 void Mgr::handle_osd_map()
 {
-  assert(lock.is_locked_by_me());
+  ceph_assert(lock.is_locked_by_me());
 
   std::set<std::string> names_exist;
 
@@ -387,8 +434,9 @@ void Mgr::handle_osd_map()
    * see if they have changed (service restart), and if so
    * reload the metadata.
    */
-  objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
-    for (unsigned int osd_id = 0; osd_id < osd_map.get_max_osd(); ++osd_id) {
+  cluster_state.with_osdmap_and_pgmap([this, &names_exist](const OSDMap &osd_map,
+                                                          const PGMap &pg_map) {
+    for (int osd_id = 0; osd_id < osd_map.get_max_osd(); ++osd_id) {
       if (!osd_map.exists(osd_id)) {
         continue;
       }
@@ -405,15 +453,15 @@ void Mgr::handle_osd_map()
 
       if (daemon_state.exists(k)) {
         auto metadata = daemon_state.get(k);
-       Mutex::Locker l(metadata->lock);
+       std::lock_guard l(metadata->lock);
         auto addr_iter = metadata->metadata.find("front_addr");
         if (addr_iter != metadata->metadata.end()) {
           const std::string &metadata_addr = addr_iter->second;
-          const auto &map_addr = osd_map.get_addr(osd_id);
+          const auto &map_addrs = osd_map.get_addrs(osd_id);
 
-          if (metadata_addr != stringify(map_addr)) {
+          if (metadata_addr != stringify(map_addrs)) {
             dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
-                    << " != " << stringify(map_addr) << dendl;
+                    << " != " << stringify(map_addrs) << dendl;
             update_meta = true;
           } else {
             dout(20) << "OSD[" << osd_id << "] addr unchanged: "
@@ -429,7 +477,6 @@ void Mgr::handle_osd_map()
       }
 
       if (update_meta) {
-        daemon_state.notify_updating(k);
         auto c = new MetadataUpdate(daemon_state, k);
         std::ostringstream cmd;
         cmd << "{\"prefix\": \"osd metadata\", \"id\": "
@@ -463,10 +510,23 @@ void Mgr::handle_service_map(MServiceMap *m)
   server.got_service_map();
 }
 
+void Mgr::handle_mon_map()
+{
+  dout(20) << __func__ << dendl;
+  assert(lock.is_locked_by_me());
+  std::set<std::string> names_exist;
+  cluster_state.with_monmap([&] (auto &monmap) {
+    for (unsigned int i = 0; i < monmap.size(); i++) {
+      names_exist.insert(monmap.get_name(i));
+    }
+  });
+  daemon_state.cull("mon", names_exist);
+}
+
 bool Mgr::ms_dispatch(Message *m)
 {
   dout(4) << *m << dendl;
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
   switch (m->get_type()) {
     case MSG_MGR_DIGEST:
@@ -474,6 +534,7 @@ bool Mgr::ms_dispatch(Message *m)
       break;
     case CEPH_MSG_MON_MAP:
       py_module_registry->notify_all("mon_map", "");
+      handle_mon_map();
       m->put();
       break;
     case CEPH_MSG_FS_MAP:
@@ -492,7 +553,7 @@ bool Mgr::ms_dispatch(Message *m)
       m->put();
       break;
     case MSG_SERVICE_MAP:
-      handle_service_map((MServiceMap*)m);
+      handle_service_map(static_cast<MServiceMap*>(m));
       py_module_registry->notify_all("service_map", "");
       m->put();
       break;
@@ -509,7 +570,7 @@ bool Mgr::ms_dispatch(Message *m)
 
 void Mgr::handle_fs_map(MFSMap* m)
 {
-  assert(lock.is_locked_by_me());
+  ceph_assert(lock.is_locked_by_me());
 
   std::set<std::string> names_exist;
   
@@ -543,17 +604,17 @@ void Mgr::handle_fs_map(MFSMap* m)
     bool update = false;
     if (daemon_state.exists(k)) {
       auto metadata = daemon_state.get(k);
-      Mutex::Locker l(metadata->lock);
+      std::lock_guard l(metadata->lock);
       if (metadata->metadata.empty() ||
          metadata->metadata.count("addr") == 0) {
         update = true;
       } else {
-        auto metadata_addr = metadata->metadata.at("addr");
-        const auto map_addr = info.addr;
-        update = metadata_addr != stringify(map_addr);
+        auto metadata_addrs = metadata->metadata.at("addr");
+        const auto map_addrs = info.addrs;
+        update = metadata_addrs != stringify(map_addrs);
         if (update) {
-          dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
-                  << " != " << stringify(map_addr) << dendl;
+          dout(4) << "MDS[" << info.name << "] addr change " << metadata_addrs
+                  << " != " << stringify(map_addrs) << dendl;
         }
       }
     } else {
@@ -561,12 +622,11 @@ void Mgr::handle_fs_map(MFSMap* m)
     }
 
     if (update) {
-      daemon_state.notify_updating(k);
       auto c = new MetadataUpdate(daemon_state, k);
 
       // Older MDS daemons don't have addr in the metadata, so
       // fake it if the returned metadata doesn't have the field.
-      c->set_default("addr", stringify(info.addr));
+      c->set_default("addr", stringify(info.addrs));
 
       std::ostringstream cmd;
       cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
@@ -581,7 +641,7 @@ void Mgr::handle_fs_map(MFSMap* m)
 
 bool Mgr::got_mgr_map(const MgrMap& m)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
   dout(10) << m << dendl;
 
   set<string> old_modules;
@@ -595,6 +655,7 @@ bool Mgr::got_mgr_map(const MgrMap& m)
   }
 
   cluster_state.set_mgr_map(m);
+  server.got_mgr_map();
 
   return false;
 }
@@ -620,25 +681,9 @@ void Mgr::handle_mgr_digest(MMgrDigest* m)
   }
 }
 
-void Mgr::tick()
-{
-  dout(10) << dendl;
-  server.send_report();
-}
-
-std::vector<MonCommand> Mgr::get_command_set() const
-{
-  Mutex::Locker l(lock);
-
-  std::vector<MonCommand> commands = mgr_commands;
-  std::vector<MonCommand> py_commands = py_module_registry->get_commands();
-  commands.insert(commands.end(), py_commands.begin(), py_commands.end());
-  return commands;
-}
-
 std::map<std::string, std::string> Mgr::get_services() const
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
   return py_module_registry->get_services();
 }