]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/MgrClient.cc
6253d267034335267af3530d227ffc0012968832
[ceph.git] / ceph / src / mgr / MgrClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14
15 #include "MgrClient.h"
16
17 #include "mgr/MgrContext.h"
18 #include "mon/MonMap.h"
19
20 #include "msg/Messenger.h"
21 #include "messages/MMgrMap.h"
22 #include "messages/MMgrReport.h"
23 #include "messages/MMgrOpen.h"
24 #include "messages/MMgrUpdate.h"
25 #include "messages/MMgrClose.h"
26 #include "messages/MMgrConfigure.h"
27 #include "messages/MCommand.h"
28 #include "messages/MCommandReply.h"
29 #include "messages/MMgrCommand.h"
30 #include "messages/MMgrCommandReply.h"
31 #include "messages/MPGStats.h"
32
33 using std::string;
34 using std::vector;
35
36 using ceph::bufferlist;
37 using ceph::make_message;
38 using ceph::ref_cast;
39 using ceph::ref_t;
40
41 #define dout_subsys ceph_subsys_mgrc
42 #undef dout_prefix
43 #define dout_prefix *_dout << "mgrc " << __func__ << " "
44
45 MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_, MonMap *monmap_)
46 : Dispatcher(cct_),
47 cct(cct_),
48 msgr(msgr_),
49 monmap(monmap_),
50 timer(cct_, lock)
51 {
52 ceph_assert(cct != nullptr);
53 }
54
55 void MgrClient::init()
56 {
57 std::lock_guard l(lock);
58
59 ceph_assert(msgr != nullptr);
60
61 timer.init();
62 initialized = true;
63 }
64
65 void MgrClient::shutdown()
66 {
67 std::unique_lock l(lock);
68 ldout(cct, 10) << dendl;
69
70 if (connect_retry_callback) {
71 timer.cancel_event(connect_retry_callback);
72 connect_retry_callback = nullptr;
73 }
74
75 // forget about in-flight commands if we are prematurely shut down
76 // (e.g., by control-C)
77 command_table.clear();
78 if (service_daemon &&
79 session &&
80 session->con &&
81 HAVE_FEATURE(session->con->get_features(), SERVER_MIMIC)) {
82 ldout(cct, 10) << "closing mgr session" << dendl;
83 auto m = make_message<MMgrClose>();
84 m->daemon_name = daemon_name;
85 m->service_name = service_name;
86 session->con->send_message2(m);
87 auto timeout = ceph::make_timespan(cct->_conf.get_val<double>(
88 "mgr_client_service_daemon_unregister_timeout"));
89 shutdown_cond.wait_for(l, timeout);
90 }
91
92 timer.shutdown();
93 if (session) {
94 session->con->mark_down();
95 session.reset();
96 }
97 }
98
99 bool MgrClient::ms_dispatch2(const ref_t<Message>& m)
100 {
101 std::lock_guard l(lock);
102
103 switch(m->get_type()) {
104 case MSG_MGR_MAP:
105 return handle_mgr_map(ref_cast<MMgrMap>(m));
106 case MSG_MGR_CONFIGURE:
107 return handle_mgr_configure(ref_cast<MMgrConfigure>(m));
108 case MSG_MGR_CLOSE:
109 return handle_mgr_close(ref_cast<MMgrClose>(m));
110 case MSG_COMMAND_REPLY:
111 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
112 MCommandReply *c = static_cast<MCommandReply*>(m.get());
113 handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
114 return true;
115 } else {
116 return false;
117 }
118 case MSG_MGR_COMMAND_REPLY:
119 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
120 MMgrCommandReply *c = static_cast<MMgrCommandReply*>(m.get());
121 handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
122 return true;
123 } else {
124 return false;
125 }
126 default:
127 ldout(cct, 30) << "Not handling " << *m << dendl;
128 return false;
129 }
130 }
131
132 void MgrClient::reconnect()
133 {
134 ceph_assert(ceph_mutex_is_locked_by_me(lock));
135
136 if (session) {
137 ldout(cct, 4) << "Terminating session with "
138 << session->con->get_peer_addr() << dendl;
139 session->con->mark_down();
140 session.reset();
141 stats_period = 0;
142 if (report_callback != nullptr) {
143 timer.cancel_event(report_callback);
144 report_callback = nullptr;
145 }
146 }
147
148 if (!map.get_available()) {
149 ldout(cct, 4) << "No active mgr available yet" << dendl;
150 return;
151 }
152
153 if (!clock_t::is_zero(last_connect_attempt)) {
154 auto now = clock_t::now();
155 auto when = last_connect_attempt +
156 ceph::make_timespan(
157 cct->_conf.get_val<double>("mgr_connect_retry_interval"));
158 if (now < when) {
159 if (!connect_retry_callback) {
160 connect_retry_callback = timer.add_event_at(
161 when,
162 new LambdaContext([this](int r){
163 connect_retry_callback = nullptr;
164 reconnect();
165 }));
166 }
167 ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
168 return;
169 }
170 }
171
172 if (connect_retry_callback) {
173 timer.cancel_event(connect_retry_callback);
174 connect_retry_callback = nullptr;
175 }
176
177 ldout(cct, 4) << "Starting new session with " << map.get_active_addrs()
178 << dendl;
179 last_connect_attempt = clock_t::now();
180
181 session.reset(new MgrSessionState());
182 session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR,
183 map.get_active_addrs());
184
185 if (service_daemon) {
186 daemon_dirty_status = true;
187 }
188 task_dirty_status = true;
189
190 // Don't send an open if we're just a client (i.e. doing
191 // command-sending, not stats etc)
192 if (msgr->get_mytype() != CEPH_ENTITY_TYPE_CLIENT || service_daemon) {
193 _send_open();
194 }
195
196 // resend any pending commands
197 auto p = command_table.get_commands().begin();
198 while (p != command_table.get_commands().end()) {
199 auto tid = p->first;
200 auto& op = p->second;
201 ldout(cct,10) << "resending " << tid << (op.tell ? " (tell)":" (cli)") << dendl;
202 MessageRef m;
203 if (op.tell) {
204 if (op.name.size() && op.name != map.active_name) {
205 ldout(cct, 10) << "active mgr " << map.active_name << " != target "
206 << op.name << dendl;
207 if (op.on_finish) {
208 op.on_finish->complete(-ENXIO);
209 }
210 ++p;
211 command_table.erase(tid);
212 continue;
213 }
214 // Set fsid argument to signal that this is really a tell message (and
215 // we are not a legacy client sending a non-tell command via MCommand).
216 m = op.get_message(monmap->fsid, false);
217 } else {
218 m = op.get_message(
219 {},
220 HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
221 }
222 ceph_assert(session);
223 ceph_assert(session->con);
224 session->con->send_message2(std::move(m));
225 ++p;
226 }
227 }
228
229 void MgrClient::_send_open()
230 {
231 if (session && session->con) {
232 auto open = make_message<MMgrOpen>();
233 if (!service_name.empty()) {
234 open->service_name = service_name;
235 open->daemon_name = daemon_name;
236 } else {
237 open->daemon_name = cct->_conf->name.get_id();
238 }
239 if (service_daemon) {
240 open->service_daemon = service_daemon;
241 open->daemon_metadata = daemon_metadata;
242 }
243 cct->_conf.get_config_bl(0, &open->config_bl, &last_config_bl_version);
244 cct->_conf.get_defaults_bl(&open->config_defaults_bl);
245 session->con->send_message2(open);
246 }
247 }
248
249 void MgrClient::_send_update()
250 {
251 if (session && session->con) {
252 auto update = make_message<MMgrUpdate>();
253 if (!service_name.empty()) {
254 update->service_name = service_name;
255 update->daemon_name = daemon_name;
256 } else {
257 update->daemon_name = cct->_conf->name.get_id();
258 }
259 if (need_metadata_update) {
260 update->daemon_metadata = daemon_metadata;
261 }
262 update->need_metadata_update = need_metadata_update;
263 session->con->send_message2(update);
264 }
265 }
266
267 bool MgrClient::handle_mgr_map(ref_t<MMgrMap> m)
268 {
269 ceph_assert(ceph_mutex_is_locked_by_me(lock));
270
271 ldout(cct, 20) << *m << dendl;
272
273 map = m->get_map();
274 ldout(cct, 4) << "Got map version " << map.epoch << dendl;
275
276 ldout(cct, 4) << "Active mgr is now " << map.get_active_addrs() << dendl;
277
278 // Reset session?
279 if (!session ||
280 session->con->get_peer_addrs() != map.get_active_addrs()) {
281 reconnect();
282 }
283
284 return true;
285 }
286
287 bool MgrClient::ms_handle_reset(Connection *con)
288 {
289 std::lock_guard l(lock);
290 if (session && con == session->con) {
291 ldout(cct, 4) << __func__ << " con " << con << dendl;
292 reconnect();
293 return true;
294 }
295 return false;
296 }
297
298 bool MgrClient::ms_handle_refused(Connection *con)
299 {
300 // do nothing for now
301 return false;
302 }
303
304 void MgrClient::_send_stats()
305 {
306 _send_report();
307 _send_pgstats();
308 if (stats_period != 0) {
309 report_callback = timer.add_event_after(
310 stats_period,
311 new LambdaContext([this](int) {
312 _send_stats();
313 }));
314 }
315 }
316
317 void MgrClient::_send_report()
318 {
319 ceph_assert(ceph_mutex_is_locked_by_me(lock));
320 ceph_assert(session);
321 report_callback = nullptr;
322
323 auto report = make_message<MMgrReport>();
324 auto pcc = cct->get_perfcounters_collection();
325
326 pcc->with_counters([this, report](
327 const PerfCountersCollectionImpl::CounterMap &by_path)
328 {
329 // Helper for checking whether a counter should be included
330 auto include_counter = [this](
331 const PerfCounters::perf_counter_data_any_d &ctr,
332 const PerfCounters &perf_counters)
333 {
334 return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
335 };
336
337 // Helper for cases where we want to forget a counter
338 auto undeclare = [report, this](const std::string &path)
339 {
340 report->undeclare_types.push_back(path);
341 ldout(cct,20) << " undeclare " << path << dendl;
342 session->declared.erase(path);
343 };
344
345 ENCODE_START(1, 1, report->packed);
346
347 // Find counters that no longer exist, and undeclare them
348 for (auto p = session->declared.begin(); p != session->declared.end(); ) {
349 const auto &path = *(p++);
350 if (by_path.count(path) == 0) {
351 undeclare(path);
352 }
353 }
354
355 for (const auto &i : by_path) {
356 auto& path = i.first;
357 auto& data = *(i.second.data);
358 auto& perf_counters = *(i.second.perf_counters);
359
360 // Find counters that still exist, but are no longer permitted by
361 // stats_threshold
362 if (!include_counter(data, perf_counters)) {
363 if (session->declared.count(path)) {
364 undeclare(path);
365 }
366 continue;
367 }
368
369 if (session->declared.count(path) == 0) {
370 ldout(cct,20) << " declare " << path << dendl;
371 PerfCounterType type;
372 type.path = path;
373 if (data.description) {
374 type.description = data.description;
375 }
376 if (data.nick) {
377 type.nick = data.nick;
378 }
379 type.type = data.type;
380 type.priority = perf_counters.get_adjusted_priority(data.prio);
381 type.unit = data.unit;
382 report->declare_types.push_back(std::move(type));
383 session->declared.insert(path);
384 }
385
386 encode(static_cast<uint64_t>(data.u64), report->packed);
387 if (data.type & PERFCOUNTER_LONGRUNAVG) {
388 encode(static_cast<uint64_t>(data.avgcount), report->packed);
389 encode(static_cast<uint64_t>(data.avgcount2), report->packed);
390 }
391 }
392 ENCODE_FINISH(report->packed);
393
394 ldout(cct, 20) << "sending " << session->declared.size() << " counters ("
395 "of possible " << by_path.size() << "), "
396 << report->declare_types.size() << " new, "
397 << report->undeclare_types.size() << " removed"
398 << dendl;
399 });
400
401 ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
402
403 if (daemon_name.size()) {
404 report->daemon_name = daemon_name;
405 } else {
406 report->daemon_name = cct->_conf->name.get_id();
407 }
408 report->service_name = service_name;
409
410 if (daemon_dirty_status) {
411 report->daemon_status = daemon_status;
412 daemon_dirty_status = false;
413 }
414
415 if (task_dirty_status) {
416 report->task_status = task_status;
417 task_dirty_status = false;
418 }
419
420 report->daemon_health_metrics = std::move(daemon_health_metrics);
421
422 cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
423 &last_config_bl_version);
424
425 if (get_perf_report_cb) {
426 report->metric_report_message = MetricReportMessage(get_perf_report_cb());
427 }
428
429 session->con->send_message2(report);
430 }
431
432 void MgrClient::send_pgstats()
433 {
434 std::lock_guard l(lock);
435 _send_pgstats();
436 }
437
438 void MgrClient::_send_pgstats()
439 {
440 if (pgstats_cb && session) {
441 session->con->send_message(pgstats_cb());
442 }
443 }
444
445 bool MgrClient::handle_mgr_configure(ref_t<MMgrConfigure> m)
446 {
447 ceph_assert(ceph_mutex_is_locked_by_me(lock));
448
449 ldout(cct, 20) << *m << dendl;
450
451 if (!session) {
452 lderr(cct) << "dropping unexpected configure message" << dendl;
453 return true;
454 }
455
456 ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
457
458 if (stats_threshold != m->stats_threshold) {
459 ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl;
460 stats_threshold = m->stats_threshold;
461 }
462
463 if (!m->osd_perf_metric_queries.empty()) {
464 handle_config_payload(m->osd_perf_metric_queries);
465 } else if (m->metric_config_message) {
466 const MetricConfigMessage &message = *m->metric_config_message;
467 boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
468 }
469
470 bool starting = (stats_period == 0) && (m->stats_period != 0);
471 stats_period = m->stats_period;
472 if (starting) {
473 _send_stats();
474 }
475
476 return true;
477 }
478
479 bool MgrClient::handle_mgr_close(ref_t<MMgrClose> m)
480 {
481 service_daemon = false;
482 shutdown_cond.notify_all();
483 return true;
484 }
485
486 int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
487 bufferlist *outbl, string *outs,
488 Context *onfinish)
489 {
490 std::lock_guard l(lock);
491
492 ldout(cct, 20) << "cmd: " << cmd << dendl;
493
494 if (map.epoch == 0 && mgr_optional) {
495 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
496 return -EACCES;
497 }
498
499 auto &op = command_table.start_command();
500 op.cmd = cmd;
501 op.inbl = inbl;
502 op.outbl = outbl;
503 op.outs = outs;
504 op.on_finish = onfinish;
505
506 if (session && session->con) {
507 // Leaving fsid argument null because it isn't used historically, and
508 // we can use it as a signal that we are sending a non-tell command.
509 auto m = op.get_message(
510 {},
511 HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
512 session->con->send_message2(std::move(m));
513 } else {
514 ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl;
515 }
516 return 0;
517 }
518
519 int MgrClient::start_tell_command(
520 const string& name,
521 const vector<string>& cmd, const bufferlist& inbl,
522 bufferlist *outbl, string *outs,
523 Context *onfinish)
524 {
525 std::lock_guard l(lock);
526
527 ldout(cct, 20) << "target: " << name << " cmd: " << cmd << dendl;
528
529 if (map.epoch == 0 && mgr_optional) {
530 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
531 return -EACCES;
532 }
533
534 auto &op = command_table.start_command();
535 op.tell = true;
536 op.name = name;
537 op.cmd = cmd;
538 op.inbl = inbl;
539 op.outbl = outbl;
540 op.outs = outs;
541 op.on_finish = onfinish;
542
543 if (session && session->con && (name.size() == 0 || map.active_name == name)) {
544 // Set fsid argument to signal that this is really a tell message (and
545 // we are not a legacy client sending a non-tell command via MCommand).
546 auto m = op.get_message(monmap->fsid, false);
547 session->con->send_message2(std::move(m));
548 } else {
549 ldout(cct, 5) << "no mgr session (no running mgr daemon?), or "
550 << name << " not active mgr, waiting" << dendl;
551 }
552 return 0;
553 }
554
555 bool MgrClient::handle_command_reply(
556 uint64_t tid,
557 bufferlist& data,
558 const std::string& rs,
559 int r)
560 {
561 ceph_assert(ceph_mutex_is_locked_by_me(lock));
562
563 ldout(cct, 20) << "tid " << tid << " r " << r << dendl;
564
565 if (!command_table.exists(tid)) {
566 ldout(cct, 4) << "handle_command_reply tid " << tid
567 << " not found" << dendl;
568 return true;
569 }
570
571 auto &op = command_table.get_command(tid);
572 if (op.outbl) {
573 *op.outbl = std::move(data);
574 }
575
576 if (op.outs) {
577 *(op.outs) = rs;
578 }
579
580 if (op.on_finish) {
581 op.on_finish->complete(r);
582 }
583
584 command_table.erase(tid);
585 return true;
586 }
587
588 int MgrClient::update_daemon_metadata(
589 const std::string& service,
590 const std::string& name,
591 const std::map<std::string,std::string>& metadata)
592 {
593 std::lock_guard l(lock);
594 if (service_daemon) {
595 return -EEXIST;
596 }
597 ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
598 service_name = service;
599 daemon_name = name;
600 daemon_metadata = metadata;
601 daemon_dirty_status = true;
602
603 if (need_metadata_update &&
604 !daemon_metadata.empty()) {
605 _send_update();
606 need_metadata_update = false;
607 }
608
609 return 0;
610 }
611
612 int MgrClient::service_daemon_register(
613 const std::string& service,
614 const std::string& name,
615 const std::map<std::string,std::string>& metadata)
616 {
617 std::lock_guard l(lock);
618 if (service_daemon) {
619 return -EEXIST;
620 }
621 ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
622 service_daemon = true;
623 service_name = service;
624 daemon_name = name;
625 daemon_metadata = metadata;
626 daemon_dirty_status = true;
627
628 // late register?
629 if (msgr->get_mytype() == CEPH_ENTITY_TYPE_CLIENT && session && session->con) {
630 _send_open();
631 }
632
633 return 0;
634 }
635
636 int MgrClient::service_daemon_update_status(
637 std::map<std::string,std::string>&& status)
638 {
639 std::lock_guard l(lock);
640 ldout(cct,10) << status << dendl;
641 daemon_status = std::move(status);
642 daemon_dirty_status = true;
643 return 0;
644 }
645
646 int MgrClient::service_daemon_update_task_status(
647 std::map<std::string,std::string> &&status) {
648 std::lock_guard l(lock);
649 ldout(cct,10) << status << dendl;
650 task_status = std::move(status);
651 task_dirty_status = true;
652 return 0;
653 }
654
655 void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
656 {
657 std::lock_guard l(lock);
658 daemon_health_metrics = std::move(metrics);
659 }
660