#include "messages/MMonMgrReport.h"
#include "messages/MStatfs.h"
#include "messages/MStatfsReply.h"
+#include "messages/MServiceMap.h"
class MgrPGStatService : public MonPGStatService {
PGMapDigest& digest;
void MgrStatMonitor::create_initial()
{
- dout(10) << dendl;
+ dout(10) << __func__ << dendl;
version = 0;
+ service_map.epoch = 1;
+ ::encode(service_map, pending_service_map_bl, CEPH_FEATURES_ALL);
}
void MgrStatMonitor::update_from_paxos(bool *need_bootstrap)
{
version = get_last_committed();
dout(10) << " " << version << dendl;
+ load_health();
bufferlist bl;
get_version(version, bl);
if (version) {
assert(bl.length());
- auto p = bl.begin();
- ::decode(digest, p);
- ::decode(health_summary, p);
- ::decode(health_detail, p);
+ try {
+ auto p = bl.begin();
+ ::decode(digest, p);
+ ::decode(service_map, p);
+ dout(10) << __func__ << " v" << version
+ << " service_map e" << service_map.epoch << dendl;
+ }
+ catch (buffer::error& e) {
+ derr << "failed to decode mgrstat state; luminous dev version?" << dendl;
+ }
+ }
+ check_subs();
+ update_logger();
+}
+
+void MgrStatMonitor::update_logger()
+{
+ dout(20) << __func__ << dendl;
+ if (mon->osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ dout(20) << "yielding cluster perfcounter updates to pgmon" << dendl;
+ return;
+ }
+
+ mon->cluster_logger->set(l_cluster_osd_bytes, digest.osd_sum.kb * 1024ull);
+ mon->cluster_logger->set(l_cluster_osd_bytes_used,
+ digest.osd_sum.kb_used * 1024ull);
+ mon->cluster_logger->set(l_cluster_osd_bytes_avail,
+ digest.osd_sum.kb_avail * 1024ull);
+
+ mon->cluster_logger->set(l_cluster_num_pool, digest.pg_pool_sum.size());
+ uint64_t num_pg = 0;
+ for (auto i : digest.num_pg_by_pool) {
+ num_pg += i.second;
}
+ mon->cluster_logger->set(l_cluster_num_pg, num_pg);
+
+ unsigned active = 0, active_clean = 0, peering = 0;
+ for (auto p = digest.num_pg_by_state.begin();
+ p != digest.num_pg_by_state.end();
+ ++p) {
+ if (p->first & PG_STATE_ACTIVE) {
+ active += p->second;
+ if (p->first & PG_STATE_CLEAN)
+ active_clean += p->second;
+ }
+ if (p->first & PG_STATE_PEERING)
+ peering += p->second;
+ }
+ mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
+ mon->cluster_logger->set(l_cluster_num_pg_active, active);
+ mon->cluster_logger->set(l_cluster_num_pg_peering, peering);
+
+ mon->cluster_logger->set(l_cluster_num_object, digest.pg_sum.stats.sum.num_objects);
+ mon->cluster_logger->set(l_cluster_num_object_degraded, digest.pg_sum.stats.sum.num_objects_degraded);
+ mon->cluster_logger->set(l_cluster_num_object_misplaced, digest.pg_sum.stats.sum.num_objects_misplaced);
+ mon->cluster_logger->set(l_cluster_num_object_unfound, digest.pg_sum.stats.sum.num_objects_unfound);
+ mon->cluster_logger->set(l_cluster_num_bytes, digest.pg_sum.stats.sum.num_bytes);
+
}
void MgrStatMonitor::create_pending()
{
dout(10) << " " << version << dendl;
pending_digest = digest;
- pending_health_summary = health_summary;
- pending_health_detail = health_detail;
+ pending_health_checks = get_health_checks();
+ pending_service_map_bl.clear();
+ ::encode(service_map, pending_service_map_bl, mon->get_quorum_con_features());
}
void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t)
dout(10) << " " << version << dendl;
bufferlist bl;
::encode(pending_digest, bl, mon->get_quorum_con_features());
- ::encode(pending_health_summary, bl);
- ::encode(pending_health_detail, bl);
+ assert(pending_service_map_bl.length());
+ bl.append(pending_service_map_bl);
put_version(t, version, bl);
put_last_committed(t, version);
+
+ encode_health(pending_health_checks, t);
}
version_t MgrStatMonitor::get_trim_to()
void MgrStatMonitor::on_active()
{
+ update_logger();
}
void MgrStatMonitor::get_health(list<pair<health_status_t,string> >& summary,
list<pair<health_status_t,string> > *detail,
CephContext *cct) const
{
- summary.insert(summary.end(), health_summary.begin(), health_summary.end());
- if (detail) {
- detail->insert(detail->end(), health_detail.begin(), health_detail.end());
- }
}
void MgrStatMonitor::tick()
bufferlist bl = m->get_data();
auto p = bl.begin();
::decode(pending_digest, p);
- dout(10) << __func__ << " " << pending_digest << dendl;
- pending_health_summary.swap(m->health_summary);
- pending_health_detail.swap(m->health_detail);
+ pending_health_checks.swap(m->health_checks);
+ if (m->service_map_bl.length()) {
+ pending_service_map_bl.swap(m->service_map_bl);
+ }
+ dout(10) << __func__ << " " << pending_digest << ", "
+ << pending_health_checks.checks.size() << " health checks" << dendl;
return true;
}
mon->send_reply(op, reply);
return true;
}
+
+void MgrStatMonitor::check_sub(Subscription *sub)
+{
+ const auto epoch = mon->monmap->get_epoch();
+ dout(10) << __func__
+ << " next " << sub->next
+ << " have " << epoch << dendl;
+ if (sub->next <= service_map.epoch) {
+ auto m = new MServiceMap(service_map);
+ sub->session->con->send_message(m);
+ if (sub->onetime) {
+ mon->with_session_map([this, sub](MonSessionMap& session_map) {
+ session_map.remove_sub(sub);
+ });
+ } else {
+ sub->next = epoch + 1;
+ }
+ }
+}
+
+void MgrStatMonitor::check_subs()
+{
+ dout(10) << __func__ << dendl;
+ if (!service_map.epoch) {
+ return;
+ }
+ auto subs = mon->session_map.subs.find("servicemap");
+ if (subs == mon->session_map.subs.end()) {
+ return;
+ }
+ auto p = subs->second->begin();
+ while (!p.end()) {
+ auto sub = *p;
+ ++p;
+ check_sub(sub);
+ }
+}