]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include <random> | |
16 | ||
17 | #include "messages/MMonGetMap.h" | |
18 | #include "messages/MMonGetVersion.h" | |
19 | #include "messages/MMonGetVersionReply.h" | |
20 | #include "messages/MMonMap.h" | |
21 | #include "messages/MAuth.h" | |
22 | #include "messages/MLogAck.h" | |
23 | #include "messages/MAuthReply.h" | |
24 | #include "messages/MMonCommand.h" | |
25 | #include "messages/MMonCommandAck.h" | |
26 | #include "messages/MPing.h" | |
27 | ||
28 | #include "messages/MMonSubscribe.h" | |
29 | #include "messages/MMonSubscribeAck.h" | |
30 | #include "common/errno.h" | |
31 | #include "common/LogClient.h" | |
32 | ||
33 | #include "MonClient.h" | |
34 | #include "MonMap.h" | |
35 | ||
36 | #include "auth/Auth.h" | |
37 | #include "auth/KeyRing.h" | |
38 | #include "auth/AuthClientHandler.h" | |
39 | #include "auth/AuthMethodList.h" | |
40 | #include "auth/RotatingKeyRing.h" | |
41 | ||
7c673cae FG |
42 | #define dout_subsys ceph_subsys_monc |
43 | #undef dout_prefix | |
44 | #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": " | |
45 | ||
46 | MonClient::MonClient(CephContext *cct_) : | |
47 | Dispatcher(cct_), | |
48 | messenger(NULL), | |
49 | monc_lock("MonClient::monc_lock"), | |
31f18b77 FG |
50 | timer(cct_, monc_lock), |
51 | finisher(cct_), | |
7c673cae FG |
52 | initialized(false), |
53 | no_keyring_disabled_cephx(false), | |
54 | log_client(NULL), | |
55 | more_log_pending(false), | |
56 | want_monmap(true), | |
57 | had_a_connection(false), | |
58 | reopen_interval_multiplier(1.0), | |
59 | last_mon_command_tid(0), | |
60 | version_req_id(0) | |
61 | { | |
62 | } | |
63 | ||
64 | MonClient::~MonClient() | |
65 | { | |
66 | } | |
67 | ||
68 | int MonClient::build_initial_monmap() | |
69 | { | |
70 | ldout(cct, 10) << __func__ << dendl; | |
71 | return monmap.build_initial(cct, cerr); | |
72 | } | |
73 | ||
74 | int MonClient::get_monmap() | |
75 | { | |
76 | ldout(cct, 10) << __func__ << dendl; | |
77 | Mutex::Locker l(monc_lock); | |
78 | ||
79 | _sub_want("monmap", 0, 0); | |
80 | if (!_opened()) | |
81 | _reopen_session(); | |
82 | ||
83 | while (want_monmap) | |
84 | map_cond.Wait(monc_lock); | |
85 | ||
86 | ldout(cct, 10) << __func__ << " done" << dendl; | |
87 | return 0; | |
88 | } | |
89 | ||
90 | int MonClient::get_monmap_privately() | |
91 | { | |
92 | ldout(cct, 10) << __func__ << dendl; | |
93 | Mutex::Locker l(monc_lock); | |
94 | ||
95 | bool temp_msgr = false; | |
96 | Messenger* smessenger = NULL; | |
97 | if (!messenger) { | |
98 | messenger = smessenger = Messenger::create_client_messenger(cct, "temp_mon_client"); | |
99 | if (NULL == messenger) { | |
100 | return -1; | |
101 | } | |
102 | messenger->add_dispatcher_head(this); | |
103 | smessenger->start(); | |
104 | temp_msgr = true; | |
105 | } | |
106 | ||
107 | int attempt = 10; | |
108 | ||
109 | ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl; | |
110 | ||
111 | std::random_device rd; | |
112 | std::mt19937 rng(rd()); | |
113 | assert(monmap.size() > 0); | |
114 | std::uniform_int_distribution<unsigned> ranks(0, monmap.size() - 1); | |
115 | while (monmap.fsid.is_zero()) { | |
116 | auto rank = ranks(rng); | |
117 | auto& pending_con = _add_conn(rank, 0); | |
118 | auto con = pending_con.get_con(); | |
119 | ldout(cct, 10) << "querying mon." << monmap.get_name(rank) << " " | |
120 | << con->get_peer_addr() << dendl; | |
121 | con->send_message(new MMonGetMap); | |
122 | ||
123 | if (--attempt == 0) | |
124 | break; | |
125 | ||
126 | utime_t interval; | |
127 | interval.set_from_double(cct->_conf->mon_client_hunt_interval); | |
128 | map_cond.WaitInterval(monc_lock, interval); | |
129 | ||
130 | if (monmap.fsid.is_zero() && con) { | |
131 | con->mark_down(); // nope, clean that connection up | |
132 | } | |
133 | } | |
134 | ||
135 | if (temp_msgr) { | |
136 | pending_cons.clear(); | |
137 | monc_lock.Unlock(); | |
138 | messenger->shutdown(); | |
139 | if (smessenger) | |
140 | smessenger->wait(); | |
141 | delete messenger; | |
142 | messenger = 0; | |
143 | monc_lock.Lock(); | |
144 | } | |
145 | ||
146 | pending_cons.clear(); | |
147 | ||
148 | if (!monmap.fsid.is_zero()) | |
149 | return 0; | |
150 | return -1; | |
151 | } | |
152 | ||
153 | ||
154 | /** | |
155 | * Ping the monitor with id @p mon_id and set the resulting reply in | |
156 | * the provided @p result_reply, if this last parameter is not NULL. | |
157 | * | |
158 | * So that we don't rely on the MonClient's default messenger, set up | |
159 | * during connect(), we create our own messenger to comunicate with the | |
160 | * specified monitor. This is advantageous in the following ways: | |
161 | * | |
162 | * - Isolate the ping procedure from the rest of the MonClient's operations, | |
163 | * allowing us to not acquire or manage the big monc_lock, thus not | |
164 | * having to block waiting for some other operation to finish before we | |
165 | * can proceed. | |
166 | * * for instance, we can ping mon.FOO even if we are currently hunting | |
167 | * or blocked waiting for auth to complete with mon.BAR. | |
168 | * | |
169 | * - Ping a monitor prior to establishing a connection (using connect()) | |
170 | * and properly establish the MonClient's messenger. This frees us | |
171 | * from dealing with the complex foo that happens in connect(). | |
172 | * | |
173 | * We also don't rely on MonClient as a dispatcher for this messenger, | |
174 | * unlike what happens with the MonClient's default messenger. This allows | |
175 | * us to sandbox the whole ping, having it much as a separate entity in | |
176 | * the MonClient class, considerably simplifying the handling and dispatching | |
177 | * of messages without needing to consider monc_lock. | |
178 | * | |
179 | * Current drawback is that we will establish a messenger for each ping | |
180 | * we want to issue, instead of keeping a single messenger instance that | |
181 | * would be used for all pings. | |
182 | */ | |
183 | int MonClient::ping_monitor(const string &mon_id, string *result_reply) | |
184 | { | |
185 | ldout(cct, 10) << __func__ << dendl; | |
186 | ||
187 | string new_mon_id; | |
188 | if (monmap.contains("noname-"+mon_id)) { | |
189 | new_mon_id = "noname-"+mon_id; | |
190 | } else { | |
191 | new_mon_id = mon_id; | |
192 | } | |
193 | ||
194 | if (new_mon_id.empty()) { | |
195 | ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl; | |
196 | return -EINVAL; | |
197 | } else if (!monmap.contains(new_mon_id)) { | |
198 | ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'" | |
199 | << dendl; | |
200 | return -ENOENT; | |
201 | } | |
202 | ||
203 | MonClientPinger *pinger = new MonClientPinger(cct, result_reply); | |
204 | ||
205 | Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client"); | |
206 | smsgr->add_dispatcher_head(pinger); | |
207 | smsgr->start(); | |
208 | ||
209 | ConnectionRef con = smsgr->get_connection(monmap.get_inst(new_mon_id)); | |
210 | ldout(cct, 10) << __func__ << " ping mon." << new_mon_id | |
211 | << " " << con->get_peer_addr() << dendl; | |
212 | con->send_message(new MPing); | |
213 | ||
214 | pinger->lock.Lock(); | |
215 | int ret = pinger->wait_for_reply(cct->_conf->client_mount_timeout); | |
216 | if (ret == 0) { | |
217 | ldout(cct,10) << __func__ << " got ping reply" << dendl; | |
218 | } else { | |
219 | ret = -ret; | |
220 | } | |
221 | pinger->lock.Unlock(); | |
222 | ||
223 | con->mark_down(); | |
224 | smsgr->shutdown(); | |
225 | smsgr->wait(); | |
226 | delete smsgr; | |
227 | delete pinger; | |
228 | return ret; | |
229 | } | |
230 | ||
231 | bool MonClient::ms_dispatch(Message *m) | |
232 | { | |
233 | if (my_addr == entity_addr_t()) | |
234 | my_addr = messenger->get_myaddr(); | |
235 | ||
236 | // we only care about these message types | |
237 | switch (m->get_type()) { | |
238 | case CEPH_MSG_MON_MAP: | |
239 | case CEPH_MSG_AUTH_REPLY: | |
240 | case CEPH_MSG_MON_SUBSCRIBE_ACK: | |
241 | case CEPH_MSG_MON_GET_VERSION_REPLY: | |
242 | case MSG_MON_COMMAND_ACK: | |
243 | case MSG_LOGACK: | |
244 | break; | |
245 | default: | |
246 | return false; | |
247 | } | |
248 | ||
249 | Mutex::Locker lock(monc_lock); | |
250 | ||
251 | if (_hunting()) { | |
252 | auto pending_con = pending_cons.find(m->get_source_addr()); | |
253 | if (pending_con == pending_cons.end() || | |
254 | pending_con->second.get_con() != m->get_connection()) { | |
255 | // ignore any messages outside hunting sessions | |
256 | ldout(cct, 10) << "discarding stray monitor message " << *m << dendl; | |
257 | m->put(); | |
258 | return true; | |
259 | } | |
260 | } else if (!active_con || active_con->get_con() != m->get_connection()) { | |
261 | // ignore any messages outside our session(s) | |
262 | ldout(cct, 10) << "discarding stray monitor message " << *m << dendl; | |
263 | m->put(); | |
264 | return true; | |
265 | } | |
266 | ||
267 | switch (m->get_type()) { | |
268 | case CEPH_MSG_MON_MAP: | |
269 | handle_monmap(static_cast<MMonMap*>(m)); | |
31f18b77 FG |
270 | if (passthrough_monmap) { |
271 | return false; | |
272 | } else { | |
273 | m->put(); | |
274 | } | |
7c673cae FG |
275 | break; |
276 | case CEPH_MSG_AUTH_REPLY: | |
277 | handle_auth(static_cast<MAuthReply*>(m)); | |
278 | break; | |
279 | case CEPH_MSG_MON_SUBSCRIBE_ACK: | |
280 | handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m)); | |
281 | break; | |
282 | case CEPH_MSG_MON_GET_VERSION_REPLY: | |
283 | handle_get_version_reply(static_cast<MMonGetVersionReply*>(m)); | |
284 | break; | |
285 | case MSG_MON_COMMAND_ACK: | |
286 | handle_mon_command_ack(static_cast<MMonCommandAck*>(m)); | |
287 | break; | |
288 | case MSG_LOGACK: | |
289 | if (log_client) { | |
290 | log_client->handle_log_ack(static_cast<MLogAck*>(m)); | |
291 | m->put(); | |
292 | if (more_log_pending) { | |
293 | send_log(); | |
294 | } | |
295 | } else { | |
296 | m->put(); | |
297 | } | |
298 | break; | |
299 | } | |
300 | return true; | |
301 | } | |
302 | ||
303 | void MonClient::send_log(bool flush) | |
304 | { | |
305 | if (log_client) { | |
306 | Message *lm = log_client->get_mon_log_message(flush); | |
307 | if (lm) | |
308 | _send_mon_message(lm); | |
309 | more_log_pending = log_client->are_pending(); | |
310 | } | |
311 | } | |
312 | ||
313 | void MonClient::flush_log() | |
314 | { | |
315 | Mutex::Locker l(monc_lock); | |
316 | send_log(); | |
317 | } | |
318 | ||
31f18b77 FG |
319 | /* Unlike all the other message-handling functions, we don't put away a reference |
320 | * because we want to support MMonMap passthrough to other Dispatchers. */ | |
7c673cae FG |
321 | void MonClient::handle_monmap(MMonMap *m) |
322 | { | |
323 | ldout(cct, 10) << __func__ << " " << *m << dendl; | |
324 | auto peer = m->get_source_addr(); | |
325 | string cur_mon = monmap.get_name(peer); | |
326 | ||
327 | bufferlist::iterator p = m->monmapbl.begin(); | |
328 | ::decode(monmap, p); | |
329 | ||
330 | ldout(cct, 10) << " got monmap " << monmap.epoch | |
331 | << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon) | |
332 | << dendl; | |
333 | ldout(cct, 10) << "dump:\n"; | |
334 | monmap.print(*_dout); | |
335 | *_dout << dendl; | |
336 | ||
337 | _sub_got("monmap", monmap.get_epoch()); | |
338 | ||
339 | if (!monmap.get_addr_name(peer, cur_mon)) { | |
340 | ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl; | |
341 | // can't find the mon we were talking to (above) | |
342 | _reopen_session(); | |
343 | } | |
344 | ||
345 | map_cond.Signal(); | |
346 | want_monmap = false; | |
7c673cae FG |
347 | } |
348 | ||
349 | // ---------------------- | |
350 | ||
351 | int MonClient::init() | |
352 | { | |
353 | ldout(cct, 10) << __func__ << dendl; | |
354 | ||
355 | messenger->add_dispatcher_head(this); | |
356 | ||
357 | entity_name = cct->_conf->name; | |
358 | ||
359 | Mutex::Locker l(monc_lock); | |
360 | ||
361 | string method; | |
362 | if (!cct->_conf->auth_supported.empty()) | |
363 | method = cct->_conf->auth_supported; | |
364 | else if (entity_name.get_type() == CEPH_ENTITY_TYPE_OSD || | |
365 | entity_name.get_type() == CEPH_ENTITY_TYPE_MDS || | |
366 | entity_name.get_type() == CEPH_ENTITY_TYPE_MON) | |
367 | method = cct->_conf->auth_cluster_required; | |
368 | else | |
369 | method = cct->_conf->auth_client_required; | |
370 | auth_supported.reset(new AuthMethodList(cct, method)); | |
371 | ldout(cct, 10) << "auth_supported " << auth_supported->get_supported_set() << " method " << method << dendl; | |
372 | ||
373 | int r = 0; | |
374 | keyring.reset(new KeyRing); // initializing keyring anyway | |
375 | ||
376 | if (auth_supported->is_supported_auth(CEPH_AUTH_CEPHX)) { | |
377 | r = keyring->from_ceph_context(cct); | |
378 | if (r == -ENOENT) { | |
379 | auth_supported->remove_supported_auth(CEPH_AUTH_CEPHX); | |
380 | if (!auth_supported->get_supported_set().empty()) { | |
381 | r = 0; | |
382 | no_keyring_disabled_cephx = true; | |
383 | } else { | |
384 | lderr(cct) << "ERROR: missing keyring, cannot use cephx for authentication" << dendl; | |
385 | } | |
386 | } | |
387 | } | |
388 | ||
389 | if (r < 0) { | |
390 | return r; | |
391 | } | |
392 | ||
393 | rotating_secrets.reset( | |
394 | new RotatingKeyRing(cct, cct->get_module_type(), keyring.get())); | |
395 | ||
396 | initialized = true; | |
397 | ||
398 | timer.init(); | |
399 | finisher.start(); | |
400 | schedule_tick(); | |
401 | ||
402 | return 0; | |
403 | } | |
404 | ||
405 | void MonClient::shutdown() | |
406 | { | |
407 | ldout(cct, 10) << __func__ << dendl; | |
408 | monc_lock.Lock(); | |
409 | while (!version_requests.empty()) { | |
410 | version_requests.begin()->second->context->complete(-ECANCELED); | |
411 | ldout(cct, 20) << __func__ << " canceling and discarding version request " | |
412 | << version_requests.begin()->second << dendl; | |
413 | delete version_requests.begin()->second; | |
414 | version_requests.erase(version_requests.begin()); | |
415 | } | |
31f18b77 FG |
416 | while (!mon_commands.empty()) { |
417 | auto tid = mon_commands.begin()->first; | |
418 | _cancel_mon_command(tid); | |
419 | } | |
7c673cae FG |
420 | while (!waiting_for_session.empty()) { |
421 | ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl; | |
422 | waiting_for_session.front()->put(); | |
423 | waiting_for_session.pop_front(); | |
424 | } | |
425 | ||
426 | active_con.reset(); | |
427 | pending_cons.clear(); | |
428 | auth.reset(); | |
429 | ||
430 | monc_lock.Unlock(); | |
431 | ||
432 | if (initialized) { | |
433 | finisher.wait_for_empty(); | |
434 | finisher.stop(); | |
435 | } | |
436 | monc_lock.Lock(); | |
437 | timer.shutdown(); | |
438 | ||
439 | monc_lock.Unlock(); | |
440 | } | |
441 | ||
442 | int MonClient::authenticate(double timeout) | |
443 | { | |
444 | Mutex::Locker lock(monc_lock); | |
445 | ||
446 | if (active_con) { | |
447 | ldout(cct, 5) << "already authenticated" << dendl; | |
448 | return 0; | |
449 | } | |
450 | ||
451 | _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0); | |
452 | if (!_opened()) | |
453 | _reopen_session(); | |
454 | ||
455 | utime_t until = ceph_clock_now(); | |
456 | until += timeout; | |
457 | if (timeout > 0.0) | |
458 | ldout(cct, 10) << "authenticate will time out at " << until << dendl; | |
459 | while (!active_con && !authenticate_err) { | |
460 | if (timeout > 0.0) { | |
461 | int r = auth_cond.WaitUntil(monc_lock, until); | |
462 | if (r == ETIMEDOUT) { | |
463 | ldout(cct, 0) << "authenticate timed out after " << timeout << dendl; | |
464 | authenticate_err = -r; | |
465 | } | |
466 | } else { | |
467 | auth_cond.Wait(monc_lock); | |
468 | } | |
469 | } | |
470 | ||
471 | if (active_con) { | |
472 | ldout(cct, 5) << __func__ << " success, global_id " | |
473 | << active_con->get_global_id() << dendl; | |
474 | // active_con should not have been set if there was an error | |
475 | assert(authenticate_err == 0); | |
476 | authenticated = true; | |
477 | } | |
478 | ||
479 | if (authenticate_err < 0 && no_keyring_disabled_cephx) { | |
480 | lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl; | |
481 | } | |
482 | ||
483 | return authenticate_err; | |
484 | } | |
485 | ||
486 | void MonClient::handle_auth(MAuthReply *m) | |
487 | { | |
488 | assert(monc_lock.is_locked()); | |
489 | if (!_hunting()) { | |
490 | std::swap(active_con->get_auth(), auth); | |
491 | int ret = active_con->authenticate(m); | |
492 | m->put(); | |
493 | std::swap(auth, active_con->get_auth()); | |
494 | if (global_id != active_con->get_global_id()) { | |
495 | lderr(cct) << __func__ << " peer assigned me a different global_id: " | |
496 | << active_con->get_global_id() << dendl; | |
497 | } | |
498 | if (ret != -EAGAIN) { | |
499 | _finish_auth(ret); | |
500 | } | |
501 | return; | |
502 | } | |
503 | ||
504 | // hunting | |
505 | auto found = pending_cons.find(m->get_source_addr()); | |
506 | assert(found != pending_cons.end()); | |
507 | int auth_err = found->second.handle_auth(m, entity_name, want_keys, | |
508 | rotating_secrets.get()); | |
509 | m->put(); | |
510 | if (auth_err == -EAGAIN) { | |
511 | return; | |
512 | } | |
513 | if (auth_err) { | |
514 | pending_cons.erase(found); | |
515 | if (!pending_cons.empty()) { | |
516 | // keep trying with pending connections | |
517 | return; | |
518 | } | |
519 | // the last try just failed, give up. | |
520 | } else { | |
521 | auto& mc = found->second; | |
522 | assert(mc.have_session()); | |
523 | active_con.reset(new MonConnection(std::move(mc))); | |
524 | pending_cons.clear(); | |
525 | } | |
526 | ||
527 | _finish_hunting(); | |
528 | ||
529 | if (!auth_err) { | |
530 | last_rotating_renew_sent = utime_t(); | |
531 | while (!waiting_for_session.empty()) { | |
532 | _send_mon_message(waiting_for_session.front()); | |
533 | waiting_for_session.pop_front(); | |
534 | } | |
535 | _resend_mon_commands(); | |
536 | send_log(true); | |
537 | if (active_con) { | |
538 | std::swap(auth, active_con->get_auth()); | |
539 | global_id = active_con->get_global_id(); | |
540 | } | |
541 | } | |
542 | _finish_auth(auth_err); | |
543 | if (!auth_err) { | |
544 | Context *cb = nullptr; | |
545 | if (session_established_context) { | |
546 | cb = session_established_context.release(); | |
547 | } | |
548 | if (cb) { | |
549 | monc_lock.Unlock(); | |
550 | cb->complete(0); | |
551 | monc_lock.Lock(); | |
552 | } | |
553 | } | |
554 | } | |
555 | ||
556 | void MonClient::_finish_auth(int auth_err) | |
557 | { | |
558 | authenticate_err = auth_err; | |
559 | // _resend_mon_commands() could _reopen_session() if the connected mon is not | |
560 | // the one the MonCommand is targeting. | |
561 | if (!auth_err && active_con) { | |
562 | assert(auth); | |
563 | _check_auth_tickets(); | |
564 | } | |
565 | auth_cond.SignalAll(); | |
566 | } | |
567 | ||
568 | // --------- | |
569 | ||
570 | void MonClient::_send_mon_message(Message *m) | |
571 | { | |
572 | assert(monc_lock.is_locked()); | |
573 | if (active_con) { | |
574 | auto cur_con = active_con->get_con(); | |
575 | ldout(cct, 10) << "_send_mon_message to mon." | |
576 | << monmap.get_name(cur_con->get_peer_addr()) | |
577 | << " at " << cur_con->get_peer_addr() << dendl; | |
578 | cur_con->send_message(m); | |
579 | } else { | |
580 | waiting_for_session.push_back(m); | |
581 | } | |
582 | } | |
583 | ||
584 | void MonClient::_reopen_session(int rank) | |
585 | { | |
586 | assert(monc_lock.is_locked()); | |
587 | ldout(cct, 10) << __func__ << " rank " << rank << dendl; | |
588 | ||
589 | active_con.reset(); | |
590 | pending_cons.clear(); | |
591 | ||
592 | _start_hunting(); | |
593 | ||
594 | if (rank >= 0) { | |
595 | _add_conn(rank, global_id); | |
596 | } else { | |
597 | _add_conns(global_id); | |
598 | } | |
599 | ||
600 | // throw out old queued messages | |
601 | while (!waiting_for_session.empty()) { | |
602 | waiting_for_session.front()->put(); | |
603 | waiting_for_session.pop_front(); | |
604 | } | |
605 | ||
606 | // throw out version check requests | |
607 | while (!version_requests.empty()) { | |
608 | finisher.queue(version_requests.begin()->second->context, -EAGAIN); | |
609 | delete version_requests.begin()->second; | |
610 | version_requests.erase(version_requests.begin()); | |
611 | } | |
612 | ||
613 | for (auto& c : pending_cons) { | |
614 | c.second.start(monmap.get_epoch(), entity_name, *auth_supported); | |
615 | } | |
616 | ||
617 | for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin(); | |
618 | p != sub_sent.end(); | |
619 | ++p) { | |
620 | if (sub_new.count(p->first) == 0) | |
621 | sub_new[p->first] = p->second; | |
622 | } | |
623 | if (!sub_new.empty()) | |
624 | _renew_subs(); | |
625 | } | |
626 | ||
627 | MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id) | |
628 | { | |
629 | auto peer = monmap.get_addr(rank); | |
630 | auto conn = messenger->get_connection(monmap.get_inst(rank)); | |
631 | MonConnection mc(cct, conn, global_id); | |
632 | auto inserted = pending_cons.insert(make_pair(peer, move(mc))); | |
633 | ldout(cct, 10) << "picked mon." << monmap.get_name(rank) | |
634 | << " con " << conn | |
635 | << " addr " << conn->get_peer_addr() | |
636 | << dendl; | |
637 | return inserted.first->second; | |
638 | } | |
639 | ||
640 | void MonClient::_add_conns(uint64_t global_id) | |
641 | { | |
224ce89b WB |
642 | uint16_t min_priority = std::numeric_limits<uint16_t>::max(); |
643 | for (const auto& m : monmap.mon_info) { | |
644 | if (m.second.priority < min_priority) { | |
645 | min_priority = m.second.priority; | |
646 | } | |
647 | } | |
648 | vector<unsigned> ranks; | |
649 | for (const auto& m : monmap.mon_info) { | |
650 | if (m.second.priority == min_priority) { | |
651 | ranks.push_back(monmap.get_rank(m.first)); | |
652 | } | |
7c673cae FG |
653 | } |
654 | std::random_device rd; | |
655 | std::mt19937 rng(rd()); | |
656 | std::shuffle(ranks.begin(), ranks.end(), rng); | |
7c673cae | 657 | unsigned n = cct->_conf->mon_client_hunt_parallel; |
224ce89b WB |
658 | if (n == 0 || n > ranks.size()) { |
659 | n = ranks.size(); | |
7c673cae FG |
660 | } |
661 | for (unsigned i = 0; i < n; i++) { | |
662 | _add_conn(ranks[i], global_id); | |
663 | } | |
664 | } | |
665 | ||
666 | bool MonClient::ms_handle_reset(Connection *con) | |
667 | { | |
668 | Mutex::Locker lock(monc_lock); | |
669 | ||
670 | if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) | |
671 | return false; | |
672 | ||
673 | if (_hunting()) { | |
674 | if (pending_cons.count(con->get_peer_addr())) { | |
675 | ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addr() << dendl; | |
676 | } else { | |
677 | ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl; | |
678 | } | |
679 | return true; | |
680 | } else { | |
681 | if (active_con && con == active_con->get_con()) { | |
682 | ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addr() << dendl; | |
683 | _reopen_session(); | |
684 | return false; | |
685 | } else { | |
686 | ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl; | |
687 | return true; | |
688 | } | |
689 | } | |
690 | } | |
691 | ||
692 | bool MonClient::_opened() const | |
693 | { | |
694 | assert(monc_lock.is_locked()); | |
695 | return active_con || _hunting(); | |
696 | } | |
697 | ||
698 | bool MonClient::_hunting() const | |
699 | { | |
700 | return !pending_cons.empty(); | |
701 | } | |
702 | ||
703 | void MonClient::_start_hunting() | |
704 | { | |
705 | assert(!_hunting()); | |
706 | // adjust timeouts if necessary | |
707 | if (!had_a_connection) | |
708 | return; | |
709 | reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff; | |
710 | if (reopen_interval_multiplier > | |
711 | cct->_conf->mon_client_hunt_interval_max_multiple) { | |
712 | reopen_interval_multiplier = | |
713 | cct->_conf->mon_client_hunt_interval_max_multiple; | |
714 | } | |
715 | } | |
716 | ||
717 | void MonClient::_finish_hunting() | |
718 | { | |
719 | assert(monc_lock.is_locked()); | |
720 | // the pending conns have been cleaned. | |
721 | assert(!_hunting()); | |
722 | if (active_con) { | |
723 | auto con = active_con->get_con(); | |
724 | ldout(cct, 1) << "found mon." | |
725 | << monmap.get_name(con->get_peer_addr()) | |
726 | << dendl; | |
727 | } else { | |
728 | ldout(cct, 1) << "no mon sessions established" << dendl; | |
729 | } | |
730 | ||
731 | had_a_connection = true; | |
732 | reopen_interval_multiplier /= 2.0; | |
733 | if (reopen_interval_multiplier < 1.0) | |
734 | reopen_interval_multiplier = 1.0; | |
735 | } | |
736 | ||
737 | void MonClient::tick() | |
738 | { | |
739 | ldout(cct, 10) << __func__ << dendl; | |
740 | ||
741 | _check_auth_tickets(); | |
742 | ||
743 | if (_hunting()) { | |
744 | ldout(cct, 1) << "continuing hunt" << dendl; | |
745 | _reopen_session(); | |
746 | } else if (active_con) { | |
747 | // just renew as needed | |
748 | utime_t now = ceph_clock_now(); | |
749 | auto cur_con = active_con->get_con(); | |
750 | if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) { | |
751 | ldout(cct, 10) << "renew subs? (now: " << now | |
752 | << "; renew after: " << sub_renew_after << ") -- " | |
753 | << (now > sub_renew_after ? "yes" : "no") | |
754 | << dendl; | |
755 | if (now > sub_renew_after) | |
756 | _renew_subs(); | |
757 | } | |
758 | ||
759 | cur_con->send_keepalive(); | |
760 | ||
761 | if (cct->_conf->mon_client_ping_timeout > 0 && | |
762 | cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { | |
763 | utime_t lk = cur_con->get_last_keepalive_ack(); | |
764 | utime_t interval = now - lk; | |
765 | if (interval > cct->_conf->mon_client_ping_timeout) { | |
766 | ldout(cct, 1) << "no keepalive since " << lk << " (" << interval | |
767 | << " seconds), reconnecting" << dendl; | |
768 | _reopen_session(); | |
769 | } | |
770 | ||
771 | send_log(); | |
772 | } | |
773 | } | |
774 | ||
775 | schedule_tick(); | |
776 | } | |
777 | ||
778 | void MonClient::schedule_tick() | |
779 | { | |
780 | struct C_Tick : public Context { | |
781 | MonClient *monc; | |
782 | explicit C_Tick(MonClient *m) : monc(m) {} | |
783 | void finish(int r) override { | |
784 | monc->tick(); | |
785 | } | |
786 | }; | |
787 | ||
788 | if (_hunting()) { | |
789 | timer.add_event_after(cct->_conf->mon_client_hunt_interval | |
790 | * reopen_interval_multiplier, | |
791 | new C_Tick(this)); | |
792 | } else | |
793 | timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this)); | |
794 | } | |
795 | ||
796 | // --------- | |
797 | ||
798 | void MonClient::_renew_subs() | |
799 | { | |
800 | assert(monc_lock.is_locked()); | |
801 | if (sub_new.empty()) { | |
802 | ldout(cct, 10) << __func__ << " - empty" << dendl; | |
803 | return; | |
804 | } | |
805 | ||
806 | ldout(cct, 10) << __func__ << dendl; | |
807 | if (!_opened()) | |
808 | _reopen_session(); | |
809 | else { | |
810 | if (sub_renew_sent == utime_t()) | |
811 | sub_renew_sent = ceph_clock_now(); | |
812 | ||
813 | MMonSubscribe *m = new MMonSubscribe; | |
814 | m->what = sub_new; | |
815 | _send_mon_message(m); | |
816 | ||
817 | // update sub_sent with sub_new | |
818 | sub_new.insert(sub_sent.begin(), sub_sent.end()); | |
819 | std::swap(sub_new, sub_sent); | |
820 | sub_new.clear(); | |
821 | } | |
822 | } | |
823 | ||
824 | void MonClient::handle_subscribe_ack(MMonSubscribeAck *m) | |
825 | { | |
826 | if (sub_renew_sent != utime_t()) { | |
827 | // NOTE: this is only needed for legacy (infernalis or older) | |
828 | // mons; see tick(). | |
829 | sub_renew_after = sub_renew_sent; | |
830 | sub_renew_after += m->interval / 2.0; | |
831 | ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl; | |
832 | sub_renew_sent = utime_t(); | |
833 | } else { | |
834 | ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << ", ignoring" << dendl; | |
835 | } | |
836 | ||
837 | m->put(); | |
838 | } | |
839 | ||
840 | int MonClient::_check_auth_tickets() | |
841 | { | |
842 | assert(monc_lock.is_locked()); | |
843 | if (active_con && auth) { | |
844 | if (auth->need_tickets()) { | |
845 | ldout(cct, 10) << __func__ << " getting new tickets!" << dendl; | |
846 | MAuth *m = new MAuth; | |
847 | m->protocol = auth->get_protocol(); | |
848 | auth->prepare_build_request(); | |
849 | auth->build_request(m->auth_payload); | |
850 | _send_mon_message(m); | |
851 | } | |
852 | ||
853 | _check_auth_rotating(); | |
854 | } | |
855 | return 0; | |
856 | } | |
857 | ||
858 | int MonClient::_check_auth_rotating() | |
859 | { | |
860 | assert(monc_lock.is_locked()); | |
861 | if (!rotating_secrets || | |
862 | !auth_principal_needs_rotating_keys(entity_name)) { | |
863 | ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl; | |
864 | return 0; | |
865 | } | |
866 | ||
867 | if (!active_con || !auth) { | |
868 | ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl; | |
869 | return 0; | |
870 | } | |
871 | ||
872 | utime_t now = ceph_clock_now(); | |
873 | utime_t cutoff = now; | |
874 | cutoff -= MIN(30.0, cct->_conf->auth_service_ticket_ttl / 4.0); | |
875 | utime_t issued_at_lower_bound = now; | |
876 | issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl; | |
877 | if (!rotating_secrets->need_new_secrets(cutoff)) { | |
878 | ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl; | |
879 | rotating_secrets->dump_rotating(); | |
880 | return 0; | |
881 | } | |
882 | ||
883 | ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl; | |
884 | if (!rotating_secrets->need_new_secrets() && | |
885 | rotating_secrets->need_new_secrets(issued_at_lower_bound)) { | |
886 | // the key has expired before it has been issued? | |
887 | lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early" | |
888 | << " (before " << issued_at_lower_bound << ")" << dendl; | |
889 | } | |
890 | if ((now > last_rotating_renew_sent) && | |
891 | double(now - last_rotating_renew_sent) < 1) { | |
892 | ldout(cct, 10) << __func__ << " called too often (last: " | |
893 | << last_rotating_renew_sent << "), skipping refresh" << dendl; | |
894 | return 0; | |
895 | } | |
896 | MAuth *m = new MAuth; | |
897 | m->protocol = auth->get_protocol(); | |
898 | if (auth->build_rotating_request(m->auth_payload)) { | |
899 | last_rotating_renew_sent = now; | |
900 | _send_mon_message(m); | |
901 | } else { | |
902 | m->put(); | |
903 | } | |
904 | return 0; | |
905 | } | |
906 | ||
907 | int MonClient::wait_auth_rotating(double timeout) | |
908 | { | |
909 | Mutex::Locker l(monc_lock); | |
910 | utime_t now = ceph_clock_now(); | |
911 | utime_t until = now; | |
912 | until += timeout; | |
913 | ||
914 | // Must be initialized | |
915 | assert(auth != nullptr); | |
916 | ||
917 | if (auth->get_protocol() == CEPH_AUTH_NONE) | |
918 | return 0; | |
919 | ||
920 | if (!rotating_secrets) | |
921 | return 0; | |
922 | ||
923 | while (auth_principal_needs_rotating_keys(entity_name) && | |
924 | rotating_secrets->need_new_secrets(now)) { | |
925 | if (now >= until) { | |
926 | ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl; | |
927 | return -ETIMEDOUT; | |
928 | } | |
929 | ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl; | |
930 | auth_cond.WaitUntil(monc_lock, until); | |
931 | now = ceph_clock_now(); | |
932 | } | |
933 | ldout(cct, 10) << __func__ << " done" << dendl; | |
934 | return 0; | |
935 | } | |
936 | ||
937 | // --------- | |
938 | ||
939 | void MonClient::_send_command(MonCommand *r) | |
940 | { | |
941 | entity_addr_t peer; | |
942 | if (active_con) { | |
943 | peer = active_con->get_con()->get_peer_addr(); | |
944 | } | |
945 | ||
946 | if (r->target_rank >= 0 && | |
947 | r->target_rank != monmap.get_rank(peer)) { | |
948 | ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd | |
949 | << " wants rank " << r->target_rank | |
950 | << ", reopening session" | |
951 | << dendl; | |
952 | if (r->target_rank >= (int)monmap.size()) { | |
953 | ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl; | |
954 | _finish_command(r, -ENOENT, "mon rank dne"); | |
955 | return; | |
956 | } | |
957 | _reopen_session(r->target_rank); | |
958 | return; | |
959 | } | |
960 | ||
961 | if (r->target_name.length() && | |
962 | r->target_name != monmap.get_name(peer)) { | |
963 | ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd | |
964 | << " wants mon " << r->target_name | |
965 | << ", reopening session" | |
966 | << dendl; | |
967 | if (!monmap.contains(r->target_name)) { | |
968 | ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl; | |
969 | _finish_command(r, -ENOENT, "mon dne"); | |
970 | return; | |
971 | } | |
972 | _reopen_session(monmap.get_rank(r->target_name)); | |
973 | return; | |
974 | } | |
975 | ||
976 | ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl; | |
977 | MMonCommand *m = new MMonCommand(monmap.fsid); | |
978 | m->set_tid(r->tid); | |
979 | m->cmd = r->cmd; | |
980 | m->set_data(r->inbl); | |
981 | _send_mon_message(m); | |
982 | return; | |
983 | } | |
984 | ||
985 | void MonClient::_resend_mon_commands() | |
986 | { | |
987 | // resend any requests | |
988 | for (map<uint64_t,MonCommand*>::iterator p = mon_commands.begin(); | |
989 | p != mon_commands.end(); | |
990 | ++p) { | |
991 | _send_command(p->second); | |
992 | } | |
993 | } | |
994 | ||
995 | void MonClient::handle_mon_command_ack(MMonCommandAck *ack) | |
996 | { | |
997 | MonCommand *r = NULL; | |
998 | uint64_t tid = ack->get_tid(); | |
999 | ||
1000 | if (tid == 0 && !mon_commands.empty()) { | |
1001 | r = mon_commands.begin()->second; | |
1002 | ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl; | |
1003 | } else { | |
1004 | map<uint64_t,MonCommand*>::iterator p = mon_commands.find(tid); | |
1005 | if (p == mon_commands.end()) { | |
1006 | ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl; | |
1007 | ack->put(); | |
1008 | return; | |
1009 | } | |
1010 | r = p->second; | |
1011 | } | |
1012 | ||
1013 | ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl; | |
1014 | if (r->poutbl) | |
1015 | r->poutbl->claim(ack->get_data()); | |
1016 | _finish_command(r, ack->r, ack->rs); | |
1017 | ack->put(); | |
1018 | } | |
1019 | ||
31f18b77 | 1020 | int MonClient::_cancel_mon_command(uint64_t tid) |
7c673cae FG |
1021 | { |
1022 | assert(monc_lock.is_locked()); | |
1023 | ||
1024 | map<ceph_tid_t, MonCommand*>::iterator it = mon_commands.find(tid); | |
1025 | if (it == mon_commands.end()) { | |
1026 | ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; | |
1027 | return -ENOENT; | |
1028 | } | |
1029 | ||
1030 | ldout(cct, 10) << __func__ << " tid " << tid << dendl; | |
1031 | ||
1032 | MonCommand *cmd = it->second; | |
1033 | _finish_command(cmd, -ETIMEDOUT, ""); | |
1034 | return 0; | |
1035 | } | |
1036 | ||
1037 | void MonClient::_finish_command(MonCommand *r, int ret, string rs) | |
1038 | { | |
1039 | ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl; | |
1040 | if (r->prval) | |
1041 | *(r->prval) = ret; | |
1042 | if (r->prs) | |
1043 | *(r->prs) = rs; | |
1044 | if (r->onfinish) | |
1045 | finisher.queue(r->onfinish, ret); | |
1046 | mon_commands.erase(r->tid); | |
1047 | delete r; | |
1048 | } | |
1049 | ||
1050 | void MonClient::start_mon_command(const vector<string>& cmd, | |
1051 | const bufferlist& inbl, | |
1052 | bufferlist *outbl, string *outs, | |
1053 | Context *onfinish) | |
1054 | { | |
1055 | Mutex::Locker l(monc_lock); | |
1056 | MonCommand *r = new MonCommand(++last_mon_command_tid); | |
1057 | r->cmd = cmd; | |
1058 | r->inbl = inbl; | |
1059 | r->poutbl = outbl; | |
1060 | r->prs = outs; | |
1061 | r->onfinish = onfinish; | |
1062 | if (cct->_conf->rados_mon_op_timeout > 0) { | |
1063 | class C_CancelMonCommand : public Context | |
1064 | { | |
1065 | uint64_t tid; | |
1066 | MonClient *monc; | |
1067 | public: | |
1068 | C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {} | |
1069 | void finish(int r) override { | |
31f18b77 | 1070 | monc->_cancel_mon_command(tid); |
7c673cae FG |
1071 | } |
1072 | }; | |
1073 | r->ontimeout = new C_CancelMonCommand(r->tid, this); | |
1074 | timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout); | |
1075 | } | |
1076 | mon_commands[r->tid] = r; | |
1077 | _send_command(r); | |
1078 | } | |
1079 | ||
1080 | void MonClient::start_mon_command(const string &mon_name, | |
1081 | const vector<string>& cmd, | |
1082 | const bufferlist& inbl, | |
1083 | bufferlist *outbl, string *outs, | |
1084 | Context *onfinish) | |
1085 | { | |
1086 | Mutex::Locker l(monc_lock); | |
1087 | MonCommand *r = new MonCommand(++last_mon_command_tid); | |
1088 | r->target_name = mon_name; | |
1089 | r->cmd = cmd; | |
1090 | r->inbl = inbl; | |
1091 | r->poutbl = outbl; | |
1092 | r->prs = outs; | |
1093 | r->onfinish = onfinish; | |
1094 | mon_commands[r->tid] = r; | |
1095 | _send_command(r); | |
1096 | } | |
1097 | ||
1098 | void MonClient::start_mon_command(int rank, | |
1099 | const vector<string>& cmd, | |
1100 | const bufferlist& inbl, | |
1101 | bufferlist *outbl, string *outs, | |
1102 | Context *onfinish) | |
1103 | { | |
1104 | Mutex::Locker l(monc_lock); | |
1105 | MonCommand *r = new MonCommand(++last_mon_command_tid); | |
1106 | r->target_rank = rank; | |
1107 | r->cmd = cmd; | |
1108 | r->inbl = inbl; | |
1109 | r->poutbl = outbl; | |
1110 | r->prs = outs; | |
1111 | r->onfinish = onfinish; | |
1112 | mon_commands[r->tid] = r; | |
1113 | _send_command(r); | |
1114 | } | |
1115 | ||
1116 | // --------- | |
1117 | ||
1118 | void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish) | |
1119 | { | |
1120 | version_req_d *req = new version_req_d(onfinish, newest, oldest); | |
1121 | ldout(cct, 10) << "get_version " << map << " req " << req << dendl; | |
1122 | Mutex::Locker l(monc_lock); | |
1123 | MMonGetVersion *m = new MMonGetVersion(); | |
1124 | m->what = map; | |
1125 | m->handle = ++version_req_id; | |
1126 | version_requests[m->handle] = req; | |
1127 | _send_mon_message(m); | |
1128 | } | |
1129 | ||
1130 | void MonClient::handle_get_version_reply(MMonGetVersionReply* m) | |
1131 | { | |
1132 | assert(monc_lock.is_locked()); | |
1133 | map<ceph_tid_t, version_req_d*>::iterator iter = version_requests.find(m->handle); | |
1134 | if (iter == version_requests.end()) { | |
1135 | ldout(cct, 0) << __func__ << " version request with handle " << m->handle | |
1136 | << " not found" << dendl; | |
1137 | } else { | |
1138 | version_req_d *req = iter->second; | |
1139 | ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl; | |
1140 | version_requests.erase(iter); | |
1141 | if (req->newest) | |
1142 | *req->newest = m->version; | |
1143 | if (req->oldest) | |
1144 | *req->oldest = m->oldest_version; | |
1145 | finisher.queue(req->context, 0); | |
1146 | delete req; | |
1147 | } | |
1148 | m->put(); | |
1149 | } | |
1150 | ||
1151 | AuthAuthorizer* MonClient::build_authorizer(int service_id) const { | |
1152 | Mutex::Locker l(monc_lock); | |
1153 | if (auth) { | |
1154 | return auth->build_authorizer(service_id); | |
1155 | } else { | |
1156 | ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id) | |
1157 | << ", but no auth is available now" << dendl; | |
1158 | return nullptr; | |
1159 | } | |
1160 | } | |
1161 | ||
1162 | #define dout_subsys ceph_subsys_monc | |
1163 | #undef dout_prefix | |
1164 | #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ") | |
1165 | ||
1166 | MonConnection::MonConnection(CephContext *cct, ConnectionRef con, uint64_t global_id) | |
1167 | : cct(cct), con(con), global_id(global_id) | |
1168 | {} | |
1169 | ||
1170 | MonConnection::~MonConnection() | |
1171 | { | |
1172 | if (con) { | |
1173 | con->mark_down(); | |
1174 | con.reset(); | |
1175 | } | |
1176 | } | |
1177 | ||
1178 | bool MonConnection::have_session() const | |
1179 | { | |
1180 | return state == State::HAVE_SESSION; | |
1181 | } | |
1182 | ||
1183 | void MonConnection::start(epoch_t epoch, | |
1184 | const EntityName& entity_name, | |
1185 | const AuthMethodList& auth_supported) | |
1186 | { | |
1187 | // restart authentication handshake | |
1188 | state = State::NEGOTIATING; | |
1189 | ||
1190 | // send an initial keepalive to ensure our timestamp is valid by the | |
1191 | // time we are in an OPENED state (by sequencing this before | |
1192 | // authentication). | |
1193 | con->send_keepalive(); | |
1194 | ||
1195 | auto m = new MAuth; | |
1196 | m->protocol = 0; | |
1197 | m->monmap_epoch = epoch; | |
1198 | __u8 struct_v = 1; | |
1199 | ::encode(struct_v, m->auth_payload); | |
1200 | ::encode(auth_supported.get_supported_set(), m->auth_payload); | |
1201 | ::encode(entity_name, m->auth_payload); | |
1202 | ::encode(global_id, m->auth_payload); | |
1203 | con->send_message(m); | |
1204 | } | |
1205 | ||
1206 | int MonConnection::handle_auth(MAuthReply* m, | |
1207 | const EntityName& entity_name, | |
1208 | uint32_t want_keys, | |
1209 | RotatingKeyRing* keyring) | |
1210 | { | |
1211 | if (state == State::NEGOTIATING) { | |
1212 | int r = _negotiate(m, entity_name, want_keys, keyring); | |
1213 | if (r) { | |
1214 | return r; | |
1215 | } | |
1216 | state = State::AUTHENTICATING; | |
1217 | } | |
1218 | int r = authenticate(m); | |
1219 | if (!r) { | |
1220 | state = State::HAVE_SESSION; | |
1221 | } | |
1222 | return r; | |
1223 | } | |
1224 | ||
1225 | int MonConnection::_negotiate(MAuthReply *m, | |
1226 | const EntityName& entity_name, | |
1227 | uint32_t want_keys, | |
1228 | RotatingKeyRing* keyring) | |
1229 | { | |
1230 | if (auth && (int)m->protocol == auth->get_protocol()) { | |
1231 | // good, negotiation completed | |
1232 | auth->reset(); | |
1233 | return 0; | |
1234 | } | |
1235 | ||
1236 | auth.reset(get_auth_client_handler(cct, m->protocol, keyring)); | |
1237 | if (!auth) { | |
1238 | ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl; | |
1239 | if (m->result == -ENOTSUP) { | |
1240 | ldout(cct, 10) << "none of our auth protocols are supported by the server" | |
1241 | << dendl; | |
1242 | } | |
1243 | return m->result; | |
1244 | } | |
1245 | ||
1246 | // do not request MGR key unless the mon has the SERVER_KRAKEN | |
1247 | // feature. otherwise it will give us an auth error. note that | |
1248 | // we have to use the FEATUREMASK because pre-jewel the kraken | |
1249 | // feature bit was used for something else. | |
1250 | if ((want_keys & CEPH_ENTITY_TYPE_MGR) && | |
1251 | !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) { | |
1252 | ldout(cct, 1) << __func__ | |
1253 | << " not requesting MGR keys from pre-kraken monitor" | |
1254 | << dendl; | |
1255 | want_keys &= ~CEPH_ENTITY_TYPE_MGR; | |
1256 | } | |
1257 | auth->set_want_keys(want_keys); | |
1258 | auth->init(entity_name); | |
1259 | auth->set_global_id(global_id); | |
1260 | return 0; | |
1261 | } | |
1262 | ||
1263 | int MonConnection::authenticate(MAuthReply *m) | |
1264 | { | |
1265 | assert(auth); | |
1266 | if (!m->global_id) { | |
1267 | ldout(cct, 1) << "peer sent an invalid global_id" << dendl; | |
1268 | } | |
1269 | if (m->global_id != global_id) { | |
1270 | // it's a new session | |
1271 | auth->reset(); | |
1272 | global_id = m->global_id; | |
1273 | auth->set_global_id(global_id); | |
1274 | ldout(cct, 10) << "my global_id is " << m->global_id << dendl; | |
1275 | } | |
1276 | auto p = m->result_bl.begin(); | |
1277 | int ret = auth->handle_response(m->result, p); | |
1278 | if (ret == -EAGAIN) { | |
1279 | auto ma = new MAuth; | |
1280 | ma->protocol = auth->get_protocol(); | |
1281 | auth->prepare_build_request(); | |
1282 | auth->build_request(ma->auth_payload); | |
1283 | con->send_message(ma); | |
1284 | } | |
1285 | return ret; | |
1286 | } |