]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/ClusterState.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mgr / ClusterState.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "messages/MMgrDigest.h"
15 #include "messages/MMonMgrReport.h"
16 #include "messages/MPGStats.h"
17
18 #include "mgr/ClusterState.h"
19 #include <time.h>
20 #include <boost/range/adaptor/reversed.hpp>
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_mgr
24 #undef dout_prefix
25 #define dout_prefix *_dout << "mgr " << __func__ << " "
26
27 using std::ostream;
28 using std::set;
29 using std::string;
30 using std::stringstream;
31
32 ClusterState::ClusterState(
33 MonClient *monc_,
34 Objecter *objecter_,
35 const MgrMap& mgrmap)
36 : monc(monc_),
37 objecter(objecter_),
38 mgr_map(mgrmap),
39 asok_hook(NULL)
40 {}
41
42 void ClusterState::set_objecter(Objecter *objecter_)
43 {
44 std::lock_guard l(lock);
45
46 objecter = objecter_;
47 }
48
49 void ClusterState::set_fsmap(FSMap const &new_fsmap)
50 {
51 std::lock_guard l(lock);
52
53 fsmap = new_fsmap;
54 }
55
56 void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
57 {
58 std::lock_guard l(lock);
59 mgr_map = new_mgrmap;
60 }
61
62 void ClusterState::set_service_map(ServiceMap const &new_service_map)
63 {
64 std::lock_guard l(lock);
65 servicemap = new_service_map;
66 }
67
68 void ClusterState::load_digest(MMgrDigest *m)
69 {
70 std::lock_guard l(lock);
71 health_json = std::move(m->health_json);
72 mon_status_json = std::move(m->mon_status_json);
73 }
74
75 void ClusterState::ingest_pgstats(ref_t<MPGStats> stats)
76 {
77 std::lock_guard l(lock);
78
79 const int from = stats->get_orig_source().num();
80 bool is_in = with_osdmap([from](const OSDMap& osdmap) {
81 return osdmap.is_in(from);
82 });
83
84 if (is_in) {
85 pending_inc.update_stat(from, std::move(stats->osd_stat));
86 } else {
87 osd_stat_t empty_stat;
88 empty_stat.seq = stats->osd_stat.seq;
89 pending_inc.update_stat(from, std::move(empty_stat));
90 }
91
92 for (auto p : stats->pg_stat) {
93 pg_t pgid = p.first;
94 const auto &pg_stats = p.second;
95
96 // In case we're hearing about a PG that according to last
97 // OSDMap update should not exist
98 auto r = existing_pools.find(pgid.pool());
99 if (r == existing_pools.end()) {
100 dout(15) << " got " << pgid
101 << " reported at " << pg_stats.reported_epoch << ":"
102 << pg_stats.reported_seq
103 << " state " << pg_state_string(pg_stats.state)
104 << " but pool not in " << existing_pools
105 << dendl;
106 continue;
107 }
108 if (pgid.ps() >= r->second) {
109 dout(15) << " got " << pgid
110 << " reported at " << pg_stats.reported_epoch << ":"
111 << pg_stats.reported_seq
112 << " state " << pg_state_string(pg_stats.state)
113 << " but > pg_num " << r->second
114 << dendl;
115 continue;
116 }
117 // In case we already heard about more recent stats from this PG
118 // from another OSD
119 const auto q = pg_map.pg_stat.find(pgid);
120 if (q != pg_map.pg_stat.end() &&
121 q->second.get_version_pair() > pg_stats.get_version_pair()) {
122 dout(15) << " had " << pgid << " from "
123 << q->second.reported_epoch << ":"
124 << q->second.reported_seq << dendl;
125 continue;
126 }
127
128 pending_inc.pg_stat_updates[pgid] = pg_stats;
129 }
130 for (auto p : stats->pool_stat) {
131 pending_inc.pool_statfs_updates[std::make_pair(p.first, from)] = p.second;
132 }
133 }
134
135 void ClusterState::update_delta_stats()
136 {
137 pending_inc.stamp = ceph_clock_now();
138 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
139 dout(10) << " v" << pending_inc.version << dendl;
140
141 dout(30) << " pg_map before:\n";
142 JSONFormatter jf(true);
143 jf.dump_object("pg_map", pg_map);
144 jf.flush(*_dout);
145 *_dout << dendl;
146 dout(30) << " incremental:\n";
147 JSONFormatter jf(true);
148 jf.dump_object("pending_inc", pending_inc);
149 jf.flush(*_dout);
150 *_dout << dendl;
151 pg_map.apply_incremental(g_ceph_context, pending_inc);
152 pending_inc = PGMap::Incremental();
153 }
154
155 void ClusterState::notify_osdmap(const OSDMap &osd_map)
156 {
157 assert(ceph_mutex_is_locked(lock));
158
159 pending_inc.stamp = ceph_clock_now();
160 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
161 dout(10) << " v" << pending_inc.version << dendl;
162
163 PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
164
165 // update our list of pools that exist, so that we can filter pg_map updates
166 // in synchrony with this OSDMap.
167 existing_pools.clear();
168 for (auto& p : osd_map.get_pools()) {
169 existing_pools[p.first] = p.second.get_pg_num();
170 }
171
172 // brute force this for now (don't bother being clever by only
173 // checking osds that went up/down)
174 set<int> need_check_down_pg_osds;
175 PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
176 need_check_down_pg_osds, &pending_inc);
177
178 dout(30) << " pg_map before:\n";
179 JSONFormatter jf(true);
180 jf.dump_object("pg_map", pg_map);
181 jf.flush(*_dout);
182 *_dout << dendl;
183 dout(30) << " incremental:\n";
184 JSONFormatter jf(true);
185 jf.dump_object("pending_inc", pending_inc);
186 jf.flush(*_dout);
187 *_dout << dendl;
188
189 pg_map.apply_incremental(g_ceph_context, pending_inc);
190 pending_inc = PGMap::Incremental();
191 // TODO: Complete the separation of PG state handling so
192 // that a cut-down set of functionality remains in PGMonitor
193 // while the full-blown PGMap lives only here.
194 }
195
196 class ClusterSocketHook : public AdminSocketHook {
197 ClusterState *cluster_state;
198 public:
199 explicit ClusterSocketHook(ClusterState *o) : cluster_state(o) {}
200 int call(std::string_view admin_command, const cmdmap_t& cmdmap,
201 Formatter *f,
202 std::ostream& errss,
203 bufferlist& out) override {
204 stringstream outss;
205 int r = 0;
206 try {
207 r = cluster_state->asok_command(admin_command, cmdmap, f, outss);
208 out.append(outss);
209 } catch (const TOPNSPC::common::bad_cmd_get& e) {
210 errss << e.what();
211 r = -EINVAL;
212 }
213 return r;
214 }
215 };
216
217 void ClusterState::final_init()
218 {
219 AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
220 asok_hook = new ClusterSocketHook(this);
221 int r = admin_socket->register_command(
222 "dump_osd_network name=value,type=CephInt,req=false", asok_hook,
223 "Dump osd heartbeat network ping times");
224 ceph_assert(r == 0);
225 }
226
227 void ClusterState::shutdown()
228 {
229 // unregister commands
230 g_ceph_context->get_admin_socket()->unregister_commands(asok_hook);
231 delete asok_hook;
232 asok_hook = NULL;
233 }
234
235 bool ClusterState::asok_command(
236 std::string_view admin_command,
237 const cmdmap_t& cmdmap,
238 Formatter *f,
239 ostream& ss)
240 {
241 std::lock_guard l(lock);
242 if (admin_command == "dump_osd_network") {
243 int64_t value = 0;
244 // Default to health warning level if nothing specified
245 if (!(TOPNSPC::common::cmd_getval(cmdmap, "value", value))) {
246 // Convert milliseconds to microseconds
247 value = static_cast<int64_t>(g_ceph_context->_conf.get_val<double>("mon_warn_on_slow_ping_time")) * 1000;
248 if (value == 0) {
249 double ratio = g_conf().get_val<double>("mon_warn_on_slow_ping_ratio");
250 value = g_conf().get_val<int64_t>("osd_heartbeat_grace");
251 value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio
252 }
253 } else {
254 // Convert user input to microseconds
255 value *= 1000;
256 }
257 if (value < 0)
258 value = 0;
259
260 struct mgr_ping_time_t {
261 uint32_t pingtime;
262 int from;
263 int to;
264 bool back;
265 std::array<uint32_t,3> times;
266 std::array<uint32_t,3> min;
267 std::array<uint32_t,3> max;
268 uint32_t last;
269 uint32_t last_update;
270
271 bool operator<(const mgr_ping_time_t& rhs) const {
272 if (pingtime < rhs.pingtime)
273 return true;
274 if (pingtime > rhs.pingtime)
275 return false;
276 if (from < rhs.from)
277 return true;
278 if (from > rhs.from)
279 return false;
280 if (to < rhs.to)
281 return true;
282 if (to > rhs.to)
283 return false;
284 return back;
285 }
286 };
287
288 set<mgr_ping_time_t> sorted;
289 utime_t now = ceph_clock_now();
290 for (auto i : pg_map.osd_stat) {
291 for (auto j : i.second.hb_pingtime) {
292
293 if (j.second.last_update == 0)
294 continue;
295 auto stale_time = g_ceph_context->_conf.get_val<int64_t>("osd_mon_heartbeat_stat_stale");
296 if (now.sec() - j.second.last_update > stale_time) {
297 dout(20) << __func__ << " time out heartbeat for osd " << i.first
298 << " last_update " << j.second.last_update << dendl;
299 continue;
300 }
301 mgr_ping_time_t item;
302 item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]);
303 item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]);
304 if (!value || item.pingtime >= value) {
305 item.from = i.first;
306 item.to = j.first;
307 item.times[0] = j.second.back_pingtime[0];
308 item.times[1] = j.second.back_pingtime[1];
309 item.times[2] = j.second.back_pingtime[2];
310 item.min[0] = j.second.back_min[0];
311 item.min[1] = j.second.back_min[1];
312 item.min[2] = j.second.back_min[2];
313 item.max[0] = j.second.back_max[0];
314 item.max[1] = j.second.back_max[1];
315 item.max[2] = j.second.back_max[2];
316 item.last = j.second.back_last;
317 item.back = true;
318 item.last_update = j.second.last_update;
319 sorted.emplace(item);
320 }
321
322 if (j.second.front_last == 0)
323 continue;
324 item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]);
325 item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]);
326 if (!value || item.pingtime >= value) {
327 item.from = i.first;
328 item.to = j.first;
329 item.times[0] = j.second.front_pingtime[0];
330 item.times[1] = j.second.front_pingtime[1];
331 item.times[2] = j.second.front_pingtime[2];
332 item.min[0] = j.second.front_min[0];
333 item.min[1] = j.second.front_min[1];
334 item.min[2] = j.second.front_min[2];
335 item.max[0] = j.second.front_max[0];
336 item.max[1] = j.second.front_max[1];
337 item.max[2] = j.second.front_max[2];
338 item.last = j.second.front_last;
339 item.back = false;
340 item.last_update = j.second.last_update;
341 sorted.emplace(item);
342 }
343 }
344 }
345
346 // Network ping times (1min 5min 15min)
347 f->open_object_section("network_ping_times");
348 f->dump_int("threshold", value / 1000);
349 f->open_array_section("entries");
350 for (auto &sitem : boost::adaptors::reverse(sorted)) {
351 ceph_assert(!value || sitem.pingtime >= value);
352
353 f->open_object_section("entry");
354
355 const time_t lu(sitem.last_update);
356 char buffer[26];
357 string lustr(ctime_r(&lu, buffer));
358 lustr.pop_back(); // Remove trailing \n
359 auto stale = g_ceph_context->_conf.get_val<int64_t>("osd_heartbeat_stale");
360 f->dump_string("last update", lustr);
361 f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale);
362 f->dump_int("from osd", sitem.from);
363 f->dump_int("to osd", sitem.to);
364 f->dump_string("interface", (sitem.back ? "back" : "front"));
365 f->open_object_section("average");
366 f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.times[0],3).c_str());
367 f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.times[1],3).c_str());
368 f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.times[2],3).c_str());
369 f->close_section(); // average
370 f->open_object_section("min");
371 f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.min[0],3).c_str());
372 f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.min[1],3).c_str());
373 f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.min[2],3).c_str());
374 f->close_section(); // min
375 f->open_object_section("max");
376 f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.max[0],3).c_str());
377 f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.max[1],3).c_str());
378 f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.max[2],3).c_str());
379 f->close_section(); // max
380 f->dump_format_unquoted("last", "%s", fixed_u_to_string(sitem.last,3).c_str());
381 f->close_section(); // entry
382 }
383 f->close_section(); // entries
384 f->close_section(); // network_ping_times
385 } else {
386 ceph_abort_msg("broken asok registration");
387 }
388 return true;
389 }