]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/MgrClient.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / mgr / MgrClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14
15#include "MgrClient.h"
16
17#include "mgr/MgrContext.h"
18
19#include "msg/Messenger.h"
20#include "messages/MMgrMap.h"
21#include "messages/MMgrReport.h"
22#include "messages/MMgrOpen.h"
11fdf7f2 23#include "messages/MMgrClose.h"
7c673cae
FG
24#include "messages/MMgrConfigure.h"
25#include "messages/MCommand.h"
26#include "messages/MCommandReply.h"
27#include "messages/MPGStats.h"
28
29#define dout_subsys ceph_subsys_mgrc
30#undef dout_prefix
31#define dout_prefix *_dout << "mgrc " << __func__ << " "
32
33MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_)
34 : Dispatcher(cct_), cct(cct_), msgr(msgr_),
35 timer(cct_, lock)
36{
11fdf7f2 37 ceph_assert(cct != nullptr);
7c673cae
FG
38}
39
40void MgrClient::init()
41{
11fdf7f2 42 std::lock_guard l(lock);
7c673cae 43
11fdf7f2 44 ceph_assert(msgr != nullptr);
7c673cae
FG
45
46 timer.init();
47}
48
49void MgrClient::shutdown()
50{
11fdf7f2
TL
51 std::lock_guard l(lock);
52 ldout(cct, 10) << dendl;
7c673cae
FG
53
54 if (connect_retry_callback) {
55 timer.cancel_event(connect_retry_callback);
56 connect_retry_callback = nullptr;
57 }
58
59 // forget about in-flight commands if we are prematurely shut down
60 // (e.g., by control-C)
61 command_table.clear();
11fdf7f2
TL
62 if (service_daemon &&
63 session &&
64 session->con &&
65 HAVE_FEATURE(session->con->get_features(), SERVER_MIMIC)) {
66 ldout(cct, 10) << "closing mgr session" << dendl;
67 MMgrClose *m = new MMgrClose();
68 m->daemon_name = daemon_name;
69 m->service_name = service_name;
70 session->con->send_message(m);
71 utime_t timeout;
72 timeout.set_from_double(cct->_conf.get_val<double>(
73 "mgr_client_service_daemon_unregister_timeout"));
74 shutdown_cond.WaitInterval(lock, timeout);
75 }
7c673cae
FG
76
77 timer.shutdown();
31f18b77
FG
78 if (session) {
79 session->con->mark_down();
80 session.reset();
81 }
7c673cae
FG
82}
83
84bool MgrClient::ms_dispatch(Message *m)
85{
11fdf7f2 86 std::lock_guard l(lock);
7c673cae
FG
87
88 switch(m->get_type()) {
89 case MSG_MGR_MAP:
90 return handle_mgr_map(static_cast<MMgrMap*>(m));
91 case MSG_MGR_CONFIGURE:
92 return handle_mgr_configure(static_cast<MMgrConfigure*>(m));
11fdf7f2
TL
93 case MSG_MGR_CLOSE:
94 return handle_mgr_close(static_cast<MMgrClose*>(m));
7c673cae
FG
95 case MSG_COMMAND_REPLY:
96 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
97 handle_command_reply(static_cast<MCommandReply*>(m));
98 return true;
99 } else {
100 return false;
101 }
102 default:
103 ldout(cct, 30) << "Not handling " << *m << dendl;
104 return false;
105 }
106}
107
108void MgrClient::reconnect()
109{
11fdf7f2 110 ceph_assert(lock.is_locked_by_me());
7c673cae
FG
111
112 if (session) {
113 ldout(cct, 4) << "Terminating session with "
114 << session->con->get_peer_addr() << dendl;
115 session->con->mark_down();
116 session.reset();
117 stats_period = 0;
118 if (report_callback != nullptr) {
119 timer.cancel_event(report_callback);
120 report_callback = nullptr;
121 }
122 }
123
124 if (!map.get_available()) {
125 ldout(cct, 4) << "No active mgr available yet" << dendl;
126 return;
127 }
128
129 if (last_connect_attempt != utime_t()) {
130 utime_t now = ceph_clock_now();
131 utime_t when = last_connect_attempt;
11fdf7f2 132 when += cct->_conf.get_val<double>("mgr_connect_retry_interval");
7c673cae
FG
133 if (now < when) {
134 if (!connect_retry_callback) {
3efd9988
FG
135 connect_retry_callback = timer.add_event_at(
136 when,
137 new FunctionContext([this](int r){
138 connect_retry_callback = nullptr;
139 reconnect();
140 }));
7c673cae
FG
141 }
142 ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
143 return;
144 }
145 }
146
147 if (connect_retry_callback) {
148 timer.cancel_event(connect_retry_callback);
149 connect_retry_callback = nullptr;
150 }
151
11fdf7f2 152 ldout(cct, 4) << "Starting new session with " << map.get_active_addrs()
7c673cae 153 << dendl;
7c673cae
FG
154 last_connect_attempt = ceph_clock_now();
155
156 session.reset(new MgrSessionState());
11fdf7f2
TL
157 session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR,
158 map.get_active_addrs());
7c673cae 159
224ce89b
WB
160 if (service_daemon) {
161 daemon_dirty_status = true;
162 }
163
7c673cae
FG
164 // Don't send an open if we're just a client (i.e. doing
165 // command-sending, not stats etc)
92f5a8d4 166 if (msgr->get_mytype() != CEPH_ENTITY_TYPE_CLIENT || service_daemon) {
224ce89b 167 _send_open();
7c673cae
FG
168 }
169
170 // resend any pending commands
171 for (const auto &p : command_table.get_commands()) {
11fdf7f2
TL
172 auto m = p.second.get_message({});
173 ceph_assert(session);
174 ceph_assert(session->con);
175 session->con->send_message2(std::move(m));
7c673cae
FG
176 }
177}
178
224ce89b
WB
179void MgrClient::_send_open()
180{
181 if (session && session->con) {
182 auto open = new MMgrOpen();
183 if (!service_name.empty()) {
184 open->service_name = service_name;
185 open->daemon_name = daemon_name;
186 } else {
c07f9fc5 187 open->daemon_name = cct->_conf->name.get_id();
224ce89b
WB
188 }
189 if (service_daemon) {
190 open->service_daemon = service_daemon;
191 open->daemon_metadata = daemon_metadata;
192 }
11fdf7f2
TL
193 cct->_conf.get_config_bl(0, &open->config_bl, &last_config_bl_version);
194 cct->_conf.get_defaults_bl(&open->config_defaults_bl);
224ce89b
WB
195 session->con->send_message(open);
196 }
197}
198
7c673cae
FG
199bool MgrClient::handle_mgr_map(MMgrMap *m)
200{
11fdf7f2 201 ceph_assert(lock.is_locked_by_me());
7c673cae
FG
202
203 ldout(cct, 20) << *m << dendl;
204
205 map = m->get_map();
206 ldout(cct, 4) << "Got map version " << map.epoch << dendl;
207 m->put();
208
11fdf7f2 209 ldout(cct, 4) << "Active mgr is now " << map.get_active_addrs() << dendl;
7c673cae
FG
210
211 // Reset session?
212 if (!session ||
11fdf7f2 213 session->con->get_peer_addrs() != map.get_active_addrs()) {
7c673cae
FG
214 reconnect();
215 }
216
217 return true;
218}
219
220bool MgrClient::ms_handle_reset(Connection *con)
221{
11fdf7f2 222 std::lock_guard l(lock);
7c673cae
FG
223 if (session && con == session->con) {
224 ldout(cct, 4) << __func__ << " con " << con << dendl;
225 reconnect();
226 return true;
227 }
228 return false;
229}
230
231bool MgrClient::ms_handle_refused(Connection *con)
232{
233 // do nothing for now
234 return false;
235}
236
91327a77
AA
237void MgrClient::_send_stats()
238{
239 _send_report();
240 _send_pgstats();
241 if (stats_period != 0) {
242 report_callback = timer.add_event_after(
243 stats_period,
244 new FunctionContext([this](int) {
245 _send_stats();
246 }));
247 }
248}
249
250void MgrClient::_send_report()
7c673cae 251{
11fdf7f2
TL
252 ceph_assert(lock.is_locked_by_me());
253 ceph_assert(session);
7c673cae
FG
254 report_callback = nullptr;
255
256 auto report = new MMgrReport();
257 auto pcc = cct->get_perfcounters_collection();
258
259 pcc->with_counters([this, report](
11fdf7f2 260 const PerfCountersCollectionImpl::CounterMap &by_path)
7c673cae 261 {
3efd9988
FG
262 // Helper for checking whether a counter should be included
263 auto include_counter = [this](
264 const PerfCounters::perf_counter_data_any_d &ctr,
265 const PerfCounters &perf_counters)
266 {
267 return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
268 };
269
270 // Helper for cases where we want to forget a counter
271 auto undeclare = [report, this](const std::string &path)
272 {
273 report->undeclare_types.push_back(path);
274 ldout(cct,20) << " undeclare " << path << dendl;
275 session->declared.erase(path);
276 };
277
7c673cae 278 ENCODE_START(1, 1, report->packed);
3efd9988
FG
279
280 // Find counters that no longer exist, and undeclare them
7c673cae 281 for (auto p = session->declared.begin(); p != session->declared.end(); ) {
3efd9988
FG
282 const auto &path = *(p++);
283 if (by_path.count(path) == 0) {
284 undeclare(path);
7c673cae
FG
285 }
286 }
3efd9988 287
7c673cae
FG
288 for (const auto &i : by_path) {
289 auto& path = i.first;
3efd9988
FG
290 auto& data = *(i.second.data);
291 auto& perf_counters = *(i.second.perf_counters);
292
293 // Find counters that still exist, but are no longer permitted by
294 // stats_threshold
295 if (!include_counter(data, perf_counters)) {
296 if (session->declared.count(path)) {
297 undeclare(path);
298 }
299 continue;
300 }
7c673cae
FG
301
302 if (session->declared.count(path) == 0) {
3efd9988 303 ldout(cct,20) << " declare " << path << dendl;
7c673cae
FG
304 PerfCounterType type;
305 type.path = path;
306 if (data.description) {
307 type.description = data.description;
308 }
309 if (data.nick) {
310 type.nick = data.nick;
311 }
312 type.type = data.type;
1adf2230
AA
313 type.priority = perf_counters.get_adjusted_priority(data.prio);
314 type.unit = data.unit;
7c673cae
FG
315 report->declare_types.push_back(std::move(type));
316 session->declared.insert(path);
317 }
318
11fdf7f2 319 encode(static_cast<uint64_t>(data.u64), report->packed);
7c673cae 320 if (data.type & PERFCOUNTER_LONGRUNAVG) {
11fdf7f2
TL
321 encode(static_cast<uint64_t>(data.avgcount), report->packed);
322 encode(static_cast<uint64_t>(data.avgcount2), report->packed);
7c673cae
FG
323 }
324 }
325 ENCODE_FINISH(report->packed);
326
3efd9988
FG
327 ldout(cct, 20) << "sending " << session->declared.size() << " counters ("
328 "of possible " << by_path.size() << "), "
329 << report->declare_types.size() << " new, "
330 << report->undeclare_types.size() << " removed"
331 << dendl;
7c673cae
FG
332 });
333
334 ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
335
224ce89b
WB
336 if (daemon_name.size()) {
337 report->daemon_name = daemon_name;
338 } else {
c07f9fc5 339 report->daemon_name = cct->_conf->name.get_id();
224ce89b
WB
340 }
341 report->service_name = service_name;
342
343 if (daemon_dirty_status) {
344 report->daemon_status = daemon_status;
345 daemon_dirty_status = false;
346 }
7c673cae 347
11fdf7f2
TL
348 report->daemon_health_metrics = std::move(daemon_health_metrics);
349
350 cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
351 &last_config_bl_version);
352
353 if (get_perf_report_cb) {
354 get_perf_report_cb(&report->osd_perf_metric_reports);
355 }
356
7c673cae 357 session->con->send_message(report);
31f18b77
FG
358}
359
360void MgrClient::send_pgstats()
91327a77 361{
11fdf7f2 362 std::lock_guard l(lock);
91327a77
AA
363 _send_pgstats();
364}
365
366void MgrClient::_send_pgstats()
31f18b77
FG
367{
368 if (pgstats_cb && session) {
369 session->con->send_message(pgstats_cb());
7c673cae
FG
370 }
371}
372
373bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
374{
11fdf7f2 375 ceph_assert(lock.is_locked_by_me());
7c673cae
FG
376
377 ldout(cct, 20) << *m << dendl;
378
379 if (!session) {
380 lderr(cct) << "dropping unexpected configure message" << dendl;
381 m->put();
382 return true;
383 }
384
385 ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
386
3efd9988
FG
387 if (stats_threshold != m->stats_threshold) {
388 ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl;
389 stats_threshold = m->stats_threshold;
390 }
391
11fdf7f2
TL
392 if (set_perf_queries_cb) {
393 set_perf_queries_cb(m->osd_perf_metric_queries);
394 }
395
7c673cae
FG
396 bool starting = (stats_period == 0) && (m->stats_period != 0);
397 stats_period = m->stats_period;
398 if (starting) {
91327a77 399 _send_stats();
7c673cae
FG
400 }
401
402 m->put();
403 return true;
404}
405
11fdf7f2
TL
406bool MgrClient::handle_mgr_close(MMgrClose *m)
407{
408 service_daemon = false;
409 shutdown_cond.Signal();
410 m->put();
411 return true;
412}
413
7c673cae
FG
414int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
415 bufferlist *outbl, string *outs,
416 Context *onfinish)
417{
11fdf7f2 418 std::lock_guard l(lock);
7c673cae
FG
419
420 ldout(cct, 20) << "cmd: " << cmd << dendl;
421
11fdf7f2 422 if (map.epoch == 0 && mgr_optional) {
7c673cae
FG
423 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
424 return -EACCES;
425 }
426
427 auto &op = command_table.start_command();
428 op.cmd = cmd;
429 op.inbl = inbl;
430 op.outbl = outbl;
431 op.outs = outs;
432 op.on_finish = onfinish;
433
434 if (session && session->con) {
435 // Leaving fsid argument null because it isn't used.
11fdf7f2
TL
436 auto m = op.get_message({});
437 session->con->send_message2(std::move(m));
438 } else {
439 ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl;
7c673cae
FG
440 }
441 return 0;
442}
443
444bool MgrClient::handle_command_reply(MCommandReply *m)
445{
11fdf7f2 446 ceph_assert(lock.is_locked_by_me());
7c673cae
FG
447
448 ldout(cct, 20) << *m << dendl;
449
450 const auto tid = m->get_tid();
451 if (!command_table.exists(tid)) {
452 ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
453 << " not found" << dendl;
454 m->put();
455 return true;
456 }
457
458 auto &op = command_table.get_command(tid);
459 if (op.outbl) {
460 op.outbl->claim(m->get_data());
461 }
462
463 if (op.outs) {
464 *(op.outs) = m->rs;
465 }
466
467 if (op.on_finish) {
468 op.on_finish->complete(m->r);
469 }
470
471 command_table.erase(tid);
472
473 m->put();
474 return true;
475}
476
224ce89b
WB
477int MgrClient::service_daemon_register(
478 const std::string& service,
479 const std::string& name,
480 const std::map<std::string,std::string>& metadata)
481{
11fdf7f2
TL
482 std::lock_guard l(lock);
483 if (service == "osd" ||
484 service == "mds" ||
485 service == "client" ||
486 service == "mon" ||
487 service == "mgr") {
224ce89b
WB
488 // normal ceph entity types are not allowed!
489 return -EINVAL;
490 }
491 if (service_daemon) {
492 return -EEXIST;
493 }
494 ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
495 service_daemon = true;
496 service_name = service;
497 daemon_name = name;
498 daemon_metadata = metadata;
499 daemon_dirty_status = true;
500
501 // late register?
92f5a8d4 502 if (msgr->get_mytype() == CEPH_ENTITY_TYPE_CLIENT && session && session->con) {
224ce89b
WB
503 _send_open();
504 }
505
506 return 0;
507}
508
509int MgrClient::service_daemon_update_status(
11fdf7f2 510 std::map<std::string,std::string>&& status)
224ce89b 511{
11fdf7f2 512 std::lock_guard l(lock);
224ce89b 513 ldout(cct,10) << status << dendl;
11fdf7f2 514 daemon_status = std::move(status);
224ce89b
WB
515 daemon_dirty_status = true;
516 return 0;
517}
b32b8144 518
11fdf7f2 519void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
b32b8144 520{
11fdf7f2
TL
521 std::lock_guard l(lock);
522 daemon_health_metrics = std::move(metrics);
b32b8144 523}
11fdf7f2 524