]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mon / MonClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <random>
16
17 #include "messages/MMonGetMap.h"
18 #include "messages/MMonGetVersion.h"
19 #include "messages/MMonGetVersionReply.h"
20 #include "messages/MMonMap.h"
21 #include "messages/MAuth.h"
22 #include "messages/MLogAck.h"
23 #include "messages/MAuthReply.h"
24 #include "messages/MMonCommand.h"
25 #include "messages/MMonCommandAck.h"
26 #include "messages/MPing.h"
27
28 #include "messages/MMonSubscribe.h"
29 #include "messages/MMonSubscribeAck.h"
30 #include "common/errno.h"
31 #include "common/LogClient.h"
32
33 #include "MonClient.h"
34 #include "MonMap.h"
35
36 #include "auth/Auth.h"
37 #include "auth/KeyRing.h"
38 #include "auth/AuthClientHandler.h"
39 #include "auth/AuthMethodList.h"
40 #include "auth/RotatingKeyRing.h"
41
42 #define dout_subsys ceph_subsys_monc
43 #undef dout_prefix
44 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
45
46 MonClient::MonClient(CephContext *cct_) :
47 Dispatcher(cct_),
48 messenger(NULL),
49 monc_lock("MonClient::monc_lock"),
50 timer(cct_, monc_lock),
51 finisher(cct_),
52 initialized(false),
53 no_keyring_disabled_cephx(false),
54 log_client(NULL),
55 more_log_pending(false),
56 want_monmap(true),
57 had_a_connection(false),
58 reopen_interval_multiplier(1.0),
59 last_mon_command_tid(0),
60 version_req_id(0)
61 {
62 }
63
64 MonClient::~MonClient()
65 {
66 }
67
68 int MonClient::build_initial_monmap()
69 {
70 ldout(cct, 10) << __func__ << dendl;
71 return monmap.build_initial(cct, cerr);
72 }
73
74 int MonClient::get_monmap()
75 {
76 ldout(cct, 10) << __func__ << dendl;
77 Mutex::Locker l(monc_lock);
78
79 _sub_want("monmap", 0, 0);
80 if (!_opened())
81 _reopen_session();
82
83 while (want_monmap)
84 map_cond.Wait(monc_lock);
85
86 ldout(cct, 10) << __func__ << " done" << dendl;
87 return 0;
88 }
89
90 int MonClient::get_monmap_privately()
91 {
92 ldout(cct, 10) << __func__ << dendl;
93 Mutex::Locker l(monc_lock);
94
95 bool temp_msgr = false;
96 Messenger* smessenger = NULL;
97 if (!messenger) {
98 messenger = smessenger = Messenger::create_client_messenger(cct, "temp_mon_client");
99 if (NULL == messenger) {
100 return -1;
101 }
102 messenger->add_dispatcher_head(this);
103 smessenger->start();
104 temp_msgr = true;
105 }
106
107 int attempt = 10;
108
109 ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl;
110
111 std::random_device rd;
112 std::mt19937 rng(rd());
113 assert(monmap.size() > 0);
114 std::uniform_int_distribution<unsigned> ranks(0, monmap.size() - 1);
115 while (monmap.fsid.is_zero()) {
116 auto rank = ranks(rng);
117 auto& pending_con = _add_conn(rank, 0);
118 auto con = pending_con.get_con();
119 ldout(cct, 10) << "querying mon." << monmap.get_name(rank) << " "
120 << con->get_peer_addr() << dendl;
121 con->send_message(new MMonGetMap);
122
123 if (--attempt == 0)
124 break;
125
126 utime_t interval;
127 interval.set_from_double(cct->_conf->mon_client_hunt_interval);
128 map_cond.WaitInterval(monc_lock, interval);
129
130 if (monmap.fsid.is_zero() && con) {
131 con->mark_down(); // nope, clean that connection up
132 }
133 }
134
135 if (temp_msgr) {
136 pending_cons.clear();
137 monc_lock.Unlock();
138 messenger->shutdown();
139 if (smessenger)
140 smessenger->wait();
141 delete messenger;
142 messenger = 0;
143 monc_lock.Lock();
144 }
145
146 pending_cons.clear();
147
148 if (!monmap.fsid.is_zero())
149 return 0;
150 return -1;
151 }
152
153
154 /**
155 * Ping the monitor with id @p mon_id and set the resulting reply in
156 * the provided @p result_reply, if this last parameter is not NULL.
157 *
158 * So that we don't rely on the MonClient's default messenger, set up
159 * during connect(), we create our own messenger to comunicate with the
160 * specified monitor. This is advantageous in the following ways:
161 *
162 * - Isolate the ping procedure from the rest of the MonClient's operations,
163 * allowing us to not acquire or manage the big monc_lock, thus not
164 * having to block waiting for some other operation to finish before we
165 * can proceed.
166 * * for instance, we can ping mon.FOO even if we are currently hunting
167 * or blocked waiting for auth to complete with mon.BAR.
168 *
169 * - Ping a monitor prior to establishing a connection (using connect())
170 * and properly establish the MonClient's messenger. This frees us
171 * from dealing with the complex foo that happens in connect().
172 *
173 * We also don't rely on MonClient as a dispatcher for this messenger,
174 * unlike what happens with the MonClient's default messenger. This allows
175 * us to sandbox the whole ping, having it much as a separate entity in
176 * the MonClient class, considerably simplifying the handling and dispatching
177 * of messages without needing to consider monc_lock.
178 *
179 * Current drawback is that we will establish a messenger for each ping
180 * we want to issue, instead of keeping a single messenger instance that
181 * would be used for all pings.
182 */
183 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
184 {
185 ldout(cct, 10) << __func__ << dendl;
186
187 string new_mon_id;
188 if (monmap.contains("noname-"+mon_id)) {
189 new_mon_id = "noname-"+mon_id;
190 } else {
191 new_mon_id = mon_id;
192 }
193
194 if (new_mon_id.empty()) {
195 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
196 return -EINVAL;
197 } else if (!monmap.contains(new_mon_id)) {
198 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
199 << dendl;
200 return -ENOENT;
201 }
202
203 MonClientPinger *pinger = new MonClientPinger(cct, result_reply);
204
205 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
206 smsgr->add_dispatcher_head(pinger);
207 smsgr->start();
208
209 ConnectionRef con = smsgr->get_connection(monmap.get_inst(new_mon_id));
210 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
211 << " " << con->get_peer_addr() << dendl;
212 con->send_message(new MPing);
213
214 pinger->lock.Lock();
215 int ret = pinger->wait_for_reply(cct->_conf->client_mount_timeout);
216 if (ret == 0) {
217 ldout(cct,10) << __func__ << " got ping reply" << dendl;
218 } else {
219 ret = -ret;
220 }
221 pinger->lock.Unlock();
222
223 con->mark_down();
224 smsgr->shutdown();
225 smsgr->wait();
226 delete smsgr;
227 delete pinger;
228 return ret;
229 }
230
231 bool MonClient::ms_dispatch(Message *m)
232 {
233 if (my_addr == entity_addr_t())
234 my_addr = messenger->get_myaddr();
235
236 // we only care about these message types
237 switch (m->get_type()) {
238 case CEPH_MSG_MON_MAP:
239 case CEPH_MSG_AUTH_REPLY:
240 case CEPH_MSG_MON_SUBSCRIBE_ACK:
241 case CEPH_MSG_MON_GET_VERSION_REPLY:
242 case MSG_MON_COMMAND_ACK:
243 case MSG_LOGACK:
244 break;
245 default:
246 return false;
247 }
248
249 Mutex::Locker lock(monc_lock);
250
251 if (_hunting()) {
252 auto pending_con = pending_cons.find(m->get_source_addr());
253 if (pending_con == pending_cons.end() ||
254 pending_con->second.get_con() != m->get_connection()) {
255 // ignore any messages outside hunting sessions
256 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
257 m->put();
258 return true;
259 }
260 } else if (!active_con || active_con->get_con() != m->get_connection()) {
261 // ignore any messages outside our session(s)
262 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
263 m->put();
264 return true;
265 }
266
267 switch (m->get_type()) {
268 case CEPH_MSG_MON_MAP:
269 handle_monmap(static_cast<MMonMap*>(m));
270 if (passthrough_monmap) {
271 return false;
272 } else {
273 m->put();
274 }
275 break;
276 case CEPH_MSG_AUTH_REPLY:
277 handle_auth(static_cast<MAuthReply*>(m));
278 break;
279 case CEPH_MSG_MON_SUBSCRIBE_ACK:
280 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
281 break;
282 case CEPH_MSG_MON_GET_VERSION_REPLY:
283 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
284 break;
285 case MSG_MON_COMMAND_ACK:
286 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
287 break;
288 case MSG_LOGACK:
289 if (log_client) {
290 log_client->handle_log_ack(static_cast<MLogAck*>(m));
291 m->put();
292 if (more_log_pending) {
293 send_log();
294 }
295 } else {
296 m->put();
297 }
298 break;
299 }
300 return true;
301 }
302
303 void MonClient::send_log(bool flush)
304 {
305 if (log_client) {
306 Message *lm = log_client->get_mon_log_message(flush);
307 if (lm)
308 _send_mon_message(lm);
309 more_log_pending = log_client->are_pending();
310 }
311 }
312
313 void MonClient::flush_log()
314 {
315 Mutex::Locker l(monc_lock);
316 send_log();
317 }
318
319 /* Unlike all the other message-handling functions, we don't put away a reference
320 * because we want to support MMonMap passthrough to other Dispatchers. */
321 void MonClient::handle_monmap(MMonMap *m)
322 {
323 ldout(cct, 10) << __func__ << " " << *m << dendl;
324 auto peer = m->get_source_addr();
325 string cur_mon = monmap.get_name(peer);
326
327 bufferlist::iterator p = m->monmapbl.begin();
328 ::decode(monmap, p);
329
330 ldout(cct, 10) << " got monmap " << monmap.epoch
331 << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon)
332 << dendl;
333 ldout(cct, 10) << "dump:\n";
334 monmap.print(*_dout);
335 *_dout << dendl;
336
337 _sub_got("monmap", monmap.get_epoch());
338
339 if (!monmap.get_addr_name(peer, cur_mon)) {
340 ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl;
341 // can't find the mon we were talking to (above)
342 _reopen_session();
343 }
344
345 map_cond.Signal();
346 want_monmap = false;
347 }
348
349 // ----------------------
350
351 int MonClient::init()
352 {
353 ldout(cct, 10) << __func__ << dendl;
354
355 messenger->add_dispatcher_head(this);
356
357 entity_name = cct->_conf->name;
358
359 Mutex::Locker l(monc_lock);
360
361 string method;
362 if (!cct->_conf->auth_supported.empty())
363 method = cct->_conf->auth_supported;
364 else if (entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
365 entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
366 entity_name.get_type() == CEPH_ENTITY_TYPE_MON)
367 method = cct->_conf->auth_cluster_required;
368 else
369 method = cct->_conf->auth_client_required;
370 auth_supported.reset(new AuthMethodList(cct, method));
371 ldout(cct, 10) << "auth_supported " << auth_supported->get_supported_set() << " method " << method << dendl;
372
373 int r = 0;
374 keyring.reset(new KeyRing); // initializing keyring anyway
375
376 if (auth_supported->is_supported_auth(CEPH_AUTH_CEPHX)) {
377 r = keyring->from_ceph_context(cct);
378 if (r == -ENOENT) {
379 auth_supported->remove_supported_auth(CEPH_AUTH_CEPHX);
380 if (!auth_supported->get_supported_set().empty()) {
381 r = 0;
382 no_keyring_disabled_cephx = true;
383 } else {
384 lderr(cct) << "ERROR: missing keyring, cannot use cephx for authentication" << dendl;
385 }
386 }
387 }
388
389 if (r < 0) {
390 return r;
391 }
392
393 rotating_secrets.reset(
394 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
395
396 initialized = true;
397
398 timer.init();
399 finisher.start();
400 schedule_tick();
401
402 return 0;
403 }
404
405 void MonClient::shutdown()
406 {
407 ldout(cct, 10) << __func__ << dendl;
408 monc_lock.Lock();
409 while (!version_requests.empty()) {
410 version_requests.begin()->second->context->complete(-ECANCELED);
411 ldout(cct, 20) << __func__ << " canceling and discarding version request "
412 << version_requests.begin()->second << dendl;
413 delete version_requests.begin()->second;
414 version_requests.erase(version_requests.begin());
415 }
416 while (!mon_commands.empty()) {
417 auto tid = mon_commands.begin()->first;
418 _cancel_mon_command(tid);
419 }
420 while (!waiting_for_session.empty()) {
421 ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl;
422 waiting_for_session.front()->put();
423 waiting_for_session.pop_front();
424 }
425
426 active_con.reset();
427 pending_cons.clear();
428 auth.reset();
429
430 monc_lock.Unlock();
431
432 if (initialized) {
433 finisher.wait_for_empty();
434 finisher.stop();
435 }
436 monc_lock.Lock();
437 timer.shutdown();
438
439 monc_lock.Unlock();
440 }
441
442 int MonClient::authenticate(double timeout)
443 {
444 Mutex::Locker lock(monc_lock);
445
446 if (active_con) {
447 ldout(cct, 5) << "already authenticated" << dendl;
448 return 0;
449 }
450
451 _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
452 if (!_opened())
453 _reopen_session();
454
455 utime_t until = ceph_clock_now();
456 until += timeout;
457 if (timeout > 0.0)
458 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
459 while (!active_con && !authenticate_err) {
460 if (timeout > 0.0) {
461 int r = auth_cond.WaitUntil(monc_lock, until);
462 if (r == ETIMEDOUT) {
463 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
464 authenticate_err = -r;
465 }
466 } else {
467 auth_cond.Wait(monc_lock);
468 }
469 }
470
471 if (active_con) {
472 ldout(cct, 5) << __func__ << " success, global_id "
473 << active_con->get_global_id() << dendl;
474 // active_con should not have been set if there was an error
475 assert(authenticate_err == 0);
476 authenticated = true;
477 }
478
479 if (authenticate_err < 0 && no_keyring_disabled_cephx) {
480 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
481 }
482
483 return authenticate_err;
484 }
485
486 void MonClient::handle_auth(MAuthReply *m)
487 {
488 assert(monc_lock.is_locked());
489 if (!_hunting()) {
490 std::swap(active_con->get_auth(), auth);
491 int ret = active_con->authenticate(m);
492 m->put();
493 std::swap(auth, active_con->get_auth());
494 if (global_id != active_con->get_global_id()) {
495 lderr(cct) << __func__ << " peer assigned me a different global_id: "
496 << active_con->get_global_id() << dendl;
497 }
498 if (ret != -EAGAIN) {
499 _finish_auth(ret);
500 }
501 return;
502 }
503
504 // hunting
505 auto found = pending_cons.find(m->get_source_addr());
506 assert(found != pending_cons.end());
507 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
508 rotating_secrets.get());
509 m->put();
510 if (auth_err == -EAGAIN) {
511 return;
512 }
513 if (auth_err) {
514 pending_cons.erase(found);
515 if (!pending_cons.empty()) {
516 // keep trying with pending connections
517 return;
518 }
519 // the last try just failed, give up.
520 } else {
521 auto& mc = found->second;
522 assert(mc.have_session());
523 active_con.reset(new MonConnection(std::move(mc)));
524 pending_cons.clear();
525 }
526
527 _finish_hunting();
528
529 if (!auth_err) {
530 last_rotating_renew_sent = utime_t();
531 while (!waiting_for_session.empty()) {
532 _send_mon_message(waiting_for_session.front());
533 waiting_for_session.pop_front();
534 }
535 _resend_mon_commands();
536 send_log(true);
537 if (active_con) {
538 std::swap(auth, active_con->get_auth());
539 global_id = active_con->get_global_id();
540 }
541 }
542 _finish_auth(auth_err);
543 if (!auth_err) {
544 Context *cb = nullptr;
545 if (session_established_context) {
546 cb = session_established_context.release();
547 }
548 if (cb) {
549 monc_lock.Unlock();
550 cb->complete(0);
551 monc_lock.Lock();
552 }
553 }
554 }
555
556 void MonClient::_finish_auth(int auth_err)
557 {
558 authenticate_err = auth_err;
559 // _resend_mon_commands() could _reopen_session() if the connected mon is not
560 // the one the MonCommand is targeting.
561 if (!auth_err && active_con) {
562 assert(auth);
563 _check_auth_tickets();
564 }
565 auth_cond.SignalAll();
566 }
567
568 // ---------
569
570 void MonClient::_send_mon_message(Message *m)
571 {
572 assert(monc_lock.is_locked());
573 if (active_con) {
574 auto cur_con = active_con->get_con();
575 ldout(cct, 10) << "_send_mon_message to mon."
576 << monmap.get_name(cur_con->get_peer_addr())
577 << " at " << cur_con->get_peer_addr() << dendl;
578 cur_con->send_message(m);
579 } else {
580 waiting_for_session.push_back(m);
581 }
582 }
583
584 void MonClient::_reopen_session(int rank)
585 {
586 assert(monc_lock.is_locked());
587 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
588
589 active_con.reset();
590 pending_cons.clear();
591
592 _start_hunting();
593
594 if (rank >= 0) {
595 _add_conn(rank, global_id);
596 } else {
597 _add_conns(global_id);
598 }
599
600 // throw out old queued messages
601 while (!waiting_for_session.empty()) {
602 waiting_for_session.front()->put();
603 waiting_for_session.pop_front();
604 }
605
606 // throw out version check requests
607 while (!version_requests.empty()) {
608 finisher.queue(version_requests.begin()->second->context, -EAGAIN);
609 delete version_requests.begin()->second;
610 version_requests.erase(version_requests.begin());
611 }
612
613 for (auto& c : pending_cons) {
614 c.second.start(monmap.get_epoch(), entity_name, *auth_supported);
615 }
616
617 for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
618 p != sub_sent.end();
619 ++p) {
620 if (sub_new.count(p->first) == 0)
621 sub_new[p->first] = p->second;
622 }
623 if (!sub_new.empty())
624 _renew_subs();
625 }
626
627 MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
628 {
629 auto peer = monmap.get_addr(rank);
630 auto conn = messenger->get_connection(monmap.get_inst(rank));
631 MonConnection mc(cct, conn, global_id);
632 auto inserted = pending_cons.insert(make_pair(peer, move(mc)));
633 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
634 << " con " << conn
635 << " addr " << conn->get_peer_addr()
636 << dendl;
637 return inserted.first->second;
638 }
639
640 void MonClient::_add_conns(uint64_t global_id)
641 {
642 uint16_t min_priority = std::numeric_limits<uint16_t>::max();
643 for (const auto& m : monmap.mon_info) {
644 if (m.second.priority < min_priority) {
645 min_priority = m.second.priority;
646 }
647 }
648 vector<unsigned> ranks;
649 for (const auto& m : monmap.mon_info) {
650 if (m.second.priority == min_priority) {
651 ranks.push_back(monmap.get_rank(m.first));
652 }
653 }
654 std::random_device rd;
655 std::mt19937 rng(rd());
656 std::shuffle(ranks.begin(), ranks.end(), rng);
657 unsigned n = cct->_conf->mon_client_hunt_parallel;
658 if (n == 0 || n > ranks.size()) {
659 n = ranks.size();
660 }
661 for (unsigned i = 0; i < n; i++) {
662 _add_conn(ranks[i], global_id);
663 }
664 }
665
666 bool MonClient::ms_handle_reset(Connection *con)
667 {
668 Mutex::Locker lock(monc_lock);
669
670 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
671 return false;
672
673 if (_hunting()) {
674 if (pending_cons.count(con->get_peer_addr())) {
675 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addr() << dendl;
676 } else {
677 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl;
678 }
679 return true;
680 } else {
681 if (active_con && con == active_con->get_con()) {
682 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addr() << dendl;
683 _reopen_session();
684 return false;
685 } else {
686 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl;
687 return true;
688 }
689 }
690 }
691
692 bool MonClient::_opened() const
693 {
694 assert(monc_lock.is_locked());
695 return active_con || _hunting();
696 }
697
698 bool MonClient::_hunting() const
699 {
700 return !pending_cons.empty();
701 }
702
703 void MonClient::_start_hunting()
704 {
705 assert(!_hunting());
706 // adjust timeouts if necessary
707 if (!had_a_connection)
708 return;
709 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
710 if (reopen_interval_multiplier >
711 cct->_conf->mon_client_hunt_interval_max_multiple) {
712 reopen_interval_multiplier =
713 cct->_conf->mon_client_hunt_interval_max_multiple;
714 }
715 }
716
717 void MonClient::_finish_hunting()
718 {
719 assert(monc_lock.is_locked());
720 // the pending conns have been cleaned.
721 assert(!_hunting());
722 if (active_con) {
723 auto con = active_con->get_con();
724 ldout(cct, 1) << "found mon."
725 << monmap.get_name(con->get_peer_addr())
726 << dendl;
727 } else {
728 ldout(cct, 1) << "no mon sessions established" << dendl;
729 }
730
731 had_a_connection = true;
732 reopen_interval_multiplier /= 2.0;
733 if (reopen_interval_multiplier < 1.0)
734 reopen_interval_multiplier = 1.0;
735 }
736
737 void MonClient::tick()
738 {
739 ldout(cct, 10) << __func__ << dendl;
740
741 _check_auth_tickets();
742
743 if (_hunting()) {
744 ldout(cct, 1) << "continuing hunt" << dendl;
745 _reopen_session();
746 } else if (active_con) {
747 // just renew as needed
748 utime_t now = ceph_clock_now();
749 auto cur_con = active_con->get_con();
750 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
751 ldout(cct, 10) << "renew subs? (now: " << now
752 << "; renew after: " << sub_renew_after << ") -- "
753 << (now > sub_renew_after ? "yes" : "no")
754 << dendl;
755 if (now > sub_renew_after)
756 _renew_subs();
757 }
758
759 cur_con->send_keepalive();
760
761 if (cct->_conf->mon_client_ping_timeout > 0 &&
762 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
763 utime_t lk = cur_con->get_last_keepalive_ack();
764 utime_t interval = now - lk;
765 if (interval > cct->_conf->mon_client_ping_timeout) {
766 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
767 << " seconds), reconnecting" << dendl;
768 _reopen_session();
769 }
770
771 send_log();
772 }
773 }
774
775 schedule_tick();
776 }
777
778 void MonClient::schedule_tick()
779 {
780 struct C_Tick : public Context {
781 MonClient *monc;
782 explicit C_Tick(MonClient *m) : monc(m) {}
783 void finish(int r) override {
784 monc->tick();
785 }
786 };
787
788 if (_hunting()) {
789 timer.add_event_after(cct->_conf->mon_client_hunt_interval
790 * reopen_interval_multiplier,
791 new C_Tick(this));
792 } else
793 timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this));
794 }
795
796 // ---------
797
798 void MonClient::_renew_subs()
799 {
800 assert(monc_lock.is_locked());
801 if (sub_new.empty()) {
802 ldout(cct, 10) << __func__ << " - empty" << dendl;
803 return;
804 }
805
806 ldout(cct, 10) << __func__ << dendl;
807 if (!_opened())
808 _reopen_session();
809 else {
810 if (sub_renew_sent == utime_t())
811 sub_renew_sent = ceph_clock_now();
812
813 MMonSubscribe *m = new MMonSubscribe;
814 m->what = sub_new;
815 _send_mon_message(m);
816
817 // update sub_sent with sub_new
818 sub_new.insert(sub_sent.begin(), sub_sent.end());
819 std::swap(sub_new, sub_sent);
820 sub_new.clear();
821 }
822 }
823
824 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
825 {
826 if (sub_renew_sent != utime_t()) {
827 // NOTE: this is only needed for legacy (infernalis or older)
828 // mons; see tick().
829 sub_renew_after = sub_renew_sent;
830 sub_renew_after += m->interval / 2.0;
831 ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl;
832 sub_renew_sent = utime_t();
833 } else {
834 ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << ", ignoring" << dendl;
835 }
836
837 m->put();
838 }
839
840 int MonClient::_check_auth_tickets()
841 {
842 assert(monc_lock.is_locked());
843 if (active_con && auth) {
844 if (auth->need_tickets()) {
845 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
846 MAuth *m = new MAuth;
847 m->protocol = auth->get_protocol();
848 auth->prepare_build_request();
849 auth->build_request(m->auth_payload);
850 _send_mon_message(m);
851 }
852
853 _check_auth_rotating();
854 }
855 return 0;
856 }
857
858 int MonClient::_check_auth_rotating()
859 {
860 assert(monc_lock.is_locked());
861 if (!rotating_secrets ||
862 !auth_principal_needs_rotating_keys(entity_name)) {
863 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
864 return 0;
865 }
866
867 if (!active_con || !auth) {
868 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
869 return 0;
870 }
871
872 utime_t now = ceph_clock_now();
873 utime_t cutoff = now;
874 cutoff -= MIN(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
875 utime_t issued_at_lower_bound = now;
876 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
877 if (!rotating_secrets->need_new_secrets(cutoff)) {
878 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
879 rotating_secrets->dump_rotating();
880 return 0;
881 }
882
883 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
884 if (!rotating_secrets->need_new_secrets() &&
885 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
886 // the key has expired before it has been issued?
887 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
888 << " (before " << issued_at_lower_bound << ")" << dendl;
889 }
890 if ((now > last_rotating_renew_sent) &&
891 double(now - last_rotating_renew_sent) < 1) {
892 ldout(cct, 10) << __func__ << " called too often (last: "
893 << last_rotating_renew_sent << "), skipping refresh" << dendl;
894 return 0;
895 }
896 MAuth *m = new MAuth;
897 m->protocol = auth->get_protocol();
898 if (auth->build_rotating_request(m->auth_payload)) {
899 last_rotating_renew_sent = now;
900 _send_mon_message(m);
901 } else {
902 m->put();
903 }
904 return 0;
905 }
906
907 int MonClient::wait_auth_rotating(double timeout)
908 {
909 Mutex::Locker l(monc_lock);
910 utime_t now = ceph_clock_now();
911 utime_t until = now;
912 until += timeout;
913
914 // Must be initialized
915 assert(auth != nullptr);
916
917 if (auth->get_protocol() == CEPH_AUTH_NONE)
918 return 0;
919
920 if (!rotating_secrets)
921 return 0;
922
923 while (auth_principal_needs_rotating_keys(entity_name) &&
924 rotating_secrets->need_new_secrets(now)) {
925 if (now >= until) {
926 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
927 return -ETIMEDOUT;
928 }
929 ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl;
930 auth_cond.WaitUntil(monc_lock, until);
931 now = ceph_clock_now();
932 }
933 ldout(cct, 10) << __func__ << " done" << dendl;
934 return 0;
935 }
936
937 // ---------
938
939 void MonClient::_send_command(MonCommand *r)
940 {
941 entity_addr_t peer;
942 if (active_con) {
943 peer = active_con->get_con()->get_peer_addr();
944 }
945
946 if (r->target_rank >= 0 &&
947 r->target_rank != monmap.get_rank(peer)) {
948 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
949 << " wants rank " << r->target_rank
950 << ", reopening session"
951 << dendl;
952 if (r->target_rank >= (int)monmap.size()) {
953 ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl;
954 _finish_command(r, -ENOENT, "mon rank dne");
955 return;
956 }
957 _reopen_session(r->target_rank);
958 return;
959 }
960
961 if (r->target_name.length() &&
962 r->target_name != monmap.get_name(peer)) {
963 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
964 << " wants mon " << r->target_name
965 << ", reopening session"
966 << dendl;
967 if (!monmap.contains(r->target_name)) {
968 ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl;
969 _finish_command(r, -ENOENT, "mon dne");
970 return;
971 }
972 _reopen_session(monmap.get_rank(r->target_name));
973 return;
974 }
975
976 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
977 MMonCommand *m = new MMonCommand(monmap.fsid);
978 m->set_tid(r->tid);
979 m->cmd = r->cmd;
980 m->set_data(r->inbl);
981 _send_mon_message(m);
982 return;
983 }
984
985 void MonClient::_resend_mon_commands()
986 {
987 // resend any requests
988 for (map<uint64_t,MonCommand*>::iterator p = mon_commands.begin();
989 p != mon_commands.end();
990 ++p) {
991 _send_command(p->second);
992 }
993 }
994
995 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
996 {
997 MonCommand *r = NULL;
998 uint64_t tid = ack->get_tid();
999
1000 if (tid == 0 && !mon_commands.empty()) {
1001 r = mon_commands.begin()->second;
1002 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1003 } else {
1004 map<uint64_t,MonCommand*>::iterator p = mon_commands.find(tid);
1005 if (p == mon_commands.end()) {
1006 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1007 ack->put();
1008 return;
1009 }
1010 r = p->second;
1011 }
1012
1013 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1014 if (r->poutbl)
1015 r->poutbl->claim(ack->get_data());
1016 _finish_command(r, ack->r, ack->rs);
1017 ack->put();
1018 }
1019
1020 int MonClient::_cancel_mon_command(uint64_t tid)
1021 {
1022 assert(monc_lock.is_locked());
1023
1024 map<ceph_tid_t, MonCommand*>::iterator it = mon_commands.find(tid);
1025 if (it == mon_commands.end()) {
1026 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1027 return -ENOENT;
1028 }
1029
1030 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1031
1032 MonCommand *cmd = it->second;
1033 _finish_command(cmd, -ETIMEDOUT, "");
1034 return 0;
1035 }
1036
1037 void MonClient::_finish_command(MonCommand *r, int ret, string rs)
1038 {
1039 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
1040 if (r->prval)
1041 *(r->prval) = ret;
1042 if (r->prs)
1043 *(r->prs) = rs;
1044 if (r->onfinish)
1045 finisher.queue(r->onfinish, ret);
1046 mon_commands.erase(r->tid);
1047 delete r;
1048 }
1049
1050 void MonClient::start_mon_command(const vector<string>& cmd,
1051 const bufferlist& inbl,
1052 bufferlist *outbl, string *outs,
1053 Context *onfinish)
1054 {
1055 Mutex::Locker l(monc_lock);
1056 MonCommand *r = new MonCommand(++last_mon_command_tid);
1057 r->cmd = cmd;
1058 r->inbl = inbl;
1059 r->poutbl = outbl;
1060 r->prs = outs;
1061 r->onfinish = onfinish;
1062 if (cct->_conf->rados_mon_op_timeout > 0) {
1063 class C_CancelMonCommand : public Context
1064 {
1065 uint64_t tid;
1066 MonClient *monc;
1067 public:
1068 C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
1069 void finish(int r) override {
1070 monc->_cancel_mon_command(tid);
1071 }
1072 };
1073 r->ontimeout = new C_CancelMonCommand(r->tid, this);
1074 timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
1075 }
1076 mon_commands[r->tid] = r;
1077 _send_command(r);
1078 }
1079
1080 void MonClient::start_mon_command(const string &mon_name,
1081 const vector<string>& cmd,
1082 const bufferlist& inbl,
1083 bufferlist *outbl, string *outs,
1084 Context *onfinish)
1085 {
1086 Mutex::Locker l(monc_lock);
1087 MonCommand *r = new MonCommand(++last_mon_command_tid);
1088 r->target_name = mon_name;
1089 r->cmd = cmd;
1090 r->inbl = inbl;
1091 r->poutbl = outbl;
1092 r->prs = outs;
1093 r->onfinish = onfinish;
1094 mon_commands[r->tid] = r;
1095 _send_command(r);
1096 }
1097
1098 void MonClient::start_mon_command(int rank,
1099 const vector<string>& cmd,
1100 const bufferlist& inbl,
1101 bufferlist *outbl, string *outs,
1102 Context *onfinish)
1103 {
1104 Mutex::Locker l(monc_lock);
1105 MonCommand *r = new MonCommand(++last_mon_command_tid);
1106 r->target_rank = rank;
1107 r->cmd = cmd;
1108 r->inbl = inbl;
1109 r->poutbl = outbl;
1110 r->prs = outs;
1111 r->onfinish = onfinish;
1112 mon_commands[r->tid] = r;
1113 _send_command(r);
1114 }
1115
1116 // ---------
1117
1118 void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
1119 {
1120 version_req_d *req = new version_req_d(onfinish, newest, oldest);
1121 ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
1122 Mutex::Locker l(monc_lock);
1123 MMonGetVersion *m = new MMonGetVersion();
1124 m->what = map;
1125 m->handle = ++version_req_id;
1126 version_requests[m->handle] = req;
1127 _send_mon_message(m);
1128 }
1129
1130 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1131 {
1132 assert(monc_lock.is_locked());
1133 map<ceph_tid_t, version_req_d*>::iterator iter = version_requests.find(m->handle);
1134 if (iter == version_requests.end()) {
1135 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1136 << " not found" << dendl;
1137 } else {
1138 version_req_d *req = iter->second;
1139 ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
1140 version_requests.erase(iter);
1141 if (req->newest)
1142 *req->newest = m->version;
1143 if (req->oldest)
1144 *req->oldest = m->oldest_version;
1145 finisher.queue(req->context, 0);
1146 delete req;
1147 }
1148 m->put();
1149 }
1150
1151 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1152 Mutex::Locker l(monc_lock);
1153 if (auth) {
1154 return auth->build_authorizer(service_id);
1155 } else {
1156 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1157 << ", but no auth is available now" << dendl;
1158 return nullptr;
1159 }
1160 }
1161
1162 #define dout_subsys ceph_subsys_monc
1163 #undef dout_prefix
1164 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1165
1166 MonConnection::MonConnection(CephContext *cct, ConnectionRef con, uint64_t global_id)
1167 : cct(cct), con(con), global_id(global_id)
1168 {}
1169
1170 MonConnection::~MonConnection()
1171 {
1172 if (con) {
1173 con->mark_down();
1174 con.reset();
1175 }
1176 }
1177
1178 bool MonConnection::have_session() const
1179 {
1180 return state == State::HAVE_SESSION;
1181 }
1182
1183 void MonConnection::start(epoch_t epoch,
1184 const EntityName& entity_name,
1185 const AuthMethodList& auth_supported)
1186 {
1187 // restart authentication handshake
1188 state = State::NEGOTIATING;
1189
1190 // send an initial keepalive to ensure our timestamp is valid by the
1191 // time we are in an OPENED state (by sequencing this before
1192 // authentication).
1193 con->send_keepalive();
1194
1195 auto m = new MAuth;
1196 m->protocol = 0;
1197 m->monmap_epoch = epoch;
1198 __u8 struct_v = 1;
1199 ::encode(struct_v, m->auth_payload);
1200 ::encode(auth_supported.get_supported_set(), m->auth_payload);
1201 ::encode(entity_name, m->auth_payload);
1202 ::encode(global_id, m->auth_payload);
1203 con->send_message(m);
1204 }
1205
1206 int MonConnection::handle_auth(MAuthReply* m,
1207 const EntityName& entity_name,
1208 uint32_t want_keys,
1209 RotatingKeyRing* keyring)
1210 {
1211 if (state == State::NEGOTIATING) {
1212 int r = _negotiate(m, entity_name, want_keys, keyring);
1213 if (r) {
1214 return r;
1215 }
1216 state = State::AUTHENTICATING;
1217 }
1218 int r = authenticate(m);
1219 if (!r) {
1220 state = State::HAVE_SESSION;
1221 }
1222 return r;
1223 }
1224
1225 int MonConnection::_negotiate(MAuthReply *m,
1226 const EntityName& entity_name,
1227 uint32_t want_keys,
1228 RotatingKeyRing* keyring)
1229 {
1230 if (auth && (int)m->protocol == auth->get_protocol()) {
1231 // good, negotiation completed
1232 auth->reset();
1233 return 0;
1234 }
1235
1236 auth.reset(get_auth_client_handler(cct, m->protocol, keyring));
1237 if (!auth) {
1238 ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl;
1239 if (m->result == -ENOTSUP) {
1240 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1241 << dendl;
1242 }
1243 return m->result;
1244 }
1245
1246 // do not request MGR key unless the mon has the SERVER_KRAKEN
1247 // feature. otherwise it will give us an auth error. note that
1248 // we have to use the FEATUREMASK because pre-jewel the kraken
1249 // feature bit was used for something else.
1250 if ((want_keys & CEPH_ENTITY_TYPE_MGR) &&
1251 !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1252 ldout(cct, 1) << __func__
1253 << " not requesting MGR keys from pre-kraken monitor"
1254 << dendl;
1255 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1256 }
1257 auth->set_want_keys(want_keys);
1258 auth->init(entity_name);
1259 auth->set_global_id(global_id);
1260 return 0;
1261 }
1262
1263 int MonConnection::authenticate(MAuthReply *m)
1264 {
1265 assert(auth);
1266 if (!m->global_id) {
1267 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1268 }
1269 if (m->global_id != global_id) {
1270 // it's a new session
1271 auth->reset();
1272 global_id = m->global_id;
1273 auth->set_global_id(global_id);
1274 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1275 }
1276 auto p = m->result_bl.begin();
1277 int ret = auth->handle_response(m->result, p);
1278 if (ret == -EAGAIN) {
1279 auto ma = new MAuth;
1280 ma->protocol = auth->get_protocol();
1281 auth->prepare_build_request();
1282 auth->build_request(ma->auth_payload);
1283 con->send_message(ma);
1284 }
1285 return ret;
1286 }