]>
git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/MgrClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "MgrClient.h"
17 #include "mgr/MgrContext.h"
18 #include "mon/MonMap.h"
20 #include "msg/Messenger.h"
21 #include "messages/MMgrMap.h"
22 #include "messages/MMgrReport.h"
23 #include "messages/MMgrOpen.h"
24 #include "messages/MMgrClose.h"
25 #include "messages/MMgrConfigure.h"
26 #include "messages/MCommand.h"
27 #include "messages/MCommandReply.h"
28 #include "messages/MMgrCommand.h"
29 #include "messages/MMgrCommandReply.h"
30 #include "messages/MPGStats.h"
35 using ceph::bufferlist
;
36 using ceph::make_message
;
40 #define dout_subsys ceph_subsys_mgrc
42 #define dout_prefix *_dout << "mgrc " << __func__ << " "
44 MgrClient::MgrClient(CephContext
*cct_
, Messenger
*msgr_
, MonMap
*monmap_
)
51 ceph_assert(cct
!= nullptr);
54 void MgrClient::init()
56 std::lock_guard
l(lock
);
58 ceph_assert(msgr
!= nullptr);
64 void MgrClient::shutdown()
66 std::unique_lock
l(lock
);
67 ldout(cct
, 10) << dendl
;
69 if (connect_retry_callback
) {
70 timer
.cancel_event(connect_retry_callback
);
71 connect_retry_callback
= nullptr;
74 // forget about in-flight commands if we are prematurely shut down
75 // (e.g., by control-C)
76 command_table
.clear();
80 HAVE_FEATURE(session
->con
->get_features(), SERVER_MIMIC
)) {
81 ldout(cct
, 10) << "closing mgr session" << dendl
;
82 auto m
= make_message
<MMgrClose
>();
83 m
->daemon_name
= daemon_name
;
84 m
->service_name
= service_name
;
85 session
->con
->send_message2(m
);
86 auto timeout
= ceph::make_timespan(cct
->_conf
.get_val
<double>(
87 "mgr_client_service_daemon_unregister_timeout"));
88 shutdown_cond
.wait_for(l
, timeout
);
93 session
->con
->mark_down();
98 bool MgrClient::ms_dispatch2(const ref_t
<Message
>& m
)
100 std::lock_guard
l(lock
);
102 switch(m
->get_type()) {
104 return handle_mgr_map(ref_cast
<MMgrMap
>(m
));
105 case MSG_MGR_CONFIGURE
:
106 return handle_mgr_configure(ref_cast
<MMgrConfigure
>(m
));
108 return handle_mgr_close(ref_cast
<MMgrClose
>(m
));
109 case MSG_COMMAND_REPLY
:
110 if (m
->get_source().type() == CEPH_ENTITY_TYPE_MGR
) {
111 MCommandReply
*c
= static_cast<MCommandReply
*>(m
.get());
112 handle_command_reply(c
->get_tid(), c
->get_data(), c
->rs
, c
->r
);
117 case MSG_MGR_COMMAND_REPLY
:
118 if (m
->get_source().type() == CEPH_ENTITY_TYPE_MGR
) {
119 MMgrCommandReply
*c
= static_cast<MMgrCommandReply
*>(m
.get());
120 handle_command_reply(c
->get_tid(), c
->get_data(), c
->rs
, c
->r
);
126 ldout(cct
, 30) << "Not handling " << *m
<< dendl
;
131 void MgrClient::reconnect()
133 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
136 ldout(cct
, 4) << "Terminating session with "
137 << session
->con
->get_peer_addr() << dendl
;
138 session
->con
->mark_down();
141 if (report_callback
!= nullptr) {
142 timer
.cancel_event(report_callback
);
143 report_callback
= nullptr;
147 if (!map
.get_available()) {
148 ldout(cct
, 4) << "No active mgr available yet" << dendl
;
152 if (!clock_t::is_zero(last_connect_attempt
)) {
153 auto now
= clock_t::now();
154 auto when
= last_connect_attempt
+
156 cct
->_conf
.get_val
<double>("mgr_connect_retry_interval"));
158 if (!connect_retry_callback
) {
159 connect_retry_callback
= timer
.add_event_at(
161 new LambdaContext([this](int r
){
162 connect_retry_callback
= nullptr;
166 ldout(cct
, 4) << "waiting to retry connect until " << when
<< dendl
;
171 if (connect_retry_callback
) {
172 timer
.cancel_event(connect_retry_callback
);
173 connect_retry_callback
= nullptr;
176 ldout(cct
, 4) << "Starting new session with " << map
.get_active_addrs()
178 last_connect_attempt
= clock_t::now();
180 session
.reset(new MgrSessionState());
181 session
->con
= msgr
->connect_to(CEPH_ENTITY_TYPE_MGR
,
182 map
.get_active_addrs());
184 if (service_daemon
) {
185 daemon_dirty_status
= true;
187 task_dirty_status
= true;
189 // Don't send an open if we're just a client (i.e. doing
190 // command-sending, not stats etc)
191 if (msgr
->get_mytype() != CEPH_ENTITY_TYPE_CLIENT
|| service_daemon
) {
195 // resend any pending commands
196 auto p
= command_table
.get_commands().begin();
197 while (p
!= command_table
.get_commands().end()) {
199 auto& op
= p
->second
;
200 ldout(cct
,10) << "resending " << tid
<< (op
.tell
? " (tell)":" (cli)") << dendl
;
203 if (op
.name
.size() && op
.name
!= map
.active_name
) {
204 ldout(cct
, 10) << "active mgr " << map
.active_name
<< " != target "
207 op
.on_finish
->complete(-ENXIO
);
210 command_table
.erase(tid
);
213 // Set fsid argument to signal that this is really a tell message (and
214 // we are not a legacy client sending a non-tell command via MCommand).
215 m
= op
.get_message(monmap
->fsid
, false);
219 HAVE_FEATURE(map
.active_mgr_features
, SERVER_OCTOPUS
));
221 ceph_assert(session
);
222 ceph_assert(session
->con
);
223 session
->con
->send_message2(std::move(m
));
228 void MgrClient::_send_open()
230 if (session
&& session
->con
) {
231 auto open
= make_message
<MMgrOpen
>();
232 if (!service_name
.empty()) {
233 open
->service_name
= service_name
;
234 open
->daemon_name
= daemon_name
;
236 open
->daemon_name
= cct
->_conf
->name
.get_id();
238 if (service_daemon
) {
239 open
->service_daemon
= service_daemon
;
240 open
->daemon_metadata
= daemon_metadata
;
242 cct
->_conf
.get_config_bl(0, &open
->config_bl
, &last_config_bl_version
);
243 cct
->_conf
.get_defaults_bl(&open
->config_defaults_bl
);
244 session
->con
->send_message2(open
);
248 bool MgrClient::handle_mgr_map(ref_t
<MMgrMap
> m
)
250 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
252 ldout(cct
, 20) << *m
<< dendl
;
255 ldout(cct
, 4) << "Got map version " << map
.epoch
<< dendl
;
257 ldout(cct
, 4) << "Active mgr is now " << map
.get_active_addrs() << dendl
;
261 session
->con
->get_peer_addrs() != map
.get_active_addrs()) {
268 bool MgrClient::ms_handle_reset(Connection
*con
)
270 std::lock_guard
l(lock
);
271 if (session
&& con
== session
->con
) {
272 ldout(cct
, 4) << __func__
<< " con " << con
<< dendl
;
279 bool MgrClient::ms_handle_refused(Connection
*con
)
281 // do nothing for now
285 void MgrClient::_send_stats()
289 if (stats_period
!= 0) {
290 report_callback
= timer
.add_event_after(
292 new LambdaContext([this](int) {
298 void MgrClient::_send_report()
300 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
301 ceph_assert(session
);
302 report_callback
= nullptr;
304 auto report
= make_message
<MMgrReport
>();
305 auto pcc
= cct
->get_perfcounters_collection();
307 pcc
->with_counters([this, report
](
308 const PerfCountersCollectionImpl::CounterMap
&by_path
)
310 // Helper for checking whether a counter should be included
311 auto include_counter
= [this](
312 const PerfCounters::perf_counter_data_any_d
&ctr
,
313 const PerfCounters
&perf_counters
)
315 return perf_counters
.get_adjusted_priority(ctr
.prio
) >= (int)stats_threshold
;
318 // Helper for cases where we want to forget a counter
319 auto undeclare
= [report
, this](const std::string
&path
)
321 report
->undeclare_types
.push_back(path
);
322 ldout(cct
,20) << " undeclare " << path
<< dendl
;
323 session
->declared
.erase(path
);
326 ENCODE_START(1, 1, report
->packed
);
328 // Find counters that no longer exist, and undeclare them
329 for (auto p
= session
->declared
.begin(); p
!= session
->declared
.end(); ) {
330 const auto &path
= *(p
++);
331 if (by_path
.count(path
) == 0) {
336 for (const auto &i
: by_path
) {
337 auto& path
= i
.first
;
338 auto& data
= *(i
.second
.data
);
339 auto& perf_counters
= *(i
.second
.perf_counters
);
341 // Find counters that still exist, but are no longer permitted by
343 if (!include_counter(data
, perf_counters
)) {
344 if (session
->declared
.count(path
)) {
350 if (session
->declared
.count(path
) == 0) {
351 ldout(cct
,20) << " declare " << path
<< dendl
;
352 PerfCounterType type
;
354 if (data
.description
) {
355 type
.description
= data
.description
;
358 type
.nick
= data
.nick
;
360 type
.type
= data
.type
;
361 type
.priority
= perf_counters
.get_adjusted_priority(data
.prio
);
362 type
.unit
= data
.unit
;
363 report
->declare_types
.push_back(std::move(type
));
364 session
->declared
.insert(path
);
367 encode(static_cast<uint64_t>(data
.u64
), report
->packed
);
368 if (data
.type
& PERFCOUNTER_LONGRUNAVG
) {
369 encode(static_cast<uint64_t>(data
.avgcount
), report
->packed
);
370 encode(static_cast<uint64_t>(data
.avgcount2
), report
->packed
);
373 ENCODE_FINISH(report
->packed
);
375 ldout(cct
, 20) << "sending " << session
->declared
.size() << " counters ("
376 "of possible " << by_path
.size() << "), "
377 << report
->declare_types
.size() << " new, "
378 << report
->undeclare_types
.size() << " removed"
382 ldout(cct
, 20) << "encoded " << report
->packed
.length() << " bytes" << dendl
;
384 if (daemon_name
.size()) {
385 report
->daemon_name
= daemon_name
;
387 report
->daemon_name
= cct
->_conf
->name
.get_id();
389 report
->service_name
= service_name
;
391 if (daemon_dirty_status
) {
392 report
->daemon_status
= daemon_status
;
393 daemon_dirty_status
= false;
396 if (task_dirty_status
) {
397 report
->task_status
= task_status
;
398 task_dirty_status
= false;
401 report
->daemon_health_metrics
= std::move(daemon_health_metrics
);
403 cct
->_conf
.get_config_bl(last_config_bl_version
, &report
->config_bl
,
404 &last_config_bl_version
);
406 if (get_perf_report_cb
) {
407 report
->metric_report_message
= MetricReportMessage(get_perf_report_cb());
410 session
->con
->send_message2(report
);
413 void MgrClient::send_pgstats()
415 std::lock_guard
l(lock
);
419 void MgrClient::_send_pgstats()
421 if (pgstats_cb
&& session
) {
422 session
->con
->send_message(pgstats_cb());
426 bool MgrClient::handle_mgr_configure(ref_t
<MMgrConfigure
> m
)
428 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
430 ldout(cct
, 20) << *m
<< dendl
;
433 lderr(cct
) << "dropping unexpected configure message" << dendl
;
437 ldout(cct
, 4) << "stats_period=" << m
->stats_period
<< dendl
;
439 if (stats_threshold
!= m
->stats_threshold
) {
440 ldout(cct
, 4) << "updated stats threshold: " << m
->stats_threshold
<< dendl
;
441 stats_threshold
= m
->stats_threshold
;
444 if (!m
->osd_perf_metric_queries
.empty()) {
445 handle_config_payload(m
->osd_perf_metric_queries
);
446 } else if (m
->metric_config_message
) {
447 const MetricConfigMessage
&message
= *m
->metric_config_message
;
448 boost::apply_visitor(HandlePayloadVisitor(this), message
.payload
);
451 bool starting
= (stats_period
== 0) && (m
->stats_period
!= 0);
452 stats_period
= m
->stats_period
;
460 bool MgrClient::handle_mgr_close(ref_t
<MMgrClose
> m
)
462 service_daemon
= false;
463 shutdown_cond
.notify_all();
467 int MgrClient::start_command(const vector
<string
>& cmd
, const bufferlist
& inbl
,
468 bufferlist
*outbl
, string
*outs
,
471 std::lock_guard
l(lock
);
473 ldout(cct
, 20) << "cmd: " << cmd
<< dendl
;
475 if (map
.epoch
== 0 && mgr_optional
) {
476 ldout(cct
,20) << " no MgrMap, assuming EACCES" << dendl
;
480 auto &op
= command_table
.start_command();
485 op
.on_finish
= onfinish
;
487 if (session
&& session
->con
) {
488 // Leaving fsid argument null because it isn't used historically, and
489 // we can use it as a signal that we are sending a non-tell command.
490 auto m
= op
.get_message(
492 HAVE_FEATURE(map
.active_mgr_features
, SERVER_OCTOPUS
));
493 session
->con
->send_message2(std::move(m
));
495 ldout(cct
, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl
;
500 int MgrClient::start_tell_command(
502 const vector
<string
>& cmd
, const bufferlist
& inbl
,
503 bufferlist
*outbl
, string
*outs
,
506 std::lock_guard
l(lock
);
508 ldout(cct
, 20) << "target: " << name
<< " cmd: " << cmd
<< dendl
;
510 if (map
.epoch
== 0 && mgr_optional
) {
511 ldout(cct
,20) << " no MgrMap, assuming EACCES" << dendl
;
515 auto &op
= command_table
.start_command();
522 op
.on_finish
= onfinish
;
524 if (session
&& session
->con
&& (name
.size() == 0 || map
.active_name
== name
)) {
525 // Set fsid argument to signal that this is really a tell message (and
526 // we are not a legacy client sending a non-tell command via MCommand).
527 auto m
= op
.get_message(monmap
->fsid
, false);
528 session
->con
->send_message2(std::move(m
));
530 ldout(cct
, 5) << "no mgr session (no running mgr daemon?), or "
531 << name
<< " not active mgr, waiting" << dendl
;
536 bool MgrClient::handle_command_reply(
539 const std::string
& rs
,
542 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
544 ldout(cct
, 20) << "tid " << tid
<< " r " << r
<< dendl
;
546 if (!command_table
.exists(tid
)) {
547 ldout(cct
, 4) << "handle_command_reply tid " << tid
548 << " not found" << dendl
;
552 auto &op
= command_table
.get_command(tid
);
554 *op
.outbl
= std::move(data
);
562 op
.on_finish
->complete(r
);
565 command_table
.erase(tid
);
569 int MgrClient::service_daemon_register(
570 const std::string
& service
,
571 const std::string
& name
,
572 const std::map
<std::string
,std::string
>& metadata
)
574 std::lock_guard
l(lock
);
575 if (service_daemon
) {
578 ldout(cct
,1) << service
<< "." << name
<< " metadata " << metadata
<< dendl
;
579 service_daemon
= true;
580 service_name
= service
;
582 daemon_metadata
= metadata
;
583 daemon_dirty_status
= true;
586 if (msgr
->get_mytype() == CEPH_ENTITY_TYPE_CLIENT
&& session
&& session
->con
) {
593 int MgrClient::service_daemon_update_status(
594 std::map
<std::string
,std::string
>&& status
)
596 std::lock_guard
l(lock
);
597 ldout(cct
,10) << status
<< dendl
;
598 daemon_status
= std::move(status
);
599 daemon_dirty_status
= true;
603 int MgrClient::service_daemon_update_task_status(
604 std::map
<std::string
,std::string
> &&status
) {
605 std::lock_guard
l(lock
);
606 ldout(cct
,10) << status
<< dendl
;
607 task_status
= std::move(status
);
608 task_dirty_status
= true;
612 void MgrClient::update_daemon_health(std::vector
<DaemonHealthMetric
>&& metrics
)
614 std::lock_guard
l(lock
);
615 daemon_health_metrics
= std::move(metrics
);