]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/MgrClient.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mgr / MgrClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14
15 #include "MgrClient.h"
16
17 #include "mgr/MgrContext.h"
18 #include "mon/MonMap.h"
19
20 #include "msg/Messenger.h"
21 #include "messages/MMgrMap.h"
22 #include "messages/MMgrReport.h"
23 #include "messages/MMgrOpen.h"
24 #include "messages/MMgrClose.h"
25 #include "messages/MMgrConfigure.h"
26 #include "messages/MCommand.h"
27 #include "messages/MCommandReply.h"
28 #include "messages/MMgrCommand.h"
29 #include "messages/MMgrCommandReply.h"
30 #include "messages/MPGStats.h"
31
32 using std::string;
33 using std::vector;
34
35 using ceph::bufferlist;
36 using ceph::make_message;
37 using ceph::ref_cast;
38 using ceph::ref_t;
39
40 #define dout_subsys ceph_subsys_mgrc
41 #undef dout_prefix
42 #define dout_prefix *_dout << "mgrc " << __func__ << " "
43
44 MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_, MonMap *monmap_)
45 : Dispatcher(cct_),
46 cct(cct_),
47 msgr(msgr_),
48 monmap(monmap_),
49 timer(cct_, lock)
50 {
51 ceph_assert(cct != nullptr);
52 }
53
54 void MgrClient::init()
55 {
56 std::lock_guard l(lock);
57
58 ceph_assert(msgr != nullptr);
59
60 timer.init();
61 initialized = true;
62 }
63
64 void MgrClient::shutdown()
65 {
66 std::unique_lock l(lock);
67 ldout(cct, 10) << dendl;
68
69 if (connect_retry_callback) {
70 timer.cancel_event(connect_retry_callback);
71 connect_retry_callback = nullptr;
72 }
73
74 // forget about in-flight commands if we are prematurely shut down
75 // (e.g., by control-C)
76 command_table.clear();
77 if (service_daemon &&
78 session &&
79 session->con &&
80 HAVE_FEATURE(session->con->get_features(), SERVER_MIMIC)) {
81 ldout(cct, 10) << "closing mgr session" << dendl;
82 auto m = make_message<MMgrClose>();
83 m->daemon_name = daemon_name;
84 m->service_name = service_name;
85 session->con->send_message2(m);
86 auto timeout = ceph::make_timespan(cct->_conf.get_val<double>(
87 "mgr_client_service_daemon_unregister_timeout"));
88 shutdown_cond.wait_for(l, timeout);
89 }
90
91 timer.shutdown();
92 if (session) {
93 session->con->mark_down();
94 session.reset();
95 }
96 }
97
98 bool MgrClient::ms_dispatch2(const ref_t<Message>& m)
99 {
100 std::lock_guard l(lock);
101
102 switch(m->get_type()) {
103 case MSG_MGR_MAP:
104 return handle_mgr_map(ref_cast<MMgrMap>(m));
105 case MSG_MGR_CONFIGURE:
106 return handle_mgr_configure(ref_cast<MMgrConfigure>(m));
107 case MSG_MGR_CLOSE:
108 return handle_mgr_close(ref_cast<MMgrClose>(m));
109 case MSG_COMMAND_REPLY:
110 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
111 MCommandReply *c = static_cast<MCommandReply*>(m.get());
112 handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
113 return true;
114 } else {
115 return false;
116 }
117 case MSG_MGR_COMMAND_REPLY:
118 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
119 MMgrCommandReply *c = static_cast<MMgrCommandReply*>(m.get());
120 handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
121 return true;
122 } else {
123 return false;
124 }
125 default:
126 ldout(cct, 30) << "Not handling " << *m << dendl;
127 return false;
128 }
129 }
130
131 void MgrClient::reconnect()
132 {
133 ceph_assert(ceph_mutex_is_locked_by_me(lock));
134
135 if (session) {
136 ldout(cct, 4) << "Terminating session with "
137 << session->con->get_peer_addr() << dendl;
138 session->con->mark_down();
139 session.reset();
140 stats_period = 0;
141 if (report_callback != nullptr) {
142 timer.cancel_event(report_callback);
143 report_callback = nullptr;
144 }
145 }
146
147 if (!map.get_available()) {
148 ldout(cct, 4) << "No active mgr available yet" << dendl;
149 return;
150 }
151
152 if (!clock_t::is_zero(last_connect_attempt)) {
153 auto now = clock_t::now();
154 auto when = last_connect_attempt +
155 ceph::make_timespan(
156 cct->_conf.get_val<double>("mgr_connect_retry_interval"));
157 if (now < when) {
158 if (!connect_retry_callback) {
159 connect_retry_callback = timer.add_event_at(
160 when,
161 new LambdaContext([this](int r){
162 connect_retry_callback = nullptr;
163 reconnect();
164 }));
165 }
166 ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
167 return;
168 }
169 }
170
171 if (connect_retry_callback) {
172 timer.cancel_event(connect_retry_callback);
173 connect_retry_callback = nullptr;
174 }
175
176 ldout(cct, 4) << "Starting new session with " << map.get_active_addrs()
177 << dendl;
178 last_connect_attempt = clock_t::now();
179
180 session.reset(new MgrSessionState());
181 session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR,
182 map.get_active_addrs());
183
184 if (service_daemon) {
185 daemon_dirty_status = true;
186 }
187 task_dirty_status = true;
188
189 // Don't send an open if we're just a client (i.e. doing
190 // command-sending, not stats etc)
191 if (msgr->get_mytype() != CEPH_ENTITY_TYPE_CLIENT || service_daemon) {
192 _send_open();
193 }
194
195 // resend any pending commands
196 auto p = command_table.get_commands().begin();
197 while (p != command_table.get_commands().end()) {
198 auto tid = p->first;
199 auto& op = p->second;
200 ldout(cct,10) << "resending " << tid << (op.tell ? " (tell)":" (cli)") << dendl;
201 MessageRef m;
202 if (op.tell) {
203 if (op.name.size() && op.name != map.active_name) {
204 ldout(cct, 10) << "active mgr " << map.active_name << " != target "
205 << op.name << dendl;
206 if (op.on_finish) {
207 op.on_finish->complete(-ENXIO);
208 }
209 ++p;
210 command_table.erase(tid);
211 continue;
212 }
213 // Set fsid argument to signal that this is really a tell message (and
214 // we are not a legacy client sending a non-tell command via MCommand).
215 m = op.get_message(monmap->fsid, false);
216 } else {
217 m = op.get_message(
218 {},
219 HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
220 }
221 ceph_assert(session);
222 ceph_assert(session->con);
223 session->con->send_message2(std::move(m));
224 ++p;
225 }
226 }
227
228 void MgrClient::_send_open()
229 {
230 if (session && session->con) {
231 auto open = make_message<MMgrOpen>();
232 if (!service_name.empty()) {
233 open->service_name = service_name;
234 open->daemon_name = daemon_name;
235 } else {
236 open->daemon_name = cct->_conf->name.get_id();
237 }
238 if (service_daemon) {
239 open->service_daemon = service_daemon;
240 open->daemon_metadata = daemon_metadata;
241 }
242 cct->_conf.get_config_bl(0, &open->config_bl, &last_config_bl_version);
243 cct->_conf.get_defaults_bl(&open->config_defaults_bl);
244 session->con->send_message2(open);
245 }
246 }
247
248 bool MgrClient::handle_mgr_map(ref_t<MMgrMap> m)
249 {
250 ceph_assert(ceph_mutex_is_locked_by_me(lock));
251
252 ldout(cct, 20) << *m << dendl;
253
254 map = m->get_map();
255 ldout(cct, 4) << "Got map version " << map.epoch << dendl;
256
257 ldout(cct, 4) << "Active mgr is now " << map.get_active_addrs() << dendl;
258
259 // Reset session?
260 if (!session ||
261 session->con->get_peer_addrs() != map.get_active_addrs()) {
262 reconnect();
263 }
264
265 return true;
266 }
267
268 bool MgrClient::ms_handle_reset(Connection *con)
269 {
270 std::lock_guard l(lock);
271 if (session && con == session->con) {
272 ldout(cct, 4) << __func__ << " con " << con << dendl;
273 reconnect();
274 return true;
275 }
276 return false;
277 }
278
279 bool MgrClient::ms_handle_refused(Connection *con)
280 {
281 // do nothing for now
282 return false;
283 }
284
285 void MgrClient::_send_stats()
286 {
287 _send_report();
288 _send_pgstats();
289 if (stats_period != 0) {
290 report_callback = timer.add_event_after(
291 stats_period,
292 new LambdaContext([this](int) {
293 _send_stats();
294 }));
295 }
296 }
297
298 void MgrClient::_send_report()
299 {
300 ceph_assert(ceph_mutex_is_locked_by_me(lock));
301 ceph_assert(session);
302 report_callback = nullptr;
303
304 auto report = make_message<MMgrReport>();
305 auto pcc = cct->get_perfcounters_collection();
306
307 pcc->with_counters([this, report](
308 const PerfCountersCollectionImpl::CounterMap &by_path)
309 {
310 // Helper for checking whether a counter should be included
311 auto include_counter = [this](
312 const PerfCounters::perf_counter_data_any_d &ctr,
313 const PerfCounters &perf_counters)
314 {
315 return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
316 };
317
318 // Helper for cases where we want to forget a counter
319 auto undeclare = [report, this](const std::string &path)
320 {
321 report->undeclare_types.push_back(path);
322 ldout(cct,20) << " undeclare " << path << dendl;
323 session->declared.erase(path);
324 };
325
326 ENCODE_START(1, 1, report->packed);
327
328 // Find counters that no longer exist, and undeclare them
329 for (auto p = session->declared.begin(); p != session->declared.end(); ) {
330 const auto &path = *(p++);
331 if (by_path.count(path) == 0) {
332 undeclare(path);
333 }
334 }
335
336 for (const auto &i : by_path) {
337 auto& path = i.first;
338 auto& data = *(i.second.data);
339 auto& perf_counters = *(i.second.perf_counters);
340
341 // Find counters that still exist, but are no longer permitted by
342 // stats_threshold
343 if (!include_counter(data, perf_counters)) {
344 if (session->declared.count(path)) {
345 undeclare(path);
346 }
347 continue;
348 }
349
350 if (session->declared.count(path) == 0) {
351 ldout(cct,20) << " declare " << path << dendl;
352 PerfCounterType type;
353 type.path = path;
354 if (data.description) {
355 type.description = data.description;
356 }
357 if (data.nick) {
358 type.nick = data.nick;
359 }
360 type.type = data.type;
361 type.priority = perf_counters.get_adjusted_priority(data.prio);
362 type.unit = data.unit;
363 report->declare_types.push_back(std::move(type));
364 session->declared.insert(path);
365 }
366
367 encode(static_cast<uint64_t>(data.u64), report->packed);
368 if (data.type & PERFCOUNTER_LONGRUNAVG) {
369 encode(static_cast<uint64_t>(data.avgcount), report->packed);
370 encode(static_cast<uint64_t>(data.avgcount2), report->packed);
371 }
372 }
373 ENCODE_FINISH(report->packed);
374
375 ldout(cct, 20) << "sending " << session->declared.size() << " counters ("
376 "of possible " << by_path.size() << "), "
377 << report->declare_types.size() << " new, "
378 << report->undeclare_types.size() << " removed"
379 << dendl;
380 });
381
382 ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
383
384 if (daemon_name.size()) {
385 report->daemon_name = daemon_name;
386 } else {
387 report->daemon_name = cct->_conf->name.get_id();
388 }
389 report->service_name = service_name;
390
391 if (daemon_dirty_status) {
392 report->daemon_status = daemon_status;
393 daemon_dirty_status = false;
394 }
395
396 if (task_dirty_status) {
397 report->task_status = task_status;
398 task_dirty_status = false;
399 }
400
401 report->daemon_health_metrics = std::move(daemon_health_metrics);
402
403 cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl,
404 &last_config_bl_version);
405
406 if (get_perf_report_cb) {
407 report->metric_report_message = MetricReportMessage(get_perf_report_cb());
408 }
409
410 session->con->send_message2(report);
411 }
412
413 void MgrClient::send_pgstats()
414 {
415 std::lock_guard l(lock);
416 _send_pgstats();
417 }
418
419 void MgrClient::_send_pgstats()
420 {
421 if (pgstats_cb && session) {
422 session->con->send_message(pgstats_cb());
423 }
424 }
425
426 bool MgrClient::handle_mgr_configure(ref_t<MMgrConfigure> m)
427 {
428 ceph_assert(ceph_mutex_is_locked_by_me(lock));
429
430 ldout(cct, 20) << *m << dendl;
431
432 if (!session) {
433 lderr(cct) << "dropping unexpected configure message" << dendl;
434 return true;
435 }
436
437 ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
438
439 if (stats_threshold != m->stats_threshold) {
440 ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl;
441 stats_threshold = m->stats_threshold;
442 }
443
444 if (!m->osd_perf_metric_queries.empty()) {
445 handle_config_payload(m->osd_perf_metric_queries);
446 } else if (m->metric_config_message) {
447 const MetricConfigMessage &message = *m->metric_config_message;
448 boost::apply_visitor(HandlePayloadVisitor(this), message.payload);
449 }
450
451 bool starting = (stats_period == 0) && (m->stats_period != 0);
452 stats_period = m->stats_period;
453 if (starting) {
454 _send_stats();
455 }
456
457 return true;
458 }
459
460 bool MgrClient::handle_mgr_close(ref_t<MMgrClose> m)
461 {
462 service_daemon = false;
463 shutdown_cond.notify_all();
464 return true;
465 }
466
467 int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
468 bufferlist *outbl, string *outs,
469 Context *onfinish)
470 {
471 std::lock_guard l(lock);
472
473 ldout(cct, 20) << "cmd: " << cmd << dendl;
474
475 if (map.epoch == 0 && mgr_optional) {
476 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
477 return -EACCES;
478 }
479
480 auto &op = command_table.start_command();
481 op.cmd = cmd;
482 op.inbl = inbl;
483 op.outbl = outbl;
484 op.outs = outs;
485 op.on_finish = onfinish;
486
487 if (session && session->con) {
488 // Leaving fsid argument null because it isn't used historically, and
489 // we can use it as a signal that we are sending a non-tell command.
490 auto m = op.get_message(
491 {},
492 HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
493 session->con->send_message2(std::move(m));
494 } else {
495 ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl;
496 }
497 return 0;
498 }
499
500 int MgrClient::start_tell_command(
501 const string& name,
502 const vector<string>& cmd, const bufferlist& inbl,
503 bufferlist *outbl, string *outs,
504 Context *onfinish)
505 {
506 std::lock_guard l(lock);
507
508 ldout(cct, 20) << "target: " << name << " cmd: " << cmd << dendl;
509
510 if (map.epoch == 0 && mgr_optional) {
511 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
512 return -EACCES;
513 }
514
515 auto &op = command_table.start_command();
516 op.tell = true;
517 op.name = name;
518 op.cmd = cmd;
519 op.inbl = inbl;
520 op.outbl = outbl;
521 op.outs = outs;
522 op.on_finish = onfinish;
523
524 if (session && session->con && (name.size() == 0 || map.active_name == name)) {
525 // Set fsid argument to signal that this is really a tell message (and
526 // we are not a legacy client sending a non-tell command via MCommand).
527 auto m = op.get_message(monmap->fsid, false);
528 session->con->send_message2(std::move(m));
529 } else {
530 ldout(cct, 5) << "no mgr session (no running mgr daemon?), or "
531 << name << " not active mgr, waiting" << dendl;
532 }
533 return 0;
534 }
535
536 bool MgrClient::handle_command_reply(
537 uint64_t tid,
538 bufferlist& data,
539 const std::string& rs,
540 int r)
541 {
542 ceph_assert(ceph_mutex_is_locked_by_me(lock));
543
544 ldout(cct, 20) << "tid " << tid << " r " << r << dendl;
545
546 if (!command_table.exists(tid)) {
547 ldout(cct, 4) << "handle_command_reply tid " << tid
548 << " not found" << dendl;
549 return true;
550 }
551
552 auto &op = command_table.get_command(tid);
553 if (op.outbl) {
554 *op.outbl = std::move(data);
555 }
556
557 if (op.outs) {
558 *(op.outs) = rs;
559 }
560
561 if (op.on_finish) {
562 op.on_finish->complete(r);
563 }
564
565 command_table.erase(tid);
566 return true;
567 }
568
569 int MgrClient::service_daemon_register(
570 const std::string& service,
571 const std::string& name,
572 const std::map<std::string,std::string>& metadata)
573 {
574 std::lock_guard l(lock);
575 if (service_daemon) {
576 return -EEXIST;
577 }
578 ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
579 service_daemon = true;
580 service_name = service;
581 daemon_name = name;
582 daemon_metadata = metadata;
583 daemon_dirty_status = true;
584
585 // late register?
586 if (msgr->get_mytype() == CEPH_ENTITY_TYPE_CLIENT && session && session->con) {
587 _send_open();
588 }
589
590 return 0;
591 }
592
593 int MgrClient::service_daemon_update_status(
594 std::map<std::string,std::string>&& status)
595 {
596 std::lock_guard l(lock);
597 ldout(cct,10) << status << dendl;
598 daemon_status = std::move(status);
599 daemon_dirty_status = true;
600 return 0;
601 }
602
603 int MgrClient::service_daemon_update_task_status(
604 std::map<std::string,std::string> &&status) {
605 std::lock_guard l(lock);
606 ldout(cct,10) << status << dendl;
607 task_status = std::move(status);
608 task_dirty_status = true;
609 return 0;
610 }
611
612 void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics)
613 {
614 std::lock_guard l(lock);
615 daemon_health_metrics = std::move(metrics);
616 }
617