]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/ClusterState.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / mgr / ClusterState.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#include "messages/MMgrDigest.h"
31f18b77 15#include "messages/MMonMgrReport.h"
7c673cae
FG
16#include "messages/MPGStats.h"
17
18#include "mgr/ClusterState.h"
eafe8130
TL
19#include <time.h>
20#include <boost/range/adaptor/reversed.hpp>
7c673cae
FG
21
22#define dout_context g_ceph_context
23#define dout_subsys ceph_subsys_mgr
24#undef dout_prefix
25#define dout_prefix *_dout << "mgr " << __func__ << " "
26
20effc67
TL
27using std::ostream;
28using std::set;
29using std::string;
30using std::stringstream;
31
224ce89b
WB
32ClusterState::ClusterState(
33 MonClient *monc_,
34 Objecter *objecter_,
35 const MgrMap& mgrmap)
36 : monc(monc_),
37 objecter(objecter_),
eafe8130
TL
38 mgr_map(mgrmap),
39 asok_hook(NULL)
7c673cae
FG
40{}
41
42void ClusterState::set_objecter(Objecter *objecter_)
43{
11fdf7f2 44 std::lock_guard l(lock);
7c673cae
FG
45
46 objecter = objecter_;
47}
48
49void ClusterState::set_fsmap(FSMap const &new_fsmap)
50{
11fdf7f2 51 std::lock_guard l(lock);
7c673cae
FG
52
53 fsmap = new_fsmap;
54}
55
224ce89b
WB
56void ClusterState::set_mgr_map(MgrMap const &new_mgrmap)
57{
11fdf7f2 58 std::lock_guard l(lock);
224ce89b
WB
59 mgr_map = new_mgrmap;
60}
61
62void ClusterState::set_service_map(ServiceMap const &new_service_map)
63{
11fdf7f2 64 std::lock_guard l(lock);
224ce89b
WB
65 servicemap = new_service_map;
66}
67
7c673cae
FG
68void ClusterState::load_digest(MMgrDigest *m)
69{
e306af50 70 std::lock_guard l(lock);
7c673cae
FG
71 health_json = std::move(m->health_json);
72 mon_status_json = std::move(m->mon_status_json);
73}
74
9f95a23c 75void ClusterState::ingest_pgstats(ref_t<MPGStats> stats)
7c673cae 76{
11fdf7f2 77 std::lock_guard l(lock);
7c673cae
FG
78
79 const int from = stats->get_orig_source().num();
adb31ebb
TL
80 bool is_in = with_osdmap([from](const OSDMap& osdmap) {
81 return osdmap.is_in(from);
82 });
83
84 if (is_in) {
85 pending_inc.update_stat(from, std::move(stats->osd_stat));
86 } else {
87 osd_stat_t empty_stat;
88 empty_stat.seq = stats->osd_stat.seq;
89 pending_inc.update_stat(from, std::move(empty_stat));
90 }
7c673cae
FG
91
92 for (auto p : stats->pg_stat) {
93 pg_t pgid = p.first;
94 const auto &pg_stats = p.second;
95
96 // In case we're hearing about a PG that according to last
97 // OSDMap update should not exist
11fdf7f2
TL
98 auto r = existing_pools.find(pgid.pool());
99 if (r == existing_pools.end()) {
31f18b77
FG
100 dout(15) << " got " << pgid
101 << " reported at " << pg_stats.reported_epoch << ":"
7c673cae
FG
102 << pg_stats.reported_seq
103 << " state " << pg_state_string(pg_stats.state)
31f18b77 104 << " but pool not in " << existing_pools
7c673cae
FG
105 << dendl;
106 continue;
31f18b77 107 }
11fdf7f2
TL
108 if (pgid.ps() >= r->second) {
109 dout(15) << " got " << pgid
110 << " reported at " << pg_stats.reported_epoch << ":"
111 << pg_stats.reported_seq
112 << " state " << pg_state_string(pg_stats.state)
113 << " but > pg_num " << r->second
114 << dendl;
115 continue;
116 }
31f18b77
FG
117 // In case we already heard about more recent stats from this PG
118 // from another OSD
224ce89b
WB
119 const auto q = pg_map.pg_stat.find(pgid);
120 if (q != pg_map.pg_stat.end() &&
121 q->second.get_version_pair() > pg_stats.get_version_pair()) {
31f18b77 122 dout(15) << " had " << pgid << " from "
224ce89b 123 << q->second.reported_epoch << ":"
11fdf7f2 124 << q->second.reported_seq << dendl;
7c673cae
FG
125 continue;
126 }
127
128 pending_inc.pg_stat_updates[pgid] = pg_stats;
129 }
11fdf7f2
TL
130 for (auto p : stats->pool_stat) {
131 pending_inc.pool_statfs_updates[std::make_pair(p.first, from)] = p.second;
132 }
31f18b77
FG
133}
134
135void ClusterState::update_delta_stats()
136{
137 pending_inc.stamp = ceph_clock_now();
138 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
139 dout(10) << " v" << pending_inc.version << dendl;
140
141 dout(30) << " pg_map before:\n";
142 JSONFormatter jf(true);
143 jf.dump_object("pg_map", pg_map);
144 jf.flush(*_dout);
145 *_dout << dendl;
146 dout(30) << " incremental:\n";
147 JSONFormatter jf(true);
148 jf.dump_object("pending_inc", pending_inc);
149 jf.flush(*_dout);
150 *_dout << dendl;
7c673cae 151 pg_map.apply_incremental(g_ceph_context, pending_inc);
31f18b77 152 pending_inc = PGMap::Incremental();
7c673cae
FG
153}
154
155void ClusterState::notify_osdmap(const OSDMap &osd_map)
156{
11fdf7f2 157 assert(ceph_mutex_is_locked(lock));
7c673cae 158
31f18b77 159 pending_inc.stamp = ceph_clock_now();
7c673cae 160 pending_inc.version = pg_map.version + 1; // to make apply_incremental happy
31f18b77 161 dout(10) << " v" << pending_inc.version << dendl;
7c673cae 162
31f18b77
FG
163 PGMapUpdater::check_osd_map(g_ceph_context, osd_map, pg_map, &pending_inc);
164
165 // update our list of pools that exist, so that we can filter pg_map updates
166 // in synchrony with this OSDMap.
167 existing_pools.clear();
168 for (auto& p : osd_map.get_pools()) {
11fdf7f2 169 existing_pools[p.first] = p.second.get_pg_num();
31f18b77 170 }
7c673cae
FG
171
172 // brute force this for now (don't bother being clever by only
173 // checking osds that went up/down)
174 set<int> need_check_down_pg_osds;
175 PGMapUpdater::check_down_pgs(osd_map, pg_map, true,
176 need_check_down_pg_osds, &pending_inc);
177
31f18b77
FG
178 dout(30) << " pg_map before:\n";
179 JSONFormatter jf(true);
180 jf.dump_object("pg_map", pg_map);
181 jf.flush(*_dout);
182 *_dout << dendl;
183 dout(30) << " incremental:\n";
184 JSONFormatter jf(true);
185 jf.dump_object("pending_inc", pending_inc);
186 jf.flush(*_dout);
187 *_dout << dendl;
7c673cae 188
31f18b77
FG
189 pg_map.apply_incremental(g_ceph_context, pending_inc);
190 pending_inc = PGMap::Incremental();
7c673cae
FG
191 // TODO: Complete the separation of PG state handling so
192 // that a cut-down set of functionality remains in PGMonitor
193 // while the full-blown PGMap lives only here.
194}
eafe8130
TL
195
196class ClusterSocketHook : public AdminSocketHook {
197 ClusterState *cluster_state;
198public:
199 explicit ClusterSocketHook(ClusterState *o) : cluster_state(o) {}
9f95a23c 200 int call(std::string_view admin_command, const cmdmap_t& cmdmap,
39ae355f 201 const bufferlist&,
9f95a23c
TL
202 Formatter *f,
203 std::ostream& errss,
204 bufferlist& out) override {
205 stringstream outss;
206 int r = 0;
eafe8130 207 try {
9f95a23c
TL
208 r = cluster_state->asok_command(admin_command, cmdmap, f, outss);
209 out.append(outss);
210 } catch (const TOPNSPC::common::bad_cmd_get& e) {
211 errss << e.what();
212 r = -EINVAL;
eafe8130 213 }
eafe8130
TL
214 return r;
215 }
216};
217
218void ClusterState::final_init()
219{
220 AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
221 asok_hook = new ClusterSocketHook(this);
9f95a23c
TL
222 int r = admin_socket->register_command(
223 "dump_osd_network name=value,type=CephInt,req=false", asok_hook,
224 "Dump osd heartbeat network ping times");
eafe8130
TL
225 ceph_assert(r == 0);
226}
227
228void ClusterState::shutdown()
229{
230 // unregister commands
231 g_ceph_context->get_admin_socket()->unregister_commands(asok_hook);
232 delete asok_hook;
233 asok_hook = NULL;
234}
235
9f95a23c
TL
236bool ClusterState::asok_command(
237 std::string_view admin_command,
238 const cmdmap_t& cmdmap,
239 Formatter *f,
240 ostream& ss)
eafe8130
TL
241{
242 std::lock_guard l(lock);
39ae355f 243
eafe8130
TL
244 if (admin_command == "dump_osd_network") {
245 int64_t value = 0;
246 // Default to health warning level if nothing specified
9f95a23c 247 if (!(TOPNSPC::common::cmd_getval(cmdmap, "value", value))) {
eafe8130
TL
248 // Convert milliseconds to microseconds
249 value = static_cast<int64_t>(g_ceph_context->_conf.get_val<double>("mon_warn_on_slow_ping_time")) * 1000;
250 if (value == 0) {
251 double ratio = g_conf().get_val<double>("mon_warn_on_slow_ping_ratio");
252 value = g_conf().get_val<int64_t>("osd_heartbeat_grace");
253 value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio
254 }
255 } else {
256 // Convert user input to microseconds
257 value *= 1000;
258 }
259 if (value < 0)
260 value = 0;
261
262 struct mgr_ping_time_t {
263 uint32_t pingtime;
264 int from;
265 int to;
266 bool back;
267 std::array<uint32_t,3> times;
268 std::array<uint32_t,3> min;
269 std::array<uint32_t,3> max;
270 uint32_t last;
271 uint32_t last_update;
272
273 bool operator<(const mgr_ping_time_t& rhs) const {
274 if (pingtime < rhs.pingtime)
275 return true;
276 if (pingtime > rhs.pingtime)
277 return false;
278 if (from < rhs.from)
279 return true;
280 if (from > rhs.from)
281 return false;
282 if (to < rhs.to)
283 return true;
284 if (to > rhs.to)
285 return false;
286 return back;
287 }
288 };
289
290 set<mgr_ping_time_t> sorted;
291 utime_t now = ceph_clock_now();
292 for (auto i : pg_map.osd_stat) {
293 for (auto j : i.second.hb_pingtime) {
294
295 if (j.second.last_update == 0)
296 continue;
297 auto stale_time = g_ceph_context->_conf.get_val<int64_t>("osd_mon_heartbeat_stat_stale");
298 if (now.sec() - j.second.last_update > stale_time) {
299 dout(20) << __func__ << " time out heartbeat for osd " << i.first
300 << " last_update " << j.second.last_update << dendl;
301 continue;
302 }
303 mgr_ping_time_t item;
304 item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]);
305 item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]);
306 if (!value || item.pingtime >= value) {
307 item.from = i.first;
308 item.to = j.first;
309 item.times[0] = j.second.back_pingtime[0];
310 item.times[1] = j.second.back_pingtime[1];
311 item.times[2] = j.second.back_pingtime[2];
312 item.min[0] = j.second.back_min[0];
313 item.min[1] = j.second.back_min[1];
314 item.min[2] = j.second.back_min[2];
315 item.max[0] = j.second.back_max[0];
316 item.max[1] = j.second.back_max[1];
317 item.max[2] = j.second.back_max[2];
318 item.last = j.second.back_last;
319 item.back = true;
320 item.last_update = j.second.last_update;
321 sorted.emplace(item);
322 }
323
324 if (j.second.front_last == 0)
325 continue;
326 item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]);
327 item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]);
328 if (!value || item.pingtime >= value) {
329 item.from = i.first;
330 item.to = j.first;
331 item.times[0] = j.second.front_pingtime[0];
332 item.times[1] = j.second.front_pingtime[1];
333 item.times[2] = j.second.front_pingtime[2];
334 item.min[0] = j.second.front_min[0];
335 item.min[1] = j.second.front_min[1];
336 item.min[2] = j.second.front_min[2];
337 item.max[0] = j.second.front_max[0];
338 item.max[1] = j.second.front_max[1];
339 item.max[2] = j.second.front_max[2];
340 item.last = j.second.front_last;
341 item.back = false;
342 item.last_update = j.second.last_update;
343 sorted.emplace(item);
344 }
345 }
346 }
347
348 // Network ping times (1min 5min 15min)
349 f->open_object_section("network_ping_times");
350 f->dump_int("threshold", value / 1000);
351 f->open_array_section("entries");
352 for (auto &sitem : boost::adaptors::reverse(sorted)) {
353 ceph_assert(!value || sitem.pingtime >= value);
354
355 f->open_object_section("entry");
356
357 const time_t lu(sitem.last_update);
358 char buffer[26];
359 string lustr(ctime_r(&lu, buffer));
360 lustr.pop_back(); // Remove trailing \n
361 auto stale = g_ceph_context->_conf.get_val<int64_t>("osd_heartbeat_stale");
362 f->dump_string("last update", lustr);
363 f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale);
364 f->dump_int("from osd", sitem.from);
365 f->dump_int("to osd", sitem.to);
366 f->dump_string("interface", (sitem.back ? "back" : "front"));
367 f->open_object_section("average");
368 f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.times[0],3).c_str());
369 f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.times[1],3).c_str());
370 f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.times[2],3).c_str());
371 f->close_section(); // average
372 f->open_object_section("min");
373 f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.min[0],3).c_str());
374 f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.min[1],3).c_str());
375 f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.min[2],3).c_str());
376 f->close_section(); // min
377 f->open_object_section("max");
378 f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.max[0],3).c_str());
379 f->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem.max[1],3).c_str());
380 f->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem.max[2],3).c_str());
381 f->close_section(); // max
382 f->dump_format_unquoted("last", "%s", fixed_u_to_string(sitem.last,3).c_str());
383 f->close_section(); // entry
384 }
385 f->close_section(); // entries
386 f->close_section(); // network_ping_times
387 } else {
388 ceph_abort_msg("broken asok registration");
389 }
eafe8130
TL
390 return true;
391}