]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/ClusterState.cc
import ceph 14.2.5
[ceph.git] / ceph / src / mgr / ClusterState.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "messages/MMgrDigest.h"
15 #include "messages/MMonMgrReport.h"
16 #include "messages/MPGStats.h"
17
18 #include "mgr/ClusterState.h"
19 #include <time.h>
20 #include <boost/range/adaptor/reversed.hpp>
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_mgr
24 #undef dout_prefix
25 #define dout_prefix *_dout << "mgr " << __func__ << " "
26
27 ClusterState::ClusterState(
28 MonClient *monc_,
29 Objecter *objecter_,
30 const MgrMap& mgrmap)
31 : monc(monc_),
32 objecter(objecter_),
33 lock("ClusterState"),
34 mgr_map(mgrmap),
35 asok_hook(NULL)
36 {}
37
38 void ClusterState::set_objecter(Objecter *objecter_)
39 {
40 std::lock_guard l(lock);
41
42 objecter = objecter_;
43 }
44
45 void ClusterState::set_fsmap(FSMap const &new_fsmap)
46 {
47 std::lock_guard l(lock);
48
49 fsmap = new_fsmap;
50 }
51
52 void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
53 {
54 std::lock_guard l(lock);
55 mgr_map = new_mgrmap;
56 }
57
58 void ClusterState::set_service_map(ServiceMap const &new_service_map)
59 {
60 std::lock_guard l(lock);
61 servicemap = new_service_map;
62 }
63
64 void ClusterState::load_digest(MMgrDigest *m)
65 {
66 health_json = std::move(m->health_json);
67 mon_status_json = std::move(m->mon_status_json);
68 }
69
70 void ClusterState::ingest_pgstats(MPGStats *stats)
71 {
72 std::lock_guard l(lock);
73
74 const int from = stats->get_orig_source().num();
75 pending_inc.update_stat(from, std::move(stats->osd_stat));
76
77 for (auto p : stats->pg_stat) {
78 pg_t pgid = p.first;
79 const auto &pg_stats = p.second;
80
81 // In case we're hearing about a PG that according to last
82 // OSDMap update should not exist
83 auto r = existing_pools.find(pgid.pool());
84 if (r == existing_pools.end()) {
85 dout(15) << " got " << pgid
86 << " reported at " << pg_stats.reported_epoch << ":"
87 << pg_stats.reported_seq
88 << " state " << pg_state_string(pg_stats.state)
89 << " but pool not in " << existing_pools
90 << dendl;
91 continue;
92 }
93 if (pgid.ps() >= r->second) {
94 dout(15) << " got " << pgid
95 << " reported at " << pg_stats.reported_epoch << ":"
96 << pg_stats.reported_seq
97 << " state " << pg_state_string(pg_stats.state)
98 << " but > pg_num " << r->second
99 << dendl;
100 continue;
101 }
102 // In case we already heard about more recent stats from this PG
103 // from another OSD
104 const auto q = pg_map.pg_stat.find(pgid);
105 if (q != pg_map.pg_stat.end() &&
106 q->second.get_version_pair() > pg_stats.get_version_pair()) {
107 dout(15) << " had " << pgid << " from "
108 << q->second.reported_epoch << ":"
109 << q->second.reported_seq << dendl;
110 continue;
111 }
112
113 pending_inc.pg_stat_updates[pgid] = pg_stats;
114 }
115 for (auto p : stats->pool_stat) {
116 pending_inc.pool_statfs_updates[std::make_pair(p.first, from)] = p.second;
117 }
118 }
119
120 void ClusterState::update_delta_stats()
121 {
122 pending_inc.stamp = ceph_clock_now();
123 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
124 dout(10) << " v" << pending_inc.version << dendl;
125
126 dout(30) << " pg_map before:\n";
127 JSONFormatter jf(true);
128 jf.dump_object("pg_map", pg_map);
129 jf.flush(*_dout);
130 *_dout << dendl;
131 dout(30) << " incremental:\n";
132 JSONFormatter jf(true);
133 jf.dump_object("pending_inc", pending_inc);
134 jf.flush(*_dout);
135 *_dout << dendl;
136 pg_map.apply_incremental(g_ceph_context, pending_inc);
137 pending_inc = PGMap::Incremental();
138 }
139
140 void ClusterState::notify_osdmap(const OSDMap &osd_map)
141 {
142 assert(ceph_mutex_is_locked(lock));
143
144 pending_inc.stamp = ceph_clock_now();
145 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
146 dout(10) << " v" << pending_inc.version << dendl;
147
148 PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
149
150 // update our list of pools that exist, so that we can filter pg_map updates
151 // in synchrony with this OSDMap.
152 existing_pools.clear();
153 for (auto& p : osd_map.get_pools()) {
154 existing_pools[p.first] = p.second.get_pg_num();
155 }
156
157 // brute force this for now (don't bother being clever by only
158 // checking osds that went up/down)
159 set<int> need_check_down_pg_osds;
160 PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
161 need_check_down_pg_osds, &pending_inc);
162
163 dout(30) << " pg_map before:\n";
164 JSONFormatter jf(true);
165 jf.dump_object("pg_map", pg_map);
166 jf.flush(*_dout);
167 *_dout << dendl;
168 dout(30) << " incremental:\n";
169 JSONFormatter jf(true);
170 jf.dump_object("pending_inc", pending_inc);
171 jf.flush(*_dout);
172 *_dout << dendl;
173
174 pg_map.apply_incremental(g_ceph_context, pending_inc);
175 pending_inc = PGMap::Incremental();
176 // TODO: Complete the separation of PG state handling so
177 // that a cut-down set of functionality remains in PGMonitor
178 // while the full-blown PGMap lives only here.
179 }
180
181 class ClusterSocketHook : public AdminSocketHook {
182 ClusterState *cluster_state;
183 public:
184 explicit ClusterSocketHook(ClusterState *o) : cluster_state(o) {}
185 bool call(std::string_view admin_command, const cmdmap_t& cmdmap,
186 std::string_view format, bufferlist& out) override {
187 stringstream ss;
188 bool r = true;
189 try {
190 r = cluster_state->asok_command(admin_command, cmdmap, format, ss);
191 } catch (const bad_cmd_get& e) {
192 ss << e.what();
193 r = true;
194 }
195 out.append(ss);
196 return r;
197 }
198 };
199
200 void ClusterState::final_init()
201 {
202 AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
203 asok_hook = new ClusterSocketHook(this);
204 int r = admin_socket->register_command("dump_osd_network",
205 "dump_osd_network name=value,type=CephInt,req=false", asok_hook,
206 "Dump osd heartbeat network ping times");
207 ceph_assert(r == 0);
208 }
209
210 void ClusterState::shutdown()
211 {
212 // unregister commands
213 g_ceph_context->get_admin_socket()->unregister_commands(asok_hook);
214 delete asok_hook;
215 asok_hook = NULL;
216 }
217
218 bool ClusterState::asok_command(std::string_view admin_command, const cmdmap_t& cmdmap,
219 std::string_view format, ostream& ss)
220 {
221 std::lock_guard l(lock);
222 Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
223 if (admin_command == "dump_osd_network") {
224 int64_t value = 0;
225 // Default to health warning level if nothing specified
226 if (!(cmd_getval(g_ceph_context, cmdmap, "value", value))) {
227 // Convert milliseconds to microseconds
228 value = static_cast<int64_t>(g_ceph_context->_conf.get_val<double>("mon_warn_on_slow_ping_time")) * 1000;
229 if (value == 0) {
230 double ratio = g_conf().get_val<double>("mon_warn_on_slow_ping_ratio");
231 value = g_conf().get_val<int64_t>("osd_heartbeat_grace");
232 value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio
233 }
234 } else {
235 // Convert user input to microseconds
236 value *= 1000;
237 }
238 if (value < 0)
239 value = 0;
240
241 struct mgr_ping_time_t {
242 uint32_t pingtime;
243 int from;
244 int to;
245 bool back;
246 std::array<uint32_t,3> times;
247 std::array<uint32_t,3> min;
248 std::array<uint32_t,3> max;
249 uint32_t last;
250 uint32_t last_update;
251
252 bool operator<(const mgr_ping_time_t& rhs) const {
253 if (pingtime < rhs.pingtime)
254 return true;
255 if (pingtime > rhs.pingtime)
256 return false;
257 if (from < rhs.from)
258 return true;
259 if (from > rhs.from)
260 return false;
261 if (to < rhs.to)
262 return true;
263 if (to > rhs.to)
264 return false;
265 return back;
266 }
267 };
268
269 set<mgr_ping_time_t> sorted;
270 utime_t now = ceph_clock_now();
271 for (auto i : pg_map.osd_stat) {
272 for (auto j : i.second.hb_pingtime) {
273
274 if (j.second.last_update == 0)
275 continue;
276 auto stale_time = g_ceph_context->_conf.get_val<int64_t>("osd_mon_heartbeat_stat_stale");
277 if (now.sec() - j.second.last_update > stale_time) {
278 dout(20) << __func__ << " time out heartbeat for osd " << i.first
279 << " last_update " << j.second.last_update << dendl;
280 continue;
281 }
282 mgr_ping_time_t item;
283 item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]);
284 item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]);
285 if (!value || item.pingtime >= value) {
286 item.from = i.first;
287 item.to = j.first;
288 item.times[0] = j.second.back_pingtime[0];
289 item.times[1] = j.second.back_pingtime[1];
290 item.times[2] = j.second.back_pingtime[2];
291 item.min[0] = j.second.back_min[0];
292 item.min[1] = j.second.back_min[1];
293 item.min[2] = j.second.back_min[2];
294 item.max[0] = j.second.back_max[0];
295 item.max[1] = j.second.back_max[1];
296 item.max[2] = j.second.back_max[2];
297 item.last = j.second.back_last;
298 item.back = true;
299 item.last_update = j.second.last_update;
300 sorted.emplace(item);
301 }
302
303 if (j.second.front_last == 0)
304 continue;
305 item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]);
306 item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]);
307 if (!value || item.pingtime >= value) {
308 item.from = i.first;
309 item.to = j.first;
310 item.times[0] = j.second.front_pingtime[0];
311 item.times[1] = j.second.front_pingtime[1];
312 item.times[2] = j.second.front_pingtime[2];
313 item.min[0] = j.second.front_min[0];
314 item.min[1] = j.second.front_min[1];
315 item.min[2] = j.second.front_min[2];
316 item.max[0] = j.second.front_max[0];
317 item.max[1] = j.second.front_max[1];
318 item.max[2] = j.second.front_max[2];
319 item.last = j.second.front_last;
320 item.back = false;
321 item.last_update = j.second.last_update;
322 sorted.emplace(item);
323 }
324 }
325 }
326
327 // Network ping times (1min 5min 15min)
328 f->open_object_section("network_ping_times");
329 f->dump_int("threshold", value / 1000);
330 f->open_array_section("entries");
331 for (auto &sitem : boost::adaptors::reverse(sorted)) {
332 ceph_assert(!value || sitem.pingtime >= value);
333
334 f->open_object_section("entry");
335
336 const time_t lu(sitem.last_update);
337 char buffer[26];
338 string lustr(ctime_r(&lu, buffer));
339 lustr.pop_back(); // Remove trailing \n
340 auto stale = g_ceph_context->_conf.get_val<int64_t>("osd_heartbeat_stale");
341 f->dump_string("last update", lustr);
342 f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale);
343 f->dump_int("from osd", sitem.from);
344 f->dump_int("to osd", sitem.to);
345 f->dump_string("interface", (sitem.back ? "back" : "front"));
346 f->open_object_section("average");
347 f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.times[0],3).c_str());
348 f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.times[1],3).c_str());
349 f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.times[2],3).c_str());
350 f->close_section(); // average
351 f->open_object_section("min");
352 f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.min[0],3).c_str());
353 f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.min[1],3).c_str());
354 f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.min[2],3).c_str());
355 f->close_section(); // min
356 f->open_object_section("max");
357 f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.max[0],3).c_str());
358 f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.max[1],3).c_str());
359 f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.max[2],3).c_str());
360 f->close_section(); // max
361 f->dump_format_unquoted("last", "%s", fixed_u_to_string(sitem.last,3).c_str());
362 f->close_section(); // entry
363 }
364 f->close_section(); // entries
365 f->close_section(); // network_ping_times
366 } else {
367 ceph_abort_msg("broken asok registration");
368 }
369 f->flush(ss);
370 delete f;
371 return true;
372 }