1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonState.h"
16 #include "MgrSession.h"
17 #include "include/stringify.h"
18 #include "common/Formatter.h"
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_mgr
23 #define dout_prefix *_dout << "mgr " << __func__ << " "
25 void DeviceState::set_metadata(map
<string
,string
>&& m
)
27 metadata
= std::move(m
);
28 auto p
= metadata
.find("life_expectancy_min");
29 if (p
!= metadata
.end()) {
30 life_expectancy
.first
.parse(p
->second
);
32 p
= metadata
.find("life_expectancy_max");
33 if (p
!= metadata
.end()) {
34 life_expectancy
.second
.parse(p
->second
);
36 p
= metadata
.find("life_expectancy_stamp");
37 if (p
!= metadata
.end()) {
38 life_expectancy_stamp
.parse(p
->second
);
42 void DeviceState::set_life_expectancy(utime_t from
, utime_t to
, utime_t now
)
44 life_expectancy
= make_pair(from
, to
);
45 life_expectancy_stamp
= now
;
46 if (from
!= utime_t()) {
47 metadata
["life_expectancy_min"] = from
;
49 metadata
["life_expectancy_min"] = "";
51 if (to
!= utime_t()) {
52 metadata
["life_expectancy_max"] = to
;
54 metadata
["life_expectancy_max"] = "";
56 if (now
!= utime_t()) {
57 metadata
["life_expectancy_stamp"] = stringify(now
);
59 metadata
["life_expectancy_stamp"] = "";
63 void DeviceState::rm_life_expectancy()
65 life_expectancy
= make_pair(utime_t(), utime_t());
66 life_expectancy_stamp
= utime_t();
67 metadata
.erase("life_expectancy_min");
68 metadata
.erase("life_expectancy_max");
69 metadata
.erase("life_expectancy_stamp");
72 string
DeviceState::get_life_expectancy_str(utime_t now
) const
74 if (life_expectancy
.first
== utime_t()) {
77 if (now
>= life_expectancy
.first
) {
80 utime_t min
= life_expectancy
.first
- now
;
81 utime_t max
= life_expectancy
.second
- now
;
82 if (life_expectancy
.second
== utime_t()) {
83 return string(">") + timespan_str(make_timespan(min
));
85 string a
= timespan_str(make_timespan(min
));
86 string b
= timespan_str(make_timespan(max
));
90 return a
+ " to " + b
;
93 void DeviceState::dump(Formatter
*f
) const
95 f
->dump_string("devid", devid
);
96 f
->open_array_section("location");
97 for (auto& i
: devnames
) {
98 f
->open_object_section("attachment");
99 f
->dump_string("host", i
.first
);
100 f
->dump_string("dev", i
.second
);
104 f
->open_array_section("daemons");
105 for (auto& i
: daemons
) {
106 f
->dump_string("daemon", to_string(i
));
109 if (life_expectancy
.first
!= utime_t()) {
110 f
->dump_stream("life_expectancy_min") << life_expectancy
.first
;
111 f
->dump_stream("life_expectancy_max") << life_expectancy
.second
;
112 f
->dump_stream("life_expectancy_stamp")
113 << life_expectancy_stamp
;
117 void DeviceState::print(ostream
& out
) const
119 out
<< "device " << devid
<< "\n";
120 for (auto& i
: devnames
) {
121 out
<< "attachment " << i
.first
<< ":" << i
.second
<< "\n";
124 for (auto& j
: daemons
) {
125 d
.insert(to_string(j
));
127 out
<< "daemons " << d
<< "\n";
128 if (life_expectancy
.first
!= utime_t()) {
129 out
<< "life_expectancy " << life_expectancy
.first
<< " to "
130 << life_expectancy
.second
131 << " (as of " << life_expectancy_stamp
<< ")\n";
135 void DaemonStateIndex::insert(DaemonStatePtr dm
)
137 RWLock::WLocker
l(lock
);
141 void DaemonStateIndex::_insert(DaemonStatePtr dm
)
143 if (all
.count(dm
->key
)) {
147 by_server
[dm
->hostname
][dm
->key
] = dm
;
150 for (auto& i
: dm
->devices
) {
151 auto d
= _get_or_create_device(i
.first
);
152 d
->daemons
.insert(dm
->key
);
153 d
->devnames
.insert(make_pair(dm
->hostname
, i
.second
));
157 void DaemonStateIndex::_erase(const DaemonKey
& dmk
)
159 ceph_assert(lock
.is_wlocked());
161 const auto to_erase
= all
.find(dmk
);
162 ceph_assert(to_erase
!= all
.end());
163 const auto dm
= to_erase
->second
;
165 for (auto& i
: dm
->devices
) {
166 auto d
= _get_or_create_device(i
.first
);
167 ceph_assert(d
->daemons
.count(dmk
));
168 d
->daemons
.erase(dmk
);
169 d
->devnames
.erase(make_pair(dm
->hostname
, i
.second
));
175 auto &server_collection
= by_server
[dm
->hostname
];
176 server_collection
.erase(dm
->key
);
177 if (server_collection
.empty()) {
178 by_server
.erase(dm
->hostname
);
184 DaemonStateCollection
DaemonStateIndex::get_by_service(
185 const std::string
& svc
) const
187 RWLock::RLocker
l(lock
);
189 DaemonStateCollection result
;
191 for (const auto &i
: all
) {
192 if (i
.first
.first
== svc
) {
193 result
[i
.first
] = i
.second
;
200 DaemonStateCollection
DaemonStateIndex::get_by_server(
201 const std::string
&hostname
) const
203 RWLock::RLocker
l(lock
);
205 if (by_server
.count(hostname
)) {
206 return by_server
.at(hostname
);
212 bool DaemonStateIndex::exists(const DaemonKey
&key
) const
214 RWLock::RLocker
l(lock
);
216 return all
.count(key
) > 0;
219 DaemonStatePtr
DaemonStateIndex::get(const DaemonKey
&key
)
221 RWLock::RLocker
l(lock
);
223 auto iter
= all
.find(key
);
224 if (iter
!= all
.end()) {
231 void DaemonStateIndex::rm(const DaemonKey
&key
)
233 RWLock::WLocker
l(lock
);
237 void DaemonStateIndex::_rm(const DaemonKey
&key
)
239 if (all
.count(key
)) {
244 void DaemonStateIndex::cull(const std::string
& svc_name
,
245 const std::set
<std::string
>& names_exist
)
247 std::vector
<string
> victims
;
249 RWLock::WLocker
l(lock
);
250 auto begin
= all
.lower_bound({svc_name
, ""});
251 auto end
= all
.end();
252 for (auto &i
= begin
; i
!= end
; ++i
) {
253 const auto& daemon_key
= i
->first
;
254 if (daemon_key
.first
!= svc_name
)
256 if (names_exist
.count(daemon_key
.second
) == 0) {
257 victims
.push_back(daemon_key
.second
);
261 for (auto &i
: victims
) {
262 dout(4) << "Removing data for " << i
<< dendl
;
263 _erase({svc_name
, i
});
267 void DaemonPerfCounters::update(MMgrReport
*report
)
269 dout(20) << "loading " << report
->declare_types
.size() << " new types, "
270 << report
->undeclare_types
.size() << " old types, had "
271 << types
.size() << " types, got "
272 << report
->packed
.length() << " bytes of data" << dendl
;
274 // Retrieve session state
275 auto priv
= report
->get_connection()->get_priv();
276 auto session
= static_cast<MgrSession
*>(priv
.get());
278 // Load any newly declared types
279 for (const auto &t
: report
->declare_types
) {
280 types
.insert(std::make_pair(t
.path
, t
));
281 session
->declared_types
.insert(t
.path
);
283 // Remove any old types
284 for (const auto &t
: report
->undeclare_types
) {
285 session
->declared_types
.erase(t
);
288 const auto now
= ceph_clock_now();
290 // Parse packed data according to declared set of types
291 auto p
= report
->packed
.cbegin();
293 for (const auto &t_path
: session
->declared_types
) {
294 const auto &t
= types
.at(t_path
);
295 auto instances_it
= instances
.find(t_path
);
296 // Always check the instance exists, as we don't prevent yet
297 // multiple sessions from daemons with the same name, and one
298 // session clearing stats created by another on open.
299 if (instances_it
== instances
.end()) {
300 instances_it
= instances
.insert({t_path
, t
.type
}).first
;
303 uint64_t avgcount
= 0;
304 uint64_t avgcount2
= 0;
307 if (t
.type
& PERFCOUNTER_LONGRUNAVG
) {
309 decode(avgcount2
, p
);
310 instances_it
->second
.push_avg(now
, val
, avgcount
);
312 instances_it
->second
.push(now
, val
);
318 void PerfCounterInstance::push(utime_t t
, uint64_t const &v
)
320 buffer
.push_back({t
, v
});
323 void PerfCounterInstance::push_avg(utime_t t
, uint64_t const &s
,
326 avg_buffer
.push_back({t
, s
, c
});