]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MgrStatMonitor.cc
4d3ecead280ae4d360dbff4b354c9d2a717b6572
[ceph.git] / ceph / src / mon / MgrStatMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "MgrStatMonitor.h"
5 #include "mon/OSDMonitor.h"
6 #include "mon/MgrMonitor.h"
7 #include "mon/PGMap.h"
8 #include "messages/MGetPoolStats.h"
9 #include "messages/MGetPoolStatsReply.h"
10 #include "messages/MMonMgrReport.h"
11 #include "messages/MStatfs.h"
12 #include "messages/MStatfsReply.h"
13 #include "messages/MServiceMap.h"
14
15 #include "include/ceph_assert.h" // re-clobber assert
16
17 #define dout_subsys ceph_subsys_mon
18 #undef dout_prefix
19 #define dout_prefix _prefix(_dout, mon)
20 static ostream& _prefix(std::ostream *_dout, Monitor *mon) {
21 return *_dout << "mon." << mon->name << "@" << mon->rank
22 << "(" << mon->get_state_name()
23 << ").mgrstat ";
24 }
25
26 MgrStatMonitor::MgrStatMonitor(Monitor *mn, Paxos *p, const string& service_name)
27 : PaxosService(mn, p, service_name)
28 {
29 }
30
31 MgrStatMonitor::~MgrStatMonitor() = default;
32
33 void MgrStatMonitor::create_initial()
34 {
35 dout(10) << __func__ << dendl;
36 version = 0;
37 service_map.epoch = 1;
38 service_map.modified = ceph_clock_now();
39 pending_service_map_bl.clear();
40 encode(service_map, pending_service_map_bl, CEPH_FEATURES_ALL);
41 }
42
43 void MgrStatMonitor::update_from_paxos(bool *need_bootstrap)
44 {
45 version = get_last_committed();
46 dout(10) << " " << version << dendl;
47 load_health();
48 bufferlist bl;
49 get_version(version, bl);
50 if (version) {
51 ceph_assert(bl.length());
52 try {
53 auto p = bl.cbegin();
54 decode(digest, p);
55 decode(service_map, p);
56 if (!p.end()) {
57 decode(progress_events, p);
58 }
59 dout(10) << __func__ << " v" << version
60 << " service_map e" << service_map.epoch
61 << " " << progress_events.size() << " progress events"
62 << dendl;
63 }
64 catch (buffer::error& e) {
65 derr << "failed to decode mgrstat state; luminous dev version? "
66 << e.what() << dendl;
67 }
68 }
69 check_subs();
70 update_logger();
71 }
72
73 void MgrStatMonitor::update_logger()
74 {
75 dout(20) << __func__ << dendl;
76
77 mon->cluster_logger->set(l_cluster_osd_bytes, digest.osd_sum.statfs.total);
78 mon->cluster_logger->set(l_cluster_osd_bytes_used,
79 digest.osd_sum.statfs.get_used_raw());
80 mon->cluster_logger->set(l_cluster_osd_bytes_avail,
81 digest.osd_sum.statfs.available);
82
83 mon->cluster_logger->set(l_cluster_num_pool, digest.pg_pool_sum.size());
84 uint64_t num_pg = 0;
85 for (auto i : digest.num_pg_by_pool) {
86 num_pg += i.second;
87 }
88 mon->cluster_logger->set(l_cluster_num_pg, num_pg);
89
90 unsigned active = 0, active_clean = 0, peering = 0;
91 for (auto p = digest.num_pg_by_state.begin();
92 p != digest.num_pg_by_state.end();
93 ++p) {
94 if (p->first & PG_STATE_ACTIVE) {
95 active += p->second;
96 if (p->first & PG_STATE_CLEAN)
97 active_clean += p->second;
98 }
99 if (p->first & PG_STATE_PEERING)
100 peering += p->second;
101 }
102 mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
103 mon->cluster_logger->set(l_cluster_num_pg_active, active);
104 mon->cluster_logger->set(l_cluster_num_pg_peering, peering);
105
106 mon->cluster_logger->set(l_cluster_num_object, digest.pg_sum.stats.sum.num_objects);
107 mon->cluster_logger->set(l_cluster_num_object_degraded, digest.pg_sum.stats.sum.num_objects_degraded);
108 mon->cluster_logger->set(l_cluster_num_object_misplaced, digest.pg_sum.stats.sum.num_objects_misplaced);
109 mon->cluster_logger->set(l_cluster_num_object_unfound, digest.pg_sum.stats.sum.num_objects_unfound);
110 mon->cluster_logger->set(l_cluster_num_bytes, digest.pg_sum.stats.sum.num_bytes);
111
112 }
113
114 void MgrStatMonitor::create_pending()
115 {
116 dout(10) << " " << version << dendl;
117 pending_digest = digest;
118 pending_health_checks = get_health_checks();
119 pending_service_map_bl.clear();
120 encode(service_map, pending_service_map_bl, mon->get_quorum_con_features());
121 }
122
123 void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t)
124 {
125 ++version;
126 dout(10) << " " << version << dendl;
127 bufferlist bl;
128 encode(pending_digest, bl, mon->get_quorum_con_features());
129 ceph_assert(pending_service_map_bl.length());
130 bl.append(pending_service_map_bl);
131 encode(pending_progress_events, bl);
132 put_version(t, version, bl);
133 put_last_committed(t, version);
134
135 encode_health(pending_health_checks, t);
136 }
137
138 version_t MgrStatMonitor::get_trim_to() const
139 {
140 // we don't actually need *any* old states, but keep a few.
141 if (version > 5) {
142 return version - 5;
143 }
144 return 0;
145 }
146
147 void MgrStatMonitor::on_active()
148 {
149 update_logger();
150 }
151
152 void MgrStatMonitor::tick()
153 {
154 }
155
156 bool MgrStatMonitor::preprocess_query(MonOpRequestRef op)
157 {
158 auto m = op->get_req<PaxosServiceMessage>();
159 switch (m->get_type()) {
160 case CEPH_MSG_STATFS:
161 return preprocess_statfs(op);
162 case MSG_MON_MGR_REPORT:
163 return preprocess_report(op);
164 case MSG_GETPOOLSTATS:
165 return preprocess_getpoolstats(op);
166 default:
167 mon->no_reply(op);
168 derr << "Unhandled message type " << m->get_type() << dendl;
169 return true;
170 }
171 }
172
173 bool MgrStatMonitor::prepare_update(MonOpRequestRef op)
174 {
175 auto m = op->get_req<PaxosServiceMessage>();
176 switch (m->get_type()) {
177 case MSG_MON_MGR_REPORT:
178 return prepare_report(op);
179 default:
180 mon->no_reply(op);
181 derr << "Unhandled message type " << m->get_type() << dendl;
182 return true;
183 }
184 }
185
186 bool MgrStatMonitor::preprocess_report(MonOpRequestRef op)
187 {
188 auto m = op->get_req<MMonMgrReport>();
189 mon->no_reply(op);
190 if (m->gid &&
191 m->gid != mon->mgrmon()->get_map().get_active_gid()) {
192 dout(10) << "ignoring report from non-active mgr " << m->gid
193 << dendl;
194 return true;
195 }
196 return false;
197 }
198
199 bool MgrStatMonitor::prepare_report(MonOpRequestRef op)
200 {
201 auto m = op->get_req<MMonMgrReport>();
202 bufferlist bl = m->get_data();
203 auto p = bl.cbegin();
204 decode(pending_digest, p);
205 pending_health_checks.swap(m->health_checks);
206 if (m->service_map_bl.length()) {
207 pending_service_map_bl.swap(m->service_map_bl);
208 }
209 pending_progress_events.swap(m->progress_events);
210 dout(10) << __func__ << " " << pending_digest << ", "
211 << pending_health_checks.checks.size() << " health checks, "
212 << progress_events.size() << " progress events" << dendl;
213 dout(20) << "pending_digest:\n";
214 JSONFormatter jf(true);
215 jf.open_object_section("pending_digest");
216 pending_digest.dump(&jf);
217 jf.close_section();
218 jf.flush(*_dout);
219 *_dout << dendl;
220 dout(20) << "health checks:\n";
221 JSONFormatter jf(true);
222 jf.open_object_section("health_checks");
223 pending_health_checks.dump(&jf);
224 jf.close_section();
225 jf.flush(*_dout);
226 *_dout << dendl;
227 dout(20) << "progress events:\n";
228 JSONFormatter jf(true);
229 jf.open_object_section("progress_events");
230 for (auto& i : pending_progress_events) {
231 jf.dump_object(i.first.c_str(), i.second);
232 }
233 jf.close_section();
234 jf.flush(*_dout);
235 *_dout << dendl;
236 return true;
237 }
238
239 bool MgrStatMonitor::preprocess_getpoolstats(MonOpRequestRef op)
240 {
241 op->mark_pgmon_event(__func__);
242 auto m = op->get_req<MGetPoolStats>();
243 auto session = op->get_session();
244 if (!session)
245 return true;
246 if (!session->is_capable("pg", MON_CAP_R)) {
247 dout(0) << "MGetPoolStats received from entity with insufficient caps "
248 << session->caps << dendl;
249 return true;
250 }
251 if (m->fsid != mon->monmap->fsid) {
252 dout(0) << __func__ << " on fsid "
253 << m->fsid << " != " << mon->monmap->fsid << dendl;
254 return true;
255 }
256 epoch_t ver = get_last_committed();
257 auto reply = new MGetPoolStatsReply(m->fsid, m->get_tid(), ver);
258 reply->per_pool = digest.use_per_pool_stats();
259 for (const auto& pool_name : m->pools) {
260 const auto pool_id = mon->osdmon()->osdmap.lookup_pg_pool_name(pool_name);
261 if (pool_id == -ENOENT)
262 continue;
263 auto pool_stat = get_pool_stat(pool_id);
264 if (!pool_stat)
265 continue;
266 reply->pool_stats[pool_name] = *pool_stat;
267 }
268 mon->send_reply(op, reply);
269 return true;
270 }
271
272 bool MgrStatMonitor::preprocess_statfs(MonOpRequestRef op)
273 {
274 op->mark_pgmon_event(__func__);
275 auto statfs = op->get_req<MStatfs>();
276 auto session = op->get_session();
277
278 if (!session)
279 return true;
280 if (!session->is_capable("pg", MON_CAP_R)) {
281 dout(0) << "MStatfs received from entity with insufficient privileges "
282 << session->caps << dendl;
283 return true;
284 }
285 if (statfs->fsid != mon->monmap->fsid) {
286 dout(0) << __func__ << " on fsid " << statfs->fsid
287 << " != " << mon->monmap->fsid << dendl;
288 return true;
289 }
290 const auto& pool = statfs->data_pool;
291 if (pool && !mon->osdmon()->osdmap.have_pg_pool(*pool)) {
292 // There's no error field for MStatfsReply so just ignore the request.
293 // This is known to happen when a client is still accessing a removed fs.
294 dout(1) << __func__ << " on removed pool " << *pool << dendl;
295 return true;
296 }
297 dout(10) << __func__ << " " << *statfs
298 << " from " << statfs->get_orig_source() << dendl;
299 epoch_t ver = get_last_committed();
300 auto reply = new MStatfsReply(statfs->fsid, statfs->get_tid(), ver);
301 reply->h.st = get_statfs(mon->osdmon()->osdmap, pool);
302 mon->send_reply(op, reply);
303 return true;
304 }
305
306 void MgrStatMonitor::check_sub(Subscription *sub)
307 {
308 dout(10) << __func__
309 << " next " << sub->next
310 << " vs service_map.epoch " << service_map.epoch << dendl;
311 if (sub->next <= service_map.epoch) {
312 auto m = new MServiceMap(service_map);
313 sub->session->con->send_message(m);
314 if (sub->onetime) {
315 mon->with_session_map([sub](MonSessionMap& session_map) {
316 session_map.remove_sub(sub);
317 });
318 } else {
319 sub->next = service_map.epoch + 1;
320 }
321 }
322 }
323
324 void MgrStatMonitor::check_subs()
325 {
326 dout(10) << __func__ << dendl;
327 if (!service_map.epoch) {
328 return;
329 }
330 auto subs = mon->session_map.subs.find("servicemap");
331 if (subs == mon->session_map.subs.end()) {
332 return;
333 }
334 auto p = subs->second->begin();
335 while (!p.end()) {
336 auto sub = *p;
337 ++p;
338 check_sub(sub);
339 }
340 }