]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/Mgr.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mgr / Mgr.cc
index 9df32239bd0bc04b8e6f225558a45c8139755d87..68126e9430ed8b37de08cc91c80daec372b3b1e0 100644 (file)
@@ -30,6 +30,7 @@
 #include "messages/MCommand.h"
 #include "messages/MCommandReply.h"
 #include "messages/MLog.h"
+#include "messages/MServiceMap.h"
 
 #include "Mgr.h"
 
@@ -39,7 +40,8 @@
 #define dout_prefix *_dout << "mgr " << __func__ << " "
 
 
-Mgr::Mgr(MonClient *monc_, Messenger *clientm_, Objecter *objecter_,
+Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
+        Messenger *clientm_, Objecter *objecter_,
         Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
   monc(monc_),
   objecter(objecter_),
@@ -48,9 +50,10 @@ Mgr::Mgr(MonClient *monc_, Messenger *clientm_, Objecter *objecter_,
   lock("Mgr::lock"),
   timer(g_ceph_context, lock),
   finisher(g_ceph_context, "Mgr", "mgr-fin"),
-  py_modules(daemon_state, cluster_state, *monc, *objecter, *client,
+  digest_received(false),
+  py_modules(daemon_state, cluster_state, *monc, clog_, *objecter, *client,
              finisher),
-  cluster_state(monc, nullptr),
+  cluster_state(monc, nullptr, mgrmap),
   server(monc, finisher, daemon_state, cluster_state, py_modules,
          clog_, audit_clog_),
   initialized(false),
@@ -92,14 +95,13 @@ public:
   {
     daemon_state.clear_updating(key);
     if (r == 0) {
-      if (key.first == CEPH_ENTITY_TYPE_MDS) {
+      if (key.first == "mds") {
         json_spirit::mValue json_result;
         bool read_ok = json_spirit::read(
             outbl.to_str(), json_result);
         if (!read_ok) {
           dout(1) << "mon returned invalid JSON for "
-                  << ceph_entity_type_name(key.first)
-                  << "." << key.second << dendl;
+                  << key.first << "." << key.second << dendl;
           return;
         }
 
@@ -133,20 +135,20 @@ public:
 
           daemon_state.insert(state);
         }
-      } else if (key.first == CEPH_ENTITY_TYPE_OSD) {
+      } else if (key.first == "osd") {
       } else {
         ceph_abort();
       }
     } else {
       dout(1) << "mon failed to return metadata for "
-              << ceph_entity_type_name(key.first)
-              << "." << key.second << ": " << cpp_strerror(r) << dendl;
+              << key.first << "." << key.second << ": "
+             << cpp_strerror(r) << dendl;
     }
   }
 };
 
 
-void Mgr::background_init()
+void Mgr::background_init(Context *completion)
 {
   Mutex::Locker l(lock);
   assert(!initializing);
@@ -155,8 +157,9 @@ void Mgr::background_init()
 
   finisher.start();
 
-  finisher.queue(new FunctionContext([this](int r){
+  finisher.queue(new FunctionContext([this, completion](int r){
     init();
+    completion->complete(0);
   }));
 }
 
@@ -184,6 +187,7 @@ void Mgr::init()
   monc->sub_want("log-info", 0, 0);
   monc->sub_want("mgrdigest", 0, 0);
   monc->sub_want("fsmap", 0, 0);
+  monc->sub_want("servicemap", 0, 0);
 
   dout(4) << "waiting for OSDMap..." << dendl;
   // Subscribe to OSDMap update to pass on to ClusterState
@@ -217,11 +221,14 @@ void Mgr::init()
   // all sets will come via mgr)
   load_config();
 
-  // Wait for MgrDigest...?
-  // TODO
+  // Wait for MgrDigest...
+  dout(4) << "waiting for MgrDigest..." << dendl;
+  while (!digest_received) {
+    digest_cond.Wait(lock);
+  }
 
   // assume finisher already initialized in background_init
-
+  dout(4) << "starting PyModules..." << dendl;
   py_modules.init();
   py_modules.start();
 
@@ -259,7 +266,7 @@ void Mgr::load_all_metadata()
     }
 
     DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
-    dm->key = DaemonKey(CEPH_ENTITY_TYPE_MDS,
+    dm->key = DaemonKey("mds",
                         daemon_meta.at("name").get_str());
     dm->hostname = daemon_meta.at("hostname").get_str();
 
@@ -281,7 +288,7 @@ void Mgr::load_all_metadata()
     }
 
     DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
-    dm->key = DaemonKey(CEPH_ENTITY_TYPE_MON,
+    dm->key = DaemonKey("mon",
                         daemon_meta.at("name").get_str());
     dm->hostname = daemon_meta.at("hostname").get_str();
 
@@ -304,7 +311,7 @@ void Mgr::load_all_metadata()
     dout(4) << osd_metadata.at("hostname").get_str() << dendl;
 
     DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
-    dm->key = DaemonKey(CEPH_ENTITY_TYPE_OSD,
+    dm->key = DaemonKey("osd",
                         stringify(osd_metadata.at("id").get_int()));
     dm->hostname = osd_metadata.at("hostname").get_str();
 
@@ -345,9 +352,10 @@ void Mgr::load_config()
       std::ostringstream cmd_json;
       cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
       get_cmd.run(monc, cmd_json.str());
+      lock.Unlock();
       get_cmd.wait();
+      lock.Lock();
       assert(get_cmd.r == 0);
-
       loaded[key] = get_cmd.outbl.to_str();
     }
   }
@@ -399,7 +407,7 @@ void Mgr::handle_osd_map()
 
       // Consider whether to update the daemon metadata (new/restarted daemon)
       bool update_meta = false;
-      const auto k = DaemonKey(CEPH_ENTITY_TYPE_OSD, stringify(osd_id));
+      const auto k = DaemonKey("osd", stringify(osd_id));
       if (daemon_state.is_updating(k)) {
         continue;
       }
@@ -444,7 +452,7 @@ void Mgr::handle_osd_map()
   });
 
   // TODO: same culling for MonMap
-  daemon_state.cull(CEPH_ENTITY_TYPE_OSD, names_exist);
+  daemon_state.cull("osd", names_exist);
 }
 
 void Mgr::handle_log(MLog *m)
@@ -456,6 +464,13 @@ void Mgr::handle_log(MLog *m)
   m->put();
 }
 
