]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/DaemonServer.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mgr / DaemonServer.cc
index e8d8c8a212865f007054299787ced8db5d3a590a..6454c8da306a52b96bd631527f2b61c2dc09e5b9 100644 (file)
@@ -24,6 +24,7 @@
 #include "messages/MCommandReply.h"
 #include "messages/MPGStats.h"
 #include "messages/MOSDScrub.h"
+#include "common/errno.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mgr
@@ -112,6 +113,8 @@ int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
   msgr->start();
   msgr->add_dispatcher_tail(this);
 
+  started_at = ceph_clock_now();
+
   return 0;
 }
 
@@ -176,7 +179,7 @@ bool DaemonServer::ms_verify_authorizer(Connection *con,
     if (peer_type == CEPH_ENTITY_TYPE_OSD) {
       Mutex::Locker l(lock);
       s->osd_id = atoi(s->entity_name.get_id().c_str());
-      dout(10) << __func__ << " registering osd." << s->osd_id << " session "
+      dout(10) << "registering osd." << s->osd_id << " session "
               << s << " con " << con << dendl;
       osd_cons[s->osd_id].insert(con);
     }
@@ -214,7 +217,7 @@ bool DaemonServer::ms_handle_reset(Connection *con)
     }
     session->put(); // SessionRef takes a ref
     Mutex::Locker l(lock);
-    dout(10) << __func__ << " unregistering osd." << session->osd_id
+    dout(10) << "unregistering osd." << session->osd_id
             << "  session " << session << " con " << con << dendl;
     osd_cons[session->osd_id].erase(con);
   }
@@ -234,6 +237,7 @@ bool DaemonServer::ms_dispatch(Message *m)
   switch (m->get_type()) {
     case MSG_PGSTATS:
       cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
+      maybe_ready(m->get_source().num());
       m->put();
       return true;
     case MSG_MGR_REPORT:
@@ -248,31 +252,59 @@ bool DaemonServer::ms_dispatch(Message *m)
   };
 }
 
+void DaemonServer::maybe_ready(int32_t osd_id)
+{
+  if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
+    dout(4) << "initial report from osd " << osd_id << dendl;
+    reported_osds.insert(osd_id);
+    std::set<int32_t> up_osds;
+
+    cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+        osdmap.get_up_osds(up_osds);
+    });
+
+    std::set<int32_t> unreported_osds;
+    std::set_difference(up_osds.begin(), up_osds.end(),
+                        reported_osds.begin(), reported_osds.end(),
+                        std::inserter(unreported_osds, unreported_osds.begin()));
+
+    if (unreported_osds.size() == 0) {
+      dout(4) << "all osds have reported, sending PG state to mon" << dendl;
+      pgmap_ready = true;
+      reported_osds.clear();
+      // Avoid waiting for next tick
+      send_report();
+    } else {
+      dout(4) << "still waiting for " << unreported_osds.size() << " osds"
+                 " to report in before PGMap is ready" << dendl;
+    }
+  }
+}
+
 void DaemonServer::shutdown()
 {
-  dout(10) << __func__ << dendl;
+  dout(10) << "begin" << dendl;
   msgr->shutdown();
   msgr->wait();
-  dout(10) << __func__ << " done" << dendl;
+  dout(10) << "done" << dendl;
 }
 
 
 
 bool DaemonServer::handle_open(MMgrOpen *m)
 {
-  uint32_t type = m->get_connection()->get_peer_type();
-  DaemonKey key(type, m->daemon_name);
+  DaemonKey key;
+  if (!m->service_name.empty()) {
+    key.first = m->service_name;
+  } else {
+    key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+  }
+  key.second = m->daemon_name;
 
-  dout(4) << "from " << m->get_connection() << " name "
-          << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
+  dout(4) << "from " << m->get_connection() << "  " << key << dendl;
 
   auto configure = new MMgrConfigure();
-  if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
-    // We don't want clients to send us stats
-    configure->stats_period = 0;
-  } else {
-    configure->stats_period = g_conf->mgr_stats_period;
-  }
+  configure->stats_period = g_conf->mgr_stats_period;
   m->get_connection()->send_message(configure);
 
   if (daemon_state.exists(key)) {
@@ -280,42 +312,92 @@ bool DaemonServer::handle_open(MMgrOpen *m)
     daemon_state.get(key)->perf_counters.clear();
   }
 
