]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MgrStatMonitor.cc
854bc0e090a1326941f7d124b913a257100c64a3
[ceph.git] / ceph / src / mon / MgrStatMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "MgrStatMonitor.h"
5 #include "mon/OSDMonitor.h"
6 #include "mon/PGMap.h"
7 #include "messages/MGetPoolStats.h"
8 #include "messages/MGetPoolStatsReply.h"
9 #include "messages/MMonMgrReport.h"
10 #include "messages/MStatfs.h"
11 #include "messages/MStatfsReply.h"
12 #include "messages/MServiceMap.h"
13
14 #include "include/ceph_assert.h" // re-clobber assert
15
16 #define dout_subsys ceph_subsys_mon
17 #undef dout_prefix
18 #define dout_prefix _prefix(_dout, mon)
19 static ostream& _prefix(std::ostream *_dout, Monitor *mon) {
20 return *_dout << "mon." << mon->name << "@" << mon->rank
21 << "(" << mon->get_state_name()
22 << ").mgrstat ";
23 }
24
25 MgrStatMonitor::MgrStatMonitor(Monitor *mn, Paxos *p, const string& service_name)
26 : PaxosService(mn, p, service_name)
27 {
28 }
29
30 MgrStatMonitor::~MgrStatMonitor() = default;
31
32 void MgrStatMonitor::create_initial()
33 {
34 dout(10) << __func__ << dendl;
35 version = 0;
36 service_map.epoch = 1;
37 service_map.modified = ceph_clock_now();
38 pending_service_map_bl.clear();
39 encode(service_map, pending_service_map_bl, CEPH_FEATURES_ALL);
40 }
41
42 void MgrStatMonitor::update_from_paxos(bool *need_bootstrap)
43 {
44 version = get_last_committed();
45 dout(10) << " " << version << dendl;
46 load_health();
47 bufferlist bl;
48 get_version(version, bl);
49 if (version) {
50 ceph_assert(bl.length());
51 try {
52 auto p = bl.cbegin();
53 decode(digest, p);
54 decode(service_map, p);
55 if (!p.end()) {
56 decode(progress_events, p);
57 }
58 dout(10) << __func__ << " v" << version
59 << " service_map e" << service_map.epoch
60 << " " << progress_events.size() << " progress events"
61 << dendl;
62 }
63 catch (buffer::error& e) {
64 derr << "failed to decode mgrstat state; luminous dev version? "
65 << e.what() << dendl;
66 }
67 }
68 check_subs();
69 update_logger();
70 }
71
72 void MgrStatMonitor::update_logger()
73 {
74 dout(20) << __func__ << dendl;
75
76 mon->cluster_logger->set(l_cluster_osd_bytes, digest.osd_sum.statfs.total);
77 mon->cluster_logger->set(l_cluster_osd_bytes_used,
78 digest.osd_sum.statfs.get_used_raw());
79 mon->cluster_logger->set(l_cluster_osd_bytes_avail,
80 digest.osd_sum.statfs.available);
81
82 mon->cluster_logger->set(l_cluster_num_pool, digest.pg_pool_sum.size());
83 uint64_t num_pg = 0;
84 for (auto i : digest.num_pg_by_pool) {
85 num_pg += i.second;
86 }
87 mon->cluster_logger->set(l_cluster_num_pg, num_pg);
88
89 unsigned active = 0, active_clean = 0, peering = 0;
90 for (auto p = digest.num_pg_by_state.begin();
91 p != digest.num_pg_by_state.end();
92 ++p) {
93 if (p->first & PG_STATE_ACTIVE) {
94 active += p->second;
95 if (p->first & PG_STATE_CLEAN)
96 active_clean += p->second;
97 }
98 if (p->first & PG_STATE_PEERING)
99 peering += p->second;
100 }
101 mon->cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
102 mon->cluster_logger->set(l_cluster_num_pg_active, active);
103 mon->cluster_logger->set(l_cluster_num_pg_peering, peering);
104
105 mon->cluster_logger->set(l_cluster_num_object, digest.pg_sum.stats.sum.num_objects);
106 mon->cluster_logger->set(l_cluster_num_object_degraded, digest.pg_sum.stats.sum.num_objects_degraded);
107 mon->cluster_logger->set(l_cluster_num_object_misplaced, digest.pg_sum.stats.sum.num_objects_misplaced);
108 mon->cluster_logger->set(l_cluster_num_object_unfound, digest.pg_sum.stats.sum.num_objects_unfound);
109 mon->cluster_logger->set(l_cluster_num_bytes, digest.pg_sum.stats.sum.num_bytes);
110
111 }
112
113 void MgrStatMonitor::create_pending()
114 {
115 dout(10) << " " << version << dendl;
116 pending_digest = digest;
117 pending_health_checks = get_health_checks();
118 pending_service_map_bl.clear();
119 encode(service_map, pending_service_map_bl, mon->get_quorum_con_features());
120 }
121
122 void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t)
123 {
124 ++version;
125 dout(10) << " " << version << dendl;
126 bufferlist bl;
127 encode(pending_digest, bl, mon->get_quorum_con_features());
128 ceph_assert(pending_service_map_bl.length());
129 bl.append(pending_service_map_bl);
130 encode(pending_progress_events, bl);
131 put_version(t, version, bl);
132 put_last_committed(t, version);
133
134 encode_health(pending_health_checks, t);
135 }
136
137 version_t MgrStatMonitor::get_trim_to() const
138 {
139 // we don't actually need *any* old states, but keep a few.
140 if (version > 5) {
141 return version - 5;
142 }
143 return 0;
144 }
145
146 void MgrStatMonitor::on_active()
147 {
148 update_logger();
149 }
150
151 void MgrStatMonitor::tick()
152 {
153 }
154
155 bool MgrStatMonitor::preprocess_query(MonOpRequestRef op)
156 {
157 auto m = op->get_req<PaxosServiceMessage>();
158 switch (m->get_type()) {
159 case CEPH_MSG_STATFS:
160 return preprocess_statfs(op);
161 case MSG_MON_MGR_REPORT:
162 return preprocess_report(op);
163 case MSG_GETPOOLSTATS:
164 return preprocess_getpoolstats(op);
165 default:
166 mon->no_reply(op);
167 derr << "Unhandled message type " << m->get_type() << dendl;
168 return true;
169 }
170 }
171
172 bool MgrStatMonitor::prepare_update(MonOpRequestRef op)
173 {
174 auto m = op->get_req<PaxosServiceMessage>();
175 switch (m->get_type()) {
176 case MSG_MON_MGR_REPORT:
177 return prepare_report(op);
178 default:
179 mon->no_reply(op);
180 derr << "Unhandled message type " << m->get_type() << dendl;
181 return true;
182 }
183 }
184
185 bool MgrStatMonitor::preprocess_report(MonOpRequestRef op)
186 {
187 mon->no_reply(op);
188 return false;
189 }
190
191 bool MgrStatMonitor::prepare_report(MonOpRequestRef op)
192 {
193 auto m = op->get_req<MMonMgrReport>();
194 bufferlist bl = m->get_data();
195 auto p = bl.cbegin();
196 decode(pending_digest, p);
197 pending_health_checks.swap(m->health_checks);
198 if (m->service_map_bl.length()) {
199 pending_service_map_bl.swap(m->service_map_bl);
200 }
201 pending_progress_events.swap(m->progress_events);
202 dout(10) << __func__ << " " << pending_digest << ", "
203 << pending_health_checks.checks.size() << " health checks, "
204 << progress_events.size() << " progress events" << dendl;
205 dout(20) << "pending_digest:\n";
206 JSONFormatter jf(true);
207 jf.open_object_section("pending_digest");
208 pending_digest.dump(&jf);
209 jf.close_section();
210 jf.flush(*_dout);
211 *_dout << dendl;
212 dout(20) << "health checks:\n";
213 JSONFormatter jf(true);
214 jf.open_object_section("health_checks");
215 pending_health_checks.dump(&jf);
216 jf.close_section();
217 jf.flush(*_dout);
218 *_dout << dendl;
219 dout(20) << "progress events:\n";
220 JSONFormatter jf(true);
221 jf.open_object_section("progress_events");
222 for (auto& i : pending_progress_events) {
223 jf.dump_object(i.first.c_str(), i.second);
224 }
225 jf.close_section();
226 jf.flush(*_dout);
227 *_dout << dendl;
228 return true;
229 }
230
231 bool MgrStatMonitor::preprocess_getpoolstats(MonOpRequestRef op)
232 {
233 op->mark_pgmon_event(__func__);
234 auto m = op->get_req<MGetPoolStats>();
235 auto session = op->get_session();
236 if (!session)
237 return true;
238 if (!session->is_capable("pg", MON_CAP_R)) {
239 dout(0) << "MGetPoolStats received from entity with insufficient caps "
240 << session->caps << dendl;
241 return true;
242 }
243 if (m->fsid != mon->monmap->fsid) {
244 dout(0) << __func__ << " on fsid "
245 << m->fsid << " != " << mon->monmap->fsid << dendl;
246 return true;
247 }
248 epoch_t ver = get_last_committed();
249 auto reply = new MGetPoolStatsReply(m->fsid, m->get_tid(), ver);
250 reply->per_pool = digest.use_per_pool_stats();
251 for (const auto& pool_name : m->pools) {
252 const auto pool_id = mon->osdmon()->osdmap.lookup_pg_pool_name(pool_name);
253 if (pool_id == -ENOENT)
254 continue;
255 auto pool_stat = get_pool_stat(pool_id);
256 if (!pool_stat)
257 continue;
258 reply->pool_stats[pool_name] = *pool_stat;
259 }
260 mon->send_reply(op, reply);
261 return true;
262 }
263
264 bool MgrStatMonitor::preprocess_statfs(MonOpRequestRef op)
265 {
266 op->mark_pgmon_event(__func__);
267 auto statfs = op->get_req<MStatfs>();
268 auto session = op->get_session();
269
270 if (!session)
271 return true;
272 if (!session->is_capable("pg", MON_CAP_R)) {
273 dout(0) << "MStatfs received from entity with insufficient privileges "
274 << session->caps << dendl;
275 return true;
276 }
277 if (statfs->fsid != mon->monmap->fsid) {
278 dout(0) << __func__ << " on fsid " << statfs->fsid
279 << " != " << mon->monmap->fsid << dendl;
280 return true;
281 }
282 const auto& pool = statfs->data_pool;
283 if (pool && !mon->osdmon()->osdmap.have_pg_pool(*pool)) {
284 // There's no error field for MStatfsReply so just ignore the request.
285 // This is known to happen when a client is still accessing a removed fs.
286 dout(1) << __func__ << " on removed pool " << *pool << dendl;
287 return true;
288 }
289 dout(10) << __func__ << " " << *statfs
290 << " from " << statfs->get_orig_source() << dendl;
291 epoch_t ver = get_last_committed();
292 auto reply = new MStatfsReply(statfs->fsid, statfs->get_tid(), ver);
293 reply->h.st = get_statfs(mon->osdmon()->osdmap, pool);
294 mon->send_reply(op, reply);
295 return true;
296 }
297
298 void MgrStatMonitor::check_sub(Subscription *sub)
299 {
300 const auto epoch = mon->monmap->get_epoch();
301 dout(10) << __func__
302 << " next " << sub->next
303 << " have " << epoch << dendl;
304 if (sub->next <= service_map.epoch) {
305 auto m = new MServiceMap(service_map);
306 sub->session->con->send_message(m);
307 if (sub->onetime) {
308 mon->with_session_map([sub](MonSessionMap& session_map) {
309 session_map.remove_sub(sub);
310 });
311 } else {
312 sub->next = epoch + 1;
313 }
314 }
315 }
316
317 void MgrStatMonitor::check_subs()
318 {
319 dout(10) << __func__ << dendl;
320 if (!service_map.epoch) {
321 return;
322 }
323 auto subs = mon->session_map.subs.find("servicemap");
324 if (subs == mon->session_map.subs.end()) {
325 return;
326 }
327 auto p = subs->second->begin();
328 while (!p.end()) {
329 auto sub = *p;
330 ++p;
331 check_sub(sub);
332 }
333 }