]>
git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/MgrClient.cc
6253d267034335267af3530d227ffc0012968832
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "MgrClient.h"
17 #include "mgr/MgrContext.h"
18 #include "mon/MonMap.h"
20 #include "msg/Messenger.h"
21 #include "messages/MMgrMap.h"
22 #include "messages/MMgrReport.h"
23 #include "messages/MMgrOpen.h"
24 #include "messages/MMgrUpdate.h"
25 #include "messages/MMgrClose.h"
26 #include "messages/MMgrConfigure.h"
27 #include "messages/MCommand.h"
28 #include "messages/MCommandReply.h"
29 #include "messages/MMgrCommand.h"
30 #include "messages/MMgrCommandReply.h"
31 #include "messages/MPGStats.h"
36 using ceph::bufferlist
;
37 using ceph::make_message
;
41 #define dout_subsys ceph_subsys_mgrc
43 #define dout_prefix *_dout << "mgrc " << __func__ << " "
45 MgrClient::MgrClient(CephContext
*cct_
, Messenger
*msgr_
, MonMap
*monmap_
)
52 ceph_assert(cct
!= nullptr);
55 void MgrClient::init()
57 std::lock_guard
l(lock
);
59 ceph_assert(msgr
!= nullptr);
65 void MgrClient::shutdown()
67 std::unique_lock
l(lock
);
68 ldout(cct
, 10) << dendl
;
70 if (connect_retry_callback
) {
71 timer
.cancel_event(connect_retry_callback
);
72 connect_retry_callback
= nullptr;
75 // forget about in-flight commands if we are prematurely shut down
76 // (e.g., by control-C)
77 command_table
.clear();
81 HAVE_FEATURE(session
->con
->get_features(), SERVER_MIMIC
)) {
82 ldout(cct
, 10) << "closing mgr session" << dendl
;
83 auto m
= make_message
<MMgrClose
>();
84 m
->daemon_name
= daemon_name
;
85 m
->service_name
= service_name
;
86 session
->con
->send_message2(m
);
87 auto timeout
= ceph::make_timespan(cct
->_conf
.get_val
<double>(
88 "mgr_client_service_daemon_unregister_timeout"));
89 shutdown_cond
.wait_for(l
, timeout
);
94 session
->con
->mark_down();
99 bool MgrClient::ms_dispatch2(const ref_t
<Message
>& m
)
101 std::lock_guard
l(lock
);
103 switch(m
->get_type()) {
105 return handle_mgr_map(ref_cast
<MMgrMap
>(m
));
106 case MSG_MGR_CONFIGURE
:
107 return handle_mgr_configure(ref_cast
<MMgrConfigure
>(m
));
109 return handle_mgr_close(ref_cast
<MMgrClose
>(m
));
110 case MSG_COMMAND_REPLY
:
111 if (m
->get_source().type() == CEPH_ENTITY_TYPE_MGR
) {
112 MCommandReply
*c
= static_cast<MCommandReply
*>(m
.get());
113 handle_command_reply(c
->get_tid(), c
->get_data(), c
->rs
, c
->r
);
118 case MSG_MGR_COMMAND_REPLY
:
119 if (m
->get_source().type() == CEPH_ENTITY_TYPE_MGR
) {
120 MMgrCommandReply
*c
= static_cast<MMgrCommandReply
*>(m
.get());
121 handle_command_reply(c
->get_tid(), c
->get_data(), c
->rs
, c
->r
);
127 ldout(cct
, 30) << "Not handling " << *m
<< dendl
;
132 void MgrClient::reconnect()
134 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
137 ldout(cct
, 4) << "Terminating session with "
138 << session
->con
->get_peer_addr() << dendl
;
139 session
->con
->mark_down();
142 if (report_callback
!= nullptr) {
143 timer
.cancel_event(report_callback
);
144 report_callback
= nullptr;
148 if (!map
.get_available()) {
149 ldout(cct
, 4) << "No active mgr available yet" << dendl
;
153 if (!clock_t::is_zero(last_connect_attempt
)) {
154 auto now
= clock_t::now();
155 auto when
= last_connect_attempt
+
157 cct
->_conf
.get_val
<double>("mgr_connect_retry_interval"));
159 if (!connect_retry_callback
) {
160 connect_retry_callback
= timer
.add_event_at(
162 new LambdaContext([this](int r
){
163 connect_retry_callback
= nullptr;
167 ldout(cct
, 4) << "waiting to retry connect until " << when
<< dendl
;
172 if (connect_retry_callback
) {
173 timer
.cancel_event(connect_retry_callback
);
174 connect_retry_callback
= nullptr;
177 ldout(cct
, 4) << "Starting new session with " << map
.get_active_addrs()
179 last_connect_attempt
= clock_t::now();
181 session
.reset(new MgrSessionState());
182 session
->con
= msgr
->connect_to(CEPH_ENTITY_TYPE_MGR
,
183 map
.get_active_addrs());
185 if (service_daemon
) {
186 daemon_dirty_status
= true;
188 task_dirty_status
= true;
190 // Don't send an open if we're just a client (i.e. doing
191 // command-sending, not stats etc)
192 if (msgr
->get_mytype() != CEPH_ENTITY_TYPE_CLIENT
|| service_daemon
) {
196 // resend any pending commands
197 auto p
= command_table
.get_commands().begin();
198 while (p
!= command_table
.get_commands().end()) {
200 auto& op
= p
->second
;
201 ldout(cct
,10) << "resending " << tid
<< (op
.tell
? " (tell)":" (cli)") << dendl
;
204 if (op
.name
.size() && op
.name
!= map
.active_name
) {
205 ldout(cct
, 10) << "active mgr " << map
.active_name
<< " != target "
208 op
.on_finish
->complete(-ENXIO
);
211 command_table
.erase(tid
);
214 // Set fsid argument to signal that this is really a tell message (and
215 // we are not a legacy client sending a non-tell command via MCommand).
216 m
= op
.get_message(monmap
->fsid
, false);
220 HAVE_FEATURE(map
.active_mgr_features
, SERVER_OCTOPUS
));
222 ceph_assert(session
);
223 ceph_assert(session
->con
);
224 session
->con
->send_message2(std::move(m
));
229 void MgrClient::_send_open()
231 if (session
&& session
->con
) {
232 auto open
= make_message
<MMgrOpen
>();
233 if (!service_name
.empty()) {
234 open
->service_name
= service_name
;
235 open
->daemon_name
= daemon_name
;
237 open
->daemon_name
= cct
->_conf
->name
.get_id();
239 if (service_daemon
) {
240 open
->service_daemon
= service_daemon
;
241 open
->daemon_metadata
= daemon_metadata
;
243 cct
->_conf
.get_config_bl(0, &open
->config_bl
, &last_config_bl_version
);
244 cct
->_conf
.get_defaults_bl(&open
->config_defaults_bl
);
245 session
->con
->send_message2(open
);
249 void MgrClient::_send_update()
251 if (session
&& session
->con
) {
252 auto update
= make_message
<MMgrUpdate
>();
253 if (!service_name
.empty()) {
254 update
->service_name
= service_name
;
255 update
->daemon_name
= daemon_name
;
257 update
->daemon_name
= cct
->_conf
->name
.get_id();
259 if (need_metadata_update
) {
260 update
->daemon_metadata
= daemon_metadata
;
262 update
->need_metadata_update
= need_metadata_update
;
263 session
->con
->send_message2(update
);
267 bool MgrClient::handle_mgr_map(ref_t
<MMgrMap
> m
)
269 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
271 ldout(cct
, 20) << *m
<< dendl
;
274 ldout(cct
, 4) << "Got map version " << map
.epoch
<< dendl
;
276 ldout(cct
, 4) << "Active mgr is now " << map
.get_active_addrs() << dendl
;
280 session
->con
->get_peer_addrs() != map
.get_active_addrs()) {
287 bool MgrClient::ms_handle_reset(Connection
*con
)
289 std::lock_guard
l(lock
);
290 if (session
&& con
== session
->con
) {
291 ldout(cct
, 4) << __func__
<< " con " << con
<< dendl
;
298 bool MgrClient::ms_handle_refused(Connection
*con
)
300 // do nothing for now
304 void MgrClient::_send_stats()
308 if (stats_period
!= 0) {
309 report_callback
= timer
.add_event_after(
311 new LambdaContext([this](int) {
317 void MgrClient::_send_report()
319 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
320 ceph_assert(session
);
321 report_callback
= nullptr;
323 auto report
= make_message
<MMgrReport
>();
324 auto pcc
= cct
->get_perfcounters_collection();
326 pcc
->with_counters([this, report
](
327 const PerfCountersCollectionImpl::CounterMap
&by_path
)
329 // Helper for checking whether a counter should be included
330 auto include_counter
= [this](
331 const PerfCounters::perf_counter_data_any_d
&ctr
,
332 const PerfCounters
&perf_counters
)
334 return perf_counters
.get_adjusted_priority(ctr
.prio
) >= (int)stats_threshold
;
337 // Helper for cases where we want to forget a counter
338 auto undeclare
= [report
, this](const std::string
&path
)
340 report
->undeclare_types
.push_back(path
);
341 ldout(cct
,20) << " undeclare " << path
<< dendl
;
342 session
->declared
.erase(path
);
345 ENCODE_START(1, 1, report
->packed
);
347 // Find counters that no longer exist, and undeclare them
348 for (auto p
= session
->declared
.begin(); p
!= session
->declared
.end(); ) {
349 const auto &path
= *(p
++);
350 if (by_path
.count(path
) == 0) {
355 for (const auto &i
: by_path
) {
356 auto& path
= i
.first
;
357 auto& data
= *(i
.second
.data
);
358 auto& perf_counters
= *(i
.second
.perf_counters
);
360 // Find counters that still exist, but are no longer permitted by
362 if (!include_counter(data
, perf_counters
)) {
363 if (session
->declared
.count(path
)) {
369 if (session
->declared
.count(path
) == 0) {
370 ldout(cct
,20) << " declare " << path
<< dendl
;
371 PerfCounterType type
;
373 if (data
.description
) {
374 type
.description
= data
.description
;
377 type
.nick
= data
.nick
;
379 type
.type
= data
.type
;
380 type
.priority
= perf_counters
.get_adjusted_priority(data
.prio
);
381 type
.unit
= data
.unit
;
382 report
->declare_types
.push_back(std::move(type
));
383 session
->declared
.insert(path
);
386 encode(static_cast<uint64_t>(data
.u64
), report
->packed
);
387 if (data
.type
& PERFCOUNTER_LONGRUNAVG
) {
388 encode(static_cast<uint64_t>(data
.avgcount
), report
->packed
);
389 encode(static_cast<uint64_t>(data
.avgcount2
), report
->packed
);
392 ENCODE_FINISH(report
->packed
);
394 ldout(cct
, 20) << "sending " << session
->declared
.size() << " counters ("
395 "of possible " << by_path
.size() << "), "
396 << report
->declare_types
.size() << " new, "
397 << report
->undeclare_types
.size() << " removed"
401 ldout(cct
, 20) << "encoded " << report
->packed
.length() << " bytes" << dendl
;
403 if (daemon_name
.size()) {
404 report
->daemon_name
= daemon_name
;
406 report
->daemon_name
= cct
->_conf
->name
.get_id();
408 report
->service_name
= service_name
;
410 if (daemon_dirty_status
) {
411 report
->daemon_status
= daemon_status
;
412 daemon_dirty_status
= false;
415 if (task_dirty_status
) {
416 report
->task_status
= task_status
;
417 task_dirty_status
= false;
420 report
->daemon_health_metrics
= std::move(daemon_health_metrics
);
422 cct
->_conf
.get_config_bl(last_config_bl_version
, &report
->config_bl
,
423 &last_config_bl_version
);
425 if (get_perf_report_cb
) {
426 report
->metric_report_message
= MetricReportMessage(get_perf_report_cb());
429 session
->con
->send_message2(report
);
432 void MgrClient::send_pgstats()
434 std::lock_guard
l(lock
);
438 void MgrClient::_send_pgstats()
440 if (pgstats_cb
&& session
) {
441 session
->con
->send_message(pgstats_cb());
445 bool MgrClient::handle_mgr_configure(ref_t
<MMgrConfigure
> m
)
447 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
449 ldout(cct
, 20) << *m
<< dendl
;
452 lderr(cct
) << "dropping unexpected configure message" << dendl
;
456 ldout(cct
, 4) << "stats_period=" << m
->stats_period
<< dendl
;
458 if (stats_threshold
!= m
->stats_threshold
) {
459 ldout(cct
, 4) << "updated stats threshold: " << m
->stats_threshold
<< dendl
;
460 stats_threshold
= m
->stats_threshold
;
463 if (!m
->osd_perf_metric_queries
.empty()) {
464 handle_config_payload(m
->osd_perf_metric_queries
);
465 } else if (m
->metric_config_message
) {
466 const MetricConfigMessage
&message
= *m
->metric_config_message
;
467 boost::apply_visitor(HandlePayloadVisitor(this), message
.payload
);
470 bool starting
= (stats_period
== 0) && (m
->stats_period
!= 0);
471 stats_period
= m
->stats_period
;
479 bool MgrClient::handle_mgr_close(ref_t
<MMgrClose
> m
)
481 service_daemon
= false;
482 shutdown_cond
.notify_all();
486 int MgrClient::start_command(const vector
<string
>& cmd
, const bufferlist
& inbl
,
487 bufferlist
*outbl
, string
*outs
,
490 std::lock_guard
l(lock
);
492 ldout(cct
, 20) << "cmd: " << cmd
<< dendl
;
494 if (map
.epoch
== 0 && mgr_optional
) {
495 ldout(cct
,20) << " no MgrMap, assuming EACCES" << dendl
;
499 auto &op
= command_table
.start_command();
504 op
.on_finish
= onfinish
;
506 if (session
&& session
->con
) {
507 // Leaving fsid argument null because it isn't used historically, and
508 // we can use it as a signal that we are sending a non-tell command.
509 auto m
= op
.get_message(
511 HAVE_FEATURE(map
.active_mgr_features
, SERVER_OCTOPUS
));
512 session
->con
->send_message2(std::move(m
));
514 ldout(cct
, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl
;
519 int MgrClient::start_tell_command(
521 const vector
<string
>& cmd
, const bufferlist
& inbl
,
522 bufferlist
*outbl
, string
*outs
,
525 std::lock_guard
l(lock
);
527 ldout(cct
, 20) << "target: " << name
<< " cmd: " << cmd
<< dendl
;
529 if (map
.epoch
== 0 && mgr_optional
) {
530 ldout(cct
,20) << " no MgrMap, assuming EACCES" << dendl
;
534 auto &op
= command_table
.start_command();
541 op
.on_finish
= onfinish
;
543 if (session
&& session
->con
&& (name
.size() == 0 || map
.active_name
== name
)) {
544 // Set fsid argument to signal that this is really a tell message (and
545 // we are not a legacy client sending a non-tell command via MCommand).
546 auto m
= op
.get_message(monmap
->fsid
, false);
547 session
->con
->send_message2(std::move(m
));
549 ldout(cct
, 5) << "no mgr session (no running mgr daemon?), or "
550 << name
<< " not active mgr, waiting" << dendl
;
555 bool MgrClient::handle_command_reply(
558 const std::string
& rs
,
561 ceph_assert(ceph_mutex_is_locked_by_me(lock
));
563 ldout(cct
, 20) << "tid " << tid
<< " r " << r
<< dendl
;
565 if (!command_table
.exists(tid
)) {
566 ldout(cct
, 4) << "handle_command_reply tid " << tid
567 << " not found" << dendl
;
571 auto &op
= command_table
.get_command(tid
);
573 *op
.outbl
= std::move(data
);
581 op
.on_finish
->complete(r
);
584 command_table
.erase(tid
);
588 int MgrClient::update_daemon_metadata(
589 const std::string
& service
,
590 const std::string
& name
,
591 const std::map
<std::string
,std::string
>& metadata
)
593 std::lock_guard
l(lock
);
594 if (service_daemon
) {
597 ldout(cct
,1) << service
<< "." << name
<< " metadata " << metadata
<< dendl
;
598 service_name
= service
;
600 daemon_metadata
= metadata
;
601 daemon_dirty_status
= true;
603 if (need_metadata_update
&&
604 !daemon_metadata
.empty()) {
606 need_metadata_update
= false;
612 int MgrClient::service_daemon_register(
613 const std::string
& service
,
614 const std::string
& name
,
615 const std::map
<std::string
,std::string
>& metadata
)
617 std::lock_guard
l(lock
);
618 if (service_daemon
) {
621 ldout(cct
,1) << service
<< "." << name
<< " metadata " << metadata
<< dendl
;
622 service_daemon
= true;
623 service_name
= service
;
625 daemon_metadata
= metadata
;
626 daemon_dirty_status
= true;
629 if (msgr
->get_mytype() == CEPH_ENTITY_TYPE_CLIENT
&& session
&& session
->con
) {
636 int MgrClient::service_daemon_update_status(
637 std::map
<std::string
,std::string
>&& status
)
639 std::lock_guard
l(lock
);
640 ldout(cct
,10) << status
<< dendl
;
641 daemon_status
= std::move(status
);
642 daemon_dirty_status
= true;
646 int MgrClient::service_daemon_update_task_status(
647 std::map
<std::string
,std::string
> &&status
) {
648 std::lock_guard
l(lock
);
649 ldout(cct
,10) << status
<< dendl
;
650 task_status
= std::move(status
);
651 task_dirty_status
= true;
655 void MgrClient::update_daemon_health(std::vector
<DaemonHealthMetric
>&& metrics
)
657 std::lock_guard
l(lock
);
658 daemon_health_metrics
= std::move(metrics
);