+  if (m->service_daemon) {
+    DaemonStatePtr daemon;
+    if (daemon_state.exists(key)) {
+      daemon = daemon_state.get(key);
+    } else {
+      dout(4) << "constructing new DaemonState for " << key << dendl;
+      daemon = std::make_shared<DaemonState>(daemon_state.types);
+      daemon->key = key;
+      if (m->daemon_metadata.count("hostname")) {
+        daemon->hostname = m->daemon_metadata["hostname"];
+      }
+      daemon_state.insert(daemon);
+    }
+    daemon->service_daemon = true;
+    daemon->metadata = m->daemon_metadata;
+    daemon->service_status = m->daemon_status;
+
+    utime_t now = ceph_clock_now();
+    auto d = pending_service_map.get_daemon(m->service_name,
+                                           m->daemon_name);
+    if (d->gid != (uint64_t)m->get_source().num()) {
+      dout(10) << "registering " << key << " in pending_service_map" << dendl;
+      d->gid = m->get_source().num();
+      d->addr = m->get_source_addr();
+      d->start_epoch = pending_service_map.epoch;
+      d->start_stamp = now;
+      d->metadata = m->daemon_metadata;
+      pending_service_map_dirty = pending_service_map.epoch;
+    }
+  }
+
   m->put();
   return true;
 }
 
 bool DaemonServer::handle_report(MMgrReport *m)
 {
-  uint32_t type = m->get_connection()->get_peer_type();
-  DaemonKey key(type, m->daemon_name);
+  DaemonKey key;
+  if (!m->service_name.empty()) {
+    key.first = m->service_name;
+  } else {
+    key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+  }
+  key.second = m->daemon_name;
 
-  dout(4) << "from " << m->get_connection() << " name "
-          << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
+  dout(4) << "from " << m->get_connection() << " " << key << dendl;
 
-  if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
-    // Clients should not be sending us stats
-    dout(4) << "rejecting report from client " << m->daemon_name << dendl;
+  if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
+      m->service_name.empty()) {
+    // Clients should not be sending us stats unless they are declaring
+    // themselves to be a daemon for some service.
+    dout(4) << "rejecting report from non-daemon client " << m->daemon_name
+           << dendl;
     m->put();
     return true;
   }
 
   DaemonStatePtr daemon;
   if (daemon_state.exists(key)) {
-    dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
+    dout(20) << "updating existing DaemonState for " << key << dendl;
     daemon = daemon_state.get(key);
   } else {
-    dout(4) << "constructing new DaemonState for " << m->daemon_name << dendl;
+    dout(4) << "constructing new DaemonState for " << key << dendl;
     daemon = std::make_shared<DaemonState>(daemon_state.types);
     // FIXME: crap, we don't know the hostname at this stage.
     daemon->key = key;
     daemon_state.insert(daemon);
-    // FIXME: we should request metadata at this stage
+    // FIXME: we should avoid this case by rejecting MMgrReport from
+    // daemons without sessions, and ensuring that session open
+    // always contains metadata.
   }
-
   assert(daemon != nullptr);
   auto &daemon_counters = daemon->perf_counters;
   daemon_counters.update(m);
-  
+
+  if (daemon->service_daemon) {
+    utime_t now = ceph_clock_now();
+    if (m->daemon_status) {
+      daemon->service_status = *m->daemon_status;
+      daemon->service_status_stamp = now;
+    }
+    daemon->last_service_beacon = now;
+  } else if (m->daemon_status) {
+    derr << "got status from non-daemon " << key << dendl;
+  }
+
   m->put();
   return true;
 }
