]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MgrStatMonitor.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / mon / MgrStatMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "MgrStatMonitor.h"
5 #include "mon/OSDMonitor.h"
6 #include "mon/MgrMonitor.h"
7 #include "mon/PGMap.h"
8 #include "messages/MGetPoolStats.h"
9 #include "messages/MGetPoolStatsReply.h"
10 #include "messages/MMonMgrReport.h"
11 #include "messages/MStatfs.h"
12 #include "messages/MStatfsReply.h"
13 #include "messages/MServiceMap.h"
14
15 #include "include/ceph_assert.h" // re-clobber assert
16
17 #define dout_subsys ceph_subsys_mon
18 #undef dout_prefix
19 #define dout_prefix _prefix(_dout, mon)
20
21 using std::dec;
22 using std::hex;
23 using std::list;
24 using std::map;
25 using std::make_pair;
26 using std::ostream;
27 using std::ostringstream;
28 using std::pair;
29 using std::set;
30 using std::string;
31 using std::stringstream;
32 using std::to_string;
33 using std::vector;
34
35 using ceph::bufferlist;
36 using ceph::decode;
37 using ceph::encode;
38 using ceph::ErasureCodeInterfaceRef;
39 using ceph::ErasureCodeProfile;
40 using ceph::Formatter;
41 using ceph::JSONFormatter;
42 using ceph::make_message;
43 using ceph::mono_clock;
44 using ceph::mono_time;
45
46 static ostream& _prefix(std::ostream *_dout, Monitor &mon) {
47 return *_dout << "mon." << mon.name << "@" << mon.rank
48 << "(" << mon.get_state_name()
49 << ").mgrstat ";
50 }
51
52 MgrStatMonitor::MgrStatMonitor(Monitor &mn, Paxos &p, const string& service_name)
53 : PaxosService(mn, p, service_name)
54 {
55 }
56
57 MgrStatMonitor::~MgrStatMonitor() = default;
58
59 void MgrStatMonitor::create_initial()
60 {
61 dout(10) << __func__ << dendl;
62 version = 0;
63 service_map.epoch = 1;
64 service_map.modified = ceph_clock_now();
65 pending_service_map_bl.clear();
66 encode(service_map, pending_service_map_bl, CEPH_FEATURES_ALL);
67 }
68
69 void MgrStatMonitor::update_from_paxos(bool *need_bootstrap)
70 {
71 version = get_last_committed();
72 dout(10) << " " << version << dendl;
73 load_health();
74 bufferlist bl;
75 get_version(version, bl);
76 if (version) {
77 ceph_assert(bl.length());
78 try {
79 auto p = bl.cbegin();
80 decode(digest, p);
81 decode(service_map, p);
82 if (!p.end()) {
83 decode(progress_events, p);
84 }
85 dout(10) << __func__ << " v" << version
86 << " service_map e" << service_map.epoch
87 << " " << progress_events.size() << " progress events"
88 << dendl;
89 }
90 catch (ceph::buffer::error& e) {
91 derr << "failed to decode mgrstat state; luminous dev version? "
92 << e.what() << dendl;
93 }
94 }
95 check_subs();
96 update_logger();
97 mon.osdmon()->notify_new_pg_digest();
98 }
99
100 void MgrStatMonitor::update_logger()
101 {
102 dout(20) << __func__ << dendl;
103
104 mon.cluster_logger->set(l_cluster_osd_bytes, digest.osd_sum.statfs.total);
105 mon.cluster_logger->set(l_cluster_osd_bytes_used,
106 digest.osd_sum.statfs.get_used_raw());
107 mon.cluster_logger->set(l_cluster_osd_bytes_avail,
108 digest.osd_sum.statfs.available);
109
110 mon.cluster_logger->set(l_cluster_num_pool, digest.pg_pool_sum.size());
111 uint64_t num_pg = 0;
112 for (auto i : digest.num_pg_by_pool) {
113 num_pg += i.second;
114 }
115 mon.cluster_logger->set(l_cluster_num_pg, num_pg);
116
117 unsigned active = 0, active_clean = 0, peering = 0;
118 for (auto p = digest.num_pg_by_state.begin();
119 p != digest.num_pg_by_state.end();
120 ++p) {
121 if (p->first & PG_STATE_ACTIVE) {
122 active += p->second;
123 if (p->first & PG_STATE_CLEAN)
124 active_clean += p->second;
125 }
126 if (p->first & PG_STATE_PEERING)
127 peering += p->second;
128 }
129 mon.cluster_logger->set(l_cluster_num_pg_active_clean, active_clean);
130 mon.cluster_logger->set(l_cluster_num_pg_active, active);
131 mon.cluster_logger->set(l_cluster_num_pg_peering, peering);
132
133 mon.cluster_logger->set(l_cluster_num_object, digest.pg_sum.stats.sum.num_objects);
134 mon.cluster_logger->set(l_cluster_num_object_degraded, digest.pg_sum.stats.sum.num_objects_degraded);
135 mon.cluster_logger->set(l_cluster_num_object_misplaced, digest.pg_sum.stats.sum.num_objects_misplaced);
136 mon.cluster_logger->set(l_cluster_num_object_unfound, digest.pg_sum.stats.sum.num_objects_unfound);
137 mon.cluster_logger->set(l_cluster_num_bytes, digest.pg_sum.stats.sum.num_bytes);
138
139 }
140
141 void MgrStatMonitor::create_pending()
142 {
143 dout(10) << " " << version << dendl;
144 pending_digest = digest;
145 pending_health_checks = get_health_checks();
146 pending_service_map_bl.clear();
147 encode(service_map, pending_service_map_bl, mon.get_quorum_con_features());
148 }
149
150 void MgrStatMonitor::encode_pending(MonitorDBStore::TransactionRef t)
151 {
152 ++version;
153 dout(10) << " " << version << dendl;
154 bufferlist bl;
155 encode(pending_digest, bl, mon.get_quorum_con_features());
156 ceph_assert(pending_service_map_bl.length());
157 bl.append(pending_service_map_bl);
158 encode(pending_progress_events, bl);
159 put_version(t, version, bl);
160 put_last_committed(t, version);
161
162 encode_health(pending_health_checks, t);
163 }
164
165 version_t MgrStatMonitor::get_trim_to() const
166 {
167 // we don't actually need *any* old states, but keep a few.
168 if (version > 5) {
169 return version - 5;
170 }
171 return 0;
172 }
173
174 void MgrStatMonitor::on_active()
175 {
176 update_logger();
177 }
178
179 void MgrStatMonitor::tick()
180 {
181 }
182
183 bool MgrStatMonitor::preprocess_query(MonOpRequestRef op)
184 {
185 auto m = op->get_req<PaxosServiceMessage>();
186 switch (m->get_type()) {
187 case CEPH_MSG_STATFS:
188 return preprocess_statfs(op);
189 case MSG_MON_MGR_REPORT:
190 return preprocess_report(op);
191 case MSG_GETPOOLSTATS:
192 return preprocess_getpoolstats(op);
193 default:
194 mon.no_reply(op);
195 derr << "Unhandled message type " << m->get_type() << dendl;
196 return true;
197 }
198 }
199
200 bool MgrStatMonitor::prepare_update(MonOpRequestRef op)
201 {
202 auto m = op->get_req<PaxosServiceMessage>();
203 switch (m->get_type()) {
204 case MSG_MON_MGR_REPORT:
205 return prepare_report(op);
206 default:
207 mon.no_reply(op);
208 derr << "Unhandled message type " << m->get_type() << dendl;
209 return true;
210 }
211 }
212
213 bool MgrStatMonitor::preprocess_report(MonOpRequestRef op)
214 {
215 auto m = op->get_req<MMonMgrReport>();
216 mon.no_reply(op);
217 if (m->gid &&
218 m->gid != mon.mgrmon()->get_map().get_active_gid()) {
219 dout(10) << "ignoring report from non-active mgr " << m->gid
220 << dendl;
221 return true;
222 }
223 return false;
224 }
225
226 bool MgrStatMonitor::prepare_report(MonOpRequestRef op)
227 {
228 auto m = op->get_req<MMonMgrReport>();
229 bufferlist bl = m->get_data();
230 auto p = bl.cbegin();
231 decode(pending_digest, p);
232 pending_health_checks.swap(m->health_checks);
233 if (m->service_map_bl.length()) {
234 pending_service_map_bl.swap(m->service_map_bl);
235 }
236 pending_progress_events.swap(m->progress_events);
237 dout(10) << __func__ << " " << pending_digest << ", "
238 << pending_health_checks.checks.size() << " health checks, "
239 << progress_events.size() << " progress events" << dendl;
240 dout(20) << "pending_digest:\n";
241 JSONFormatter jf(true);
242 jf.open_object_section("pending_digest");
243 pending_digest.dump(&jf);
244 jf.close_section();
245 jf.flush(*_dout);
246 *_dout << dendl;
247 dout(20) << "health checks:\n";
248 JSONFormatter jf(true);
249 jf.open_object_section("health_checks");
250 pending_health_checks.dump(&jf);
251 jf.close_section();
252 jf.flush(*_dout);
253 *_dout << dendl;
254 dout(20) << "progress events:\n";
255 JSONFormatter jf(true);
256 jf.open_object_section("progress_events");
257 for (auto& i : pending_progress_events) {
258 jf.dump_object(i.first.c_str(), i.second);
259 }
260 jf.close_section();
261 jf.flush(*_dout);
262 *_dout << dendl;
263 return true;
264 }
265
266 bool MgrStatMonitor::preprocess_getpoolstats(MonOpRequestRef op)
267 {
268 op->mark_pgmon_event(__func__);
269 auto m = op->get_req<MGetPoolStats>();
270 auto session = op->get_session();
271 if (!session)
272 return true;
273 if (!session->is_capable("pg", MON_CAP_R)) {
274 dout(0) << "MGetPoolStats received from entity with insufficient caps "
275 << session->caps << dendl;
276 return true;
277 }
278 if (m->fsid != mon.monmap->fsid) {
279 dout(0) << __func__ << " on fsid "
280 << m->fsid << " != " << mon.monmap->fsid << dendl;
281 return true;
282 }
283 epoch_t ver = get_last_committed();
284 auto reply = new MGetPoolStatsReply(m->fsid, m->get_tid(), ver);
285 reply->per_pool = digest.use_per_pool_stats();
286 for (const auto& pool_name : m->pools) {
287 const auto pool_id = mon.osdmon()->osdmap.lookup_pg_pool_name(pool_name);
288 if (pool_id == -ENOENT)
289 continue;
290 auto pool_stat = get_pool_stat(pool_id);
291 if (!pool_stat)
292 continue;
293 reply->pool_stats[pool_name] = *pool_stat;
294 }
295 mon.send_reply(op, reply);
296 return true;
297 }
298
299 bool MgrStatMonitor::preprocess_statfs(MonOpRequestRef op)
300 {
301 op->mark_pgmon_event(__func__);
302 auto statfs = op->get_req<MStatfs>();
303 auto session = op->get_session();
304
305 if (!session)
306 return true;
307 if (!session->is_capable("pg", MON_CAP_R)) {
308 dout(0) << "MStatfs received from entity with insufficient privileges "
309 << session->caps << dendl;
310 return true;
311 }
312 if (statfs->fsid != mon.monmap->fsid) {
313 dout(0) << __func__ << " on fsid " << statfs->fsid
314 << " != " << mon.monmap->fsid << dendl;
315 return true;
316 }
317 const auto& pool = statfs->data_pool;
318 if (pool && !mon.osdmon()->osdmap.have_pg_pool(*pool)) {
319 // There's no error field for MStatfsReply so just ignore the request.
320 // This is known to happen when a client is still accessing a removed fs.
321 dout(1) << __func__ << " on removed pool " << *pool << dendl;
322 return true;
323 }
324 dout(10) << __func__ << " " << *statfs
325 << " from " << statfs->get_orig_source() << dendl;
326 epoch_t ver = get_last_committed();
327 auto reply = new MStatfsReply(statfs->fsid, statfs->get_tid(), ver);
328 reply->h.st = get_statfs(mon.osdmon()->osdmap, pool);
329 mon.send_reply(op, reply);
330 return true;
331 }
332
333 void MgrStatMonitor::check_sub(Subscription *sub)
334 {
335 dout(10) << __func__
336 << " next " << sub->next
337 << " vs service_map.epoch " << service_map.epoch << dendl;
338 if (sub->next <= service_map.epoch) {
339 auto m = new MServiceMap(service_map);
340 sub->session->con->send_message(m);
341 if (sub->onetime) {
342 mon.with_session_map([sub](MonSessionMap& session_map) {
343 session_map.remove_sub(sub);
344 });
345 } else {
346 sub->next = service_map.epoch + 1;
347 }
348 }
349 }
350
351 void MgrStatMonitor::check_subs()
352 {
353 dout(10) << __func__ << dendl;
354 if (!service_map.epoch) {
355 return;
356 }
357 auto subs = mon.session_map.subs.find("servicemap");
358 if (subs == mon.session_map.subs.end()) {
359 return;
360 }
361 auto p = subs->second->begin();
362 while (!p.end()) {
363 auto sub = *p;
364 ++p;
365 check_sub(sub);
366 }
367 }