1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "MgrStatMonitor.h"
5 #include "mon/OSDMonitor.h"
6 #include "mon/MgrMonitor.h"
8 #include "messages/MGetPoolStats.h"
9 #include "messages/MGetPoolStatsReply.h"
10 #include "messages/MMonMgrReport.h"
11 #include "messages/MStatfs.h"
12 #include "messages/MStatfsReply.h"
13 #include "messages/MServiceMap.h"
15 #include "include/ceph_assert.h" // re-clobber assert
17 #define dout_subsys ceph_subsys_mon
19 #define dout_prefix _prefix(_dout, mon)
20 static ostream
& _prefix(std::ostream
*_dout
, Monitor
*mon
) {
21 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
22 << "(" << mon
->get_state_name()
26 MgrStatMonitor::MgrStatMonitor(Monitor
*mn
, Paxos
*p
, const string
& service_name
)
27 : PaxosService(mn
, p
, service_name
)
31 MgrStatMonitor::~MgrStatMonitor() = default;
33 void MgrStatMonitor::create_initial()
35 dout(10) << __func__
<< dendl
;
37 service_map
.epoch
= 1;
38 service_map
.modified
= ceph_clock_now();
39 pending_service_map_bl
.clear();
40 encode(service_map
, pending_service_map_bl
, CEPH_FEATURES_ALL
);
43 void MgrStatMonitor::update_from_paxos(bool *need_bootstrap
)
45 version
= get_last_committed();
46 dout(10) << " " << version
<< dendl
;
49 get_version(version
, bl
);
51 ceph_assert(bl
.length());
55 decode(service_map
, p
);
57 decode(progress_events
, p
);
59 dout(10) << __func__
<< " v" << version
60 << " service_map e" << service_map
.epoch
61 << " " << progress_events
.size() << " progress events"
64 catch (buffer::error
& e
) {
65 derr
<< "failed to decode mgrstat state; luminous dev version? "
73 void MgrStatMonitor::update_logger()
75 dout(20) << __func__
<< dendl
;
77 mon
->cluster_logger
->set(l_cluster_osd_bytes
, digest
.osd_sum
.statfs
.total
);
78 mon
->cluster_logger
->set(l_cluster_osd_bytes_used
,
79 digest
.osd_sum
.statfs
.get_used_raw());
80 mon
->cluster_logger
->set(l_cluster_osd_bytes_avail
,
81 digest
.osd_sum
.statfs
.available
);
83 mon
->cluster_logger
->set(l_cluster_num_pool
, digest
.pg_pool_sum
.size());
85 for (auto i
: digest
.num_pg_by_pool
) {
88 mon
->cluster_logger
->set(l_cluster_num_pg
, num_pg
);
90 unsigned active
= 0, active_clean
= 0, peering
= 0;
91 for (auto p
= digest
.num_pg_by_state
.begin();
92 p
!= digest
.num_pg_by_state
.end();
94 if (p
->first
& PG_STATE_ACTIVE
) {
96 if (p
->first
& PG_STATE_CLEAN
)
97 active_clean
+= p
->second
;
99 if (p
->first
& PG_STATE_PEERING
)
100 peering
+= p
->second
;
102 mon
->cluster_logger
->set(l_cluster_num_pg_active_clean
, active_clean
);
103 mon
->cluster_logger
->set(l_cluster_num_pg_active
, active
);
104 mon
->cluster_logger
->set(l_cluster_num_pg_peering
, peering
);
106 mon
->cluster_logger
->set(l_cluster_num_object
, digest
.pg_sum
.stats
.sum
.num_objects
);
107 mon
->cluster_logger
->set(l_cluster_num_object_degraded
, digest
.pg_sum
.stats
.sum
.num_objects_degraded
);
108 mon
->cluster_logger
->set(l_cluster_num_object_misplaced
, digest
.pg_sum
.stats
.sum
.num_objects_misplaced
);
109 mon
->cluster_logger
->set(l_cluster_num_object_unfound
, digest
.pg_sum
.stats
.sum
.num_objects_unfound
);
110 mon
->cluster_logger
->set(l_cluster_num_bytes
, digest
.pg_sum
.stats
.sum
.num_bytes
);
114 void MgrStatMonitor::create_pending()
116 dout(10) << " " << version
<< dendl
;
117 pending_digest
= digest
;
118 pending_health_checks
= get_health_checks();
119 pending_service_map_bl
.clear();
120 encode(service_map
, pending_service_map_bl
, mon
->get_quorum_con_features());
123 void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t
)
126 dout(10) << " " << version
<< dendl
;
128 encode(pending_digest
, bl
, mon
->get_quorum_con_features());
129 ceph_assert(pending_service_map_bl
.length());
130 bl
.append(pending_service_map_bl
);
131 encode(pending_progress_events
, bl
);
132 put_version(t
, version
, bl
);
133 put_last_committed(t
, version
);
135 encode_health(pending_health_checks
, t
);
138 version_t
MgrStatMonitor::get_trim_to() const
140 // we don't actually need *any* old states, but keep a few.
147 void MgrStatMonitor::on_active()
152 void MgrStatMonitor::tick()
156 bool MgrStatMonitor::preprocess_query(MonOpRequestRef op
)
158 auto m
= op
->get_req
<PaxosServiceMessage
>();
159 switch (m
->get_type()) {
160 case CEPH_MSG_STATFS
:
161 return preprocess_statfs(op
);
162 case MSG_MON_MGR_REPORT
:
163 return preprocess_report(op
);
164 case MSG_GETPOOLSTATS
:
165 return preprocess_getpoolstats(op
);
168 derr
<< "Unhandled message type " << m
->get_type() << dendl
;
173 bool MgrStatMonitor::prepare_update(MonOpRequestRef op
)
175 auto m
= op
->get_req
<PaxosServiceMessage
>();
176 switch (m
->get_type()) {
177 case MSG_MON_MGR_REPORT
:
178 return prepare_report(op
);
181 derr
<< "Unhandled message type " << m
->get_type() << dendl
;
186 bool MgrStatMonitor::preprocess_report(MonOpRequestRef op
)
188 auto m
= op
->get_req
<MMonMgrReport
>();
191 m
->gid
!= mon
->mgrmon()->get_map().get_active_gid()) {
192 dout(10) << "ignoring report from non-active mgr " << m
->gid
199 bool MgrStatMonitor::prepare_report(MonOpRequestRef op
)
201 auto m
= op
->get_req
<MMonMgrReport
>();
202 bufferlist bl
= m
->get_data();
203 auto p
= bl
.cbegin();
204 decode(pending_digest
, p
);
205 pending_health_checks
.swap(m
->health_checks
);
206 if (m
->service_map_bl
.length()) {
207 pending_service_map_bl
.swap(m
->service_map_bl
);
209 pending_progress_events
.swap(m
->progress_events
);
210 dout(10) << __func__
<< " " << pending_digest
<< ", "
211 << pending_health_checks
.checks
.size() << " health checks, "
212 << progress_events
.size() << " progress events" << dendl
;
213 dout(20) << "pending_digest:\n";
214 JSONFormatter
jf(true);
215 jf
.open_object_section("pending_digest");
216 pending_digest
.dump(&jf
);
220 dout(20) << "health checks:\n";
221 JSONFormatter
jf(true);
222 jf
.open_object_section("health_checks");
223 pending_health_checks
.dump(&jf
);
227 dout(20) << "progress events:\n";
228 JSONFormatter
jf(true);
229 jf
.open_object_section("progress_events");
230 for (auto& i
: pending_progress_events
) {
231 jf
.dump_object(i
.first
.c_str(), i
.second
);
239 bool MgrStatMonitor::preprocess_getpoolstats(MonOpRequestRef op
)
241 op
->mark_pgmon_event(__func__
);
242 auto m
= op
->get_req
<MGetPoolStats
>();
243 auto session
= op
->get_session();
246 if (!session
->is_capable("pg", MON_CAP_R
)) {
247 dout(0) << "MGetPoolStats received from entity with insufficient caps "
248 << session
->caps
<< dendl
;
251 if (m
->fsid
!= mon
->monmap
->fsid
) {
252 dout(0) << __func__
<< " on fsid "
253 << m
->fsid
<< " != " << mon
->monmap
->fsid
<< dendl
;
256 epoch_t ver
= get_last_committed();
257 auto reply
= new MGetPoolStatsReply(m
->fsid
, m
->get_tid(), ver
);
258 reply
->per_pool
= digest
.use_per_pool_stats();
259 for (const auto& pool_name
: m
->pools
) {
260 const auto pool_id
= mon
->osdmon()->osdmap
.lookup_pg_pool_name(pool_name
);
261 if (pool_id
== -ENOENT
)
263 auto pool_stat
= get_pool_stat(pool_id
);
266 reply
->pool_stats
[pool_name
] = *pool_stat
;
268 mon
->send_reply(op
, reply
);
272 bool MgrStatMonitor::preprocess_statfs(MonOpRequestRef op
)
274 op
->mark_pgmon_event(__func__
);
275 auto statfs
= op
->get_req
<MStatfs
>();
276 auto session
= op
->get_session();
280 if (!session
->is_capable("pg", MON_CAP_R
)) {
281 dout(0) << "MStatfs received from entity with insufficient privileges "
282 << session
->caps
<< dendl
;
285 if (statfs
->fsid
!= mon
->monmap
->fsid
) {
286 dout(0) << __func__
<< " on fsid " << statfs
->fsid
287 << " != " << mon
->monmap
->fsid
<< dendl
;
290 const auto& pool
= statfs
->data_pool
;
291 if (pool
&& !mon
->osdmon()->osdmap
.have_pg_pool(*pool
)) {
292 // There's no error field for MStatfsReply so just ignore the request.
293 // This is known to happen when a client is still accessing a removed fs.
294 dout(1) << __func__
<< " on removed pool " << *pool
<< dendl
;
297 dout(10) << __func__
<< " " << *statfs
298 << " from " << statfs
->get_orig_source() << dendl
;
299 epoch_t ver
= get_last_committed();
300 auto reply
= new MStatfsReply(statfs
->fsid
, statfs
->get_tid(), ver
);
301 reply
->h
.st
= get_statfs(mon
->osdmon()->osdmap
, pool
);
302 mon
->send_reply(op
, reply
);
306 void MgrStatMonitor::check_sub(Subscription
*sub
)
309 << " next " << sub
->next
310 << " vs service_map.epoch " << service_map
.epoch
<< dendl
;
311 if (sub
->next
<= service_map
.epoch
) {
312 auto m
= new MServiceMap(service_map
);
313 sub
->session
->con
->send_message(m
);
315 mon
->with_session_map([sub
](MonSessionMap
& session_map
) {
316 session_map
.remove_sub(sub
);
319 sub
->next
= service_map
.epoch
+ 1;
324 void MgrStatMonitor::check_subs()
326 dout(10) << __func__
<< dendl
;
327 if (!service_map
.epoch
) {
330 auto subs
= mon
->session_map
.subs
.find("servicemap");
331 if (subs
== mon
->session_map
.subs
.end()) {
334 auto p
= subs
->second
->begin();