]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 John Spray <john.spray@redhat.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | */ | |
13 | ||
14 | ||
15 | #include "MgrClient.h" | |
16 | ||
17 | #include "mgr/MgrContext.h" | |
18 | ||
19 | #include "msg/Messenger.h" | |
20 | #include "messages/MMgrMap.h" | |
21 | #include "messages/MMgrReport.h" | |
22 | #include "messages/MMgrOpen.h" | |
11fdf7f2 | 23 | #include "messages/MMgrClose.h" |
7c673cae FG |
24 | #include "messages/MMgrConfigure.h" |
25 | #include "messages/MCommand.h" | |
26 | #include "messages/MCommandReply.h" | |
27 | #include "messages/MPGStats.h" | |
28 | ||
29 | #define dout_subsys ceph_subsys_mgrc | |
30 | #undef dout_prefix | |
31 | #define dout_prefix *_dout << "mgrc " << __func__ << " " | |
32 | ||
33 | MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_) | |
34 | : Dispatcher(cct_), cct(cct_), msgr(msgr_), | |
35 | timer(cct_, lock) | |
36 | { | |
11fdf7f2 | 37 | ceph_assert(cct != nullptr); |
7c673cae FG |
38 | } |
39 | ||
40 | void MgrClient::init() | |
41 | { | |
11fdf7f2 | 42 | std::lock_guard l(lock); |
7c673cae | 43 | |
11fdf7f2 | 44 | ceph_assert(msgr != nullptr); |
7c673cae FG |
45 | |
46 | timer.init(); | |
47 | } | |
48 | ||
49 | void MgrClient::shutdown() | |
50 | { | |
11fdf7f2 TL |
51 | std::lock_guard l(lock); |
52 | ldout(cct, 10) << dendl; | |
7c673cae FG |
53 | |
54 | if (connect_retry_callback) { | |
55 | timer.cancel_event(connect_retry_callback); | |
56 | connect_retry_callback = nullptr; | |
57 | } | |
58 | ||
59 | // forget about in-flight commands if we are prematurely shut down | |
60 | // (e.g., by control-C) | |
61 | command_table.clear(); | |
11fdf7f2 TL |
62 | if (service_daemon && |
63 | session && | |
64 | session->con && | |
65 | HAVE_FEATURE(session->con->get_features(), SERVER_MIMIC)) { | |
66 | ldout(cct, 10) << "closing mgr session" << dendl; | |
67 | MMgrClose *m = new MMgrClose(); | |
68 | m->daemon_name = daemon_name; | |
69 | m->service_name = service_name; | |
70 | session->con->send_message(m); | |
71 | utime_t timeout; | |
72 | timeout.set_from_double(cct->_conf.get_val<double>( | |
73 | "mgr_client_service_daemon_unregister_timeout")); | |
74 | shutdown_cond.WaitInterval(lock, timeout); | |
75 | } | |
7c673cae FG |
76 | |
77 | timer.shutdown(); | |
31f18b77 FG |
78 | if (session) { |
79 | session->con->mark_down(); | |
80 | session.reset(); | |
81 | } | |
7c673cae FG |
82 | } |
83 | ||
84 | bool MgrClient::ms_dispatch(Message *m) | |
85 | { | |
11fdf7f2 | 86 | std::lock_guard l(lock); |
7c673cae FG |
87 | |
88 | switch(m->get_type()) { | |
89 | case MSG_MGR_MAP: | |
90 | return handle_mgr_map(static_cast<MMgrMap*>(m)); | |
91 | case MSG_MGR_CONFIGURE: | |
92 | return handle_mgr_configure(static_cast<MMgrConfigure*>(m)); | |
11fdf7f2 TL |
93 | case MSG_MGR_CLOSE: |
94 | return handle_mgr_close(static_cast<MMgrClose*>(m)); | |
7c673cae FG |
95 | case MSG_COMMAND_REPLY: |
96 | if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) { | |
97 | handle_command_reply(static_cast<MCommandReply*>(m)); | |
98 | return true; | |
99 | } else { | |
100 | return false; | |
101 | } | |
102 | default: | |
103 | ldout(cct, 30) << "Not handling " << *m << dendl; | |
104 | return false; | |
105 | } | |
106 | } | |
107 | ||
108 | void MgrClient::reconnect() | |
109 | { | |
11fdf7f2 | 110 | ceph_assert(lock.is_locked_by_me()); |
7c673cae FG |
111 | |
112 | if (session) { | |
113 | ldout(cct, 4) << "Terminating session with " | |
114 | << session->con->get_peer_addr() << dendl; | |
115 | session->con->mark_down(); | |
116 | session.reset(); | |
117 | stats_period = 0; | |
118 | if (report_callback != nullptr) { | |
119 | timer.cancel_event(report_callback); | |
120 | report_callback = nullptr; | |
121 | } | |
122 | } | |
123 | ||
124 | if (!map.get_available()) { | |
125 | ldout(cct, 4) << "No active mgr available yet" << dendl; | |
126 | return; | |
127 | } | |
128 | ||
129 | if (last_connect_attempt != utime_t()) { | |
130 | utime_t now = ceph_clock_now(); | |
131 | utime_t when = last_connect_attempt; | |
11fdf7f2 | 132 | when += cct->_conf.get_val<double>("mgr_connect_retry_interval"); |
7c673cae FG |
133 | if (now < when) { |
134 | if (!connect_retry_callback) { | |
3efd9988 FG |
135 | connect_retry_callback = timer.add_event_at( |
136 | when, | |
137 | new FunctionContext([this](int r){ | |
138 | connect_retry_callback = nullptr; | |
139 | reconnect(); | |
140 | })); | |
7c673cae FG |
141 | } |
142 | ldout(cct, 4) << "waiting to retry connect until " << when << dendl; | |
143 | return; | |
144 | } | |
145 | } | |
146 | ||
147 | if (connect_retry_callback) { | |
148 | timer.cancel_event(connect_retry_callback); | |
149 | connect_retry_callback = nullptr; | |
150 | } | |
151 | ||
11fdf7f2 | 152 | ldout(cct, 4) << "Starting new session with " << map.get_active_addrs() |
7c673cae | 153 | << dendl; |
7c673cae FG |
154 | last_connect_attempt = ceph_clock_now(); |
155 | ||
156 | session.reset(new MgrSessionState()); | |
11fdf7f2 TL |
157 | session->con = msgr->connect_to(CEPH_ENTITY_TYPE_MGR, |
158 | map.get_active_addrs()); | |
7c673cae | 159 | |
224ce89b WB |
160 | if (service_daemon) { |
161 | daemon_dirty_status = true; | |
162 | } | |
163 | ||
7c673cae FG |
164 | // Don't send an open if we're just a client (i.e. doing |
165 | // command-sending, not stats etc) | |
c07f9fc5 | 166 | if (!cct->_conf->name.is_client() || service_daemon) { |
224ce89b | 167 | _send_open(); |
7c673cae FG |
168 | } |
169 | ||
170 | // resend any pending commands | |
171 | for (const auto &p : command_table.get_commands()) { | |
11fdf7f2 TL |
172 | auto m = p.second.get_message({}); |
173 | ceph_assert(session); | |
174 | ceph_assert(session->con); | |
175 | session->con->send_message2(std::move(m)); | |
7c673cae FG |
176 | } |
177 | } | |
178 | ||
224ce89b WB |
179 | void MgrClient::_send_open() |
180 | { | |
181 | if (session && session->con) { | |
182 | auto open = new MMgrOpen(); | |
183 | if (!service_name.empty()) { | |
184 | open->service_name = service_name; | |
185 | open->daemon_name = daemon_name; | |
186 | } else { | |
c07f9fc5 | 187 | open->daemon_name = cct->_conf->name.get_id(); |
224ce89b WB |
188 | } |
189 | if (service_daemon) { | |
190 | open->service_daemon = service_daemon; | |
191 | open->daemon_metadata = daemon_metadata; | |
192 | } | |
11fdf7f2 TL |
193 | cct->_conf.get_config_bl(0, &open->config_bl, &last_config_bl_version); |
194 | cct->_conf.get_defaults_bl(&open->config_defaults_bl); | |
224ce89b WB |
195 | session->con->send_message(open); |
196 | } | |
197 | } | |
198 | ||
7c673cae FG |
199 | bool MgrClient::handle_mgr_map(MMgrMap *m) |
200 | { | |
11fdf7f2 | 201 | ceph_assert(lock.is_locked_by_me()); |
7c673cae FG |
202 | |
203 | ldout(cct, 20) << *m << dendl; | |
204 | ||
205 | map = m->get_map(); | |
206 | ldout(cct, 4) << "Got map version " << map.epoch << dendl; | |
207 | m->put(); | |
208 | ||
11fdf7f2 | 209 | ldout(cct, 4) << "Active mgr is now " << map.get_active_addrs() << dendl; |
7c673cae FG |
210 | |
211 | // Reset session? | |
212 | if (!session || | |
11fdf7f2 | 213 | session->con->get_peer_addrs() != map.get_active_addrs()) { |
7c673cae FG |
214 | reconnect(); |
215 | } | |
216 | ||
217 | return true; | |
218 | } | |
219 | ||
220 | bool MgrClient::ms_handle_reset(Connection *con) | |
221 | { | |
11fdf7f2 | 222 | std::lock_guard l(lock); |
7c673cae FG |
223 | if (session && con == session->con) { |
224 | ldout(cct, 4) << __func__ << " con " << con << dendl; | |
225 | reconnect(); | |
226 | return true; | |
227 | } | |
228 | return false; | |
229 | } | |
230 | ||
231 | bool MgrClient::ms_handle_refused(Connection *con) | |
232 | { | |
233 | // do nothing for now | |
234 | return false; | |
235 | } | |
236 | ||
91327a77 AA |
237 | void MgrClient::_send_stats() |
238 | { | |
239 | _send_report(); | |
240 | _send_pgstats(); | |
241 | if (stats_period != 0) { | |
242 | report_callback = timer.add_event_after( | |
243 | stats_period, | |
244 | new FunctionContext([this](int) { | |
245 | _send_stats(); | |
246 | })); | |
247 | } | |
248 | } | |
249 | ||
250 | void MgrClient::_send_report() | |
7c673cae | 251 | { |
11fdf7f2 TL |
252 | ceph_assert(lock.is_locked_by_me()); |
253 | ceph_assert(session); | |
7c673cae FG |
254 | report_callback = nullptr; |
255 | ||
256 | auto report = new MMgrReport(); | |
257 | auto pcc = cct->get_perfcounters_collection(); | |
258 | ||
259 | pcc->with_counters([this, report]( | |
11fdf7f2 | 260 | const PerfCountersCollectionImpl::CounterMap &by_path) |
7c673cae | 261 | { |
3efd9988 FG |
262 | // Helper for checking whether a counter should be included |
263 | auto include_counter = [this]( | |
264 | const PerfCounters::perf_counter_data_any_d &ctr, | |
265 | const PerfCounters &perf_counters) | |
266 | { | |
267 | return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold; | |
268 | }; | |
269 | ||
270 | // Helper for cases where we want to forget a counter | |
271 | auto undeclare = [report, this](const std::string &path) | |
272 | { | |
273 | report->undeclare_types.push_back(path); | |
274 | ldout(cct,20) << " undeclare " << path << dendl; | |
275 | session->declared.erase(path); | |
276 | }; | |
277 | ||
7c673cae | 278 | ENCODE_START(1, 1, report->packed); |
3efd9988 FG |
279 | |
280 | // Find counters that no longer exist, and undeclare them | |
7c673cae | 281 | for (auto p = session->declared.begin(); p != session->declared.end(); ) { |
3efd9988 FG |
282 | const auto &path = *(p++); |
283 | if (by_path.count(path) == 0) { | |
284 | undeclare(path); | |
7c673cae FG |
285 | } |
286 | } | |
3efd9988 | 287 | |
7c673cae FG |
288 | for (const auto &i : by_path) { |
289 | auto& path = i.first; | |
3efd9988 FG |
290 | auto& data = *(i.second.data); |
291 | auto& perf_counters = *(i.second.perf_counters); | |
292 | ||
293 | // Find counters that still exist, but are no longer permitted by | |
294 | // stats_threshold | |
295 | if (!include_counter(data, perf_counters)) { | |
296 | if (session->declared.count(path)) { | |
297 | undeclare(path); | |
298 | } | |
299 | continue; | |
300 | } | |
7c673cae FG |
301 | |
302 | if (session->declared.count(path) == 0) { | |
3efd9988 | 303 | ldout(cct,20) << " declare " << path << dendl; |
7c673cae FG |
304 | PerfCounterType type; |
305 | type.path = path; | |
306 | if (data.description) { | |
307 | type.description = data.description; | |
308 | } | |
309 | if (data.nick) { | |
310 | type.nick = data.nick; | |
311 | } | |
312 | type.type = data.type; | |
1adf2230 AA |
313 | type.priority = perf_counters.get_adjusted_priority(data.prio); |
314 | type.unit = data.unit; | |
7c673cae FG |
315 | report->declare_types.push_back(std::move(type)); |
316 | session->declared.insert(path); | |
317 | } | |
318 | ||
11fdf7f2 | 319 | encode(static_cast<uint64_t>(data.u64), report->packed); |
7c673cae | 320 | if (data.type & PERFCOUNTER_LONGRUNAVG) { |
11fdf7f2 TL |
321 | encode(static_cast<uint64_t>(data.avgcount), report->packed); |
322 | encode(static_cast<uint64_t>(data.avgcount2), report->packed); | |
7c673cae FG |
323 | } |
324 | } | |
325 | ENCODE_FINISH(report->packed); | |
326 | ||
3efd9988 FG |
327 | ldout(cct, 20) << "sending " << session->declared.size() << " counters (" |
328 | "of possible " << by_path.size() << "), " | |
329 | << report->declare_types.size() << " new, " | |
330 | << report->undeclare_types.size() << " removed" | |
331 | << dendl; | |
7c673cae FG |
332 | }); |
333 | ||
334 | ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl; | |
335 | ||
224ce89b WB |
336 | if (daemon_name.size()) { |
337 | report->daemon_name = daemon_name; | |
338 | } else { | |
c07f9fc5 | 339 | report->daemon_name = cct->_conf->name.get_id(); |
224ce89b WB |
340 | } |
341 | report->service_name = service_name; | |
342 | ||
343 | if (daemon_dirty_status) { | |
344 | report->daemon_status = daemon_status; | |
345 | daemon_dirty_status = false; | |
346 | } | |
7c673cae | 347 | |
11fdf7f2 TL |
348 | report->daemon_health_metrics = std::move(daemon_health_metrics); |
349 | ||
350 | cct->_conf.get_config_bl(last_config_bl_version, &report->config_bl, | |
351 | &last_config_bl_version); | |
352 | ||
353 | if (get_perf_report_cb) { | |
354 | get_perf_report_cb(&report->osd_perf_metric_reports); | |
355 | } | |
356 | ||
7c673cae | 357 | session->con->send_message(report); |
31f18b77 FG |
358 | } |
359 | ||
360 | void MgrClient::send_pgstats() | |
91327a77 | 361 | { |
11fdf7f2 | 362 | std::lock_guard l(lock); |
91327a77 AA |
363 | _send_pgstats(); |
364 | } | |
365 | ||
366 | void MgrClient::_send_pgstats() | |
31f18b77 FG |
367 | { |
368 | if (pgstats_cb && session) { | |
369 | session->con->send_message(pgstats_cb()); | |
7c673cae FG |
370 | } |
371 | } | |
372 | ||
373 | bool MgrClient::handle_mgr_configure(MMgrConfigure *m) | |
374 | { | |
11fdf7f2 | 375 | ceph_assert(lock.is_locked_by_me()); |
7c673cae FG |
376 | |
377 | ldout(cct, 20) << *m << dendl; | |
378 | ||
379 | if (!session) { | |
380 | lderr(cct) << "dropping unexpected configure message" << dendl; | |
381 | m->put(); | |
382 | return true; | |
383 | } | |
384 | ||
385 | ldout(cct, 4) << "stats_period=" << m->stats_period << dendl; | |
386 | ||
3efd9988 FG |
387 | if (stats_threshold != m->stats_threshold) { |
388 | ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl; | |
389 | stats_threshold = m->stats_threshold; | |
390 | } | |
391 | ||
11fdf7f2 TL |
392 | if (set_perf_queries_cb) { |
393 | set_perf_queries_cb(m->osd_perf_metric_queries); | |
394 | } | |
395 | ||
7c673cae FG |
396 | bool starting = (stats_period == 0) && (m->stats_period != 0); |
397 | stats_period = m->stats_period; | |
398 | if (starting) { | |
91327a77 | 399 | _send_stats(); |
7c673cae FG |
400 | } |
401 | ||
402 | m->put(); | |
403 | return true; | |
404 | } | |
405 | ||
11fdf7f2 TL |
406 | bool MgrClient::handle_mgr_close(MMgrClose *m) |
407 | { | |
408 | service_daemon = false; | |
409 | shutdown_cond.Signal(); | |
410 | m->put(); | |
411 | return true; | |
412 | } | |
413 | ||
7c673cae FG |
414 | int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl, |
415 | bufferlist *outbl, string *outs, | |
416 | Context *onfinish) | |
417 | { | |
11fdf7f2 | 418 | std::lock_guard l(lock); |
7c673cae FG |
419 | |
420 | ldout(cct, 20) << "cmd: " << cmd << dendl; | |
421 | ||
11fdf7f2 | 422 | if (map.epoch == 0 && mgr_optional) { |
7c673cae FG |
423 | ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl; |
424 | return -EACCES; | |
425 | } | |
426 | ||
427 | auto &op = command_table.start_command(); | |
428 | op.cmd = cmd; | |
429 | op.inbl = inbl; | |
430 | op.outbl = outbl; | |
431 | op.outs = outs; | |
432 | op.on_finish = onfinish; | |
433 | ||
434 | if (session && session->con) { | |
435 | // Leaving fsid argument null because it isn't used. | |
11fdf7f2 TL |
436 | auto m = op.get_message({}); |
437 | session->con->send_message2(std::move(m)); | |
438 | } else { | |
439 | ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl; | |
7c673cae FG |
440 | } |
441 | return 0; | |
442 | } | |
443 | ||
444 | bool MgrClient::handle_command_reply(MCommandReply *m) | |
445 | { | |
11fdf7f2 | 446 | ceph_assert(lock.is_locked_by_me()); |
7c673cae FG |
447 | |
448 | ldout(cct, 20) << *m << dendl; | |
449 | ||
450 | const auto tid = m->get_tid(); | |
451 | if (!command_table.exists(tid)) { | |
452 | ldout(cct, 4) << "handle_command_reply tid " << m->get_tid() | |
453 | << " not found" << dendl; | |
454 | m->put(); | |
455 | return true; | |
456 | } | |
457 | ||
458 | auto &op = command_table.get_command(tid); | |
459 | if (op.outbl) { | |
460 | op.outbl->claim(m->get_data()); | |
461 | } | |
462 | ||
463 | if (op.outs) { | |
464 | *(op.outs) = m->rs; | |
465 | } | |
466 | ||
467 | if (op.on_finish) { | |
468 | op.on_finish->complete(m->r); | |
469 | } | |
470 | ||
471 | command_table.erase(tid); | |
472 | ||
473 | m->put(); | |
474 | return true; | |
475 | } | |
476 | ||
224ce89b WB |
477 | int MgrClient::service_daemon_register( |
478 | const std::string& service, | |
479 | const std::string& name, | |
480 | const std::map<std::string,std::string>& metadata) | |
481 | { | |
11fdf7f2 TL |
482 | std::lock_guard l(lock); |
483 | if (service == "osd" || | |
484 | service == "mds" || | |
485 | service == "client" || | |
486 | service == "mon" || | |
487 | service == "mgr") { | |
224ce89b WB |
488 | // normal ceph entity types are not allowed! |
489 | return -EINVAL; | |
490 | } | |
491 | if (service_daemon) { | |
492 | return -EEXIST; | |
493 | } | |
494 | ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl; | |
495 | service_daemon = true; | |
496 | service_name = service; | |
497 | daemon_name = name; | |
498 | daemon_metadata = metadata; | |
499 | daemon_dirty_status = true; | |
500 | ||
501 | // late register? | |
c07f9fc5 | 502 | if (cct->_conf->name.is_client() && session && session->con) { |
224ce89b WB |
503 | _send_open(); |
504 | } | |
505 | ||
506 | return 0; | |
507 | } | |
508 | ||
509 | int MgrClient::service_daemon_update_status( | |
11fdf7f2 | 510 | std::map<std::string,std::string>&& status) |
224ce89b | 511 | { |
11fdf7f2 | 512 | std::lock_guard l(lock); |
224ce89b | 513 | ldout(cct,10) << status << dendl; |
11fdf7f2 | 514 | daemon_status = std::move(status); |
224ce89b WB |
515 | daemon_dirty_status = true; |
516 | return 0; | |
517 | } | |
b32b8144 | 518 | |
11fdf7f2 | 519 | void MgrClient::update_daemon_health(std::vector<DaemonHealthMetric>&& metrics) |
b32b8144 | 520 | { |
11fdf7f2 TL |
521 | std::lock_guard l(lock); |
522 | daemon_health_metrics = std::move(metrics); | |
b32b8144 | 523 | } |
11fdf7f2 | 524 |