]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/MgrClient.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / mgr / MgrClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14
15 #include "MgrClient.h"
16
17 #include "common/perf_counters_key.h"
18 #include "mgr/MgrContext.h"
19 #include "mon/MonMap.h"
20
21 #include "msg/Messenger.h"
22 #include "messages/MMgrMap.h"
23 #include "messages/MMgrReport.h"
24 #include "messages/MMgrOpen.h"
25 #include "messages/MMgrUpdate.h"
26 #include "messages/MMgrClose.h"
27 #include "messages/MMgrConfigure.h"
28 #include "messages/MCommand.h"
29 #include "messages/MCommandReply.h"
30 #include "messages/MMgrCommand.h"
31 #include "messages/MMgrCommandReply.h"
32 #include "messages/MPGStats.h"
33
34 using std::string;
35 using std::vector;
36
37 using ceph::bufferlist;
38 using ceph::make_message;
39 using ceph::ref_cast;
40 using ceph::ref_t;
41
42 #define dout_subsys ceph_subsys_mgrc
43 #undef dout_prefix
44 #define dout_prefix *_dout << "mgrc " << __func__ << " "
45
46 MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_, MonMap *monmap_)
47 : Dispatcher(cct_),
48 cct(cct_),
49 msgr(msgr_),
50 monmap(monmap_),
51 timer(cct_, lock)
52 {
53 ceph_assert(cct != nullptr);
54 }
55
56 void MgrClient::init()
57 {
58 std::lock_guard l(lock);
59
60 ceph_assert(msgr != nullptr);
61
62 timer.init();
63 initialized = true;
64 }
65
66 void MgrClient::shutdown()
67 {
68 std::unique_lock l(lock);
69 ldout(cct, 10) << dendl;
70
71 if (connect_retry_callback) {
72 timer.cancel_event(connect_retry_callback);
73 connect_retry_callback = nullptr;
74 }
75
76 // forget about in-flight commands if we are prematurely shut down
77 // (e.g., by control-C)
78 command_table.clear();
79 if (service_daemon &&
80 session &&
81 session->con &&
82 HAVE_FEATURE(session->con->get_features(), SERVER_MIMIC)) {
83 ldout(cct, 10) << "closing mgr session" << dendl;
84 auto m = make_message<MMgrClose>();
85 m->daemon_name = daemon_name;
86 m->service_name = service_name;
87 session->con->send_message2(m);
88 auto timeout = ceph::make_timespan(cct->_conf.get_val<double>(
89 "mgr_client_service_daemon_unregister_timeout"));
90 shutdown_cond.wait_for(l, timeout);
91 }
92
93 timer.shutdown();
94 if (session) {
95 session->con->mark_down();
96 session.reset();
97 }
98 }
99
100 bool MgrClient::ms_dispatch2(const ref_t<Message>& m)
101 {
102 std::lock_guard l(lock);
103
104 switch(m->get_type()) {
105 case MSG_MGR_MAP:
106 return handle_mgr_map(ref_cast<MMgrMap>(m));
107 case MSG_MGR_CONFIGURE:
108 return handle_mgr_configure(ref_cast<MMgrConfigure>(m));
109 case MSG_MGR_CLOSE:
110 return handle_mgr_close(ref_cast<MMgrClose>(m));
111 case MSG_COMMAND_REPLY:
112 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
113 MCommandReply *c = static_cast<MCommandReply*>(m.get());
114 handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
115 return true;
116 } else {
117 return false;
118 }
119 case MSG_MGR_COMMAND_REPLY:
120 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
121 MMgrCommandReply *c = static_cast<MMgrCommandReply*>(m.get());
122 handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
123 return true;
124 } else {
125 return false;
126 }
127 default:
128 ldout(cct, 30) << "Not handling " << *m << dendl;
129 return false;
130 }
131 }
132
133 void MgrClient::reconnect()
134 {
135 ceph_assert(ceph_mutex_is_locked_by_me(lock));
136
137 if (session) {
138 ldout(cct, 4) << "Terminating session with "
139 << session->con->get_peer_addr() << dendl;
140 session->con->mark_down();
141 session.reset();
142 stats_period = 0;
143 if (report_callback != nullptr) {
144 timer.cancel_event(report_callback);
145 report_callback = nullptr;
146 }
147 }
148
149 if (!map.get_available()) {
150 ldout(cct, 4) << "No active mgr available yet" << dendl;
151 return;
152 }
153
154 if (!clock_t::is_zero(last_connect_attempt)) {
155 auto now = clock_t::now();
156 auto when = last_connect_attempt +
157 ceph::make_timespan(
158 cct->_conf.get_val<double>("mgr_connect_retry_interval"));
159 if (now < when) {
160 if (!connect_retry_callback) {
161 connect_retry_callback = timer.add_event_at(
162 when,
163 new LambdaContext([this](int r){
164 connect_retry_callback = nullptr;
165 reconnect();
166 }));
167 }
168 ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
169 return;
170 }
171 }
172
173 if (connect_retry_callback) {
174 timer.cancel_event(connect_retry_callback);
175 connect_retry_callback = nullptr;
176 }
177
178 ldout(cct, 4) << "Starting new session with " << map.get_active_addrs()
179 << dendl;
180 last_connect_attempt = clock_t::now();
181
182 session.reset(new MgrSessionState());
183 session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR,
184 map.get_active_addrs());
185
186 if (service_daemon) {
187 daemon_dirty_status = true;
188 }
189 task_dirty_status = true;
190
191 // Don't send an open if we're just a client (i.e. doing
192 // command-sending, not stats etc)
193 if (msgr->get_mytype() != CEPH_ENTITY_TYPE_CLIENT || service_daemon) {
194 _send_open();
195 }
196
197 // resend any pending commands
198 auto p = command_table.get_commands().begin();
199 while (p != command_table.get_commands().end()) {
200 auto tid = p->first;
201 auto& op = p->second;
202 ldout(cct,10) << "resending " << tid << (op.tell ? " (tell)":" (cli)") << dendl;
203 MessageRef m;
204 if (op.tell) {
205 if (op.name.size() && op.name != map.active_name) {
206 ldout(cct, 10) << "active mgr " << map.active_name << " != target "
207 << op.name << dendl;
208 if (op.on_finish) {
209 op.on_finish->complete(-ENXIO);
210 }
211 ++p;
212 command_table.erase(tid);
213 continue;
214 }
215 // Set fsid argument to signal that this is really a tell message (and
216 // we are not a legacy client sending a non-tell command via MCommand).
217 m = op.get_message(monmap->fsid, false);
218 } else {
219 m = op.get_message(
220 {},
221 HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
222 }
223 ceph_assert(session);
224 ceph_assert(session->con);
225 session->con->send_message2(std::move(m));
226 ++p;
227 }
228 }
229
230 void MgrClient::_send_open()
231 {
232 if (session && session->con) {
233 auto open = make_message<MMgrOpen>();
234 if (!service_name.empty()) {
235 open->service_name = service_name;
236 open->daemon_name = daemon_name;
237 } else {
238 open->daemon_name = cct->_conf->name.get_id();
239 }
240 if (service_daemon) {
241 open->service_daemon = service_daemon;
242 open->daemon_metadata = daemon_metadata;
243 }
244 cct->_conf.get_config_bl(0, &open->config_bl, &last_config_bl_version);
245 cct->_conf.get_defaults_bl(&open->config_defaults_bl);
246 session->con->send_message2(open);
247 }
248 }
249
250 void MgrClient::_send_update()
251 {
252 if (session && session->con) {
253 auto update = make_message<MMgrUpdate>();
254 if (!service_name.empty()) {
255 update->service_name = service_name;
256 update->daemon_name = daemon_name;
257 } else {
258 update->daemon_name = cct->_conf->name.get_id();
259 }
260 if (need_metadata_update) {
261 update->daemon_metadata = daemon_metadata;
262 }
263 update->need_metadata_update = need_metadata_update;
264 session->con->send_message2(update);
265 }
266 }
267
268 bool MgrClient::handle_mgr_map(ref_t<MMgrMap> m)
269 {
270 ceph_assert(ceph_mutex_is_locked_by_me(lock));
271
272 ldout(cct, 20) << *m << dendl;
273
274 map = m->get_map();
275 ldout(cct, 4) << "Got map version " << map.epoch << dendl;
276
277 ldout(cct, 4) << "Active mgr is now " << map.get_active_addrs() << dendl;
278
279 // Reset session?
280 if (!session ||
281 session->con->get_peer_addrs() != map.get_active_addrs()) {
282 reconnect();
283 }
284
285 return true;
286 }
287
288 bool MgrClient::ms_handle_reset(Connection *con)
289 {
290 std::lock_guard l(lock);
291 if (session && con == session->con) {
292 ldout(cct, 4) << __func__ << " con " << con << dendl;
293 reconnect();
294 return true;
295 }
296 return false;
297 }
298
299 bool MgrClient::ms_handle_refused(Connection *con)
300 {
301 // do nothing for now
302 return false;
303 }
304
305 void MgrClient::_send_stats()
306 {
307 _send_report();
308 _send_pgstats();
309 if (stats_period != 0) {
310 report_callback = timer.add_event_after(
311 stats_period,
312 new LambdaContext([this](int) {
313 _send_stats();
314 }));
315 }
316 }
317
318 void MgrClient::_send_report()
319 {
320 ceph_assert(ceph_mutex_is_locked_by_me(lock));
321 ceph_assert(session);
322 report_callback = nullptr;
323
324 auto report = make_message<MMgrReport>();
325 auto pcc = cct->get_perfcounters_collection();
326
327 pcc->with_counters([this, report](
328 const PerfCountersCollectionImpl::CounterMap &by_path)
329 {
330 // Helper for checking whether a counter should be included
331 auto include_counter = [this](
332 const PerfCounters::perf_counter_data_any_d &ctr,
333 const PerfCounters &perf_counters)
334 {
335 // FIXME: We don't send labeled perf counters to the mgr currently.
336 auto labels = ceph::perf_counters::key_labels(perf_counters.get_name());
337 if (labels.begin() != labels.end()) {
338 return false;
339 }
340
341 return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
342 };
343
344 // Helper for cases where we want to forget a counter
345 auto undeclare = [report, this](const std::string &path)
346 {
347 report->undeclare_types.push_back(path);
348 ldout(cct,20) << " undeclare " << path << dendl;
349 session->declared.erase(path);
350 };
351
352 ENCODE_START(1, 1, report->packed);
353
354 // Find counters that no longer exist, and undeclare them
355 for (auto p = session->declared.begin(); p != session->declared.end(); ) {
356 const auto &path = *(p++);
357 if (by_path.count(path) == 0) {
358 undeclare(path);
359 }
360 }
361
362 for (const auto &i : by_path) {
363 auto& path = i.first;
364 auto& data = *(i.second.data);
365 auto& perf_counters = *(i.second.perf_counters);
366
367 // Find counters that still exist, but are no longer permitted by
368 // stats_threshold
369 if (!include_counter(data, perf_counters)) {
370 if (session->declared.count(path)) {
371 undeclare(path);
372 }
373 continue;
374 }
375
376 if (session->declared.count(path) == 0) {
377 ldout(cct, 20) << " declare " << path << dendl;
378 PerfCounterType type;
379 type.path = path;
380 if (data.description) {
381 type.description = data.description;
382 }
383 if (data.nick) {
384 type.nick = data.nick;
385 }
386 type.type = data.type;
387 type.priority = perf_counters.get_adjusted_priority(data.prio);
388 type.unit = data.unit;
389 report->declare_types.push_back(std::move(type));
390 session->declared.insert(path);
391 }
392
393 encode(static_cast<uint64_t>(data.u64), report->packed);
394 if (data.type & PERFCOUNTER_LONGRUNAVG) {
395 encode(static_cast<uint64_t>(data.avgcount), report->packed);
396 encode(static_cast<uint64_t>(data.avgcount2), report->packed);
397 }
398 }
399 ENCODE_FINISH(report->packed);
400
401 ldout(cct, 20) << "sending " << session->declared.size() << " counters ("
402 "of possible " << by_path.size() << "), "
403 << report->declare_types.size() << " new, "
404 << report->undeclare_types.size() << " removed"
405 << dendl;
406 });
407
408 ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
409
410 if (daemon_name.size()) {
411 report->daemon_name = daemon_name;
412 } else {
413 report->daemon_name = cct->_conf->name.get_id();
414 }
415 report->service_name = service_name;
416
417 if (daemon_dirty_status) {
418 report->daemon_status = daemon_status;
419 daemon_dirty_status = false;
420 }
421
422 if (task_dirty_status) {
423 report->task_status = task_status;
424 task_dirty_status = false;
425 }
426
427 report->daemon_health_metrics = std::move(daemon_health_metrics);
428
429 cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
430 &last_config_bl_version);
431
432 if (get_perf_report_cb) {
433 report->metric_report_message = MetricReportMessage(get_perf_report_cb());
434 }
435
436 session->con->send_message2(report);
437 }
438
439 void MgrClient::send_pgstats()
440 {
441 std::lock_guard l(lock);
442 _send_pgstats();
443 }
444
445 void MgrClient::_send_pgstats()
446 {
447 if (pgstats_cb && session) {
448 session->con->send_message(pgstats_cb());
449 }
450 }
451
452 bool MgrClient::handle_mgr_configure(ref_t<MMgrConfigure> m)
453 {
454 ceph_assert(ceph_mutex_is_locked_by_me(lock));
455
456 ldout(cct, 20) << *m << dendl;
457
458 if (!session) {
459 lderr(cct) << "dropping unexpected configure message" << dendl;
460 return true;
461 }
462
463 ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
464
465 if (stats_threshold != m->stats_threshold) {
466 ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl;
467 stats_threshold = m->stats_threshold;
468 }
469
470 if (!m->osd_perf_metric_queries.empty()) {
471 handle_config_payload(m->osd_perf_metric_queries);
472 } else if (m->metric_config_message) {
473 const MetricConfigMessage &message = *m->metric_config_message;
474 boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
475 }
476
477 bool starting = (stats_period == 0) && (m->stats_period != 0);
478 stats_period = m->stats_period;
479 if (starting) {
480 _send_stats();
481 }
482
483 return true;
484 }
485
486 bool MgrClient::handle_mgr_close(ref_t<MMgrClose> m)
487 {
488 service_daemon = false;
489 shutdown_cond.notify_all();
490 return true;
491 }
492
493 int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
494 bufferlist *outbl, string *outs,
495 Context *onfinish)
496 {
497 std::lock_guard l(lock);
498
499 ldout(cct, 20) << "cmd: " << cmd << dendl;
500
501 if (map.epoch == 0 && mgr_optional) {
502 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
503 return -EACCES;
504 }
505
506 auto &op = command_table.start_command();
507 op.cmd = cmd;
508 op.inbl = inbl;
509 op.outbl = outbl;
510 op.outs = outs;
511 op.on_finish = onfinish;
512
513 if (session && session->con) {
514 // Leaving fsid argument null because it isn't used historically, and
515 // we can use it as a signal that we are sending a non-tell command.
516 auto m = op.get_message(
517 {},
518 HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
519 session->con->send_message2(std::move(m));
520 } else {
521 ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl;
522 }
523 return 0;
524 }
525
526 int MgrClient::start_tell_command(
527 const string& name,
528 const vector<string>& cmd, const bufferlist& inbl,
529 bufferlist *outbl, string *outs,
530 Context *onfinish)
531 {
532 std::lock_guard l(lock);
533
534 ldout(cct, 20) << "target: " << name << " cmd: " << cmd << dendl;
535
536 if (map.epoch == 0 && mgr_optional) {
537 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
538 return -EACCES;
539 }
540
541 auto &op = command_table.start_command();
542 op.tell = true;
543 op.name = name;
544 op.cmd = cmd;
545 op.inbl = inbl;
546 op.outbl = outbl;
547 op.outs = outs;
548 op.on_finish = onfinish;
549
550 if (session && session->con && (name.size() == 0 || map.active_name == name)) {
551 // Set fsid argument to signal that this is really a tell message (and
552 // we are not a legacy client sending a non-tell command via MCommand).
553 auto m = op.get_message(monmap->fsid, false);
554 session->con->send_message2(std::move(m));
555 } else {
556 ldout(cct, 5) << "no mgr session (no running mgr daemon?), or "
557 << name << " not active mgr, waiting" << dendl;
558 }
559 return 0;
560 }
561
562 bool MgrClient::handle_command_reply(
563 uint64_t tid,
564 bufferlist& data,
565 const std::string& rs,
566 int r)
567 {
568 ceph_assert(ceph_mutex_is_locked_by_me(lock));
569
570 ldout(cct, 20) << "tid " << tid << " r " << r << dendl;
571
572 if (!command_table.exists(tid)) {
573 ldout(cct, 4) << "handle_command_reply tid " << tid
574 << " not found" << dendl;
575 return true;
576 }
577
578 auto &op = command_table.get_command(tid);
579 if (op.outbl) {
580 *op.outbl = std::move(data);
581 }
582
583 if (op.outs) {
584 *(op.outs) = rs;
585 }
586
587 if (op.on_finish) {
588 op.on_finish->complete(r);
589 }
590
591 command_table.erase(tid);
592 return true;
593 }
594
595 int MgrClient::update_daemon_metadata(
596 const std::string& service,
597 const std::string& name,
598 const std::map<std::string,std::string>& metadata)
599 {
600 std::lock_guard l(lock);
601 if (service_daemon) {
602 return -EEXIST;
603 }
604 ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
605 service_name = service;
606 daemon_name = name;
607 daemon_metadata = metadata;
608 daemon_dirty_status = true;
609
610 if (need_metadata_update &&
611 !daemon_metadata.empty()) {
612 _send_update();
613 need_metadata_update = false;
614 }
615
616 return 0;
617 }
618
619 int MgrClient::service_daemon_register(
620 const std::string& service,
621 const std::string& name,
622 const std::map<std::string,std::string>& metadata)
623 {
624 std::lock_guard l(lock);
625 if (service_daemon) {
626 return -EEXIST;
627 }
628 ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
629 service_daemon = true;
630 service_name = service;
631 daemon_name = name;
632 daemon_metadata = metadata;
633 daemon_dirty_status = true;
634
635 // late register?
636 if (msgr->get_mytype() == CEPH_ENTITY_TYPE_CLIENT && session && session->con) {
637 _send_open();
638 }
639
640 return 0;
641 }
642
643 int MgrClient::service_daemon_update_status(
644 std::map<std::string,std::string>&& status)
645 {
646 std::lock_guard l(lock);
647 ldout(cct,10) << status << dendl;
648 daemon_status = std::move(status);
649 daemon_dirty_status = true;
650 return 0;
651 }
652
653 int MgrClient::service_daemon_update_task_status(
654 std::map<std::string,std::string> &&status) {
655 std::lock_guard l(lock);
656 ldout(cct,10) << status << dendl;
657 task_status = std::move(status);
658 task_dirty_status = true;
659 return 0;
660 }
661
662 void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
663 {
664 std::lock_guard l(lock);
665 daemon_health_metrics = std::move(metrics);
666 }
667