]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/MgrClient.cc
update sources to v12.1.2
[ceph.git] / ceph / src / mgr / MgrClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14
15#include "MgrClient.h"
16
17#include "mgr/MgrContext.h"
18
19#include "msg/Messenger.h"
20#include "messages/MMgrMap.h"
21#include "messages/MMgrReport.h"
22#include "messages/MMgrOpen.h"
23#include "messages/MMgrConfigure.h"
24#include "messages/MCommand.h"
25#include "messages/MCommandReply.h"
26#include "messages/MPGStats.h"
27
28#define dout_subsys ceph_subsys_mgrc
29#undef dout_prefix
30#define dout_prefix *_dout << "mgrc " << __func__ << " "
31
32MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_)
33 : Dispatcher(cct_), cct(cct_), msgr(msgr_),
34 timer(cct_, lock)
35{
36 assert(cct != nullptr);
37}
38
39void MgrClient::init()
40{
41 Mutex::Locker l(lock);
42
43 assert(msgr != nullptr);
44
45 timer.init();
46}
47
48void MgrClient::shutdown()
49{
50 Mutex::Locker l(lock);
51
52 if (connect_retry_callback) {
53 timer.cancel_event(connect_retry_callback);
54 connect_retry_callback = nullptr;
55 }
56
57 // forget about in-flight commands if we are prematurely shut down
58 // (e.g., by control-C)
59 command_table.clear();
60
61 timer.shutdown();
31f18b77
FG
62 if (session) {
63 session->con->mark_down();
64 session.reset();
65 }
7c673cae
FG
66}
67
68bool MgrClient::ms_dispatch(Message *m)
69{
70 Mutex::Locker l(lock);
71
72 switch(m->get_type()) {
73 case MSG_MGR_MAP:
74 return handle_mgr_map(static_cast<MMgrMap*>(m));
75 case MSG_MGR_CONFIGURE:
76 return handle_mgr_configure(static_cast<MMgrConfigure*>(m));
77 case MSG_COMMAND_REPLY:
78 if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
79 handle_command_reply(static_cast<MCommandReply*>(m));
80 return true;
81 } else {
82 return false;
83 }
84 default:
85 ldout(cct, 30) << "Not handling " << *m << dendl;
86 return false;
87 }
88}
89
90void MgrClient::reconnect()
91{
92 assert(lock.is_locked_by_me());
93
94 if (session) {
95 ldout(cct, 4) << "Terminating session with "
96 << session->con->get_peer_addr() << dendl;
97 session->con->mark_down();
98 session.reset();
99 stats_period = 0;
100 if (report_callback != nullptr) {
101 timer.cancel_event(report_callback);
102 report_callback = nullptr;
103 }
104 }
105
106 if (!map.get_available()) {
107 ldout(cct, 4) << "No active mgr available yet" << dendl;
108 return;
109 }
110
111 if (last_connect_attempt != utime_t()) {
112 utime_t now = ceph_clock_now();
113 utime_t when = last_connect_attempt;
114 when += cct->_conf->mgr_connect_retry_interval;
115 if (now < when) {
116 if (!connect_retry_callback) {
117 connect_retry_callback = new FunctionContext([this](int r){
118 connect_retry_callback = nullptr;
119 reconnect();
120 });
121 timer.add_event_at(when, connect_retry_callback);
122 }
123 ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
124 return;
125 }
126 }
127
128 if (connect_retry_callback) {
129 timer.cancel_event(connect_retry_callback);
130 connect_retry_callback = nullptr;
131 }
132
133 ldout(cct, 4) << "Starting new session with " << map.get_active_addr()
134 << dendl;
135 entity_inst_t inst;
136 inst.addr = map.get_active_addr();
137 inst.name = entity_name_t::MGR(map.get_active_gid());
138 last_connect_attempt = ceph_clock_now();
139
140 session.reset(new MgrSessionState());
141 session->con = msgr->get_connection(inst);
142
224ce89b
WB
143 if (service_daemon) {
144 daemon_dirty_status = true;
145 }
146
7c673cae
FG
147 // Don't send an open if we're just a client (i.e. doing
148 // command-sending, not stats etc)
c07f9fc5 149 if (!cct->_conf->name.is_client() || service_daemon) {
224ce89b 150 _send_open();
7c673cae
FG
151 }
152
153 // resend any pending commands
154 for (const auto &p : command_table.get_commands()) {
155 MCommand *m = p.second.get_message({});
156 assert(session);
157 assert(session->con);
158 session->con->send_message(m);
159 }
160}
161
224ce89b
WB
162void MgrClient::_send_open()
163{
164 if (session && session->con) {
165 auto open = new MMgrOpen();
166 if (!service_name.empty()) {
167 open->service_name = service_name;
168 open->daemon_name = daemon_name;
169 } else {
c07f9fc5 170 open->daemon_name = cct->_conf->name.get_id();
224ce89b
WB
171 }
172 if (service_daemon) {
173 open->service_daemon = service_daemon;
174 open->daemon_metadata = daemon_metadata;
175 }
176 session->con->send_message(open);
177 }
178}
179
7c673cae
FG
180bool MgrClient::handle_mgr_map(MMgrMap *m)
181{
182 assert(lock.is_locked_by_me());
183
184 ldout(cct, 20) << *m << dendl;
185
186 map = m->get_map();
187 ldout(cct, 4) << "Got map version " << map.epoch << dendl;
188 m->put();
189
190 ldout(cct, 4) << "Active mgr is now " << map.get_active_addr() << dendl;
191
192 // Reset session?
193 if (!session ||
194 session->con->get_peer_addr() != map.get_active_addr()) {
195 reconnect();
196 }
197
198 return true;
199}
200
201bool MgrClient::ms_handle_reset(Connection *con)
202{
203 Mutex::Locker l(lock);
204 if (session && con == session->con) {
205 ldout(cct, 4) << __func__ << " con " << con << dendl;
206 reconnect();
207 return true;
208 }
209 return false;
210}
211
212bool MgrClient::ms_handle_refused(Connection *con)
213{
214 // do nothing for now
215 return false;
216}
217
218void MgrClient::send_report()
219{
220 assert(lock.is_locked_by_me());
221 assert(session);
222 report_callback = nullptr;
223
224 auto report = new MMgrReport();
225 auto pcc = cct->get_perfcounters_collection();
226
227 pcc->with_counters([this, report](
228 const PerfCountersCollection::CounterMap &by_path)
229 {
230 ENCODE_START(1, 1, report->packed);
231 for (auto p = session->declared.begin(); p != session->declared.end(); ) {
232 if (by_path.count(*p) == 0) {
233 report->undeclare_types.push_back(*p);
234 ldout(cct,20) << __func__ << " undeclare " << *p << dendl;
235 p = session->declared.erase(p);
236 } else {
237 ++p;
238 }
239 }
240 for (const auto &i : by_path) {
241 auto& path = i.first;
242 auto& data = *(i.second);
243
244 if (session->declared.count(path) == 0) {
245 ldout(cct,20) << __func__ << " declare " << path << dendl;
246 PerfCounterType type;
247 type.path = path;
248 if (data.description) {
249 type.description = data.description;
250 }
251 if (data.nick) {
252 type.nick = data.nick;
253 }
254 type.type = data.type;
255 report->declare_types.push_back(std::move(type));
256 session->declared.insert(path);
257 }
258
31f18b77 259 ::encode(static_cast<uint64_t>(data.u64), report->packed);
7c673cae 260 if (data.type & PERFCOUNTER_LONGRUNAVG) {
31f18b77
FG
261 ::encode(static_cast<uint64_t>(data.avgcount), report->packed);
262 ::encode(static_cast<uint64_t>(data.avgcount2), report->packed);
7c673cae
FG
263 }
264 }
265 ENCODE_FINISH(report->packed);
266
267 ldout(cct, 20) << by_path.size() << " counters, of which "
268 << report->declare_types.size() << " new" << dendl;
269 });
270
271 ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
272
224ce89b
WB
273 if (daemon_name.size()) {
274 report->daemon_name = daemon_name;
275 } else {
c07f9fc5 276 report->daemon_name = cct->_conf->name.get_id();
224ce89b
WB
277 }
278 report->service_name = service_name;
279
280 if (daemon_dirty_status) {
281 report->daemon_status = daemon_status;
282 daemon_dirty_status = false;
283 }
7c673cae
FG
284
285 session->con->send_message(report);
286
287 if (stats_period != 0) {
288 report_callback = new FunctionContext([this](int r){send_report();});
289 timer.add_event_after(stats_period, report_callback);
290 }
291
31f18b77
FG
292 send_pgstats();
293}
294
295void MgrClient::send_pgstats()
296{
297 if (pgstats_cb && session) {
298 session->con->send_message(pgstats_cb());
7c673cae
FG
299 }
300}
301
302bool MgrClient::handle_mgr_configure(MMgrConfigure *m)
303{
304 assert(lock.is_locked_by_me());
305
306 ldout(cct, 20) << *m << dendl;
307
308 if (!session) {
309 lderr(cct) << "dropping unexpected configure message" << dendl;
310 m->put();
311 return true;
312 }
313
314 ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
315
316 bool starting = (stats_period == 0) && (m->stats_period != 0);
317 stats_period = m->stats_period;
318 if (starting) {
319 send_report();
320 }
321
322 m->put();
323 return true;
324}
325
326int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
327 bufferlist *outbl, string *outs,
328 Context *onfinish)
329{
330 Mutex::Locker l(lock);
331
332 ldout(cct, 20) << "cmd: " << cmd << dendl;
333
334 if (map.epoch == 0) {
335 ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl;
336 return -EACCES;
337 }
338
339 auto &op = command_table.start_command();
340 op.cmd = cmd;
341 op.inbl = inbl;
342 op.outbl = outbl;
343 op.outs = outs;
344 op.on_finish = onfinish;
345
346 if (session && session->con) {
347 // Leaving fsid argument null because it isn't used.
348 MCommand *m = op.get_message({});
349 session->con->send_message(m);
350 }
351 return 0;
352}
353
354bool MgrClient::handle_command_reply(MCommandReply *m)
355{
356 assert(lock.is_locked_by_me());
357
358 ldout(cct, 20) << *m << dendl;
359
360 const auto tid = m->get_tid();
361 if (!command_table.exists(tid)) {
362 ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
363 << " not found" << dendl;
364 m->put();
365 return true;
366 }
367
368 auto &op = command_table.get_command(tid);
369 if (op.outbl) {
370 op.outbl->claim(m->get_data());
371 }
372
373 if (op.outs) {
374 *(op.outs) = m->rs;
375 }
376
377 if (op.on_finish) {
378 op.on_finish->complete(m->r);
379 }
380
381 command_table.erase(tid);
382
383 m->put();
384 return true;
385}
386
224ce89b
WB
387int MgrClient::service_daemon_register(
388 const std::string& service,
389 const std::string& name,
390 const std::map<std::string,std::string>& metadata)
391{
392 Mutex::Locker l(lock);
393 if (name == "osd" ||
394 name == "mds" ||
395 name == "client" ||
396 name == "mon" ||
397 name == "mgr") {
398 // normal ceph entity types are not allowed!
399 return -EINVAL;
400 }
401 if (service_daemon) {
402 return -EEXIST;
403 }
404 ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
405 service_daemon = true;
406 service_name = service;
407 daemon_name = name;
408 daemon_metadata = metadata;
409 daemon_dirty_status = true;
410
411 // late register?
c07f9fc5 412 if (cct->_conf->name.is_client() && session && session->con) {
224ce89b
WB
413 _send_open();
414 }
415
416 return 0;
417}
418
419int MgrClient::service_daemon_update_status(
420 const std::map<std::string,std::string>& status)
421{
422 Mutex::Locker l(lock);
423 ldout(cct,10) << status << dendl;
424 daemon_status = status;
425 daemon_dirty_status = true;
426 return 0;
427}