]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/ClusterState.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mgr / ClusterState.cc
index 81bcdb728deb0eeb0704cb4649a37356161d9866..7e01a811ef158b67da452faad8f8e3ed55c6c5a3 100644 (file)
@@ -12,6 +12,7 @@
  */
 
 #include "messages/MMgrDigest.h"
+#include "messages/MMonMgrReport.h"
 #include "messages/MPGStats.h"
 
 #include "mgr/ClusterState.h"
 #undef dout_prefix
 #define dout_prefix *_dout << "mgr " << __func__ << " "
 
-ClusterState::ClusterState(MonClient *monc_, Objecter *objecter_)
-  : monc(monc_), objecter(objecter_), lock("ClusterState")
+ClusterState::ClusterState(
+  MonClient *monc_,
+  Objecter *objecter_,
+  const MgrMap& mgrmap)
+  : monc(monc_),
+    objecter(objecter_),
+    lock("ClusterState"),
+    mgr_map(mgrmap),
+    pgservice(pg_map)
 {}
 
 void ClusterState::set_objecter(Objecter *objecter_)
@@ -39,6 +47,18 @@ void ClusterState::set_fsmap(FSMap const &new_fsmap)
   fsmap = new_fsmap;
 }
 
+void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
+{
+  Mutex::Locker l(lock);
+  mgr_map = new_mgrmap;
+}
+
+void ClusterState::set_service_map(ServiceMap const &new_service_map)
+{
+  Mutex::Locker l(lock);
+  servicemap = new_service_map;
+}
+
 void ClusterState::load_digest(MMgrDigest *m)
 {
   health_json = std::move(m->health_json);
@@ -48,8 +68,6 @@ void ClusterState::load_digest(MMgrDigest *m)
 void ClusterState::ingest_pgstats(MPGStats *stats)
 {
   Mutex::Locker l(lock);
-  PGMap::Incremental pending_inc;
-  pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
 
   const int from = stats->get_orig_source().num();
   bool is_in = false;
@@ -58,7 +76,7 @@ void ClusterState::ingest_pgstats(MPGStats *stats)
   });
 
   if (is_in) {
-    pending_inc.update_stat(from, stats->epoch, stats->osd_stat);
+    pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat));
   } else {
     pending_inc.update_stat(from, stats->epoch, osd_stat_t());
   }
@@ -69,36 +87,67 @@ void ClusterState::ingest_pgstats(MPGStats *stats)
 
     // In case we're hearing about a PG that according to last
     // OSDMap update should not exist
-    if (pg_map.pg_stat.count(pgid) == 0) {
-      dout(15) << " got " << pgid << " reported at " << pg_stats.reported_epoch << ":"
+    if (existing_pools.count(pgid.pool()) == 0) {
+      dout(15) << " got " << pgid
+              << " reported at " << pg_stats.reported_epoch << ":"
                << pg_stats.reported_seq
                << " state " << pg_state_string(pg_stats.state)
-               << " but DNE in pg_map; pool was probably deleted."
+               << " but pool not in " << existing_pools
                << dendl;
       continue;
-     // In case we already heard about more recent stats from this PG
-     // from another OSD
-    } else if (pg_map.pg_stat[pgid].get_version_pair() > pg_stats.get_version_pair()) {
-      dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported_epoch << ":"
-               << pg_map.pg_stat[pgid].reported_seq << dendl;
+    }
+    // In case we already heard about more recent stats from this PG
+    // from another OSD
+    const auto q = pg_map.pg_stat.find(pgid);
+    if (q != pg_map.pg_stat.end() &&
+       q->second.get_version_pair() > pg_stats.get_version_pair()) {
+      dout(15) << " had " << pgid << " from "
+              << q->second.reported_epoch << ":"
+               << q->second.reported_seq << dendl;
       continue;
     }
 
     pending_inc.pg_stat_updates[pgid] = pg_stats;
   }
+}
+
+void ClusterState::update_delta_stats()
+{
+  pending_inc.stamp = ceph_clock_now();
+  pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
+  dout(10) << " v" << pending_inc.version << dendl;
+
+  dout(30) << " pg_map before:\n";
+  JSONFormatter jf(true);
+  jf.dump_object("pg_map", pg_map);
+  jf.flush(*_dout);
+  *_dout << dendl;
+  dout(30) << " incremental:\n";
+  JSONFormatter jf(true);
+  jf.dump_object("pending_inc", pending_inc);
+  jf.flush(*_dout);
+  *_dout << dendl;
 
   pg_map.apply_incremental(g_ceph_context, pending_inc);
+  pending_inc = PGMap::Incremental();
 }
 
 void ClusterState::notify_osdmap(const OSDMap &osd_map)
 {
   Mutex::Locker l(lock);
 
-  PGMap::Incremental pending_inc;
+  pending_inc.stamp = ceph_clock_now();
   pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
+  dout(10) << " v" << pending_inc.version << dendl;
 
-  PGMapUpdater::update_creating_pgs(osd_map, pg_map, &pending_inc);
-  PGMapUpdater::register_new_pgs(osd_map, pg_map, &pending_inc);
+  PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
+
+  // update our list of pools that exist, so that we can filter pg_map updates
+  // in synchrony with this OSDMap.
+  existing_pools.clear();
+  for (auto& p : osd_map.get_pools()) {
+    existing_pools.insert(p.first);
+  }
 
   // brute force this for now (don't bother being clever by only
   // checking osds that went up/down)
@@ -106,10 +155,20 @@ void ClusterState::notify_osdmap(const OSDMap &osd_map)
   PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
                               need_check_down_pg_osds, &pending_inc);
 
-  pg_map.apply_incremental(g_ceph_context, pending_inc);
+  dout(30) << " pg_map before:\n";
+  JSONFormatter jf(true);
+  jf.dump_object("pg_map", pg_map);
+  jf.flush(*_dout);
+  *_dout << dendl;
+  dout(30) << " incremental:\n";
+  JSONFormatter jf(true);
+  jf.dump_object("pending_inc", pending_inc);
+  jf.flush(*_dout);
+  *_dout << dendl;
 
+  pg_map.apply_incremental(g_ceph_context, pending_inc);
+  pending_inc = PGMap::Incremental();
   // TODO: Complete the separation of PG state handling so
   // that a cut-down set of functionality remains in PGMonitor
   // while the full-blown PGMap lives only here.
 }
-