1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "DaemonState.h"
16 #include "MgrSession.h"
18 #define dout_context g_ceph_context
19 #define dout_subsys ceph_subsys_mgr
21 #define dout_prefix *_dout << "mgr " << __func__ << " "
23 void DaemonStateIndex::insert(DaemonStatePtr dm
)
25 RWLock::WLocker
l(lock
);
27 if (all
.count(dm
->key
)) {
31 by_server
[dm
->hostname
][dm
->key
] = dm
;
35 void DaemonStateIndex::_erase(const DaemonKey
& dmk
)
37 assert(lock
.is_wlocked());
39 const auto to_erase
= all
.find(dmk
);
40 assert(to_erase
!= all
.end());
41 const auto dm
= to_erase
->second
;
42 auto &server_collection
= by_server
[dm
->hostname
];
43 server_collection
.erase(dm
->key
);
44 if (server_collection
.empty()) {
45 by_server
.erase(dm
->hostname
);
51 DaemonStateCollection
DaemonStateIndex::get_by_service(
52 const std::string
& svc
) const
54 RWLock::RLocker
l(lock
);
56 DaemonStateCollection result
;
58 for (const auto &i
: all
) {
59 if (i
.first
.first
== svc
) {
60 result
[i
.first
] = i
.second
;
67 DaemonStateCollection
DaemonStateIndex::get_by_server(
68 const std::string
&hostname
) const
70 RWLock::RLocker
l(lock
);
72 if (by_server
.count(hostname
)) {
73 return by_server
.at(hostname
);
79 bool DaemonStateIndex::exists(const DaemonKey
&key
) const
81 RWLock::RLocker
l(lock
);
83 return all
.count(key
) > 0;
86 DaemonStatePtr
DaemonStateIndex::get(const DaemonKey
&key
)
88 RWLock::RLocker
l(lock
);
90 auto iter
= all
.find(key
);
91 if (iter
!= all
.end()) {
98 void DaemonStateIndex::cull(const std::string
& svc_name
,
99 const std::set
<std::string
>& names_exist
)
101 std::vector
<string
> victims
;
103 RWLock::WLocker
l(lock
);
104 auto begin
= all
.lower_bound({svc_name
, ""});
105 auto end
= all
.end();
106 for (auto &i
= begin
; i
!= end
; ++i
) {
107 const auto& daemon_key
= i
->first
;
108 if (daemon_key
.first
!= svc_name
)
110 if (names_exist
.count(daemon_key
.second
) == 0) {
111 victims
.push_back(daemon_key
.second
);
115 for (auto &i
: victims
) {
116 dout(4) << "Removing data for " << i
<< dendl
;
117 _erase({svc_name
, i
});
121 void DaemonPerfCounters::update(MMgrReport
*report
)
123 dout(20) << "loading " << report
->declare_types
.size() << " new types, "
124 << report
->undeclare_types
.size() << " old types, had "
125 << types
.size() << " types, got "
126 << report
->packed
.length() << " bytes of data" << dendl
;
128 // Retrieve session state
129 MgrSessionRef
session(static_cast<MgrSession
*>(
130 report
->get_connection()->get_priv()));
132 // Load any newly declared types
133 for (const auto &t
: report
->declare_types
) {
134 types
.insert(std::make_pair(t
.path
, t
));
135 session
->declared_types
.insert(t
.path
);
137 // Remove any old types
138 for (const auto &t
: report
->undeclare_types
) {
139 session
->declared_types
.erase(t
);
142 const auto now
= ceph_clock_now();
144 // Parse packed data according to declared set of types
145 bufferlist::iterator p
= report
->packed
.begin();
147 for (const auto &t_path
: session
->declared_types
) {
148 const auto &t
= types
.at(t_path
);
150 uint64_t avgcount
= 0;
151 uint64_t avgcount2
= 0;
154 if (t
.type
& PERFCOUNTER_LONGRUNAVG
) {
155 ::decode(avgcount
, p
);
156 ::decode(avgcount2
, p
);
158 // TODO: interface for insertion of avgs
159 instances
[t_path
].push(now
, val
);
164 uint64_t PerfCounterInstance::get_current() const
166 return buffer
.front().v
;
169 void PerfCounterInstance::push(utime_t t
, uint64_t const &v
)
171 buffer
.push_back({t
, v
});