]>
git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/MgrClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "MgrClient.h"
17 #include "common/perf_counters_key.h"
18 #include "mgr/MgrContext.h"
19 #include "mon/MonMap.h"
21 #include "msg/Messenger.h"
22 #include "messages/MMgrMap.h"
23 #include "messages/MMgrReport.h"
24 #include "messages/MMgrOpen.h"
25 #include "messages/MMgrUpdate.h"
26 #include "messages/MMgrClose.h"
27 #include "messages/MMgrConfigure.h"
28 #include "messages/MCommand.h"
29 #include "messages/MCommandReply.h"
30 #include "messages/MMgrCommand.h"
31 #include "messages/MMgrCommandReply.h"
32 #include "messages/MPGStats.h"
37 using ceph::bufferlist
;
38 using ceph::make_message
;
42 #define dout_subsys ceph_subsys_mgrc
44 #define dout_prefix *_dout << "mgrc " << __func__ << " "
46 MgrClient::MgrClient(CephContext
*cct_
, Messenger
*msgr_
, MonMap
*monmap_
)
53 ceph_assert(cct
!= nullptr);
56 void MgrClient::init()
58 std::lock_guard
l(lock
);
60 ceph_assert(msgr
!= nullptr);
66 void MgrClient::shutdown()
68 std::unique_lock
l(lock
);
69 ldout(cct
, 10) << dendl
;
71 if (connect_retry_callback
) {
72 timer
.cancel_event(connect_retry_callback
);
73 connect_retry_callback
= nullptr;
76 // forget about in-flight commands if we are prematurely shut down
77 // (e.g., by control-C)
78 command_table
.clear();
82 HAVE_FEATURE(session
->con
->get_features(), SERVER_MIMIC
)) {
83 ldout(cct
, 10) << "closing mgr session" << dendl
;
84 auto m
= make_message
<MMgrClose
>();
85 m
->daemon_name
= daemon_name
;
86 m
->service_name
= service_name
;
87 session
->con
->send_message2(m
);
88 auto timeout
= ceph::make_timespan(cct
->_conf
.get_val
<double>(
89 "mgr_client_service_daemon_unregister_timeout"));
90 shutdown_cond
.wait_for(l
, timeout
);
95 session
->con
->mark_down();
100 bool MgrClient::ms_dispatch2(const ref_t
<Message
>& m
)
102 std::lock_guard
l(lock
);
104 switch(m
->get_type()) {
106 return handle_mgr_map(ref_cast
<MMgrMap
>(m
));
107 case MSG_MGR_CONFIGURE
:
108 return handle_mgr_configure(ref_cast
<MMgrConfigure
>(m
));
110 return handle_mgr_close(ref_cast
<MMgrClose
>(m
));
111 case MSG_COMMAND_REPLY
:
112 if (m
->get_source().type() == CEPH_ENTITY_TYPE_MGR
) {
113 MCommandReply
*c
= static_cast<MCommandReply
*>(m
.get());
114 handle_command_reply(c
->get_tid(), c
->get_data(), c
->rs
, c
->r
);
119 case MSG_MGR_COMMAND_REPLY
:
120 if (m
->get_source().type() == CEPH_ENTITY_TYPE_MGR
) {
121 MMgrCommandReply
*c
= static_cast<MMgrCommandReply
*>(m
.get());
122 handle_command_reply(c
->get_tid(), c
->get_data(), c
->rs
, c
->r
);
128 ldout(cct
, 30) << "Not handling " << *m
<< dendl
;
133 void MgrClient::reconnect()
135 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
138 ldout(cct
, 4) << "Terminating session with "
139 << session
->con
->get_peer_addr() << dendl
;
140 session
->con
->mark_down();
143 if (report_callback
!= nullptr) {
144 timer
.cancel_event(report_callback
);
145 report_callback
= nullptr;
149 if (!map
.get_available()) {
150 ldout(cct
, 4) << "No active mgr available yet" << dendl
;
154 if (!clock_t::is_zero(last_connect_attempt
)) {
155 auto now
= clock_t::now();
156 auto when
= last_connect_attempt
+
158 cct
->_conf
.get_val
<double>("mgr_connect_retry_interval"));
160 if (!connect_retry_callback
) {
161 connect_retry_callback
= timer
.add_event_at(
163 new LambdaContext([this](int r
){
164 connect_retry_callback
= nullptr;
168 ldout(cct
, 4) << "waiting to retry connect until " << when
<< dendl
;
173 if (connect_retry_callback
) {
174 timer
.cancel_event(connect_retry_callback
);
175 connect_retry_callback
= nullptr;
178 ldout(cct
, 4) << "Starting new session with " << map
.get_active_addrs()
180 last_connect_attempt
= clock_t::now();
182 session
.reset(new MgrSessionState());
183 session
->con
= msgr
->connect_to(CEPH_ENTITY_TYPE_MGR
,
184 map
.get_active_addrs());
186 if (service_daemon
) {
187 daemon_dirty_status
= true;
189 task_dirty_status
= true;
191 // Don't send an open if we're just a client (i.e. doing
192 // command-sending, not stats etc)
193 if (msgr
->get_mytype() != CEPH_ENTITY_TYPE_CLIENT
|| service_daemon
) {
197 // resend any pending commands
198 auto p
= command_table
.get_commands().begin();
199 while (p
!= command_table
.get_commands().end()) {
201 auto& op
= p
->second
;
202 ldout(cct
,10) << "resending " << tid
<< (op
.tell
? " (tell)":" (cli)") << dendl
;
205 if (op
.name
.size() && op
.name
!= map
.active_name
) {
206 ldout(cct
, 10) << "active mgr " << map
.active_name
<< " != target "
209 op
.on_finish
->complete(-ENXIO
);
212 command_table
.erase(tid
);
215 // Set fsid argument to signal that this is really a tell message (and
216 // we are not a legacy client sending a non-tell command via MCommand).
217 m
= op
.get_message(monmap
->fsid
, false);
221 HAVE_FEATURE(map
.active_mgr_features
, SERVER_OCTOPUS
));
223 ceph_assert(session
);
224 ceph_assert(session
->con
);
225 session
->con
->send_message2(std::move(m
));
230 void MgrClient::_send_open()
232 if (session
&& session
->con
) {
233 auto open
= make_message
<MMgrOpen
>();
234 if (!service_name
.empty()) {
235 open
->service_name
= service_name
;
236 open
->daemon_name
= daemon_name
;
238 open
->daemon_name
= cct
->_conf
->name
.get_id();
240 if (service_daemon
) {
241 open
->service_daemon
= service_daemon
;
242 open
->daemon_metadata
= daemon_metadata
;
244 cct
->_conf
.get_config_bl(0, &open
->config_bl
, &last_config_bl_version
);
245 cct
->_conf
.get_defaults_bl(&open
->config_defaults_bl
);
246 session
->con
->send_message2(open
);
250 void MgrClient::_send_update()
252 if (session
&& session
->con
) {
253 auto update
= make_message
<MMgrUpdate
>();
254 if (!service_name
.empty()) {
255 update
->service_name
= service_name
;
256 update
->daemon_name
= daemon_name
;
258 update
->daemon_name
= cct
->_conf
->name
.get_id();
260 if (need_metadata_update
) {
261 update
->daemon_metadata
= daemon_metadata
;
263 update
->need_metadata_update
= need_metadata_update
;
264 session
->con
->send_message2(update
);
268 bool MgrClient::handle_mgr_map(ref_t
<MMgrMap
> m
)
270 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
272 ldout(cct
, 20) << *m
<< dendl
;
275 ldout(cct
, 4) << "Got map version " << map
.epoch
<< dendl
;
277 ldout(cct
, 4) << "Active mgr is now " << map
.get_active_addrs() << dendl
;
281 session
->con
->get_peer_addrs() != map
.get_active_addrs()) {
288 bool MgrClient::ms_handle_reset(Connection
*con
)
290 std::lock_guard
l(lock
);
291 if (session
&& con
== session
->con
) {
292 ldout(cct
, 4) << __func__
<< " con " << con
<< dendl
;
299 bool MgrClient::ms_handle_refused(Connection
*con
)
301 // do nothing for now
305 void MgrClient::_send_stats()
309 if (stats_period
!= 0) {
310 report_callback
= timer
.add_event_after(
312 new LambdaContext([this](int) {
318 void MgrClient::_send_report()
320 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
321 ceph_assert(session
);
322 report_callback
= nullptr;
324 auto report
= make_message
<MMgrReport
>();
325 auto pcc
= cct
->get_perfcounters_collection();
327 pcc
->with_counters([this, report
](
328 const PerfCountersCollectionImpl::CounterMap
&by_path
)
330 // Helper for checking whether a counter should be included
331 auto include_counter
= [this](
332 const PerfCounters::perf_counter_data_any_d
&ctr
,
333 const PerfCounters
&perf_counters
)
335 // FIXME: We don't send labeled perf counters to the mgr currently.
336 auto labels
= ceph::perf_counters::key_labels(perf_counters
.get_name());
337 if (labels
.begin() != labels
.end()) {
341 return perf_counters
.get_adjusted_priority(ctr
.prio
) >= (int)stats_threshold
;
344 // Helper for cases where we want to forget a counter
345 auto undeclare
= [report
, this](const std::string
&path
)
347 report
->undeclare_types
.push_back(path
);
348 ldout(cct
,20) << " undeclare " << path
<< dendl
;
349 session
->declared
.erase(path
);
352 ENCODE_START(1, 1, report
->packed
);
354 // Find counters that no longer exist, and undeclare them
355 for (auto p
= session
->declared
.begin(); p
!= session
->declared
.end(); ) {
356 const auto &path
= *(p
++);
357 if (by_path
.count(path
) == 0) {
362 for (const auto &i
: by_path
) {
363 auto& path
= i
.first
;
364 auto& data
= *(i
.second
.data
);
365 auto& perf_counters
= *(i
.second
.perf_counters
);
367 // Find counters that still exist, but are no longer permitted by
369 if (!include_counter(data
, perf_counters
)) {
370 if (session
->declared
.count(path
)) {
376 if (session
->declared
.count(path
) == 0) {
377 ldout(cct
, 20) << " declare " << path
<< dendl
;
378 PerfCounterType type
;
380 if (data
.description
) {
381 type
.description
= data
.description
;
384 type
.nick
= data
.nick
;
386 type
.type
= data
.type
;
387 type
.priority
= perf_counters
.get_adjusted_priority(data
.prio
);
388 type
.unit
= data
.unit
;
389 report
->declare_types
.push_back(std::move(type
));
390 session
->declared
.insert(path
);
393 encode(static_cast<uint64_t>(data
.u64
), report
->packed
);
394 if (data
.type
& PERFCOUNTER_LONGRUNAVG
) {
395 encode(static_cast<uint64_t>(data
.avgcount
), report
->packed
);
396 encode(static_cast<uint64_t>(data
.avgcount2
), report
->packed
);
399 ENCODE_FINISH(report
->packed
);
401 ldout(cct
, 20) << "sending " << session
->declared
.size() << " counters ("
402 "of possible " << by_path
.size() << "), "
403 << report
->declare_types
.size() << " new, "
404 << report
->undeclare_types
.size() << " removed"
408 ldout(cct
, 20) << "encoded " << report
->packed
.length() << " bytes" << dendl
;
410 if (daemon_name
.size()) {
411 report
->daemon_name
= daemon_name
;
413 report
->daemon_name
= cct
->_conf
->name
.get_id();
415 report
->service_name
= service_name
;
417 if (daemon_dirty_status
) {
418 report
->daemon_status
= daemon_status
;
419 daemon_dirty_status
= false;
422 if (task_dirty_status
) {
423 report
->task_status
= task_status
;
424 task_dirty_status
= false;
427 report
->daemon_health_metrics
= std::move(daemon_health_metrics
);
429 cct
->_conf
.get_config_bl(last_config_bl_version
, &report
->config_bl
,
430 &last_config_bl_version
);
432 if (get_perf_report_cb
) {
433 report
->metric_report_message
= MetricReportMessage(get_perf_report_cb());
436 session
->con
->send_message2(report
);
439 void MgrClient::send_pgstats()
441 std::lock_guard
l(lock
);
445 void MgrClient::_send_pgstats()
447 if (pgstats_cb
&& session
) {
448 session
->con
->send_message(pgstats_cb());
452 bool MgrClient::handle_mgr_configure(ref_t
<MMgrConfigure
> m
)
454 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
456 ldout(cct
, 20) << *m
<< dendl
;
459 lderr(cct
) << "dropping unexpected configure message" << dendl
;
463 ldout(cct
, 4) << "stats_period=" << m
->stats_period
<< dendl
;
465 if (stats_threshold
!= m
->stats_threshold
) {
466 ldout(cct
, 4) << "updated stats threshold: " << m
->stats_threshold
<< dendl
;
467 stats_threshold
= m
->stats_threshold
;
470 if (!m
->osd_perf_metric_queries
.empty()) {
471 handle_config_payload(m
->osd_perf_metric_queries
);
472 } else if (m
->metric_config_message
) {
473 const MetricConfigMessage
&message
= *m
->metric_config_message
;
474 boost::apply_visitor(HandlePayloadVisitor(this), message
.payload
);
477 bool starting
= (stats_period
== 0) && (m
->stats_period
!= 0);
478 stats_period
= m
->stats_period
;
486 bool MgrClient::handle_mgr_close(ref_t
<MMgrClose
> m
)
488 service_daemon
= false;
489 shutdown_cond
.notify_all();
493 int MgrClient::start_command(const vector
<string
>& cmd
, const bufferlist
& inbl
,
494 bufferlist
*outbl
, string
*outs
,
497 std::lock_guard
l(lock
);
499 ldout(cct
, 20) << "cmd: " << cmd
<< dendl
;
501 if (map
.epoch
== 0 && mgr_optional
) {
502 ldout(cct
,20) << " no MgrMap, assuming EACCES" << dendl
;
506 auto &op
= command_table
.start_command();
511 op
.on_finish
= onfinish
;
513 if (session
&& session
->con
) {
514 // Leaving fsid argument null because it isn't used historically, and
515 // we can use it as a signal that we are sending a non-tell command.
516 auto m
= op
.get_message(
518 HAVE_FEATURE(map
.active_mgr_features
, SERVER_OCTOPUS
));
519 session
->con
->send_message2(std::move(m
));
521 ldout(cct
, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl
;
526 int MgrClient::start_tell_command(
528 const vector
<string
>& cmd
, const bufferlist
& inbl
,
529 bufferlist
*outbl
, string
*outs
,
532 std::lock_guard
l(lock
);
534 ldout(cct
, 20) << "target: " << name
<< " cmd: " << cmd
<< dendl
;
536 if (map
.epoch
== 0 && mgr_optional
) {
537 ldout(cct
,20) << " no MgrMap, assuming EACCES" << dendl
;
541 auto &op
= command_table
.start_command();
548 op
.on_finish
= onfinish
;
550 if (session
&& session
->con
&& (name
.size() == 0 || map
.active_name
== name
)) {
551 // Set fsid argument to signal that this is really a tell message (and
552 // we are not a legacy client sending a non-tell command via MCommand).
553 auto m
= op
.get_message(monmap
->fsid
, false);
554 session
->con
->send_message2(std::move(m
));
556 ldout(cct
, 5) << "no mgr session (no running mgr daemon?), or "
557 << name
<< " not active mgr, waiting" << dendl
;
562 bool MgrClient::handle_command_reply(
565 const std::string
& rs
,
568 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
570 ldout(cct
, 20) << "tid " << tid
<< " r " << r
<< dendl
;
572 if (!command_table
.exists(tid
)) {
573 ldout(cct
, 4) << "handle_command_reply tid " << tid
574 << " not found" << dendl
;
578 auto &op
= command_table
.get_command(tid
);
580 *op
.outbl
= std::move(data
);
588 op
.on_finish
->complete(r
);
591 command_table
.erase(tid
);
595 int MgrClient::update_daemon_metadata(
596 const std::string
& service
,
597 const std::string
& name
,
598 const std::map
<std::string
,std::string
>& metadata
)
600 std::lock_guard
l(lock
);
601 if (service_daemon
) {
604 ldout(cct
,1) << service
<< "." << name
<< " metadata " << metadata
<< dendl
;
605 service_name
= service
;
607 daemon_metadata
= metadata
;
608 daemon_dirty_status
= true;
610 if (need_metadata_update
&&
611 !daemon_metadata
.empty()) {
613 need_metadata_update
= false;
619 int MgrClient::service_daemon_register(
620 const std::string
& service
,
621 const std::string
& name
,
622 const std::map
<std::string
,std::string
>& metadata
)
624 std::lock_guard
l(lock
);
625 if (service_daemon
) {
628 ldout(cct
,1) << service
<< "." << name
<< " metadata " << metadata
<< dendl
;
629 service_daemon
= true;
630 service_name
= service
;
632 daemon_metadata
= metadata
;
633 daemon_dirty_status
= true;
636 if (msgr
->get_mytype() == CEPH_ENTITY_TYPE_CLIENT
&& session
&& session
->con
) {
643 int MgrClient::service_daemon_update_status(
644 std::map
<std::string
,std::string
>&& status
)
646 std::lock_guard
l(lock
);
647 ldout(cct
,10) << status
<< dendl
;
648 daemon_status
= std::move(status
);
649 daemon_dirty_status
= true;
653 int MgrClient::service_daemon_update_task_status(
654 std::map
<std::string
,std::string
> &&status
) {
655 std::lock_guard
l(lock
);
656 ldout(cct
,10) << status
<< dendl
;
657 task_status
= std::move(status
);
658 task_dirty_status
= true;
662 void MgrClient::update_daemon_health(std::vector
<DaemonHealthMetric
>&& metrics
)
664 std::lock_guard
l(lock
);
665 daemon_health_metrics
= std::move(metrics
);