*/
#include "messages/MMgrDigest.h"
+#include "messages/MMonMgrReport.h"
#include "messages/MPGStats.h"
#include "mgr/ClusterState.h"
#undef dout_prefix
#define dout_prefix *_dout << "mgr " << __func__ << " "
-ClusterState::ClusterState(MonClient *monc_, Objecter *objecter_)
- : monc(monc_), objecter(objecter_), lock("ClusterState")
+ClusterState::ClusterState(
+ MonClient *monc_,
+ Objecter *objecter_,
+ const MgrMap& mgrmap)
+ : monc(monc_),
+ objecter(objecter_),
+ lock("ClusterState"),
+ mgr_map(mgrmap),
+ pgservice(pg_map)
{}
void ClusterState::set_objecter(Objecter *objecter_)
fsmap = new_fsmap;
}
+void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
+{
+ Mutex::Locker l(lock);
+ mgr_map = new_mgrmap;
+}
+
+void ClusterState::set_service_map(ServiceMap const &new_service_map)
+{
+ Mutex::Locker l(lock);
+ servicemap = new_service_map;
+}
+
void ClusterState::load_digest(MMgrDigest *m)
{
health_json = std::move(m->health_json);
void ClusterState::ingest_pgstats(MPGStats *stats)
{
Mutex::Locker l(lock);
- PGMap::Incremental pending_inc;
- pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
const int from = stats->get_orig_source().num();
bool is_in = false;
});
if (is_in) {
- pending_inc.update_stat(from, stats->epoch, stats->osd_stat);
+ pending_inc.update_stat(from, stats->epoch, std::move(stats->osd_stat));
} else {
pending_inc.update_stat(from, stats->epoch, osd_stat_t());
}
// In case we're hearing about a PG that according to last
// OSDMap update should not exist
- if (pg_map.pg_stat.count(pgid) == 0) {
- dout(15) << " got " << pgid << " reported at " << pg_stats.reported_epoch << ":"
+ if (existing_pools.count(pgid.pool()) == 0) {
+ dout(15) << " got " << pgid
+ << " reported at " << pg_stats.reported_epoch << ":"
<< pg_stats.reported_seq
<< " state " << pg_state_string(pg_stats.state)
- << " but DNE in pg_map; pool was probably deleted."
+ << " but pool not in " << existing_pools
<< dendl;
continue;
- // In case we already heard about more recent stats from this PG
- // from another OSD
- } else if (pg_map.pg_stat[pgid].get_version_pair() > pg_stats.get_version_pair()) {
- dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported_epoch << ":"
- << pg_map.pg_stat[pgid].reported_seq << dendl;
+ }
+ // In case we already heard about more recent stats from this PG
+ // from another OSD
+ const auto q = pg_map.pg_stat.find(pgid);
+ if (q != pg_map.pg_stat.end() &&
+ q->second.get_version_pair() > pg_stats.get_version_pair()) {
+ dout(15) << " had " << pgid << " from "
+ << q->second.reported_epoch << ":"
+ << q->second.reported_seq << dendl;
continue;
}
pending_inc.pg_stat_updates[pgid] = pg_stats;
}
+}
+
+void ClusterState::update_delta_stats()
+{
+ pending_inc.stamp = ceph_clock_now();
+ pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
+ dout(10) << " v" << pending_inc.version << dendl;
+
+ dout(30) << " pg_map before:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("pg_map", pg_map);
+ jf.flush(*_dout);
+ *_dout << dendl;
+ dout(30) << " incremental:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("pending_inc", pending_inc);
+ jf.flush(*_dout);
+ *_dout << dendl;
pg_map.apply_incremental(g_ceph_context, pending_inc);
+ pending_inc = PGMap::Incremental();
}
void ClusterState::notify_osdmap(const OSDMap &osd_map)
{
Mutex::Locker l(lock);
- PGMap::Incremental pending_inc;
+ pending_inc.stamp = ceph_clock_now();
pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
+ dout(10) << " v" << pending_inc.version << dendl;
- PGMapUpdater::update_creating_pgs(osd_map, pg_map, &pending_inc);
- PGMapUpdater::register_new_pgs(osd_map, pg_map, &pending_inc);
+ PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
+
+ // update our list of pools that exist, so that we can filter pg_map updates
+ // in synchrony with this OSDMap.
+ existing_pools.clear();
+ for (auto& p : osd_map.get_pools()) {
+ existing_pools.insert(p.first);
+ }
// brute force this for now (don't bother being clever by only
// checking osds that went up/down)
PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
need_check_down_pg_osds, &pending_inc);
- pg_map.apply_incremental(g_ceph_context, pending_inc);
+ dout(30) << " pg_map before:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("pg_map", pg_map);
+ jf.flush(*_dout);
+ *_dout << dendl;
+ dout(30) << " incremental:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("pending_inc", pending_inc);
+ jf.flush(*_dout);
+ *_dout << dendl;
+ pg_map.apply_incremental(g_ceph_context, pending_inc);
+ pending_inc = PGMap::Incremental();
// TODO: Complete the separation of PG state handling so
// that a cut-down set of functionality remains in PGMonitor
// while the full-blown PGMap lives only here.
}
-