1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "messages/MMgrDigest.h"
15 #include "messages/MMonMgrReport.h"
16 #include "messages/MPGStats.h"
18 #include "mgr/ClusterState.h"
20 #include <boost/range/adaptor/reversed.hpp>
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_mgr
25 #define dout_prefix *_dout << "mgr " << __func__ << " "
30 using std::stringstream
;
32 ClusterState::ClusterState(
42 void ClusterState::set_objecter(Objecter
*objecter_
)
44 std::lock_guard
l(lock
);
49 void ClusterState::set_fsmap(FSMap
const &new_fsmap
)
51 std::lock_guard
l(lock
);
56 void ClusterState::set_mgr_map(MgrMap
const &new_mgrmap
)
58 std::lock_guard
l(lock
);
62 void ClusterState::set_service_map(ServiceMap
const &new_service_map
)
64 std::lock_guard
l(lock
);
65 servicemap
= new_service_map
;
68 void ClusterState::load_digest(MMgrDigest
*m
)
70 std::lock_guard
l(lock
);
71 health_json
= std::move(m
->health_json
);
72 mon_status_json
= std::move(m
->mon_status_json
);
75 void ClusterState::ingest_pgstats(ref_t
<MPGStats
> stats
)
77 std::lock_guard
l(lock
);
79 const int from
= stats
->get_orig_source().num();
80 bool is_in
= with_osdmap([from
](const OSDMap
& osdmap
) {
81 return osdmap
.is_in(from
);
85 pending_inc
.update_stat(from
, std::move(stats
->osd_stat
));
87 osd_stat_t empty_stat
;
88 empty_stat
.seq
= stats
->osd_stat
.seq
;
89 pending_inc
.update_stat(from
, std::move(empty_stat
));
92 for (auto p
: stats
->pg_stat
) {
94 const auto &pg_stats
= p
.second
;
96 // In case we're hearing about a PG that according to last
97 // OSDMap update should not exist
98 auto r
= existing_pools
.find(pgid
.pool());
99 if (r
== existing_pools
.end()) {
100 dout(15) << " got " << pgid
101 << " reported at " << pg_stats
.reported_epoch
<< ":"
102 << pg_stats
.reported_seq
103 << " state " << pg_state_string(pg_stats
.state
)
104 << " but pool not in " << existing_pools
108 if (pgid
.ps() >= r
->second
) {
109 dout(15) << " got " << pgid
110 << " reported at " << pg_stats
.reported_epoch
<< ":"
111 << pg_stats
.reported_seq
112 << " state " << pg_state_string(pg_stats
.state
)
113 << " but > pg_num " << r
->second
117 // In case we already heard about more recent stats from this PG
119 const auto q
= pg_map
.pg_stat
.find(pgid
);
120 if (q
!= pg_map
.pg_stat
.end() &&
121 q
->second
.get_version_pair() > pg_stats
.get_version_pair()) {
122 dout(15) << " had " << pgid
<< " from "
123 << q
->second
.reported_epoch
<< ":"
124 << q
->second
.reported_seq
<< dendl
;
128 pending_inc
.pg_stat_updates
[pgid
] = pg_stats
;
130 for (auto p
: stats
->pool_stat
) {
131 pending_inc
.pool_statfs_updates
[std::make_pair(p
.first
, from
)] = p
.second
;
135 void ClusterState::update_delta_stats()
137 pending_inc
.stamp
= ceph_clock_now();
138 pending_inc
.version
= pg_map
.version
+ 1; // to make apply_incremental happy
139 dout(10) << " v" << pending_inc
.version
<< dendl
;
141 dout(30) << " pg_map before:\n";
142 JSONFormatter
jf(true);
143 jf
.dump_object("pg_map", pg_map
);
146 dout(30) << " incremental:\n";
147 JSONFormatter
jf(true);
148 jf
.dump_object("pending_inc", pending_inc
);
151 pg_map
.apply_incremental(g_ceph_context
, pending_inc
);
152 pending_inc
= PGMap::Incremental();
155 void ClusterState::notify_osdmap(const OSDMap
&osd_map
)
157 assert(ceph_mutex_is_locked(lock
));
159 pending_inc
.stamp
= ceph_clock_now();
160 pending_inc
.version
= pg_map
.version
+ 1; // to make apply_incremental happy
161 dout(10) << " v" << pending_inc
.version
<< dendl
;
163 PGMapUpdater::check_osd_map(g_ceph_context
, osd_map
, pg_map
, &pending_inc
);
165 // update our list of pools that exist, so that we can filter pg_map updates
166 // in synchrony with this OSDMap.
167 existing_pools
.clear();
168 for (auto& p
: osd_map
.get_pools()) {
169 existing_pools
[p
.first
] = p
.second
.get_pg_num();
172 // brute force this for now (don't bother being clever by only
173 // checking osds that went up/down)
174 set
<int> need_check_down_pg_osds
;
175 PGMapUpdater::check_down_pgs(osd_map
, pg_map
, true,
176 need_check_down_pg_osds
, &pending_inc
);
178 dout(30) << " pg_map before:\n";
179 JSONFormatter
jf(true);
180 jf
.dump_object("pg_map", pg_map
);
183 dout(30) << " incremental:\n";
184 JSONFormatter
jf(true);
185 jf
.dump_object("pending_inc", pending_inc
);
189 pg_map
.apply_incremental(g_ceph_context
, pending_inc
);
190 pending_inc
= PGMap::Incremental();
191 // TODO: Complete the separation of PG state handling so
192 // that a cut-down set of functionality remains in PGMonitor
193 // while the full-blown PGMap lives only here.
196 class ClusterSocketHook
: public AdminSocketHook
{
197 ClusterState
*cluster_state
;
199 explicit ClusterSocketHook(ClusterState
*o
) : cluster_state(o
) {}
200 int call(std::string_view admin_command
, const cmdmap_t
& cmdmap
,
203 bufferlist
& out
) override
{
207 r
= cluster_state
->asok_command(admin_command
, cmdmap
, f
, outss
);
209 } catch (const TOPNSPC::common::bad_cmd_get
& e
) {
217 void ClusterState::final_init()
219 AdminSocket
*admin_socket
= g_ceph_context
->get_admin_socket();
220 asok_hook
= new ClusterSocketHook(this);
221 int r
= admin_socket
->register_command(
222 "dump_osd_network name=value,type=CephInt,req=false", asok_hook
,
223 "Dump osd heartbeat network ping times");
227 void ClusterState::shutdown()
229 // unregister commands
230 g_ceph_context
->get_admin_socket()->unregister_commands(asok_hook
);
235 bool ClusterState::asok_command(
236 std::string_view admin_command
,
237 const cmdmap_t
& cmdmap
,
241 std::lock_guard
l(lock
);
242 if (admin_command
== "dump_osd_network") {
244 // Default to health warning level if nothing specified
245 if (!(TOPNSPC::common::cmd_getval(cmdmap
, "value", value
))) {
246 // Convert milliseconds to microseconds
247 value
= static_cast<int64_t>(g_ceph_context
->_conf
.get_val
<double>("mon_warn_on_slow_ping_time")) * 1000;
249 double ratio
= g_conf().get_val
<double>("mon_warn_on_slow_ping_ratio");
250 value
= g_conf().get_val
<int64_t>("osd_heartbeat_grace");
251 value
*= 1000000 * ratio
; // Seconds of grace to microseconds at ratio
254 // Convert user input to microseconds
260 struct mgr_ping_time_t
{
265 std::array
<uint32_t,3> times
;
266 std::array
<uint32_t,3> min
;
267 std::array
<uint32_t,3> max
;
269 uint32_t last_update
;
271 bool operator<(const mgr_ping_time_t
& rhs
) const {
272 if (pingtime
< rhs
.pingtime
)
274 if (pingtime
> rhs
.pingtime
)
288 set
<mgr_ping_time_t
> sorted
;
289 utime_t now
= ceph_clock_now();
290 for (auto i
: pg_map
.osd_stat
) {
291 for (auto j
: i
.second
.hb_pingtime
) {
293 if (j
.second
.last_update
== 0)
295 auto stale_time
= g_ceph_context
->_conf
.get_val
<int64_t>("osd_mon_heartbeat_stat_stale");
296 if (now
.sec() - j
.second
.last_update
> stale_time
) {
297 dout(20) << __func__
<< " time out heartbeat for osd " << i
.first
298 << " last_update " << j
.second
.last_update
<< dendl
;
301 mgr_ping_time_t item
;
302 item
.pingtime
= std::max(j
.second
.back_pingtime
[0], j
.second
.back_pingtime
[1]);
303 item
.pingtime
= std::max(item
.pingtime
, j
.second
.back_pingtime
[2]);
304 if (!value
|| item
.pingtime
>= value
) {
307 item
.times
[0] = j
.second
.back_pingtime
[0];
308 item
.times
[1] = j
.second
.back_pingtime
[1];
309 item
.times
[2] = j
.second
.back_pingtime
[2];
310 item
.min
[0] = j
.second
.back_min
[0];
311 item
.min
[1] = j
.second
.back_min
[1];
312 item
.min
[2] = j
.second
.back_min
[2];
313 item
.max
[0] = j
.second
.back_max
[0];
314 item
.max
[1] = j
.second
.back_max
[1];
315 item
.max
[2] = j
.second
.back_max
[2];
316 item
.last
= j
.second
.back_last
;
318 item
.last_update
= j
.second
.last_update
;
319 sorted
.emplace(item
);
322 if (j
.second
.front_last
== 0)
324 item
.pingtime
= std::max(j
.second
.front_pingtime
[0], j
.second
.front_pingtime
[1]);
325 item
.pingtime
= std::max(item
.pingtime
, j
.second
.front_pingtime
[2]);
326 if (!value
|| item
.pingtime
>= value
) {
329 item
.times
[0] = j
.second
.front_pingtime
[0];
330 item
.times
[1] = j
.second
.front_pingtime
[1];
331 item
.times
[2] = j
.second
.front_pingtime
[2];
332 item
.min
[0] = j
.second
.front_min
[0];
333 item
.min
[1] = j
.second
.front_min
[1];
334 item
.min
[2] = j
.second
.front_min
[2];
335 item
.max
[0] = j
.second
.front_max
[0];
336 item
.max
[1] = j
.second
.front_max
[1];
337 item
.max
[2] = j
.second
.front_max
[2];
338 item
.last
= j
.second
.front_last
;
340 item
.last_update
= j
.second
.last_update
;
341 sorted
.emplace(item
);
346 // Network ping times (1min 5min 15min)
347 f
->open_object_section("network_ping_times");
348 f
->dump_int("threshold", value
/ 1000);
349 f
->open_array_section("entries");
350 for (auto &sitem
: boost::adaptors::reverse(sorted
)) {
351 ceph_assert(!value
|| sitem
.pingtime
>= value
);
353 f
->open_object_section("entry");
355 const time_t lu(sitem
.last_update
);
357 string
lustr(ctime_r(&lu
, buffer
));
358 lustr
.pop_back(); // Remove trailing \n
359 auto stale
= g_ceph_context
->_conf
.get_val
<int64_t>("osd_heartbeat_stale");
360 f
->dump_string("last update", lustr
);
361 f
->dump_bool("stale", ceph_clock_now().sec() - sitem
.last_update
> stale
);
362 f
->dump_int("from osd", sitem
.from
);
363 f
->dump_int("to osd", sitem
.to
);
364 f
->dump_string("interface", (sitem
.back
? "back" : "front"));
365 f
->open_object_section("average");
366 f
->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem
.times
[0],3).c_str());
367 f
->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem
.times
[1],3).c_str());
368 f
->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem
.times
[2],3).c_str());
369 f
->close_section(); // average
370 f
->open_object_section("min");
371 f
->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem
.min
[0],3).c_str());
372 f
->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem
.min
[1],3).c_str());
373 f
->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem
.min
[2],3).c_str());
374 f
->close_section(); // min
375 f
->open_object_section("max");
376 f
->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem
.max
[0],3).c_str());
377 f
->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem
.max
[1],3).c_str());
378 f
->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem
.max
[2],3).c_str());
379 f
->close_section(); // max
380 f
->dump_format_unquoted("last", "%s", fixed_u_to_string(sitem
.last
,3).c_str());
381 f
->close_section(); // entry
383 f
->close_section(); // entries
384 f
->close_section(); // network_ping_times
386 ceph_abort_msg("broken asok registration");