]>
git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/MgrClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "MgrClient.h"
17 #include "mgr/MgrContext.h"
19 #include "msg/Messenger.h"
20 #include "messages/MMgrMap.h"
21 #include "messages/MMgrReport.h"
22 #include "messages/MMgrOpen.h"
23 #include "messages/MMgrConfigure.h"
24 #include "messages/MCommand.h"
25 #include "messages/MCommandReply.h"
26 #include "messages/MPGStats.h"
28 #define dout_subsys ceph_subsys_mgrc
30 #define dout_prefix *_dout << "mgrc " << __func__ << " "
32 MgrClient::MgrClient(CephContext
*cct_
, Messenger
*msgr_
)
33 : Dispatcher(cct_
), cct(cct_
), msgr(msgr_
),
36 assert(cct
!= nullptr);
39 void MgrClient::init()
41 Mutex::Locker
l(lock
);
43 assert(msgr
!= nullptr);
48 void MgrClient::shutdown()
50 Mutex::Locker
l(lock
);
52 if (connect_retry_callback
) {
53 timer
.cancel_event(connect_retry_callback
);
54 connect_retry_callback
= nullptr;
57 // forget about in-flight commands if we are prematurely shut down
58 // (e.g., by control-C)
59 command_table
.clear();
63 session
->con
->mark_down();
68 bool MgrClient::ms_dispatch(Message
*m
)
70 Mutex::Locker
l(lock
);
72 switch(m
->get_type()) {
74 return handle_mgr_map(static_cast<MMgrMap
*>(m
));
75 case MSG_MGR_CONFIGURE
:
76 return handle_mgr_configure(static_cast<MMgrConfigure
*>(m
));
77 case MSG_COMMAND_REPLY
:
78 if (m
->get_source().type() == CEPH_ENTITY_TYPE_MGR
) {
79 handle_command_reply(static_cast<MCommandReply
*>(m
));
85 ldout(cct
, 30) << "Not handling " << *m
<< dendl
;
90 void MgrClient::reconnect()
92 assert(lock
.is_locked_by_me());
95 ldout(cct
, 4) << "Terminating session with "
96 << session
->con
->get_peer_addr() << dendl
;
97 session
->con
->mark_down();
100 if (report_callback
!= nullptr) {
101 timer
.cancel_event(report_callback
);
102 report_callback
= nullptr;
106 if (!map
.get_available()) {
107 ldout(cct
, 4) << "No active mgr available yet" << dendl
;
111 if (last_connect_attempt
!= utime_t()) {
112 utime_t now
= ceph_clock_now();
113 utime_t when
= last_connect_attempt
;
114 when
+= cct
->_conf
->get_val
<double>("mgr_connect_retry_interval");
116 if (!connect_retry_callback
) {
117 connect_retry_callback
= timer
.add_event_at(
119 new FunctionContext([this](int r
){
120 connect_retry_callback
= nullptr;
124 ldout(cct
, 4) << "waiting to retry connect until " << when
<< dendl
;
129 if (connect_retry_callback
) {
130 timer
.cancel_event(connect_retry_callback
);
131 connect_retry_callback
= nullptr;
134 ldout(cct
, 4) << "Starting new session with " << map
.get_active_addr()
137 inst
.addr
= map
.get_active_addr();
138 inst
.name
= entity_name_t::MGR(map
.get_active_gid());
139 last_connect_attempt
= ceph_clock_now();
141 session
.reset(new MgrSessionState());
142 session
->con
= msgr
->get_connection(inst
);
144 if (service_daemon
) {
145 daemon_dirty_status
= true;
148 // Don't send an open if we're just a client (i.e. doing
149 // command-sending, not stats etc)
150 if (!cct
->_conf
->name
.is_client() || service_daemon
) {
154 // resend any pending commands
155 for (const auto &p
: command_table
.get_commands()) {
156 MCommand
*m
= p
.second
.get_message({});
158 assert(session
->con
);
159 session
->con
->send_message(m
);
163 void MgrClient::_send_open()
165 if (session
&& session
->con
) {
166 auto open
= new MMgrOpen();
167 if (!service_name
.empty()) {
168 open
->service_name
= service_name
;
169 open
->daemon_name
= daemon_name
;
171 open
->daemon_name
= cct
->_conf
->name
.get_id();
173 if (service_daemon
) {
174 open
->service_daemon
= service_daemon
;
175 open
->daemon_metadata
= daemon_metadata
;
177 session
->con
->send_message(open
);
181 bool MgrClient::handle_mgr_map(MMgrMap
*m
)
183 assert(lock
.is_locked_by_me());
185 ldout(cct
, 20) << *m
<< dendl
;
188 ldout(cct
, 4) << "Got map version " << map
.epoch
<< dendl
;
191 ldout(cct
, 4) << "Active mgr is now " << map
.get_active_addr() << dendl
;
195 session
->con
->get_peer_addr() != map
.get_active_addr()) {
202 bool MgrClient::ms_handle_reset(Connection
*con
)
204 Mutex::Locker
l(lock
);
205 if (session
&& con
== session
->con
) {
206 ldout(cct
, 4) << __func__
<< " con " << con
<< dendl
;
213 bool MgrClient::ms_handle_refused(Connection
*con
)
215 // do nothing for now
219 void MgrClient::send_report()
221 assert(lock
.is_locked_by_me());
223 report_callback
= nullptr;
225 auto report
= new MMgrReport();
226 auto pcc
= cct
->get_perfcounters_collection();
228 pcc
->with_counters([this, report
](
229 const PerfCountersCollection::CounterMap
&by_path
)
231 // Helper for checking whether a counter should be included
232 auto include_counter
= [this](
233 const PerfCounters::perf_counter_data_any_d
&ctr
,
234 const PerfCounters
&perf_counters
)
236 return perf_counters
.get_adjusted_priority(ctr
.prio
) >= (int)stats_threshold
;
239 // Helper for cases where we want to forget a counter
240 auto undeclare
= [report
, this](const std::string
&path
)
242 report
->undeclare_types
.push_back(path
);
243 ldout(cct
,20) << " undeclare " << path
<< dendl
;
244 session
->declared
.erase(path
);
247 ENCODE_START(1, 1, report
->packed
);
249 // Find counters that no longer exist, and undeclare them
250 for (auto p
= session
->declared
.begin(); p
!= session
->declared
.end(); ) {
251 const auto &path
= *(p
++);
252 if (by_path
.count(path
) == 0) {
257 for (const auto &i
: by_path
) {
258 auto& path
= i
.first
;
259 auto& data
= *(i
.second
.data
);
260 auto& perf_counters
= *(i
.second
.perf_counters
);
262 // Find counters that still exist, but are no longer permitted by
264 if (!include_counter(data
, perf_counters
)) {
265 if (session
->declared
.count(path
)) {
271 if (session
->declared
.count(path
) == 0) {
272 ldout(cct
,20) << " declare " << path
<< dendl
;
273 PerfCounterType type
;
275 if (data
.description
) {
276 type
.description
= data
.description
;
279 type
.nick
= data
.nick
;
281 type
.type
= data
.type
;
282 type
.priority
= perf_counters
.get_adjusted_priority(data
.prio
);
283 report
->declare_types
.push_back(std::move(type
));
284 session
->declared
.insert(path
);
287 ::encode(static_cast<uint64_t>(data
.u64
), report
->packed
);
288 if (data
.type
& PERFCOUNTER_LONGRUNAVG
) {
289 ::encode(static_cast<uint64_t>(data
.avgcount
), report
->packed
);
290 ::encode(static_cast<uint64_t>(data
.avgcount2
), report
->packed
);
293 ENCODE_FINISH(report
->packed
);
295 ldout(cct
, 20) << "sending " << session
->declared
.size() << " counters ("
296 "of possible " << by_path
.size() << "), "
297 << report
->declare_types
.size() << " new, "
298 << report
->undeclare_types
.size() << " removed"
302 ldout(cct
, 20) << "encoded " << report
->packed
.length() << " bytes" << dendl
;
304 if (daemon_name
.size()) {
305 report
->daemon_name
= daemon_name
;
307 report
->daemon_name
= cct
->_conf
->name
.get_id();
309 report
->service_name
= service_name
;
311 if (daemon_dirty_status
) {
312 report
->daemon_status
= daemon_status
;
313 daemon_dirty_status
= false;
316 session
->con
->send_message(report
);
318 if (stats_period
!= 0) {
319 report_callback
= new FunctionContext([this](int r
){send_report();});
320 timer
.add_event_after(stats_period
, report_callback
);
326 void MgrClient::send_pgstats()
328 if (pgstats_cb
&& session
) {
329 session
->con
->send_message(pgstats_cb());
333 bool MgrClient::handle_mgr_configure(MMgrConfigure
*m
)
335 assert(lock
.is_locked_by_me());
337 ldout(cct
, 20) << *m
<< dendl
;
340 lderr(cct
) << "dropping unexpected configure message" << dendl
;
345 ldout(cct
, 4) << "stats_period=" << m
->stats_period
<< dendl
;
347 if (stats_threshold
!= m
->stats_threshold
) {
348 ldout(cct
, 4) << "updated stats threshold: " << m
->stats_threshold
<< dendl
;
349 stats_threshold
= m
->stats_threshold
;
352 bool starting
= (stats_period
== 0) && (m
->stats_period
!= 0);
353 stats_period
= m
->stats_period
;
362 int MgrClient::start_command(const vector
<string
>& cmd
, const bufferlist
& inbl
,
363 bufferlist
*outbl
, string
*outs
,
366 Mutex::Locker
l(lock
);
368 ldout(cct
, 20) << "cmd: " << cmd
<< dendl
;
370 if (map
.epoch
== 0) {
371 ldout(cct
,20) << " no MgrMap, assuming EACCES" << dendl
;
375 auto &op
= command_table
.start_command();
380 op
.on_finish
= onfinish
;
382 if (session
&& session
->con
) {
383 // Leaving fsid argument null because it isn't used.
384 MCommand
*m
= op
.get_message({});
385 session
->con
->send_message(m
);
390 bool MgrClient::handle_command_reply(MCommandReply
*m
)
392 assert(lock
.is_locked_by_me());
394 ldout(cct
, 20) << *m
<< dendl
;
396 const auto tid
= m
->get_tid();
397 if (!command_table
.exists(tid
)) {
398 ldout(cct
, 4) << "handle_command_reply tid " << m
->get_tid()
399 << " not found" << dendl
;
404 auto &op
= command_table
.get_command(tid
);
406 op
.outbl
->claim(m
->get_data());
414 op
.on_finish
->complete(m
->r
);
417 command_table
.erase(tid
);
423 int MgrClient::service_daemon_register(
424 const std::string
& service
,
425 const std::string
& name
,
426 const std::map
<std::string
,std::string
>& metadata
)
428 Mutex::Locker
l(lock
);
434 // normal ceph entity types are not allowed!
437 if (service_daemon
) {
440 ldout(cct
,1) << service
<< "." << name
<< " metadata " << metadata
<< dendl
;
441 service_daemon
= true;
442 service_name
= service
;
444 daemon_metadata
= metadata
;
445 daemon_dirty_status
= true;
448 if (cct
->_conf
->name
.is_client() && session
&& session
->con
) {
455 int MgrClient::service_daemon_update_status(
456 const std::map
<std::string
,std::string
>& status
)
458 Mutex::Locker
l(lock
);
459 ldout(cct
,10) << status
<< dendl
;
460 daemon_status
= status
;
461 daemon_dirty_status
= true;