1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "include/scope_guard.h"
19 #include "messages/MMonGetMap.h"
20 #include "messages/MMonGetVersion.h"
21 #include "messages/MMonGetVersionReply.h"
22 #include "messages/MMonMap.h"
23 #include "messages/MAuth.h"
24 #include "messages/MLogAck.h"
25 #include "messages/MAuthReply.h"
26 #include "messages/MMonCommand.h"
27 #include "messages/MMonCommandAck.h"
28 #include "messages/MPing.h"
30 #include "messages/MMonSubscribe.h"
31 #include "messages/MMonSubscribeAck.h"
32 #include "common/errno.h"
33 #include "common/LogClient.h"
35 #include "MonClient.h"
38 #include "auth/Auth.h"
39 #include "auth/KeyRing.h"
40 #include "auth/AuthClientHandler.h"
41 #include "auth/AuthMethodList.h"
42 #include "auth/RotatingKeyRing.h"
44 #define dout_subsys ceph_subsys_monc
46 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
48 MonClient::MonClient(CephContext
*cct_
) :
51 monc_lock("MonClient::monc_lock"),
52 timer(cct_
, monc_lock
),
55 no_keyring_disabled_cephx(false),
57 more_log_pending(false),
59 had_a_connection(false),
60 reopen_interval_multiplier(
61 cct_
->_conf
->get_val
<double>("mon_client_hunt_interval_min_multiple")),
62 last_mon_command_tid(0),
67 MonClient::~MonClient()
71 int MonClient::build_initial_monmap()
73 ldout(cct
, 10) << __func__
<< dendl
;
74 return monmap
.build_initial(cct
, cerr
);
77 int MonClient::get_monmap()
79 ldout(cct
, 10) << __func__
<< dendl
;
80 Mutex::Locker
l(monc_lock
);
82 _sub_want("monmap", 0, 0);
87 map_cond
.Wait(monc_lock
);
89 ldout(cct
, 10) << __func__
<< " done" << dendl
;
93 int MonClient::get_monmap_privately()
95 ldout(cct
, 10) << __func__
<< dendl
;
96 Mutex::Locker
l(monc_lock
);
98 bool temp_msgr
= false;
99 Messenger
* smessenger
= NULL
;
101 messenger
= smessenger
= Messenger::create_client_messenger(cct
, "temp_mon_client");
102 if (NULL
== messenger
) {
105 messenger
->add_dispatcher_head(this);
112 ldout(cct
, 10) << "have " << monmap
.epoch
<< " fsid " << monmap
.fsid
<< dendl
;
114 std::random_device rd
;
115 std::mt19937
rng(rd());
116 assert(monmap
.size() > 0);
117 std::uniform_int_distribution
<unsigned> ranks(0, monmap
.size() - 1);
118 while (monmap
.fsid
.is_zero()) {
119 auto rank
= ranks(rng
);
120 auto& pending_con
= _add_conn(rank
, 0);
121 auto con
= pending_con
.get_con();
122 ldout(cct
, 10) << "querying mon." << monmap
.get_name(rank
) << " "
123 << con
->get_peer_addr() << dendl
;
124 con
->send_message(new MMonGetMap
);
130 interval
.set_from_double(cct
->_conf
->mon_client_hunt_interval
);
131 map_cond
.WaitInterval(monc_lock
, interval
);
133 if (monmap
.fsid
.is_zero() && con
) {
134 con
->mark_down(); // nope, clean that connection up
139 pending_cons
.clear();
141 messenger
->shutdown();
149 pending_cons
.clear();
151 if (!monmap
.fsid
.is_zero())
158 * Ping the monitor with id @p mon_id and set the resulting reply in
159 * the provided @p result_reply, if this last parameter is not NULL.
161 * So that we don't rely on the MonClient's default messenger, set up
162 * during connect(), we create our own messenger to comunicate with the
163 * specified monitor. This is advantageous in the following ways:
165 * - Isolate the ping procedure from the rest of the MonClient's operations,
166 * allowing us to not acquire or manage the big monc_lock, thus not
167 * having to block waiting for some other operation to finish before we
169 * * for instance, we can ping mon.FOO even if we are currently hunting
170 * or blocked waiting for auth to complete with mon.BAR.
172 * - Ping a monitor prior to establishing a connection (using connect())
173 * and properly establish the MonClient's messenger. This frees us
174 * from dealing with the complex foo that happens in connect().
176 * We also don't rely on MonClient as a dispatcher for this messenger,
177 * unlike what happens with the MonClient's default messenger. This allows
178 * us to sandbox the whole ping, having it much as a separate entity in
179 * the MonClient class, considerably simplifying the handling and dispatching
180 * of messages without needing to consider monc_lock.
182 * Current drawback is that we will establish a messenger for each ping
183 * we want to issue, instead of keeping a single messenger instance that
184 * would be used for all pings.
186 int MonClient::ping_monitor(const string
&mon_id
, string
*result_reply
)
188 ldout(cct
, 10) << __func__
<< dendl
;
191 if (monmap
.contains("noname-"+mon_id
)) {
192 new_mon_id
= "noname-"+mon_id
;
197 if (new_mon_id
.empty()) {
198 ldout(cct
, 10) << __func__
<< " specified mon id is empty!" << dendl
;
200 } else if (!monmap
.contains(new_mon_id
)) {
201 ldout(cct
, 10) << __func__
<< " no such monitor 'mon." << new_mon_id
<< "'"
206 MonClientPinger
*pinger
= new MonClientPinger(cct
, result_reply
);
208 Messenger
*smsgr
= Messenger::create_client_messenger(cct
, "temp_ping_client");
209 smsgr
->add_dispatcher_head(pinger
);
212 ConnectionRef con
= smsgr
->get_connection(monmap
.get_inst(new_mon_id
));
213 ldout(cct
, 10) << __func__
<< " ping mon." << new_mon_id
214 << " " << con
->get_peer_addr() << dendl
;
215 con
->send_message(new MPing
);
218 int ret
= pinger
->wait_for_reply(cct
->_conf
->client_mount_timeout
);
220 ldout(cct
,10) << __func__
<< " got ping reply" << dendl
;
224 pinger
->lock
.Unlock();
234 bool MonClient::ms_dispatch(Message
*m
)
236 if (my_addr
== entity_addr_t())
237 my_addr
= messenger
->get_myaddr();
239 // we only care about these message types
240 switch (m
->get_type()) {
241 case CEPH_MSG_MON_MAP
:
242 case CEPH_MSG_AUTH_REPLY
:
243 case CEPH_MSG_MON_SUBSCRIBE_ACK
:
244 case CEPH_MSG_MON_GET_VERSION_REPLY
:
245 case MSG_MON_COMMAND_ACK
:
252 Mutex::Locker
lock(monc_lock
);
255 auto pending_con
= pending_cons
.find(m
->get_source_addr());
256 if (pending_con
== pending_cons
.end() ||
257 pending_con
->second
.get_con() != m
->get_connection()) {
258 // ignore any messages outside hunting sessions
259 ldout(cct
, 10) << "discarding stray monitor message " << *m
<< dendl
;
263 } else if (!active_con
|| active_con
->get_con() != m
->get_connection()) {
264 // ignore any messages outside our session(s)
265 ldout(cct
, 10) << "discarding stray monitor message " << *m
<< dendl
;
270 switch (m
->get_type()) {
271 case CEPH_MSG_MON_MAP
:
272 handle_monmap(static_cast<MMonMap
*>(m
));
273 if (passthrough_monmap
) {
279 case CEPH_MSG_AUTH_REPLY
:
280 handle_auth(static_cast<MAuthReply
*>(m
));
282 case CEPH_MSG_MON_SUBSCRIBE_ACK
:
283 handle_subscribe_ack(static_cast<MMonSubscribeAck
*>(m
));
285 case CEPH_MSG_MON_GET_VERSION_REPLY
:
286 handle_get_version_reply(static_cast<MMonGetVersionReply
*>(m
));
288 case MSG_MON_COMMAND_ACK
:
289 handle_mon_command_ack(static_cast<MMonCommandAck
*>(m
));
293 log_client
->handle_log_ack(static_cast<MLogAck
*>(m
));
295 if (more_log_pending
) {
306 void MonClient::send_log(bool flush
)
309 Message
*lm
= log_client
->get_mon_log_message(flush
);
311 _send_mon_message(lm
);
312 more_log_pending
= log_client
->are_pending();
316 void MonClient::flush_log()
318 Mutex::Locker
l(monc_lock
);
322 /* Unlike all the other message-handling functions, we don't put away a reference
323 * because we want to support MMonMap passthrough to other Dispatchers. */
324 void MonClient::handle_monmap(MMonMap
*m
)
326 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
327 auto peer
= m
->get_source_addr();
328 string cur_mon
= monmap
.get_name(peer
);
330 bufferlist::iterator p
= m
->monmapbl
.begin();
333 ldout(cct
, 10) << " got monmap " << monmap
.epoch
334 << ", mon." << cur_mon
<< " is now rank " << monmap
.get_rank(cur_mon
)
336 ldout(cct
, 10) << "dump:\n";
337 monmap
.print(*_dout
);
340 _sub_got("monmap", monmap
.get_epoch());
342 if (!monmap
.get_addr_name(peer
, cur_mon
)) {
343 ldout(cct
, 10) << "mon." << cur_mon
<< " went away" << dendl
;
344 // can't find the mon we were talking to (above)
352 // ----------------------
354 int MonClient::init()
356 ldout(cct
, 10) << __func__
<< dendl
;
358 messenger
->add_dispatcher_head(this);
360 entity_name
= cct
->_conf
->name
;
362 Mutex::Locker
l(monc_lock
);
365 if (!cct
->_conf
->auth_supported
.empty())
366 method
= cct
->_conf
->auth_supported
;
367 else if (entity_name
.get_type() == CEPH_ENTITY_TYPE_OSD
||
368 entity_name
.get_type() == CEPH_ENTITY_TYPE_MDS
||
369 entity_name
.get_type() == CEPH_ENTITY_TYPE_MON
||
370 entity_name
.get_type() == CEPH_ENTITY_TYPE_MGR
)
371 method
= cct
->_conf
->auth_cluster_required
;
373 method
= cct
->_conf
->auth_client_required
;
374 auth_supported
.reset(new AuthMethodList(cct
, method
));
375 ldout(cct
, 10) << "auth_supported " << auth_supported
->get_supported_set() << " method " << method
<< dendl
;
378 keyring
.reset(new KeyRing
); // initializing keyring anyway
380 if (auth_supported
->is_supported_auth(CEPH_AUTH_CEPHX
)) {
381 r
= keyring
->from_ceph_context(cct
);
383 auth_supported
->remove_supported_auth(CEPH_AUTH_CEPHX
);
384 if (!auth_supported
->get_supported_set().empty()) {
386 no_keyring_disabled_cephx
= true;
388 lderr(cct
) << "ERROR: missing keyring, cannot use cephx for authentication" << dendl
;
397 rotating_secrets
.reset(
398 new RotatingKeyRing(cct
, cct
->get_module_type(), keyring
.get()));
409 void MonClient::shutdown()
411 ldout(cct
, 10) << __func__
<< dendl
;
413 while (!version_requests
.empty()) {
414 version_requests
.begin()->second
->context
->complete(-ECANCELED
);
415 ldout(cct
, 20) << __func__
<< " canceling and discarding version request "
416 << version_requests
.begin()->second
<< dendl
;
417 delete version_requests
.begin()->second
;
418 version_requests
.erase(version_requests
.begin());
420 while (!mon_commands
.empty()) {
421 auto tid
= mon_commands
.begin()->first
;
422 _cancel_mon_command(tid
);
424 while (!waiting_for_session
.empty()) {
425 ldout(cct
, 20) << __func__
<< " discarding pending message " << *waiting_for_session
.front() << dendl
;
426 waiting_for_session
.front()->put();
427 waiting_for_session
.pop_front();
431 pending_cons
.clear();
437 finisher
.wait_for_empty();
446 int MonClient::authenticate(double timeout
)
448 Mutex::Locker
lock(monc_lock
);
451 ldout(cct
, 5) << "already authenticated" << dendl
;
455 _sub_want("monmap", monmap
.get_epoch() ? monmap
.get_epoch() + 1 : 0, 0);
459 utime_t until
= ceph_clock_now();
462 ldout(cct
, 10) << "authenticate will time out at " << until
<< dendl
;
463 while (!active_con
&& !authenticate_err
) {
465 int r
= auth_cond
.WaitUntil(monc_lock
, until
);
466 if (r
== ETIMEDOUT
) {
467 ldout(cct
, 0) << "authenticate timed out after " << timeout
<< dendl
;
468 authenticate_err
= -r
;
471 auth_cond
.Wait(monc_lock
);
476 ldout(cct
, 5) << __func__
<< " success, global_id "
477 << active_con
->get_global_id() << dendl
;
478 // active_con should not have been set if there was an error
479 assert(authenticate_err
== 0);
480 authenticated
= true;
483 if (authenticate_err
< 0 && no_keyring_disabled_cephx
) {
484 lderr(cct
) << __func__
<< " NOTE: no keyring found; disabled cephx authentication" << dendl
;
487 return authenticate_err
;
490 void MonClient::handle_auth(MAuthReply
*m
)
492 assert(monc_lock
.is_locked());
494 std::swap(active_con
->get_auth(), auth
);
495 int ret
= active_con
->authenticate(m
);
497 std::swap(auth
, active_con
->get_auth());
498 if (global_id
!= active_con
->get_global_id()) {
499 lderr(cct
) << __func__
<< " peer assigned me a different global_id: "
500 << active_con
->get_global_id() << dendl
;
502 if (ret
!= -EAGAIN
) {
509 auto found
= pending_cons
.find(m
->get_source_addr());
510 assert(found
!= pending_cons
.end());
511 int auth_err
= found
->second
.handle_auth(m
, entity_name
, want_keys
,
512 rotating_secrets
.get());
514 if (auth_err
== -EAGAIN
) {
518 pending_cons
.erase(found
);
519 if (!pending_cons
.empty()) {
520 // keep trying with pending connections
523 // the last try just failed, give up.
525 auto& mc
= found
->second
;
526 assert(mc
.have_session());
527 active_con
.reset(new MonConnection(std::move(mc
)));
528 pending_cons
.clear();
534 last_rotating_renew_sent
= utime_t();
535 while (!waiting_for_session
.empty()) {
536 _send_mon_message(waiting_for_session
.front());
537 waiting_for_session
.pop_front();
539 _resend_mon_commands();
542 std::swap(auth
, active_con
->get_auth());
543 global_id
= active_con
->get_global_id();
546 _finish_auth(auth_err
);
548 Context
*cb
= nullptr;
549 if (session_established_context
) {
550 cb
= session_established_context
.release();
560 void MonClient::_finish_auth(int auth_err
)
562 authenticate_err
= auth_err
;
563 // _resend_mon_commands() could _reopen_session() if the connected mon is not
564 // the one the MonCommand is targeting.
565 if (!auth_err
&& active_con
) {
567 _check_auth_tickets();
569 auth_cond
.SignalAll();
574 void MonClient::_send_mon_message(Message
*m
)
576 assert(monc_lock
.is_locked());
578 auto cur_con
= active_con
->get_con();
579 ldout(cct
, 10) << "_send_mon_message to mon."
580 << monmap
.get_name(cur_con
->get_peer_addr())
581 << " at " << cur_con
->get_peer_addr() << dendl
;
582 cur_con
->send_message(m
);
584 waiting_for_session
.push_back(m
);
588 void MonClient::_reopen_session(int rank
)
590 assert(monc_lock
.is_locked());
591 ldout(cct
, 10) << __func__
<< " rank " << rank
<< dendl
;
594 pending_cons
.clear();
599 _add_conn(rank
, global_id
);
601 _add_conns(global_id
);
604 // throw out old queued messages
605 while (!waiting_for_session
.empty()) {
606 waiting_for_session
.front()->put();
607 waiting_for_session
.pop_front();
610 // throw out version check requests
611 while (!version_requests
.empty()) {
612 finisher
.queue(version_requests
.begin()->second
->context
, -EAGAIN
);
613 delete version_requests
.begin()->second
;
614 version_requests
.erase(version_requests
.begin());
617 for (auto& c
: pending_cons
) {
618 c
.second
.start(monmap
.get_epoch(), entity_name
, *auth_supported
);
621 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= sub_sent
.begin();
624 if (sub_new
.count(p
->first
) == 0)
625 sub_new
[p
->first
] = p
->second
;
627 if (!sub_new
.empty())
631 MonConnection
& MonClient::_add_conn(unsigned rank
, uint64_t global_id
)
633 auto peer
= monmap
.get_addr(rank
);
634 auto conn
= messenger
->get_connection(monmap
.get_inst(rank
));
635 MonConnection
mc(cct
, conn
, global_id
);
636 auto inserted
= pending_cons
.insert(make_pair(peer
, move(mc
)));
637 ldout(cct
, 10) << "picked mon." << monmap
.get_name(rank
)
639 << " addr " << conn
->get_peer_addr()
641 return inserted
.first
->second
;
644 void MonClient::_add_conns(uint64_t global_id
)
646 uint16_t min_priority
= std::numeric_limits
<uint16_t>::max();
647 for (const auto& m
: monmap
.mon_info
) {
648 if (m
.second
.priority
< min_priority
) {
649 min_priority
= m
.second
.priority
;
652 vector
<unsigned> ranks
;
653 for (const auto& m
: monmap
.mon_info
) {
654 if (m
.second
.priority
== min_priority
) {
655 ranks
.push_back(monmap
.get_rank(m
.first
));
658 std::random_device rd
;
659 std::mt19937
rng(rd());
660 std::shuffle(ranks
.begin(), ranks
.end(), rng
);
661 unsigned n
= cct
->_conf
->mon_client_hunt_parallel
;
662 if (n
== 0 || n
> ranks
.size()) {
665 for (unsigned i
= 0; i
< n
; i
++) {
666 _add_conn(ranks
[i
], global_id
);
670 bool MonClient::ms_handle_reset(Connection
*con
)
672 Mutex::Locker
lock(monc_lock
);
674 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
)
678 if (pending_cons
.count(con
->get_peer_addr())) {
679 ldout(cct
, 10) << __func__
<< " hunted mon " << con
->get_peer_addr() << dendl
;
681 ldout(cct
, 10) << __func__
<< " stray mon " << con
->get_peer_addr() << dendl
;
685 if (active_con
&& con
== active_con
->get_con()) {
686 ldout(cct
, 10) << __func__
<< " current mon " << con
->get_peer_addr() << dendl
;
690 ldout(cct
, 10) << "ms_handle_reset stray mon " << con
->get_peer_addr() << dendl
;
696 bool MonClient::_opened() const
698 assert(monc_lock
.is_locked());
699 return active_con
|| _hunting();
702 bool MonClient::_hunting() const
704 return !pending_cons
.empty();
707 void MonClient::_start_hunting()
710 // adjust timeouts if necessary
711 if (!had_a_connection
)
713 reopen_interval_multiplier
*= cct
->_conf
->mon_client_hunt_interval_backoff
;
714 if (reopen_interval_multiplier
>
715 cct
->_conf
->mon_client_hunt_interval_max_multiple
) {
716 reopen_interval_multiplier
=
717 cct
->_conf
->mon_client_hunt_interval_max_multiple
;
721 void MonClient::_finish_hunting()
723 assert(monc_lock
.is_locked());
724 // the pending conns have been cleaned.
727 auto con
= active_con
->get_con();
728 ldout(cct
, 1) << "found mon."
729 << monmap
.get_name(con
->get_peer_addr())
732 ldout(cct
, 1) << "no mon sessions established" << dendl
;
735 had_a_connection
= true;
739 void MonClient::tick()
741 ldout(cct
, 10) << __func__
<< dendl
;
743 auto reschedule_tick
= make_scope_guard([this] {
747 _check_auth_tickets();
750 ldout(cct
, 1) << "continuing hunt" << dendl
;
751 return _reopen_session();
752 } else if (active_con
) {
753 // just renew as needed
754 utime_t now
= ceph_clock_now();
755 auto cur_con
= active_con
->get_con();
756 if (!cur_con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
)) {
757 ldout(cct
, 10) << "renew subs? (now: " << now
758 << "; renew after: " << sub_renew_after
<< ") -- "
759 << (now
> sub_renew_after
? "yes" : "no")
761 if (now
> sub_renew_after
)
765 cur_con
->send_keepalive();
767 if (cct
->_conf
->mon_client_ping_timeout
> 0 &&
768 cur_con
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
769 utime_t lk
= cur_con
->get_last_keepalive_ack();
770 utime_t interval
= now
- lk
;
771 if (interval
> cct
->_conf
->mon_client_ping_timeout
) {
772 ldout(cct
, 1) << "no keepalive since " << lk
<< " (" << interval
773 << " seconds), reconnecting" << dendl
;
774 return _reopen_session();
783 void MonClient::_un_backoff()
785 // un-backoff our reconnect interval
786 reopen_interval_multiplier
= std::max(
787 cct
->_conf
->get_val
<double>("mon_client_hunt_interval_min_multiple"),
788 reopen_interval_multiplier
/
789 cct
->_conf
->get_val
<double>("mon_client_hunt_interval_backoff"));
790 ldout(cct
, 20) << __func__
<< " reopen_interval_multipler now "
791 << reopen_interval_multiplier
<< dendl
;
794 void MonClient::schedule_tick()
796 struct C_Tick
: public Context
{
798 explicit C_Tick(MonClient
*m
) : monc(m
) {}
799 void finish(int r
) override
{
805 timer
.add_event_after(cct
->_conf
->mon_client_hunt_interval
806 * reopen_interval_multiplier
,
809 timer
.add_event_after(cct
->_conf
->mon_client_ping_interval
, new C_Tick(this));
814 void MonClient::_renew_subs()
816 assert(monc_lock
.is_locked());
817 if (sub_new
.empty()) {
818 ldout(cct
, 10) << __func__
<< " - empty" << dendl
;
822 ldout(cct
, 10) << __func__
<< dendl
;
826 if (sub_renew_sent
== utime_t())
827 sub_renew_sent
= ceph_clock_now();
829 MMonSubscribe
*m
= new MMonSubscribe
;
831 _send_mon_message(m
);
833 // update sub_sent with sub_new
834 sub_new
.insert(sub_sent
.begin(), sub_sent
.end());
835 std::swap(sub_new
, sub_sent
);
840 void MonClient::handle_subscribe_ack(MMonSubscribeAck
*m
)
842 if (sub_renew_sent
!= utime_t()) {
843 // NOTE: this is only needed for legacy (infernalis or older)
845 sub_renew_after
= sub_renew_sent
;
846 sub_renew_after
+= m
->interval
/ 2.0;
847 ldout(cct
, 10) << __func__
<< " sent " << sub_renew_sent
<< " renew after " << sub_renew_after
<< dendl
;
848 sub_renew_sent
= utime_t();
850 ldout(cct
, 10) << __func__
<< " sent " << sub_renew_sent
<< ", ignoring" << dendl
;
856 int MonClient::_check_auth_tickets()
858 assert(monc_lock
.is_locked());
859 if (active_con
&& auth
) {
860 if (auth
->need_tickets()) {
861 ldout(cct
, 10) << __func__
<< " getting new tickets!" << dendl
;
862 MAuth
*m
= new MAuth
;
863 m
->protocol
= auth
->get_protocol();
864 auth
->prepare_build_request();
865 auth
->build_request(m
->auth_payload
);
866 _send_mon_message(m
);
869 _check_auth_rotating();
874 int MonClient::_check_auth_rotating()
876 assert(monc_lock
.is_locked());
877 if (!rotating_secrets
||
878 !auth_principal_needs_rotating_keys(entity_name
)) {
879 ldout(cct
, 20) << "_check_auth_rotating not needed by " << entity_name
<< dendl
;
883 if (!active_con
|| !auth
) {
884 ldout(cct
, 10) << "_check_auth_rotating waiting for auth session" << dendl
;
888 utime_t now
= ceph_clock_now();
889 utime_t cutoff
= now
;
890 cutoff
-= MIN(30.0, cct
->_conf
->auth_service_ticket_ttl
/ 4.0);
891 utime_t issued_at_lower_bound
= now
;
892 issued_at_lower_bound
-= cct
->_conf
->auth_service_ticket_ttl
;
893 if (!rotating_secrets
->need_new_secrets(cutoff
)) {
894 ldout(cct
, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff
<< ")" << dendl
;
895 rotating_secrets
->dump_rotating();
899 ldout(cct
, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff
<< ")" << dendl
;
900 if (!rotating_secrets
->need_new_secrets() &&
901 rotating_secrets
->need_new_secrets(issued_at_lower_bound
)) {
902 // the key has expired before it has been issued?
903 lderr(cct
) << __func__
<< " possible clock skew, rotating keys expired way too early"
904 << " (before " << issued_at_lower_bound
<< ")" << dendl
;
906 if ((now
> last_rotating_renew_sent
) &&
907 double(now
- last_rotating_renew_sent
) < 1) {
908 ldout(cct
, 10) << __func__
<< " called too often (last: "
909 << last_rotating_renew_sent
<< "), skipping refresh" << dendl
;
912 MAuth
*m
= new MAuth
;
913 m
->protocol
= auth
->get_protocol();
914 if (auth
->build_rotating_request(m
->auth_payload
)) {
915 last_rotating_renew_sent
= now
;
916 _send_mon_message(m
);
923 int MonClient::wait_auth_rotating(double timeout
)
925 Mutex::Locker
l(monc_lock
);
926 utime_t now
= ceph_clock_now();
930 // Must be initialized
931 assert(auth
!= nullptr);
933 if (auth
->get_protocol() == CEPH_AUTH_NONE
)
936 if (!rotating_secrets
)
939 while (auth_principal_needs_rotating_keys(entity_name
) &&
940 rotating_secrets
->need_new_secrets(now
)) {
942 ldout(cct
, 0) << __func__
<< " timed out after " << timeout
<< dendl
;
945 ldout(cct
, 10) << __func__
<< " waiting (until " << until
<< ")" << dendl
;
946 auth_cond
.WaitUntil(monc_lock
, until
);
947 now
= ceph_clock_now();
949 ldout(cct
, 10) << __func__
<< " done" << dendl
;
955 void MonClient::_send_command(MonCommand
*r
)
959 peer
= active_con
->get_con()->get_peer_addr();
962 if (r
->target_rank
>= 0 &&
963 r
->target_rank
!= monmap
.get_rank(peer
)) {
964 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
965 << " wants rank " << r
->target_rank
966 << ", reopening session"
968 if (r
->target_rank
>= (int)monmap
.size()) {
969 ldout(cct
, 10) << " target " << r
->target_rank
<< " >= max mon " << monmap
.size() << dendl
;
970 _finish_command(r
, -ENOENT
, "mon rank dne");
973 _reopen_session(r
->target_rank
);
977 if (r
->target_name
.length() &&
978 r
->target_name
!= monmap
.get_name(peer
)) {
979 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
980 << " wants mon " << r
->target_name
981 << ", reopening session"
983 if (!monmap
.contains(r
->target_name
)) {
984 ldout(cct
, 10) << " target " << r
->target_name
<< " not present in monmap" << dendl
;
985 _finish_command(r
, -ENOENT
, "mon dne");
988 _reopen_session(monmap
.get_rank(r
->target_name
));
992 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
993 MMonCommand
*m
= new MMonCommand(monmap
.fsid
);
996 m
->set_data(r
->inbl
);
997 _send_mon_message(m
);
1001 void MonClient::_resend_mon_commands()
1003 // resend any requests
1004 for (map
<uint64_t,MonCommand
*>::iterator p
= mon_commands
.begin();
1005 p
!= mon_commands
.end();
1007 _send_command(p
->second
);
1011 void MonClient::handle_mon_command_ack(MMonCommandAck
*ack
)
1013 MonCommand
*r
= NULL
;
1014 uint64_t tid
= ack
->get_tid();
1016 if (tid
== 0 && !mon_commands
.empty()) {
1017 r
= mon_commands
.begin()->second
;
1018 ldout(cct
, 10) << __func__
<< " has tid 0, assuming it is " << r
->tid
<< dendl
;
1020 map
<uint64_t,MonCommand
*>::iterator p
= mon_commands
.find(tid
);
1021 if (p
== mon_commands
.end()) {
1022 ldout(cct
, 10) << __func__
<< " " << ack
->get_tid() << " not found" << dendl
;
1029 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
1031 r
->poutbl
->claim(ack
->get_data());
1032 _finish_command(r
, ack
->r
, ack
->rs
);
1036 int MonClient::_cancel_mon_command(uint64_t tid
)
1038 assert(monc_lock
.is_locked());
1040 map
<ceph_tid_t
, MonCommand
*>::iterator it
= mon_commands
.find(tid
);
1041 if (it
== mon_commands
.end()) {
1042 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
1046 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
1048 MonCommand
*cmd
= it
->second
;
1049 _finish_command(cmd
, -ETIMEDOUT
, "");
1053 void MonClient::_finish_command(MonCommand
*r
, int ret
, string rs
)
1055 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " = " << ret
<< " " << rs
<< dendl
;
1061 finisher
.queue(r
->onfinish
, ret
);
1062 mon_commands
.erase(r
->tid
);
1066 void MonClient::start_mon_command(const vector
<string
>& cmd
,
1067 const bufferlist
& inbl
,
1068 bufferlist
*outbl
, string
*outs
,
1071 Mutex::Locker
l(monc_lock
);
1072 MonCommand
*r
= new MonCommand(++last_mon_command_tid
);
1077 r
->onfinish
= onfinish
;
1078 if (cct
->_conf
->rados_mon_op_timeout
> 0) {
1079 class C_CancelMonCommand
: public Context
1084 C_CancelMonCommand(uint64_t tid
, MonClient
*monc
) : tid(tid
), monc(monc
) {}
1085 void finish(int r
) override
{
1086 monc
->_cancel_mon_command(tid
);
1089 r
->ontimeout
= new C_CancelMonCommand(r
->tid
, this);
1090 timer
.add_event_after(cct
->_conf
->rados_mon_op_timeout
, r
->ontimeout
);
1092 mon_commands
[r
->tid
] = r
;
1096 void MonClient::start_mon_command(const string
&mon_name
,
1097 const vector
<string
>& cmd
,
1098 const bufferlist
& inbl
,
1099 bufferlist
*outbl
, string
*outs
,
1102 Mutex::Locker
l(monc_lock
);
1103 MonCommand
*r
= new MonCommand(++last_mon_command_tid
);
1104 r
->target_name
= mon_name
;
1109 r
->onfinish
= onfinish
;
1110 mon_commands
[r
->tid
] = r
;
1114 void MonClient::start_mon_command(int rank
,
1115 const vector
<string
>& cmd
,
1116 const bufferlist
& inbl
,
1117 bufferlist
*outbl
, string
*outs
,
1120 Mutex::Locker
l(monc_lock
);
1121 MonCommand
*r
= new MonCommand(++last_mon_command_tid
);
1122 r
->target_rank
= rank
;
1127 r
->onfinish
= onfinish
;
1128 mon_commands
[r
->tid
] = r
;
1134 void MonClient::get_version(string map
, version_t
*newest
, version_t
*oldest
, Context
*onfinish
)
1136 version_req_d
*req
= new version_req_d(onfinish
, newest
, oldest
);
1137 ldout(cct
, 10) << "get_version " << map
<< " req " << req
<< dendl
;
1138 Mutex::Locker
l(monc_lock
);
1139 MMonGetVersion
*m
= new MMonGetVersion();
1141 m
->handle
= ++version_req_id
;
1142 version_requests
[m
->handle
] = req
;
1143 _send_mon_message(m
);
1146 void MonClient::handle_get_version_reply(MMonGetVersionReply
* m
)
1148 assert(monc_lock
.is_locked());
1149 map
<ceph_tid_t
, version_req_d
*>::iterator iter
= version_requests
.find(m
->handle
);
1150 if (iter
== version_requests
.end()) {
1151 ldout(cct
, 0) << __func__
<< " version request with handle " << m
->handle
1152 << " not found" << dendl
;
1154 version_req_d
*req
= iter
->second
;
1155 ldout(cct
, 10) << __func__
<< " finishing " << req
<< " version " << m
->version
<< dendl
;
1156 version_requests
.erase(iter
);
1158 *req
->newest
= m
->version
;
1160 *req
->oldest
= m
->oldest_version
;
1161 finisher
.queue(req
->context
, 0);
1167 AuthAuthorizer
* MonClient::build_authorizer(int service_id
) const {
1168 Mutex::Locker
l(monc_lock
);
1170 return auth
->build_authorizer(service_id
);
1172 ldout(cct
, 0) << __func__
<< " for " << ceph_entity_type_name(service_id
)
1173 << ", but no auth is available now" << dendl
;
1178 #define dout_subsys ceph_subsys_monc
1180 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1182 MonConnection::MonConnection(CephContext
*cct
, ConnectionRef con
, uint64_t global_id
)
1183 : cct(cct
), con(con
), global_id(global_id
)
1186 MonConnection::~MonConnection()
1194 bool MonConnection::have_session() const
1196 return state
== State::HAVE_SESSION
;
1199 void MonConnection::start(epoch_t epoch
,
1200 const EntityName
& entity_name
,
1201 const AuthMethodList
& auth_supported
)
1203 // restart authentication handshake
1204 state
= State::NEGOTIATING
;
1206 // send an initial keepalive to ensure our timestamp is valid by the
1207 // time we are in an OPENED state (by sequencing this before
1209 con
->send_keepalive();
1213 m
->monmap_epoch
= epoch
;
1215 ::encode(struct_v
, m
->auth_payload
);
1216 ::encode(auth_supported
.get_supported_set(), m
->auth_payload
);
1217 ::encode(entity_name
, m
->auth_payload
);
1218 ::encode(global_id
, m
->auth_payload
);
1219 con
->send_message(m
);
1222 int MonConnection::handle_auth(MAuthReply
* m
,
1223 const EntityName
& entity_name
,
1225 RotatingKeyRing
* keyring
)
1227 if (state
== State::NEGOTIATING
) {
1228 int r
= _negotiate(m
, entity_name
, want_keys
, keyring
);
1232 state
= State::AUTHENTICATING
;
1234 int r
= authenticate(m
);
1236 state
= State::HAVE_SESSION
;
1241 int MonConnection::_negotiate(MAuthReply
*m
,
1242 const EntityName
& entity_name
,
1244 RotatingKeyRing
* keyring
)
1246 if (auth
&& (int)m
->protocol
== auth
->get_protocol()) {
1247 // good, negotiation completed
1252 auth
.reset(get_auth_client_handler(cct
, m
->protocol
, keyring
));
1254 ldout(cct
, 10) << "no handler for protocol " << m
->protocol
<< dendl
;
1255 if (m
->result
== -ENOTSUP
) {
1256 ldout(cct
, 10) << "none of our auth protocols are supported by the server"
1262 // do not request MGR key unless the mon has the SERVER_KRAKEN
1263 // feature. otherwise it will give us an auth error. note that
1264 // we have to use the FEATUREMASK because pre-jewel the kraken
1265 // feature bit was used for something else.
1266 if ((want_keys
& CEPH_ENTITY_TYPE_MGR
) &&
1267 !(m
->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN
))) {
1268 ldout(cct
, 1) << __func__
1269 << " not requesting MGR keys from pre-kraken monitor"
1271 want_keys
&= ~CEPH_ENTITY_TYPE_MGR
;
1273 auth
->set_want_keys(want_keys
);
1274 auth
->init(entity_name
);
1275 auth
->set_global_id(global_id
);
1279 int MonConnection::authenticate(MAuthReply
*m
)
1282 if (!m
->global_id
) {
1283 ldout(cct
, 1) << "peer sent an invalid global_id" << dendl
;
1285 if (m
->global_id
!= global_id
) {
1286 // it's a new session
1288 global_id
= m
->global_id
;
1289 auth
->set_global_id(global_id
);
1290 ldout(cct
, 10) << "my global_id is " << m
->global_id
<< dendl
;
1292 auto p
= m
->result_bl
.begin();
1293 int ret
= auth
->handle_response(m
->result
, p
);
1294 if (ret
== -EAGAIN
) {
1295 auto ma
= new MAuth
;
1296 ma
->protocol
= auth
->get_protocol();
1297 auth
->prepare_build_request();
1298 auth
->build_request(ma
->auth_payload
);
1299 con
->send_message(ma
);