1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "MgrStatMonitor.h"
5 #include "mon/OSDMonitor.h"
6 #include "mon/MgrMonitor.h"
8 #include "messages/MGetPoolStats.h"
9 #include "messages/MGetPoolStatsReply.h"
10 #include "messages/MMonMgrReport.h"
11 #include "messages/MStatfs.h"
12 #include "messages/MStatfsReply.h"
13 #include "messages/MServiceMap.h"
15 #include "include/ceph_assert.h" // re-clobber assert
17 #define dout_subsys ceph_subsys_mon
19 #define dout_prefix _prefix(_dout, mon)
27 using std::ostringstream
;
31 using std::stringstream
;
35 using ceph::bufferlist
;
38 using ceph::ErasureCodeInterfaceRef
;
39 using ceph::ErasureCodeProfile
;
40 using ceph::Formatter
;
41 using ceph::JSONFormatter
;
42 using ceph::make_message
;
43 using ceph::mono_clock
;
44 using ceph::mono_time
;
46 static ostream
& _prefix(std::ostream
*_dout
, Monitor
&mon
) {
47 return *_dout
<< "mon." << mon
.name
<< "@" << mon
.rank
48 << "(" << mon
.get_state_name()
52 MgrStatMonitor::MgrStatMonitor(Monitor
&mn
, Paxos
&p
, const string
& service_name
)
53 : PaxosService(mn
, p
, service_name
)
57 MgrStatMonitor::~MgrStatMonitor() = default;
59 void MgrStatMonitor::create_initial()
61 dout(10) << __func__
<< dendl
;
63 service_map
.epoch
= 1;
64 service_map
.modified
= ceph_clock_now();
65 pending_service_map_bl
.clear();
66 encode(service_map
, pending_service_map_bl
, CEPH_FEATURES_ALL
);
69 void MgrStatMonitor::update_from_paxos(bool *need_bootstrap
)
71 version
= get_last_committed();
72 dout(10) << " " << version
<< dendl
;
75 get_version(version
, bl
);
77 ceph_assert(bl
.length());
81 decode(service_map
, p
);
83 decode(progress_events
, p
);
85 dout(10) << __func__
<< " v" << version
86 << " service_map e" << service_map
.epoch
87 << " " << progress_events
.size() << " progress events"
90 catch (ceph::buffer::error
& e
) {
91 derr
<< "failed to decode mgrstat state; luminous dev version? "
97 mon
.osdmon()->notify_new_pg_digest();
100 void MgrStatMonitor::update_logger()
102 dout(20) << __func__
<< dendl
;
104 mon
.cluster_logger
->set(l_cluster_osd_bytes
, digest
.osd_sum
.statfs
.total
);
105 mon
.cluster_logger
->set(l_cluster_osd_bytes_used
,
106 digest
.osd_sum
.statfs
.get_used_raw());
107 mon
.cluster_logger
->set(l_cluster_osd_bytes_avail
,
108 digest
.osd_sum
.statfs
.available
);
110 mon
.cluster_logger
->set(l_cluster_num_pool
, digest
.pg_pool_sum
.size());
112 for (auto i
: digest
.num_pg_by_pool
) {
115 mon
.cluster_logger
->set(l_cluster_num_pg
, num_pg
);
117 unsigned active
= 0, active_clean
= 0, peering
= 0;
118 for (auto p
= digest
.num_pg_by_state
.begin();
119 p
!= digest
.num_pg_by_state
.end();
121 if (p
->first
& PG_STATE_ACTIVE
) {
123 if (p
->first
& PG_STATE_CLEAN
)
124 active_clean
+= p
->second
;
126 if (p
->first
& PG_STATE_PEERING
)
127 peering
+= p
->second
;
129 mon
.cluster_logger
->set(l_cluster_num_pg_active_clean
, active_clean
);
130 mon
.cluster_logger
->set(l_cluster_num_pg_active
, active
);
131 mon
.cluster_logger
->set(l_cluster_num_pg_peering
, peering
);
133 mon
.cluster_logger
->set(l_cluster_num_object
, digest
.pg_sum
.stats
.sum
.num_objects
);
134 mon
.cluster_logger
->set(l_cluster_num_object_degraded
, digest
.pg_sum
.stats
.sum
.num_objects_degraded
);
135 mon
.cluster_logger
->set(l_cluster_num_object_misplaced
, digest
.pg_sum
.stats
.sum
.num_objects_misplaced
);
136 mon
.cluster_logger
->set(l_cluster_num_object_unfound
, digest
.pg_sum
.stats
.sum
.num_objects_unfound
);
137 mon
.cluster_logger
->set(l_cluster_num_bytes
, digest
.pg_sum
.stats
.sum
.num_bytes
);
141 void MgrStatMonitor::create_pending()
143 dout(10) << " " << version
<< dendl
;
144 pending_digest
= digest
;
145 pending_health_checks
= get_health_checks();
146 pending_service_map_bl
.clear();
147 encode(service_map
, pending_service_map_bl
, mon
.get_quorum_con_features());
150 void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t
)
153 dout(10) << " " << version
<< dendl
;
155 encode(pending_digest
, bl
, mon
.get_quorum_con_features());
156 ceph_assert(pending_service_map_bl
.length());
157 bl
.append(pending_service_map_bl
);
158 encode(pending_progress_events
, bl
);
159 put_version(t
, version
, bl
);
160 put_last_committed(t
, version
);
162 encode_health(pending_health_checks
, t
);
165 version_t
MgrStatMonitor::get_trim_to() const
167 // we don't actually need *any* old states, but keep a few.
174 void MgrStatMonitor::on_active()
179 void MgrStatMonitor::tick()
183 bool MgrStatMonitor::preprocess_query(MonOpRequestRef op
)
185 auto m
= op
->get_req
<PaxosServiceMessage
>();
186 switch (m
->get_type()) {
187 case CEPH_MSG_STATFS
:
188 return preprocess_statfs(op
);
189 case MSG_MON_MGR_REPORT
:
190 return preprocess_report(op
);
191 case MSG_GETPOOLSTATS
:
192 return preprocess_getpoolstats(op
);
195 derr
<< "Unhandled message type " << m
->get_type() << dendl
;
200 bool MgrStatMonitor::prepare_update(MonOpRequestRef op
)
202 auto m
= op
->get_req
<PaxosServiceMessage
>();
203 switch (m
->get_type()) {
204 case MSG_MON_MGR_REPORT
:
205 return prepare_report(op
);
208 derr
<< "Unhandled message type " << m
->get_type() << dendl
;
213 bool MgrStatMonitor::preprocess_report(MonOpRequestRef op
)
215 auto m
= op
->get_req
<MMonMgrReport
>();
218 m
->gid
!= mon
.mgrmon()->get_map().get_active_gid()) {
219 dout(10) << "ignoring report from non-active mgr " << m
->gid
226 bool MgrStatMonitor::prepare_report(MonOpRequestRef op
)
228 auto m
= op
->get_req
<MMonMgrReport
>();
229 bufferlist bl
= m
->get_data();
230 auto p
= bl
.cbegin();
231 decode(pending_digest
, p
);
232 pending_health_checks
.swap(m
->health_checks
);
233 if (m
->service_map_bl
.length()) {
234 pending_service_map_bl
.swap(m
->service_map_bl
);
236 pending_progress_events
.swap(m
->progress_events
);
237 dout(10) << __func__
<< " " << pending_digest
<< ", "
238 << pending_health_checks
.checks
.size() << " health checks, "
239 << progress_events
.size() << " progress events" << dendl
;
240 dout(20) << "pending_digest:\n";
241 JSONFormatter
jf(true);
242 jf
.open_object_section("pending_digest");
243 pending_digest
.dump(&jf
);
247 dout(20) << "health checks:\n";
248 JSONFormatter
jf(true);
249 jf
.open_object_section("health_checks");
250 pending_health_checks
.dump(&jf
);
254 dout(20) << "progress events:\n";
255 JSONFormatter
jf(true);
256 jf
.open_object_section("progress_events");
257 for (auto& i
: pending_progress_events
) {
258 jf
.dump_object(i
.first
.c_str(), i
.second
);
266 bool MgrStatMonitor::preprocess_getpoolstats(MonOpRequestRef op
)
268 op
->mark_pgmon_event(__func__
);
269 auto m
= op
->get_req
<MGetPoolStats
>();
270 auto session
= op
->get_session();
273 if (!session
->is_capable("pg", MON_CAP_R
)) {
274 dout(0) << "MGetPoolStats received from entity with insufficient caps "
275 << session
->caps
<< dendl
;
278 if (m
->fsid
!= mon
.monmap
->fsid
) {
279 dout(0) << __func__
<< " on fsid "
280 << m
->fsid
<< " != " << mon
.monmap
->fsid
<< dendl
;
283 epoch_t ver
= get_last_committed();
284 auto reply
= new MGetPoolStatsReply(m
->fsid
, m
->get_tid(), ver
);
285 reply
->per_pool
= digest
.use_per_pool_stats();
286 for (const auto& pool_name
: m
->pools
) {
287 const auto pool_id
= mon
.osdmon()->osdmap
.lookup_pg_pool_name(pool_name
);
288 if (pool_id
== -ENOENT
)
290 auto pool_stat
= get_pool_stat(pool_id
);
293 reply
->pool_stats
[pool_name
] = *pool_stat
;
295 mon
.send_reply(op
, reply
);
299 bool MgrStatMonitor::preprocess_statfs(MonOpRequestRef op
)
301 op
->mark_pgmon_event(__func__
);
302 auto statfs
= op
->get_req
<MStatfs
>();
303 auto session
= op
->get_session();
307 if (!session
->is_capable("pg", MON_CAP_R
)) {
308 dout(0) << "MStatfs received from entity with insufficient privileges "
309 << session
->caps
<< dendl
;
312 if (statfs
->fsid
!= mon
.monmap
->fsid
) {
313 dout(0) << __func__
<< " on fsid " << statfs
->fsid
314 << " != " << mon
.monmap
->fsid
<< dendl
;
317 const auto& pool
= statfs
->data_pool
;
318 if (pool
&& !mon
.osdmon()->osdmap
.have_pg_pool(*pool
)) {
319 // There's no error field for MStatfsReply so just ignore the request.
320 // This is known to happen when a client is still accessing a removed fs.
321 dout(1) << __func__
<< " on removed pool " << *pool
<< dendl
;
324 dout(10) << __func__
<< " " << *statfs
325 << " from " << statfs
->get_orig_source() << dendl
;
326 epoch_t ver
= get_last_committed();
327 auto reply
= new MStatfsReply(statfs
->fsid
, statfs
->get_tid(), ver
);
328 reply
->h
.st
= get_statfs(mon
.osdmon()->osdmap
, pool
);
329 mon
.send_reply(op
, reply
);
333 void MgrStatMonitor::check_sub(Subscription
*sub
)
336 << " next " << sub
->next
337 << " vs service_map.epoch " << service_map
.epoch
<< dendl
;
338 if (sub
->next
<= service_map
.epoch
) {
339 auto m
= new MServiceMap(service_map
);
340 sub
->session
->con
->send_message(m
);
342 mon
.with_session_map([sub
](MonSessionMap
& session_map
) {
343 session_map
.remove_sub(sub
);
346 sub
->next
= service_map
.epoch
+ 1;
351 void MgrStatMonitor::check_subs()
353 dout(10) << __func__
<< dendl
;
354 if (!service_map
.epoch
) {
357 auto subs
= mon
.session_map
.subs
.find("servicemap");
358 if (subs
== mon
.session_map
.subs
.end()) {
361 auto p
= subs
->second
->begin();