]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/MgrClient.cc
update sources to 12.2.10
[ceph.git] / ceph / src / mgr / MgrClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14
15#include "MgrClient.h"
16
17#include "mgr/MgrContext.h"
18
19#include "msg/Messenger.h"
20#include "messages/MMgrMap.h"
21#include "messages/MMgrReport.h"
22#include "messages/MMgrOpen.h"
23#include "messages/MMgrConfigure.h"
24#include "messages/MCommand.h"
25#include "messages/MCommandReply.h"
26#include "messages/MPGStats.h"
27
28#define dout_subsys ceph_subsys_mgrc
29#undef dout_prefix
30#define dout_prefix *_dout << "mgrc " << __func__ << " "
31
32MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_)
33 : Dispatcher(cct_), cct(cct_), msgr(msgr_),
34 timer(cct_, lock)
35{
36 assert(cct != nullptr);
37}
38
39void MgrClient::init()
40{
41 Mutex::Locker l(lock);
42
43 assert(msgr != nullptr);
44
45 timer.init();
46}
47
48void MgrClient::shutdown()
49{
50 Mutex::Locker l(lock);
51
52 if (connect_retry_callback) {
53 timer.cancel_event(connect_retry_callback);
54 connect_retry_callback = nullptr;
55 }
56
57 // forget about in-flight commands if we are prematurely shut down
58 // (e.g., by control-C)
59 command_table.clear();
60
61 timer.shutdown();
31f18b77
FG
62 if (session) {
63 session->con->mark_down();
64 session.reset();
65 }
7c673cae
FG
66}
67
68bool MgrClient::ms_dispatch(Message *m)
69{
70 Mutex::Locker l(lock);
71
72 switch(m->get_type()) {
73 case MSG_MGR_MAP:
74 return handle_mgr_map(static_cast<MMgrMap*>(m));
75 case MSG_MGR_CONFIGURE:
76 return handle_mgr_configure(static_cast<MMgrConfigure*>(m));
77 case MSG_COMMAND_REPLY:
78 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
79 handle_command_reply(static_cast<MCommandReply*>(m));
80 return true;
81 } else {
82 return false;
83 }
84 default:
85 ldout(cct, 30) << "Not handling " << *m << dendl;
86 return false;
87 }
88}
89
90void MgrClient::reconnect()
91{
92 assert(lock.is_locked_by_me());
93
94 if (session) {
95 ldout(cct, 4) << "Terminating session with "
96 << session->con->get_peer_addr() << dendl;
97 session->con->mark_down();
98 session.reset();
99 stats_period = 0;
100 if (report_callback != nullptr) {
101 timer.cancel_event(report_callback);
102 report_callback = nullptr;
103 }
104 }
105
106 if (!map.get_available()) {
107 ldout(cct, 4) << "No active mgr available yet" << dendl;
108 return;
109 }
110
111 if (last_connect_attempt != utime_t()) {
112 utime_t now = ceph_clock_now();
113 utime_t when = last_connect_attempt;
3efd9988 114 when += cct->_conf->get_val<double>("mgr_connect_retry_interval");
7c673cae
FG
115 if (now < when) {
116 if (!connect_retry_callback) {
3efd9988
FG
117 connect_retry_callback = timer.add_event_at(
118 when,
119 new FunctionContext([this](int r){
120 connect_retry_callback = nullptr;
121 reconnect();
122 }));
7c673cae
FG
123 }
124 ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
125 return;
126 }
127 }
128
129 if (connect_retry_callback) {
130 timer.cancel_event(connect_retry_callback);
131 connect_retry_callback = nullptr;
132 }
133
134 ldout(cct, 4) << "Starting new session with " << map.get_active_addr()
135 << dendl;
136 entity_inst_t inst;
137 inst.addr = map.get_active_addr();
138 inst.name = entity_name_t::MGR(map.get_active_gid());
139 last_connect_attempt = ceph_clock_now();
140
141 session.reset(new MgrSessionState());
142 session->con = msgr->get_connection(inst);
143
224ce89b
WB
144 if (service_daemon) {
145 daemon_dirty_status = true;
146 }
147
7c673cae
FG
148 // Don't send an open if we're just a client (i.e. doing
149 // command-sending, not stats etc)
c07f9fc5 150 if (!cct->_conf->name.is_client() || service_daemon) {
224ce89b 151 _send_open();
7c673cae
FG
152 }
153
154 // resend any pending commands
155 for (const auto &p : command_table.get_commands()) {
156 MCommand *m = p.second.get_message({});
157 assert(session);
158 assert(session->con);
159 session->con->send_message(m);
160 }
161}
162
224ce89b
WB
163void MgrClient::_send_open()
164{
165 if (session && session->con) {
166 auto open = new MMgrOpen();
167 if (!service_name.empty()) {
168 open->service_name = service_name;
169 open->daemon_name = daemon_name;
170 } else {
c07f9fc5 171 open->daemon_name = cct->_conf->name.get_id();
224ce89b
WB
172 }
173 if (service_daemon) {
174 open->service_daemon = service_daemon;
175 open->daemon_metadata = daemon_metadata;
176 }
177 session->con->send_message(open);
178 }
179}
180
7c673cae
FG
181bool MgrClient::handle_mgr_map(MMgrMap *m)
182{
183 assert(lock.is_locked_by_me());
184
185 ldout(cct, 20) << *m << dendl;
186
187 map = m->get_map();
188 ldout(cct, 4) << "Got map version " << map.epoch << dendl;
189 m->put();
190
191 ldout(cct, 4) << "Active mgr is now " << map.get_active_addr() << dendl;
192
193 // Reset session?
194 if (!session ||
195 session->con->get_peer_addr() != map.get_active_addr()) {
196 reconnect();
197 }
198
199 return true;
200}
201
202bool MgrClient::ms_handle_reset(Connection *con)
203{
204 Mutex::Locker l(lock);
205 if (session && con == session->con) {
206 ldout(cct, 4) << __func__ << " con " << con << dendl;
207 reconnect();
208 return true;
209 }
210 return false;
211}
212
213bool MgrClient::ms_handle_refused(Connection *con)
214{
215 // do nothing for now
216 return false;
217}
218
91327a77
AA
219void MgrClient::_send_stats()
220{
221 _send_report();
222 _send_pgstats();
223 if (stats_period != 0) {
224 report_callback = timer.add_event_after(
225 stats_period,
226 new FunctionContext([this](int) {
227 _send_stats();
228 }));
229 }
230}
231
232void MgrClient::_send_report()
7c673cae
FG
233{
234 assert(lock.is_locked_by_me());
235 assert(session);
236 report_callback = nullptr;
237
238 auto report = new MMgrReport();
239 auto pcc = cct->get_perfcounters_collection();
240
241 pcc->with_counters([this, report](
242 const PerfCountersCollection::CounterMap &by_path)
243 {
3efd9988
FG
244 // Helper for checking whether a counter should be included
245 auto include_counter = [this](
246 const PerfCounters::perf_counter_data_any_d &ctr,
247 const PerfCounters &perf_counters)
248 {
249 return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
250 };
251
252 // Helper for cases where we want to forget a counter
253 auto undeclare = [report, this](const std::string &path)
254 {
255 report->undeclare_types.push_back(path);
256 ldout(cct,20) << " undeclare " << path << dendl;
257 session->declared.erase(path);
258 };
259
7c673cae 260 ENCODE_START(1, 1, report->packed);
3efd9988
FG
261
262 // Find counters that no longer exist, and undeclare them
7c673cae 263 for (auto p = session->declared.begin(); p != session->declared.end(); ) {
3efd9988
FG
264 const auto &path = *(p++);
265 if (by_path.count(path) == 0) {
266 undeclare(path);
7c673cae
FG
267 }
268 }
3efd9988 269
7c673cae
FG
270 for (const auto &i : by_path) {
271 auto& path = i.first;
3efd9988
FG
272 auto& data = *(i.second.data);
273 auto& perf_counters = *(i.second.perf_counters);
274
275 // Find counters that still exist, but are no longer permitted by
276 // stats_threshold
277 if (!include_counter(data, perf_counters)) {
278 if (session->declared.count(path)) {
279 undeclare(path);
280 }
281 continue;
282 }
7c673cae
FG
283
284 if (session->declared.count(path) == 0) {
3efd9988 285 ldout(cct,20) << " declare " << path << dendl;
7c673cae
FG
286 PerfCounterType type;
287 type.path = path;
288 if (data.description) {
289 type.description = data.description;
290 }
291 if (data.nick) {
292 type.nick = data.nick;
293 }
294 type.type = data.type;
1adf2230
AA
295 type.priority = perf_counters.get_adjusted_priority(data.prio);
296 type.unit = data.unit;
7c673cae
FG
297 report->declare_types.push_back(std::move(type));
298 session->declared.insert(path);
299 }
300
31f18b77 301 ::encode(static_cast<uint64_t>(data.u64), report->packed);
7c673cae 302 if (data.type & PERFCOUNTER_LONGRUNAVG) {
31f18b77
FG
303 ::encode(static_cast<uint64_t>(data.avgcount), report->packed);
304 ::encode(static_cast<uint64_t>(data.avgcount2), report->packed);
7c673cae
FG
305 }
306 }
307 ENCODE_FINISH(report->packed);
308
3efd9988
FG
309 ldout(cct, 20) << "sending " << session->declared.size() << " counters ("
310 "of possible " << by_path.size() << "), "
311 << report->declare_types.size() << " new, "
312 << report->undeclare_types.size() << " removed"
313 << dendl;
7c673cae
FG
314 });
315
316 ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
317
224ce89b
WB
318 if (daemon_name.size()) {
319 report->daemon_name = daemon_name;
320 } else {
c07f9fc5 321 report->daemon_name = cct->_conf->name.get_id();
224ce89b
WB
322 }
323 report->service_name = service_name;
324
325 if (daemon_dirty_status) {
326 report->daemon_status = daemon_status;
327 daemon_dirty_status = false;
328 }
7c673cae 329
b32b8144 330 report->osd_health_metrics = std::move(osd_health_metrics);
7c673cae 331 session->con->send_message(report);
31f18b77
FG
332}
333
334void MgrClient::send_pgstats()
91327a77
AA
335{
336 Mutex::Locker l(lock);
337 _send_pgstats();
338}
339
340void MgrClient::_send_pgstats()
31f18b77
FG
341{
342 if (pgstats_cb && session) {
343 session->con->send_message(pgstats_cb());
7c673cae
FG
344 }
345}
346
347bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
348{
349 assert(lock.is_locked_by_me());
350
351 ldout(cct, 20) << *m << dendl;
352
353 if (!session) {
354 lderr(cct) << "dropping unexpected configure message" << dendl;
355 m->put();
356 return true;
357 }
358
359 ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
360
3efd9988
FG
361 if (stats_threshold != m->stats_threshold) {
362 ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl;
363 stats_threshold = m->stats_threshold;
364 }
365
7c673cae
FG
366 bool starting = (stats_period == 0) && (m->stats_period != 0);
367 stats_period = m->stats_period;
368 if (starting) {
91327a77 369 _send_stats();
7c673cae
FG
370 }
371
372 m->put();
373 return true;
374}
375
376int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
377 bufferlist *outbl, string *outs,
378 Context *onfinish)
379{
380 Mutex::Locker l(lock);
381
382 ldout(cct, 20) << "cmd: " << cmd << dendl;
383
384 if (map.epoch == 0) {
385 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
386 return -EACCES;
387 }
388
389 auto &op = command_table.start_command();
390 op.cmd = cmd;
391 op.inbl = inbl;
392 op.outbl = outbl;
393 op.outs = outs;
394 op.on_finish = onfinish;
395
396 if (session && session->con) {
397 // Leaving fsid argument null because it isn't used.
398 MCommand *m = op.get_message({});
399 session->con->send_message(m);
400 }
401 return 0;
402}
403
404bool MgrClient::handle_command_reply(MCommandReply *m)
405{
406 assert(lock.is_locked_by_me());
407
408 ldout(cct, 20) << *m << dendl;
409
410 const auto tid = m->get_tid();
411 if (!command_table.exists(tid)) {
412 ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
413 << " not found" << dendl;
414 m->put();
415 return true;
416 }
417
418 auto &op = command_table.get_command(tid);
419 if (op.outbl) {
420 op.outbl->claim(m->get_data());
421 }
422
423 if (op.outs) {
424 *(op.outs) = m->rs;
425 }
426
427 if (op.on_finish) {
428 op.on_finish->complete(m->r);
429 }
430
431 command_table.erase(tid);
432
433 m->put();
434 return true;
435}
436
224ce89b
WB
437int MgrClient::service_daemon_register(
438 const std::string& service,
439 const std::string& name,
440 const std::map<std::string,std::string>& metadata)
441{
442 Mutex::Locker l(lock);
443 if (name == "osd" ||
444 name == "mds" ||
445 name == "client" ||
446 name == "mon" ||
447 name == "mgr") {
448 // normal ceph entity types are not allowed!
449 return -EINVAL;
450 }
451 if (service_daemon) {
452 return -EEXIST;
453 }
454 ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
455 service_daemon = true;
456 service_name = service;
457 daemon_name = name;
458 daemon_metadata = metadata;
459 daemon_dirty_status = true;
460
461 // late register?
c07f9fc5 462 if (cct->_conf->name.is_client() && session && session->con) {
224ce89b
WB
463 _send_open();
464 }
465
466 return 0;
467}
468
469int MgrClient::service_daemon_update_status(
470 const std::map<std::string,std::string>& status)
471{
472 Mutex::Locker l(lock);
473 ldout(cct,10) << status << dendl;
474 daemon_status = status;
475 daemon_dirty_status = true;
476 return 0;
477}
b32b8144
FG
478
479void MgrClient::update_osd_health(std::vector<OSDHealthMetric>&& metrics)
480{
1adf2230 481 Mutex::Locker l(lock);
b32b8144
FG
482 osd_health_metrics = std::move(metrics);
483}