@@ -451,7 +533,7 @@ bool DaemonServer::handle_command(MCommand *m)
         con->mark_disposable();
       }
 
-      dout(1) << "do_command r=" << r << " " << rs << dendl;
+      dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl;
       if (con) {
         MCommandReply *reply = new MCommandReply(r, rs);
         reply->set_tid(m->get_tid());
@@ -575,6 +657,51 @@ bool DaemonServer::handle_command(MCommand *m)
     << "entity='" << session->entity_name << "' "
     << "cmd=" << m->cmd << ": dispatch";
 
+  // ----------------
+  // service map commands
+  if (prefix == "service dump") {
+    if (!f)
+      f.reset(Formatter::create("json-pretty"));
+    cluster_state.with_servicemap([&](const ServiceMap &service_map) {
+       f->dump_object("service_map", service_map);
+      });
+    f->flush(cmdctx->odata);
+    cmdctx->reply(0, ss);
+    return true;
+  }
+  if (prefix == "service status") {
+    if (!f)
+      f.reset(Formatter::create("json-pretty"));
+    // only include state from services that are in the persisted service map
+    f->open_object_section("service_status");
+    ServiceMap s;
+    cluster_state.with_servicemap([&](const ServiceMap& service_map) {
+       s = service_map;
+      });
+    for (auto& p : s.services) {
+      f->open_object_section(p.first.c_str());
+      for (auto& q : p.second.daemons) {
+       f->open_object_section(q.first.c_str());
+       DaemonKey key(p.first, q.first);
+       assert(daemon_state.exists(key));
+       auto daemon = daemon_state.get(key);
+       f->dump_stream("status_stamp") << daemon->service_status_stamp;
+       f->dump_stream("last_beacon") << daemon->last_service_beacon;
+       f->open_object_section("status");
+       for (auto& r : daemon->service_status) {
+         f->dump_string(r.first.c_str(), r.second);
+       }
+       f->close_section();
+       f->close_section();
+      }
+      f->close_section();
+    }
+    f->close_section();
+    f->flush(cmdctx->odata);
+    cmdctx->reply(0, ss);
+    return true;
+  }
+
   // -----------
   // PG commands
 
@@ -634,7 +761,7 @@ bool DaemonServer::handle_command(MCommand *m)
     get_str_vec(prefix, pvec);
 
     set<int> osds;
-    if (whostr == "*") {
+    if (whostr == "*" || whostr == "all" || whostr == "any") {
       cluster_state.with_osdmap([&](const OSDMap& osdmap) {
          for (int i = 0; i < osdmap.get_max_osd(); i++)
            if (osdmap.is_up(i)) {
@@ -841,21 +968,90 @@ bool DaemonServer::handle_command(MCommand *m)
   }
 }
 
+void DaemonServer::_prune_pending_service_map()
+{
+  utime_t cutoff = ceph_clock_now();
+  cutoff -= g_conf->mgr_service_beacon_grace;
+  auto p = pending_service_map.services.begin();
+  while (p != pending_service_map.services.end()) {
+    auto q = p->second.daemons.begin();
+    while (q != p->second.daemons.end()) {
+      DaemonKey key(p->first, q->first);
+      if (!daemon_state.exists(key)) {
+       derr << "missing key " << key << dendl;
+       ++q;
+       continue;
+      }
+      auto daemon = daemon_state.get(key);
+      if (daemon->last_service_beacon == utime_t()) {
+       // we must have just restarted; assume they are alive now.
+       daemon->last_service_beacon = ceph_clock_now();
+       ++q;
+       continue;
+      }
+      if (daemon->last_service_beacon < cutoff) {
+       dout(10) << "pruning stale " << p->first << "." << q->first
+                << " last_beacon " << daemon->last_service_beacon << dendl;
+       q = p->second.daemons.erase(q);
+       pending_service_map_dirty = pending_service_map.epoch;
+      } else {
+       ++q;
+      }
+    }
+    if (p->second.daemons.empty()) {
+      p = pending_service_map.services.erase(p);
+      pending_service_map_dirty = pending_service_map.epoch;
+    } else {
+      ++p;
+    }
+  }
+}
+
 void DaemonServer::send_report()
 {
+  if (!pgmap_ready) {
+    if (ceph_clock_now() - started_at > g_conf->mgr_stats_period * 4.0) {
+      pgmap_ready = true;
+      reported_osds.clear();
+      dout(1) << "Giving up on OSDs that haven't reported yet, sending "
+              << "potentially incomplete PG state to mon" << dendl;
+    } else {
+      dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
+              << dendl;
+      return;
+    }
+  }
+
   auto m = new MMonMgrReport();
   cluster_state.with_pgmap([&](const PGMap& pg_map) {
       cluster_state.update_delta_stats();
 
-      // FIXME: reporting health detail here might be a bad idea?
+      if (pending_service_map.epoch) {
+       _prune_pending_service_map();
+       if (pending_service_map_dirty >= pending_service_map.epoch) {
+         pending_service_map.modified = ceph_clock_now();
+         ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
+         dout(10) << "sending service_map e" << pending_service_map.epoch
+                  << dendl;
+         pending_service_map.epoch++;
+       }
+      }
+
       cluster_state.with_osdmap([&](const OSDMap& osdmap) {
          // FIXME: no easy way to get mon features here.  this will do for
          // now, though, as long as we don't make a backward-incompat change.
          pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
          dout(10) << pg_map << dendl;
-         pg_map.get_health(g_ceph_context, osdmap,
-                           m->health_summary,
-                           &m->health_detail);
+
+         pg_map.get_health_checks(g_ceph_context, osdmap,
+                                  &m->health_checks);
+         dout(10) << m->health_checks.checks.size() << " health checks"
+                  << dendl;
+         dout(20) << "health checks:\n";
+         JSONFormatter jf(true);
+         jf.dump_object("health_checks", m->health_checks);
+         jf.flush(*_dout);
+         *_dout << dendl;
        });
     });
   // TODO? We currently do not notify the PyModules
@@ -863,3 +1059,42 @@ void DaemonServer::send_report()
   //       so, or the state is updated.
   monc->send_mon_message(m);
 }
+
+void DaemonServer::got_service_map()
+{
+  Mutex::Locker l(lock);
+
+  cluster_state.with_servicemap([&](const ServiceMap& service_map) {
+      if (pending_service_map.epoch == 0) {
+       // we just started up
+       dout(10) << "got initial map e" << service_map.epoch << dendl;
+       pending_service_map = service_map;
+      } else {
+       // we we already active and therefore must have persisted it,
+       // which means ours is the same or newer.
+       dout(10) << "got updated map e" << service_map.epoch << dendl;
+      }
+      pending_service_map.epoch = service_map.epoch + 1;
+    });
+
+  // cull missing daemons, populate new ones
+  for (auto& p : pending_service_map.services) {
+    std::set<std::string> names;
+    for (auto& q : p.second.daemons) {
+      names.insert(q.first);
+      DaemonKey key(p.first, q.first);
+      if (!daemon_state.exists(key)) {
+       auto daemon = std::make_shared<DaemonState>(daemon_state.types);
+       daemon->key = key;
+       daemon->metadata = q.second.metadata;
+        if (q.second.metadata.count("hostname")) {
+          daemon->hostname = q.second.metadata["hostname"];
+        }
+       daemon->service_daemon = true;
+       daemon_state.insert(daemon);
+       dout(10) << "added missing " << key << dendl;
+      }
+    }
+    daemon_state.cull(p.first, names);
+  }
+}