1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "messages/MMgrDigest.h"
15 #include "messages/MMonMgrReport.h"
16 #include "messages/MPGStats.h"
18 #include "mgr/ClusterState.h"
20 #include <boost/range/adaptor/reversed.hpp>
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_mgr
25 #define dout_prefix *_dout << "mgr " << __func__ << " "
27 ClusterState::ClusterState(
37 void ClusterState::set_objecter(Objecter
*objecter_
)
39 std::lock_guard
l(lock
);
44 void ClusterState::set_fsmap(FSMap
const &new_fsmap
)
46 std::lock_guard
l(lock
);
51 void ClusterState::set_mgr_map(MgrMap
const &new_mgrmap
)
53 std::lock_guard
l(lock
);
57 void ClusterState::set_service_map(ServiceMap
const &new_service_map
)
59 std::lock_guard
l(lock
);
60 servicemap
= new_service_map
;
63 void ClusterState::load_digest(MMgrDigest
*m
)
65 health_json
= std::move(m
->health_json
);
66 mon_status_json
= std::move(m
->mon_status_json
);
69 void ClusterState::ingest_pgstats(ref_t
<MPGStats
> stats
)
71 std::lock_guard
l(lock
);
73 const int from
= stats
->get_orig_source().num();
74 pending_inc
.update_stat(from
, std::move(stats
->osd_stat
));
76 for (auto p
: stats
->pg_stat
) {
78 const auto &pg_stats
= p
.second
;
80 // In case we're hearing about a PG that according to last
81 // OSDMap update should not exist
82 auto r
= existing_pools
.find(pgid
.pool());
83 if (r
== existing_pools
.end()) {
84 dout(15) << " got " << pgid
85 << " reported at " << pg_stats
.reported_epoch
<< ":"
86 << pg_stats
.reported_seq
87 << " state " << pg_state_string(pg_stats
.state
)
88 << " but pool not in " << existing_pools
92 if (pgid
.ps() >= r
->second
) {
93 dout(15) << " got " << pgid
94 << " reported at " << pg_stats
.reported_epoch
<< ":"
95 << pg_stats
.reported_seq
96 << " state " << pg_state_string(pg_stats
.state
)
97 << " but > pg_num " << r
->second
101 // In case we already heard about more recent stats from this PG
103 const auto q
= pg_map
.pg_stat
.find(pgid
);
104 if (q
!= pg_map
.pg_stat
.end() &&
105 q
->second
.get_version_pair() > pg_stats
.get_version_pair()) {
106 dout(15) << " had " << pgid
<< " from "
107 << q
->second
.reported_epoch
<< ":"
108 << q
->second
.reported_seq
<< dendl
;
112 pending_inc
.pg_stat_updates
[pgid
] = pg_stats
;
114 for (auto p
: stats
->pool_stat
) {
115 pending_inc
.pool_statfs_updates
[std::make_pair(p
.first
, from
)] = p
.second
;
119 void ClusterState::update_delta_stats()
121 pending_inc
.stamp
= ceph_clock_now();
122 pending_inc
.version
= pg_map
.version
+ 1; // to make apply_incremental happy
123 dout(10) << " v" << pending_inc
.version
<< dendl
;
125 dout(30) << " pg_map before:\n";
126 JSONFormatter
jf(true);
127 jf
.dump_object("pg_map", pg_map
);
130 dout(30) << " incremental:\n";
131 JSONFormatter
jf(true);
132 jf
.dump_object("pending_inc", pending_inc
);
135 pg_map
.apply_incremental(g_ceph_context
, pending_inc
);
136 pending_inc
= PGMap::Incremental();
139 void ClusterState::notify_osdmap(const OSDMap
&osd_map
)
141 assert(ceph_mutex_is_locked(lock
));
143 pending_inc
.stamp
= ceph_clock_now();
144 pending_inc
.version
= pg_map
.version
+ 1; // to make apply_incremental happy
145 dout(10) << " v" << pending_inc
.version
<< dendl
;
147 PGMapUpdater::check_osd_map(g_ceph_context
, osd_map
, pg_map
, &pending_inc
);
149 // update our list of pools that exist, so that we can filter pg_map updates
150 // in synchrony with this OSDMap.
151 existing_pools
.clear();
152 for (auto& p
: osd_map
.get_pools()) {
153 existing_pools
[p
.first
] = p
.second
.get_pg_num();
156 // brute force this for now (don't bother being clever by only
157 // checking osds that went up/down)
158 set
<int> need_check_down_pg_osds
;
159 PGMapUpdater::check_down_pgs(osd_map
, pg_map
, true,
160 need_check_down_pg_osds
, &pending_inc
);
162 dout(30) << " pg_map before:\n";
163 JSONFormatter
jf(true);
164 jf
.dump_object("pg_map", pg_map
);
167 dout(30) << " incremental:\n";
168 JSONFormatter
jf(true);
169 jf
.dump_object("pending_inc", pending_inc
);
173 pg_map
.apply_incremental(g_ceph_context
, pending_inc
);
174 pending_inc
= PGMap::Incremental();
175 // TODO: Complete the separation of PG state handling so
176 // that a cut-down set of functionality remains in PGMonitor
177 // while the full-blown PGMap lives only here.
180 class ClusterSocketHook
: public AdminSocketHook
{
181 ClusterState
*cluster_state
;
183 explicit ClusterSocketHook(ClusterState
*o
) : cluster_state(o
) {}
184 int call(std::string_view admin_command
, const cmdmap_t
& cmdmap
,
187 bufferlist
& out
) override
{
191 r
= cluster_state
->asok_command(admin_command
, cmdmap
, f
, outss
);
193 } catch (const TOPNSPC::common::bad_cmd_get
& e
) {
201 void ClusterState::final_init()
203 AdminSocket
*admin_socket
= g_ceph_context
->get_admin_socket();
204 asok_hook
= new ClusterSocketHook(this);
205 int r
= admin_socket
->register_command(
206 "dump_osd_network name=value,type=CephInt,req=false", asok_hook
,
207 "Dump osd heartbeat network ping times");
211 void ClusterState::shutdown()
213 // unregister commands
214 g_ceph_context
->get_admin_socket()->unregister_commands(asok_hook
);
219 bool ClusterState::asok_command(
220 std::string_view admin_command
,
221 const cmdmap_t
& cmdmap
,
225 std::lock_guard
l(lock
);
226 if (admin_command
== "dump_osd_network") {
228 // Default to health warning level if nothing specified
229 if (!(TOPNSPC::common::cmd_getval(cmdmap
, "value", value
))) {
230 // Convert milliseconds to microseconds
231 value
= static_cast<int64_t>(g_ceph_context
->_conf
.get_val
<double>("mon_warn_on_slow_ping_time")) * 1000;
233 double ratio
= g_conf().get_val
<double>("mon_warn_on_slow_ping_ratio");
234 value
= g_conf().get_val
<int64_t>("osd_heartbeat_grace");
235 value
*= 1000000 * ratio
; // Seconds of grace to microseconds at ratio
238 // Convert user input to microseconds
244 struct mgr_ping_time_t
{
249 std::array
<uint32_t,3> times
;
250 std::array
<uint32_t,3> min
;
251 std::array
<uint32_t,3> max
;
253 uint32_t last_update
;
255 bool operator<(const mgr_ping_time_t
& rhs
) const {
256 if (pingtime
< rhs
.pingtime
)
258 if (pingtime
> rhs
.pingtime
)
272 set
<mgr_ping_time_t
> sorted
;
273 utime_t now
= ceph_clock_now();
274 for (auto i
: pg_map
.osd_stat
) {
275 for (auto j
: i
.second
.hb_pingtime
) {
277 if (j
.second
.last_update
== 0)
279 auto stale_time
= g_ceph_context
->_conf
.get_val
<int64_t>("osd_mon_heartbeat_stat_stale");
280 if (now
.sec() - j
.second
.last_update
> stale_time
) {
281 dout(20) << __func__
<< " time out heartbeat for osd " << i
.first
282 << " last_update " << j
.second
.last_update
<< dendl
;
285 mgr_ping_time_t item
;
286 item
.pingtime
= std::max(j
.second
.back_pingtime
[0], j
.second
.back_pingtime
[1]);
287 item
.pingtime
= std::max(item
.pingtime
, j
.second
.back_pingtime
[2]);
288 if (!value
|| item
.pingtime
>= value
) {
291 item
.times
[0] = j
.second
.back_pingtime
[0];
292 item
.times
[1] = j
.second
.back_pingtime
[1];
293 item
.times
[2] = j
.second
.back_pingtime
[2];
294 item
.min
[0] = j
.second
.back_min
[0];
295 item
.min
[1] = j
.second
.back_min
[1];
296 item
.min
[2] = j
.second
.back_min
[2];
297 item
.max
[0] = j
.second
.back_max
[0];
298 item
.max
[1] = j
.second
.back_max
[1];
299 item
.max
[2] = j
.second
.back_max
[2];
300 item
.last
= j
.second
.back_last
;
302 item
.last_update
= j
.second
.last_update
;
303 sorted
.emplace(item
);
306 if (j
.second
.front_last
== 0)
308 item
.pingtime
= std::max(j
.second
.front_pingtime
[0], j
.second
.front_pingtime
[1]);
309 item
.pingtime
= std::max(item
.pingtime
, j
.second
.front_pingtime
[2]);
310 if (!value
|| item
.pingtime
>= value
) {
313 item
.times
[0] = j
.second
.front_pingtime
[0];
314 item
.times
[1] = j
.second
.front_pingtime
[1];
315 item
.times
[2] = j
.second
.front_pingtime
[2];
316 item
.min
[0] = j
.second
.front_min
[0];
317 item
.min
[1] = j
.second
.front_min
[1];
318 item
.min
[2] = j
.second
.front_min
[2];
319 item
.max
[0] = j
.second
.front_max
[0];
320 item
.max
[1] = j
.second
.front_max
[1];
321 item
.max
[2] = j
.second
.front_max
[2];
322 item
.last
= j
.second
.front_last
;
324 item
.last_update
= j
.second
.last_update
;
325 sorted
.emplace(item
);
330 // Network ping times (1min 5min 15min)
331 f
->open_object_section("network_ping_times");
332 f
->dump_int("threshold", value
/ 1000);
333 f
->open_array_section("entries");
334 for (auto &sitem
: boost::adaptors::reverse(sorted
)) {
335 ceph_assert(!value
|| sitem
.pingtime
>= value
);
337 f
->open_object_section("entry");
339 const time_t lu(sitem
.last_update
);
341 string
lustr(ctime_r(&lu
, buffer
));
342 lustr
.pop_back(); // Remove trailing \n
343 auto stale
= g_ceph_context
->_conf
.get_val
<int64_t>("osd_heartbeat_stale");
344 f
->dump_string("last update", lustr
);
345 f
->dump_bool("stale", ceph_clock_now().sec() - sitem
.last_update
> stale
);
346 f
->dump_int("from osd", sitem
.from
);
347 f
->dump_int("to osd", sitem
.to
);
348 f
->dump_string("interface", (sitem
.back
? "back" : "front"));
349 f
->open_object_section("average");
350 f
->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem
.times
[0],3).c_str());
351 f
->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem
.times
[1],3).c_str());
352 f
->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem
.times
[2],3).c_str());
353 f
->close_section(); // average
354 f
->open_object_section("min");
355 f
->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem
.min
[0],3).c_str());
356 f
->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem
.min
[1],3).c_str());
357 f
->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem
.min
[2],3).c_str());
358 f
->close_section(); // min
359 f
->open_object_section("max");
360 f
->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem
.max
[0],3).c_str());
361 f
->dump_format_unquoted("5min", "%s", fixed_u_to_string(sitem
.max
[1],3).c_str());
362 f
->dump_format_unquoted("15min", "%s", fixed_u_to_string(sitem
.max
[2],3).c_str());
363 f
->close_section(); // max
364 f
->dump_format_unquoted("last", "%s", fixed_u_to_string(sitem
.last
,3).c_str());
365 f
->close_section(); // entry
367 f
->close_section(); // entries
368 f
->close_section(); // network_ping_times
370 ceph_abort_msg("broken asok registration");