+void Mgr::handle_service_map(MServiceMap *m)
+{
+  dout(10) << "e" << m->service_map.epoch << dendl;
+  cluster_state.set_service_map(m->service_map);
+  server.got_service_map();
+}
+
 bool Mgr::ms_dispatch(Message *m)
 {
   dout(4) << *m << dendl;
@@ -466,16 +481,6 @@ bool Mgr::ms_dispatch(Message *m)
       handle_mgr_digest(static_cast<MMgrDigest*>(m));
       break;
     case CEPH_MSG_MON_MAP:
-      // FIXME: we probably never get called here because MonClient
-      // has consumed the message.  For consuming OSDMap we need
-      // to be the tail dispatcher, but to see MonMap we would
-      // need to be at the head.
-      // Result is that ClusterState has access to monmap (it reads
-      // from monclient anyway), but we don't see notifications.  Hook
-      // into MonClient to get notifications instead of messing
-      // with message delivery to achieve it?
-      ceph_abort();
-
       py_modules.notify_all("mon_map", "");
       m->put();
       break;
@@ -494,6 +499,11 @@ bool Mgr::ms_dispatch(Message *m)
       objecter->maybe_request_map();
       m->put();
       break;
+    case MSG_SERVICE_MAP:
+      handle_service_map((MServiceMap*)m);
+      py_modules.notify_all("service_map", "");
+      m->put();
+      break;
     case MSG_LOG:
       handle_log(static_cast<MLog *>(m));
       break;
@@ -533,7 +543,7 @@ void Mgr::handle_fs_map(MFSMap* m)
     // Remember which MDS exists so that we can cull any that don't
     names_exist.insert(info.name);
 
-    const auto k = DaemonKey(CEPH_ENTITY_TYPE_MDS, info.name);
+    const auto k = DaemonKey("mds", info.name);
     if (daemon_state.is_updating(k)) {
       continue;
     }
@@ -573,9 +583,28 @@ void Mgr::handle_fs_map(MFSMap* m)
           {}, &c->outbl, &c->outs, c);
     }
   }
-  daemon_state.cull(CEPH_ENTITY_TYPE_MDS, names_exist);
+  daemon_state.cull("mds", names_exist);
 }
 
+bool Mgr::got_mgr_map(const MgrMap& m)
+{
+  Mutex::Locker l(lock);
+  dout(10) << m << dendl;
+
+  set<string> old_modules;
+  cluster_state.with_mgrmap([&](const MgrMap& m) {
+      old_modules = m.modules;
+    });
+  if (m.modules != old_modules) {
+    derr << "mgrmap module list changed to (" << m.modules << "), respawn"
+        << dendl;
+    return true;
+  }
+
+  cluster_state.set_mgr_map(m);
+
+  return false;
+}
 
 void Mgr::handle_mgr_digest(MMgrDigest* m)
 {
@@ -591,5 +620,15 @@ void Mgr::handle_mgr_digest(MMgrDigest* m)
   dout(10) << "done." << dendl;
 
   m->put();
+
+  if (!digest_received) {
+    digest_received = true;
+    digest_cond.Signal();
+  }
 }
 
+void Mgr::tick()
+{
+  dout(10) << dendl;
+  server.send_report();
+}