]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 John Spray <john.spray@redhat.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | */ | |
13 | ||
14 | ||
15 | #include "MgrClient.h" | |
16 | ||
17 | #include "mgr/MgrContext.h" | |
18 | ||
19 | #include "msg/Messenger.h" | |
20 | #include "messages/MMgrMap.h" | |
21 | #include "messages/MMgrReport.h" | |
22 | #include "messages/MMgrOpen.h" | |
23 | #include "messages/MMgrConfigure.h" | |
24 | #include "messages/MCommand.h" | |
25 | #include "messages/MCommandReply.h" | |
26 | #include "messages/MPGStats.h" | |
27 | ||
28 | #define dout_subsys ceph_subsys_mgrc | |
29 | #undef dout_prefix | |
30 | #define dout_prefix *_dout << "mgrc " << __func__ << " " | |
31 | ||
32 | MgrClient::MgrClient(CephContext *cct_, Messenger *msgr_) | |
33 | : Dispatcher(cct_), cct(cct_), msgr(msgr_), | |
34 | timer(cct_, lock) | |
35 | { | |
36 | assert(cct != nullptr); | |
37 | } | |
38 | ||
39 | void MgrClient::init() | |
40 | { | |
41 | Mutex::Locker l(lock); | |
42 | ||
43 | assert(msgr != nullptr); | |
44 | ||
45 | timer.init(); | |
46 | } | |
47 | ||
48 | void MgrClient::shutdown() | |
49 | { | |
50 | Mutex::Locker l(lock); | |
51 | ||
52 | if (connect_retry_callback) { | |
53 | timer.cancel_event(connect_retry_callback); | |
54 | connect_retry_callback = nullptr; | |
55 | } | |
56 | ||
57 | // forget about in-flight commands if we are prematurely shut down | |
58 | // (e.g., by control-C) | |
59 | command_table.clear(); | |
60 | ||
61 | timer.shutdown(); | |
31f18b77 FG |
62 | if (session) { |
63 | session->con->mark_down(); | |
64 | session.reset(); | |
65 | } | |
7c673cae FG |
66 | } |
67 | ||
68 | bool MgrClient::ms_dispatch(Message *m) | |
69 | { | |
70 | Mutex::Locker l(lock); | |
71 | ||
72 | switch(m->get_type()) { | |
73 | case MSG_MGR_MAP: | |
74 | return handle_mgr_map(static_cast<MMgrMap*>(m)); | |
75 | case MSG_MGR_CONFIGURE: | |
76 | return handle_mgr_configure(static_cast<MMgrConfigure*>(m)); | |
77 | case MSG_COMMAND_REPLY: | |
78 | if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) { | |
79 | handle_command_reply(static_cast<MCommandReply*>(m)); | |
80 | return true; | |
81 | } else { | |
82 | return false; | |
83 | } | |
84 | default: | |
85 | ldout(cct, 30) << "Not handling " << *m << dendl; | |
86 | return false; | |
87 | } | |
88 | } | |
89 | ||
90 | void MgrClient::reconnect() | |
91 | { | |
92 | assert(lock.is_locked_by_me()); | |
93 | ||
94 | if (session) { | |
95 | ldout(cct, 4) << "Terminating session with " | |
96 | << session->con->get_peer_addr() << dendl; | |
97 | session->con->mark_down(); | |
98 | session.reset(); | |
99 | stats_period = 0; | |
100 | if (report_callback != nullptr) { | |
101 | timer.cancel_event(report_callback); | |
102 | report_callback = nullptr; | |
103 | } | |
104 | } | |
105 | ||
106 | if (!map.get_available()) { | |
107 | ldout(cct, 4) << "No active mgr available yet" << dendl; | |
108 | return; | |
109 | } | |
110 | ||
111 | if (last_connect_attempt != utime_t()) { | |
112 | utime_t now = ceph_clock_now(); | |
113 | utime_t when = last_connect_attempt; | |
114 | when += cct->_conf->mgr_connect_retry_interval; | |
115 | if (now < when) { | |
116 | if (!connect_retry_callback) { | |
117 | connect_retry_callback = new FunctionContext([this](int r){ | |
118 | connect_retry_callback = nullptr; | |
119 | reconnect(); | |
120 | }); | |
121 | timer.add_event_at(when, connect_retry_callback); | |
122 | } | |
123 | ldout(cct, 4) << "waiting to retry connect until " << when << dendl; | |
124 | return; | |
125 | } | |
126 | } | |
127 | ||
128 | if (connect_retry_callback) { | |
129 | timer.cancel_event(connect_retry_callback); | |
130 | connect_retry_callback = nullptr; | |
131 | } | |
132 | ||
133 | ldout(cct, 4) << "Starting new session with " << map.get_active_addr() | |
134 | << dendl; | |
135 | entity_inst_t inst; | |
136 | inst.addr = map.get_active_addr(); | |
137 | inst.name = entity_name_t::MGR(map.get_active_gid()); | |
138 | last_connect_attempt = ceph_clock_now(); | |
139 | ||
140 | session.reset(new MgrSessionState()); | |
141 | session->con = msgr->get_connection(inst); | |
142 | ||
224ce89b WB |
143 | if (service_daemon) { |
144 | daemon_dirty_status = true; | |
145 | } | |
146 | ||
7c673cae FG |
147 | // Don't send an open if we're just a client (i.e. doing |
148 | // command-sending, not stats etc) | |
224ce89b WB |
149 | if ((g_conf && !g_conf->name.is_client()) || |
150 | service_daemon) { | |
151 | _send_open(); | |
7c673cae FG |
152 | } |
153 | ||
154 | // resend any pending commands | |
155 | for (const auto &p : command_table.get_commands()) { | |
156 | MCommand *m = p.second.get_message({}); | |
157 | assert(session); | |
158 | assert(session->con); | |
159 | session->con->send_message(m); | |
160 | } | |
161 | } | |
162 | ||
224ce89b WB |
163 | void MgrClient::_send_open() |
164 | { | |
165 | if (session && session->con) { | |
166 | auto open = new MMgrOpen(); | |
167 | if (!service_name.empty()) { | |
168 | open->service_name = service_name; | |
169 | open->daemon_name = daemon_name; | |
170 | } else { | |
171 | open->daemon_name = g_conf->name.get_id(); | |
172 | } | |
173 | if (service_daemon) { | |
174 | open->service_daemon = service_daemon; | |
175 | open->daemon_metadata = daemon_metadata; | |
176 | } | |
177 | session->con->send_message(open); | |
178 | } | |
179 | } | |
180 | ||
7c673cae FG |
181 | bool MgrClient::handle_mgr_map(MMgrMap *m) |
182 | { | |
183 | assert(lock.is_locked_by_me()); | |
184 | ||
185 | ldout(cct, 20) << *m << dendl; | |
186 | ||
187 | map = m->get_map(); | |
188 | ldout(cct, 4) << "Got map version " << map.epoch << dendl; | |
189 | m->put(); | |
190 | ||
191 | ldout(cct, 4) << "Active mgr is now " << map.get_active_addr() << dendl; | |
192 | ||
193 | // Reset session? | |
194 | if (!session || | |
195 | session->con->get_peer_addr() != map.get_active_addr()) { | |
196 | reconnect(); | |
197 | } | |
198 | ||
199 | return true; | |
200 | } | |
201 | ||
202 | bool MgrClient::ms_handle_reset(Connection *con) | |
203 | { | |
204 | Mutex::Locker l(lock); | |
205 | if (session && con == session->con) { | |
206 | ldout(cct, 4) << __func__ << " con " << con << dendl; | |
207 | reconnect(); | |
208 | return true; | |
209 | } | |
210 | return false; | |
211 | } | |
212 | ||
213 | bool MgrClient::ms_handle_refused(Connection *con) | |
214 | { | |
215 | // do nothing for now | |
216 | return false; | |
217 | } | |
218 | ||
219 | void MgrClient::send_report() | |
220 | { | |
221 | assert(lock.is_locked_by_me()); | |
222 | assert(session); | |
223 | report_callback = nullptr; | |
224 | ||
225 | auto report = new MMgrReport(); | |
226 | auto pcc = cct->get_perfcounters_collection(); | |
227 | ||
228 | pcc->with_counters([this, report]( | |
229 | const PerfCountersCollection::CounterMap &by_path) | |
230 | { | |
231 | ENCODE_START(1, 1, report->packed); | |
232 | for (auto p = session->declared.begin(); p != session->declared.end(); ) { | |
233 | if (by_path.count(*p) == 0) { | |
234 | report->undeclare_types.push_back(*p); | |
235 | ldout(cct,20) << __func__ << " undeclare " << *p << dendl; | |
236 | p = session->declared.erase(p); | |
237 | } else { | |
238 | ++p; | |
239 | } | |
240 | } | |
241 | for (const auto &i : by_path) { | |
242 | auto& path = i.first; | |
243 | auto& data = *(i.second); | |
244 | ||
245 | if (session->declared.count(path) == 0) { | |
246 | ldout(cct,20) << __func__ << " declare " << path << dendl; | |
247 | PerfCounterType type; | |
248 | type.path = path; | |
249 | if (data.description) { | |
250 | type.description = data.description; | |
251 | } | |
252 | if (data.nick) { | |
253 | type.nick = data.nick; | |
254 | } | |
255 | type.type = data.type; | |
256 | report->declare_types.push_back(std::move(type)); | |
257 | session->declared.insert(path); | |
258 | } | |
259 | ||
31f18b77 | 260 | ::encode(static_cast<uint64_t>(data.u64), report->packed); |
7c673cae | 261 | if (data.type & PERFCOUNTER_LONGRUNAVG) { |
31f18b77 FG |
262 | ::encode(static_cast<uint64_t>(data.avgcount), report->packed); |
263 | ::encode(static_cast<uint64_t>(data.avgcount2), report->packed); | |
7c673cae FG |
264 | } |
265 | } | |
266 | ENCODE_FINISH(report->packed); | |
267 | ||
268 | ldout(cct, 20) << by_path.size() << " counters, of which " | |
269 | << report->declare_types.size() << " new" << dendl; | |
270 | }); | |
271 | ||
272 | ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl; | |
273 | ||
224ce89b WB |
274 | if (daemon_name.size()) { |
275 | report->daemon_name = daemon_name; | |
276 | } else { | |
277 | report->daemon_name = g_conf->name.get_id(); | |
278 | } | |
279 | report->service_name = service_name; | |
280 | ||
281 | if (daemon_dirty_status) { | |
282 | report->daemon_status = daemon_status; | |
283 | daemon_dirty_status = false; | |
284 | } | |
7c673cae FG |
285 | |
286 | session->con->send_message(report); | |
287 | ||
288 | if (stats_period != 0) { | |
289 | report_callback = new FunctionContext([this](int r){send_report();}); | |
290 | timer.add_event_after(stats_period, report_callback); | |
291 | } | |
292 | ||
31f18b77 FG |
293 | send_pgstats(); |
294 | } | |
295 | ||
296 | void MgrClient::send_pgstats() | |
297 | { | |
298 | if (pgstats_cb && session) { | |
299 | session->con->send_message(pgstats_cb()); | |
7c673cae FG |
300 | } |
301 | } | |
302 | ||
303 | bool MgrClient::handle_mgr_configure(MMgrConfigure *m) | |
304 | { | |
305 | assert(lock.is_locked_by_me()); | |
306 | ||
307 | ldout(cct, 20) << *m << dendl; | |
308 | ||
309 | if (!session) { | |
310 | lderr(cct) << "dropping unexpected configure message" << dendl; | |
311 | m->put(); | |
312 | return true; | |
313 | } | |
314 | ||
315 | ldout(cct, 4) << "stats_period=" << m->stats_period << dendl; | |
316 | ||
317 | bool starting = (stats_period == 0) && (m->stats_period != 0); | |
318 | stats_period = m->stats_period; | |
319 | if (starting) { | |
320 | send_report(); | |
321 | } | |
322 | ||
323 | m->put(); | |
324 | return true; | |
325 | } | |
326 | ||
327 | int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl, | |
328 | bufferlist *outbl, string *outs, | |
329 | Context *onfinish) | |
330 | { | |
331 | Mutex::Locker l(lock); | |
332 | ||
333 | ldout(cct, 20) << "cmd: " << cmd << dendl; | |
334 | ||
335 | if (map.epoch == 0) { | |
336 | ldout(cct,20) << " no MgrMap, assuming EACCES" << dendl; | |
337 | return -EACCES; | |
338 | } | |
339 | ||
340 | auto &op = command_table.start_command(); | |
341 | op.cmd = cmd; | |
342 | op.inbl = inbl; | |
343 | op.outbl = outbl; | |
344 | op.outs = outs; | |
345 | op.on_finish = onfinish; | |
346 | ||
347 | if (session && session->con) { | |
348 | // Leaving fsid argument null because it isn't used. | |
349 | MCommand *m = op.get_message({}); | |
350 | session->con->send_message(m); | |
351 | } | |
352 | return 0; | |
353 | } | |
354 | ||
355 | bool MgrClient::handle_command_reply(MCommandReply *m) | |
356 | { | |
357 | assert(lock.is_locked_by_me()); | |
358 | ||
359 | ldout(cct, 20) << *m << dendl; | |
360 | ||
361 | const auto tid = m->get_tid(); | |
362 | if (!command_table.exists(tid)) { | |
363 | ldout(cct, 4) << "handle_command_reply tid " << m->get_tid() | |
364 | << " not found" << dendl; | |
365 | m->put(); | |
366 | return true; | |
367 | } | |
368 | ||
369 | auto &op = command_table.get_command(tid); | |
370 | if (op.outbl) { | |
371 | op.outbl->claim(m->get_data()); | |
372 | } | |
373 | ||
374 | if (op.outs) { | |
375 | *(op.outs) = m->rs; | |
376 | } | |
377 | ||
378 | if (op.on_finish) { | |
379 | op.on_finish->complete(m->r); | |
380 | } | |
381 | ||
382 | command_table.erase(tid); | |
383 | ||
384 | m->put(); | |
385 | return true; | |
386 | } | |
387 | ||
224ce89b WB |
388 | int MgrClient::service_daemon_register( |
389 | const std::string& service, | |
390 | const std::string& name, | |
391 | const std::map<std::string,std::string>& metadata) | |
392 | { | |
393 | Mutex::Locker l(lock); | |
394 | if (name == "osd" || | |
395 | name == "mds" || | |
396 | name == "client" || | |
397 | name == "mon" || | |
398 | name == "mgr") { | |
399 | // normal ceph entity types are not allowed! | |
400 | return -EINVAL; | |
401 | } | |
402 | if (service_daemon) { | |
403 | return -EEXIST; | |
404 | } | |
405 | ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl; | |
406 | service_daemon = true; | |
407 | service_name = service; | |
408 | daemon_name = name; | |
409 | daemon_metadata = metadata; | |
410 | daemon_dirty_status = true; | |
411 | ||
412 | // late register? | |
413 | if (g_conf->name.is_client() && session && session->con) { | |
414 | _send_open(); | |
415 | } | |
416 | ||
417 | return 0; | |
418 | } | |
419 | ||
420 | int MgrClient::service_daemon_update_status( | |
421 | const std::map<std::string,std::string>& status) | |
422 | { | |
423 | Mutex::Locker l(lock); | |
424 | ldout(cct,10) << status << dendl; | |
425 | daemon_status = status; | |
426 | daemon_dirty_status = true; | |
427 | return 0; | |
428 | } |