1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "messages/MMonGetMap.h"
18 #include "messages/MMonGetVersion.h"
19 #include "messages/MMonGetVersionReply.h"
20 #include "messages/MMonMap.h"
21 #include "messages/MAuth.h"
22 #include "messages/MLogAck.h"
23 #include "messages/MAuthReply.h"
24 #include "messages/MMonCommand.h"
25 #include "messages/MMonCommandAck.h"
26 #include "messages/MPing.h"
28 #include "messages/MMonSubscribe.h"
29 #include "messages/MMonSubscribeAck.h"
30 #include "common/errno.h"
31 #include "common/LogClient.h"
33 #include "MonClient.h"
36 #include "auth/Auth.h"
37 #include "auth/KeyRing.h"
38 #include "auth/AuthClientHandler.h"
39 #include "auth/AuthMethodList.h"
40 #include "auth/RotatingKeyRing.h"
42 #define dout_subsys ceph_subsys_monc
44 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
46 MonClient::MonClient(CephContext
*cct_
) :
49 monc_lock("MonClient::monc_lock"),
50 timer(cct_
, monc_lock
),
53 no_keyring_disabled_cephx(false),
55 more_log_pending(false),
57 had_a_connection(false),
58 reopen_interval_multiplier(1.0),
59 last_mon_command_tid(0),
64 MonClient::~MonClient()
68 int MonClient::build_initial_monmap()
70 ldout(cct
, 10) << __func__
<< dendl
;
71 return monmap
.build_initial(cct
, cerr
);
74 int MonClient::get_monmap()
76 ldout(cct
, 10) << __func__
<< dendl
;
77 Mutex::Locker
l(monc_lock
);
79 _sub_want("monmap", 0, 0);
84 map_cond
.Wait(monc_lock
);
86 ldout(cct
, 10) << __func__
<< " done" << dendl
;
90 int MonClient::get_monmap_privately()
92 ldout(cct
, 10) << __func__
<< dendl
;
93 Mutex::Locker
l(monc_lock
);
95 bool temp_msgr
= false;
96 Messenger
* smessenger
= NULL
;
98 messenger
= smessenger
= Messenger::create_client_messenger(cct
, "temp_mon_client");
99 if (NULL
== messenger
) {
102 messenger
->add_dispatcher_head(this);
109 ldout(cct
, 10) << "have " << monmap
.epoch
<< " fsid " << monmap
.fsid
<< dendl
;
111 std::random_device rd
;
112 std::mt19937
rng(rd());
113 assert(monmap
.size() > 0);
114 std::uniform_int_distribution
<unsigned> ranks(0, monmap
.size() - 1);
115 while (monmap
.fsid
.is_zero()) {
116 auto rank
= ranks(rng
);
117 auto& pending_con
= _add_conn(rank
, 0);
118 auto con
= pending_con
.get_con();
119 ldout(cct
, 10) << "querying mon." << monmap
.get_name(rank
) << " "
120 << con
->get_peer_addr() << dendl
;
121 con
->send_message(new MMonGetMap
);
127 interval
.set_from_double(cct
->_conf
->mon_client_hunt_interval
);
128 map_cond
.WaitInterval(monc_lock
, interval
);
130 if (monmap
.fsid
.is_zero() && con
) {
131 con
->mark_down(); // nope, clean that connection up
136 pending_cons
.clear();
138 messenger
->shutdown();
146 pending_cons
.clear();
148 if (!monmap
.fsid
.is_zero())
155 * Ping the monitor with id @p mon_id and set the resulting reply in
156 * the provided @p result_reply, if this last parameter is not NULL.
158 * So that we don't rely on the MonClient's default messenger, set up
159 * during connect(), we create our own messenger to comunicate with the
160 * specified monitor. This is advantageous in the following ways:
162 * - Isolate the ping procedure from the rest of the MonClient's operations,
163 * allowing us to not acquire or manage the big monc_lock, thus not
164 * having to block waiting for some other operation to finish before we
166 * * for instance, we can ping mon.FOO even if we are currently hunting
167 * or blocked waiting for auth to complete with mon.BAR.
169 * - Ping a monitor prior to establishing a connection (using connect())
170 * and properly establish the MonClient's messenger. This frees us
171 * from dealing with the complex foo that happens in connect().
173 * We also don't rely on MonClient as a dispatcher for this messenger,
174 * unlike what happens with the MonClient's default messenger. This allows
175 * us to sandbox the whole ping, having it much as a separate entity in
176 * the MonClient class, considerably simplifying the handling and dispatching
177 * of messages without needing to consider monc_lock.
179 * Current drawback is that we will establish a messenger for each ping
180 * we want to issue, instead of keeping a single messenger instance that
181 * would be used for all pings.
183 int MonClient::ping_monitor(const string
&mon_id
, string
*result_reply
)
185 ldout(cct
, 10) << __func__
<< dendl
;
188 if (monmap
.contains("noname-"+mon_id
)) {
189 new_mon_id
= "noname-"+mon_id
;
194 if (new_mon_id
.empty()) {
195 ldout(cct
, 10) << __func__
<< " specified mon id is empty!" << dendl
;
197 } else if (!monmap
.contains(new_mon_id
)) {
198 ldout(cct
, 10) << __func__
<< " no such monitor 'mon." << new_mon_id
<< "'"
203 MonClientPinger
*pinger
= new MonClientPinger(cct
, result_reply
);
205 Messenger
*smsgr
= Messenger::create_client_messenger(cct
, "temp_ping_client");
206 smsgr
->add_dispatcher_head(pinger
);
209 ConnectionRef con
= smsgr
->get_connection(monmap
.get_inst(new_mon_id
));
210 ldout(cct
, 10) << __func__
<< " ping mon." << new_mon_id
211 << " " << con
->get_peer_addr() << dendl
;
212 con
->send_message(new MPing
);
215 int ret
= pinger
->wait_for_reply(cct
->_conf
->client_mount_timeout
);
217 ldout(cct
,10) << __func__
<< " got ping reply" << dendl
;
221 pinger
->lock
.Unlock();
231 bool MonClient::ms_dispatch(Message
*m
)
233 if (my_addr
== entity_addr_t())
234 my_addr
= messenger
->get_myaddr();
236 // we only care about these message types
237 switch (m
->get_type()) {
238 case CEPH_MSG_MON_MAP
:
239 case CEPH_MSG_AUTH_REPLY
:
240 case CEPH_MSG_MON_SUBSCRIBE_ACK
:
241 case CEPH_MSG_MON_GET_VERSION_REPLY
:
242 case MSG_MON_COMMAND_ACK
:
249 Mutex::Locker
lock(monc_lock
);
252 auto pending_con
= pending_cons
.find(m
->get_source_addr());
253 if (pending_con
== pending_cons
.end() ||
254 pending_con
->second
.get_con() != m
->get_connection()) {
255 // ignore any messages outside hunting sessions
256 ldout(cct
, 10) << "discarding stray monitor message " << *m
<< dendl
;
260 } else if (!active_con
|| active_con
->get_con() != m
->get_connection()) {
261 // ignore any messages outside our session(s)
262 ldout(cct
, 10) << "discarding stray monitor message " << *m
<< dendl
;
267 switch (m
->get_type()) {
268 case CEPH_MSG_MON_MAP
:
269 handle_monmap(static_cast<MMonMap
*>(m
));
270 if (passthrough_monmap
) {
276 case CEPH_MSG_AUTH_REPLY
:
277 handle_auth(static_cast<MAuthReply
*>(m
));
279 case CEPH_MSG_MON_SUBSCRIBE_ACK
:
280 handle_subscribe_ack(static_cast<MMonSubscribeAck
*>(m
));
282 case CEPH_MSG_MON_GET_VERSION_REPLY
:
283 handle_get_version_reply(static_cast<MMonGetVersionReply
*>(m
));
285 case MSG_MON_COMMAND_ACK
:
286 handle_mon_command_ack(static_cast<MMonCommandAck
*>(m
));
290 log_client
->handle_log_ack(static_cast<MLogAck
*>(m
));
292 if (more_log_pending
) {
303 void MonClient::send_log(bool flush
)
306 Message
*lm
= log_client
->get_mon_log_message(flush
);
308 _send_mon_message(lm
);
309 more_log_pending
= log_client
->are_pending();
313 void MonClient::flush_log()
315 Mutex::Locker
l(monc_lock
);
319 /* Unlike all the other message-handling functions, we don't put away a reference
320 * because we want to support MMonMap passthrough to other Dispatchers. */
321 void MonClient::handle_monmap(MMonMap
*m
)
323 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
324 auto peer
= m
->get_source_addr();
325 string cur_mon
= monmap
.get_name(peer
);
327 bufferlist::iterator p
= m
->monmapbl
.begin();
330 ldout(cct
, 10) << " got monmap " << monmap
.epoch
331 << ", mon." << cur_mon
<< " is now rank " << monmap
.get_rank(cur_mon
)
333 ldout(cct
, 10) << "dump:\n";
334 monmap
.print(*_dout
);
337 _sub_got("monmap", monmap
.get_epoch());
339 if (!monmap
.get_addr_name(peer
, cur_mon
)) {
340 ldout(cct
, 10) << "mon." << cur_mon
<< " went away" << dendl
;
341 // can't find the mon we were talking to (above)
349 // ----------------------
351 int MonClient::init()
353 ldout(cct
, 10) << __func__
<< dendl
;
355 messenger
->add_dispatcher_head(this);
357 entity_name
= cct
->_conf
->name
;
359 Mutex::Locker
l(monc_lock
);
362 if (!cct
->_conf
->auth_supported
.empty())
363 method
= cct
->_conf
->auth_supported
;
364 else if (entity_name
.get_type() == CEPH_ENTITY_TYPE_OSD
||
365 entity_name
.get_type() == CEPH_ENTITY_TYPE_MDS
||
366 entity_name
.get_type() == CEPH_ENTITY_TYPE_MON
)
367 method
= cct
->_conf
->auth_cluster_required
;
369 method
= cct
->_conf
->auth_client_required
;
370 auth_supported
.reset(new AuthMethodList(cct
, method
));
371 ldout(cct
, 10) << "auth_supported " << auth_supported
->get_supported_set() << " method " << method
<< dendl
;
374 keyring
.reset(new KeyRing
); // initializing keyring anyway
376 if (auth_supported
->is_supported_auth(CEPH_AUTH_CEPHX
)) {
377 r
= keyring
->from_ceph_context(cct
);
379 auth_supported
->remove_supported_auth(CEPH_AUTH_CEPHX
);
380 if (!auth_supported
->get_supported_set().empty()) {
382 no_keyring_disabled_cephx
= true;
384 lderr(cct
) << "ERROR: missing keyring, cannot use cephx for authentication" << dendl
;
393 rotating_secrets
.reset(
394 new RotatingKeyRing(cct
, cct
->get_module_type(), keyring
.get()));
405 void MonClient::shutdown()
407 ldout(cct
, 10) << __func__
<< dendl
;
409 while (!version_requests
.empty()) {
410 version_requests
.begin()->second
->context
->complete(-ECANCELED
);
411 ldout(cct
, 20) << __func__
<< " canceling and discarding version request "
412 << version_requests
.begin()->second
<< dendl
;
413 delete version_requests
.begin()->second
;
414 version_requests
.erase(version_requests
.begin());
416 while (!mon_commands
.empty()) {
417 auto tid
= mon_commands
.begin()->first
;
418 _cancel_mon_command(tid
);
420 while (!waiting_for_session
.empty()) {
421 ldout(cct
, 20) << __func__
<< " discarding pending message " << *waiting_for_session
.front() << dendl
;
422 waiting_for_session
.front()->put();
423 waiting_for_session
.pop_front();
427 pending_cons
.clear();
433 finisher
.wait_for_empty();
442 int MonClient::authenticate(double timeout
)
444 Mutex::Locker
lock(monc_lock
);
447 ldout(cct
, 5) << "already authenticated" << dendl
;
451 _sub_want("monmap", monmap
.get_epoch() ? monmap
.get_epoch() + 1 : 0, 0);
455 utime_t until
= ceph_clock_now();
458 ldout(cct
, 10) << "authenticate will time out at " << until
<< dendl
;
459 while (!active_con
&& !authenticate_err
) {
461 int r
= auth_cond
.WaitUntil(monc_lock
, until
);
462 if (r
== ETIMEDOUT
) {
463 ldout(cct
, 0) << "authenticate timed out after " << timeout
<< dendl
;
464 authenticate_err
= -r
;
467 auth_cond
.Wait(monc_lock
);
472 ldout(cct
, 5) << __func__
<< " success, global_id "
473 << active_con
->get_global_id() << dendl
;
474 // active_con should not have been set if there was an error
475 assert(authenticate_err
== 0);
476 authenticated
= true;
479 if (authenticate_err
< 0 && no_keyring_disabled_cephx
) {
480 lderr(cct
) << __func__
<< " NOTE: no keyring found; disabled cephx authentication" << dendl
;
483 return authenticate_err
;
486 void MonClient::handle_auth(MAuthReply
*m
)
488 assert(monc_lock
.is_locked());
490 std::swap(active_con
->get_auth(), auth
);
491 int ret
= active_con
->authenticate(m
);
493 std::swap(auth
, active_con
->get_auth());
494 if (global_id
!= active_con
->get_global_id()) {
495 lderr(cct
) << __func__
<< " peer assigned me a different global_id: "
496 << active_con
->get_global_id() << dendl
;
498 if (ret
!= -EAGAIN
) {
505 auto found
= pending_cons
.find(m
->get_source_addr());
506 assert(found
!= pending_cons
.end());
507 int auth_err
= found
->second
.handle_auth(m
, entity_name
, want_keys
,
508 rotating_secrets
.get());
510 if (auth_err
== -EAGAIN
) {
514 pending_cons
.erase(found
);
515 if (!pending_cons
.empty()) {
516 // keep trying with pending connections
519 // the last try just failed, give up.
521 auto& mc
= found
->second
;
522 assert(mc
.have_session());
523 active_con
.reset(new MonConnection(std::move(mc
)));
524 pending_cons
.clear();
530 last_rotating_renew_sent
= utime_t();
531 while (!waiting_for_session
.empty()) {
532 _send_mon_message(waiting_for_session
.front());
533 waiting_for_session
.pop_front();
535 _resend_mon_commands();
538 std::swap(auth
, active_con
->get_auth());
539 global_id
= active_con
->get_global_id();
542 _finish_auth(auth_err
);
544 Context
*cb
= nullptr;
545 if (session_established_context
) {
546 cb
= session_established_context
.release();
556 void MonClient::_finish_auth(int auth_err
)
558 authenticate_err
= auth_err
;
559 // _resend_mon_commands() could _reopen_session() if the connected mon is not
560 // the one the MonCommand is targeting.
561 if (!auth_err
&& active_con
) {
563 _check_auth_tickets();
565 auth_cond
.SignalAll();
570 void MonClient::_send_mon_message(Message
*m
)
572 assert(monc_lock
.is_locked());
574 auto cur_con
= active_con
->get_con();
575 ldout(cct
, 10) << "_send_mon_message to mon."
576 << monmap
.get_name(cur_con
->get_peer_addr())
577 << " at " << cur_con
->get_peer_addr() << dendl
;
578 cur_con
->send_message(m
);
580 waiting_for_session
.push_back(m
);
584 void MonClient::_reopen_session(int rank
)
586 assert(monc_lock
.is_locked());
587 ldout(cct
, 10) << __func__
<< " rank " << rank
<< dendl
;
590 pending_cons
.clear();
595 _add_conn(rank
, global_id
);
597 _add_conns(global_id
);
600 // throw out old queued messages
601 while (!waiting_for_session
.empty()) {
602 waiting_for_session
.front()->put();
603 waiting_for_session
.pop_front();
606 // throw out version check requests
607 while (!version_requests
.empty()) {
608 finisher
.queue(version_requests
.begin()->second
->context
, -EAGAIN
);
609 delete version_requests
.begin()->second
;
610 version_requests
.erase(version_requests
.begin());
613 for (auto& c
: pending_cons
) {
614 c
.second
.start(monmap
.get_epoch(), entity_name
, *auth_supported
);
617 for (map
<string
,ceph_mon_subscribe_item
>::iterator p
= sub_sent
.begin();
620 if (sub_new
.count(p
->first
) == 0)
621 sub_new
[p
->first
] = p
->second
;
623 if (!sub_new
.empty())
627 MonConnection
& MonClient::_add_conn(unsigned rank
, uint64_t global_id
)
629 auto peer
= monmap
.get_addr(rank
);
630 auto conn
= messenger
->get_connection(monmap
.get_inst(rank
));
631 MonConnection
mc(cct
, conn
, global_id
);
632 auto inserted
= pending_cons
.insert(make_pair(peer
, move(mc
)));
633 ldout(cct
, 10) << "picked mon." << monmap
.get_name(rank
)
635 << " addr " << conn
->get_peer_addr()
637 return inserted
.first
->second
;
640 void MonClient::_add_conns(uint64_t global_id
)
642 uint16_t min_priority
= std::numeric_limits
<uint16_t>::max();
643 for (const auto& m
: monmap
.mon_info
) {
644 if (m
.second
.priority
< min_priority
) {
645 min_priority
= m
.second
.priority
;
648 vector
<unsigned> ranks
;
649 for (const auto& m
: monmap
.mon_info
) {
650 if (m
.second
.priority
== min_priority
) {
651 ranks
.push_back(monmap
.get_rank(m
.first
));
654 std::random_device rd
;
655 std::mt19937
rng(rd());
656 std::shuffle(ranks
.begin(), ranks
.end(), rng
);
657 unsigned n
= cct
->_conf
->mon_client_hunt_parallel
;
658 if (n
== 0 || n
> ranks
.size()) {
661 for (unsigned i
= 0; i
< n
; i
++) {
662 _add_conn(ranks
[i
], global_id
);
666 bool MonClient::ms_handle_reset(Connection
*con
)
668 Mutex::Locker
lock(monc_lock
);
670 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
)
674 if (pending_cons
.count(con
->get_peer_addr())) {
675 ldout(cct
, 10) << __func__
<< " hunted mon " << con
->get_peer_addr() << dendl
;
677 ldout(cct
, 10) << __func__
<< " stray mon " << con
->get_peer_addr() << dendl
;
681 if (active_con
&& con
== active_con
->get_con()) {
682 ldout(cct
, 10) << __func__
<< " current mon " << con
->get_peer_addr() << dendl
;
686 ldout(cct
, 10) << "ms_handle_reset stray mon " << con
->get_peer_addr() << dendl
;
692 bool MonClient::_opened() const
694 assert(monc_lock
.is_locked());
695 return active_con
|| _hunting();
698 bool MonClient::_hunting() const
700 return !pending_cons
.empty();
703 void MonClient::_start_hunting()
706 // adjust timeouts if necessary
707 if (!had_a_connection
)
709 reopen_interval_multiplier
*= cct
->_conf
->mon_client_hunt_interval_backoff
;
710 if (reopen_interval_multiplier
>
711 cct
->_conf
->mon_client_hunt_interval_max_multiple
) {
712 reopen_interval_multiplier
=
713 cct
->_conf
->mon_client_hunt_interval_max_multiple
;
717 void MonClient::_finish_hunting()
719 assert(monc_lock
.is_locked());
720 // the pending conns have been cleaned.
723 auto con
= active_con
->get_con();
724 ldout(cct
, 1) << "found mon."
725 << monmap
.get_name(con
->get_peer_addr())
728 ldout(cct
, 1) << "no mon sessions established" << dendl
;
731 had_a_connection
= true;
732 reopen_interval_multiplier
/= 2.0;
733 if (reopen_interval_multiplier
< 1.0)
734 reopen_interval_multiplier
= 1.0;
737 void MonClient::tick()
739 ldout(cct
, 10) << __func__
<< dendl
;
741 _check_auth_tickets();
744 ldout(cct
, 1) << "continuing hunt" << dendl
;
746 } else if (active_con
) {
747 // just renew as needed
748 utime_t now
= ceph_clock_now();
749 auto cur_con
= active_con
->get_con();
750 if (!cur_con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
)) {
751 ldout(cct
, 10) << "renew subs? (now: " << now
752 << "; renew after: " << sub_renew_after
<< ") -- "
753 << (now
> sub_renew_after
? "yes" : "no")
755 if (now
> sub_renew_after
)
759 cur_con
->send_keepalive();
761 if (cct
->_conf
->mon_client_ping_timeout
> 0 &&
762 cur_con
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
763 utime_t lk
= cur_con
->get_last_keepalive_ack();
764 utime_t interval
= now
- lk
;
765 if (interval
> cct
->_conf
->mon_client_ping_timeout
) {
766 ldout(cct
, 1) << "no keepalive since " << lk
<< " (" << interval
767 << " seconds), reconnecting" << dendl
;
778 void MonClient::schedule_tick()
780 struct C_Tick
: public Context
{
782 explicit C_Tick(MonClient
*m
) : monc(m
) {}
783 void finish(int r
) override
{
789 timer
.add_event_after(cct
->_conf
->mon_client_hunt_interval
790 * reopen_interval_multiplier
,
793 timer
.add_event_after(cct
->_conf
->mon_client_ping_interval
, new C_Tick(this));
798 void MonClient::_renew_subs()
800 assert(monc_lock
.is_locked());
801 if (sub_new
.empty()) {
802 ldout(cct
, 10) << __func__
<< " - empty" << dendl
;
806 ldout(cct
, 10) << __func__
<< dendl
;
810 if (sub_renew_sent
== utime_t())
811 sub_renew_sent
= ceph_clock_now();
813 MMonSubscribe
*m
= new MMonSubscribe
;
815 _send_mon_message(m
);
817 // update sub_sent with sub_new
818 sub_new
.insert(sub_sent
.begin(), sub_sent
.end());
819 std::swap(sub_new
, sub_sent
);
824 void MonClient::handle_subscribe_ack(MMonSubscribeAck
*m
)
826 if (sub_renew_sent
!= utime_t()) {
827 // NOTE: this is only needed for legacy (infernalis or older)
829 sub_renew_after
= sub_renew_sent
;
830 sub_renew_after
+= m
->interval
/ 2.0;
831 ldout(cct
, 10) << __func__
<< " sent " << sub_renew_sent
<< " renew after " << sub_renew_after
<< dendl
;
832 sub_renew_sent
= utime_t();
834 ldout(cct
, 10) << __func__
<< " sent " << sub_renew_sent
<< ", ignoring" << dendl
;
840 int MonClient::_check_auth_tickets()
842 assert(monc_lock
.is_locked());
843 if (active_con
&& auth
) {
844 if (auth
->need_tickets()) {
845 ldout(cct
, 10) << __func__
<< " getting new tickets!" << dendl
;
846 MAuth
*m
= new MAuth
;
847 m
->protocol
= auth
->get_protocol();
848 auth
->prepare_build_request();
849 auth
->build_request(m
->auth_payload
);
850 _send_mon_message(m
);
853 _check_auth_rotating();
858 int MonClient::_check_auth_rotating()
860 assert(monc_lock
.is_locked());
861 if (!rotating_secrets
||
862 !auth_principal_needs_rotating_keys(entity_name
)) {
863 ldout(cct
, 20) << "_check_auth_rotating not needed by " << entity_name
<< dendl
;
867 if (!active_con
|| !auth
) {
868 ldout(cct
, 10) << "_check_auth_rotating waiting for auth session" << dendl
;
872 utime_t now
= ceph_clock_now();
873 utime_t cutoff
= now
;
874 cutoff
-= MIN(30.0, cct
->_conf
->auth_service_ticket_ttl
/ 4.0);
875 utime_t issued_at_lower_bound
= now
;
876 issued_at_lower_bound
-= cct
->_conf
->auth_service_ticket_ttl
;
877 if (!rotating_secrets
->need_new_secrets(cutoff
)) {
878 ldout(cct
, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff
<< ")" << dendl
;
879 rotating_secrets
->dump_rotating();
883 ldout(cct
, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff
<< ")" << dendl
;
884 if (!rotating_secrets
->need_new_secrets() &&
885 rotating_secrets
->need_new_secrets(issued_at_lower_bound
)) {
886 // the key has expired before it has been issued?
887 lderr(cct
) << __func__
<< " possible clock skew, rotating keys expired way too early"
888 << " (before " << issued_at_lower_bound
<< ")" << dendl
;
890 if ((now
> last_rotating_renew_sent
) &&
891 double(now
- last_rotating_renew_sent
) < 1) {
892 ldout(cct
, 10) << __func__
<< " called too often (last: "
893 << last_rotating_renew_sent
<< "), skipping refresh" << dendl
;
896 MAuth
*m
= new MAuth
;
897 m
->protocol
= auth
->get_protocol();
898 if (auth
->build_rotating_request(m
->auth_payload
)) {
899 last_rotating_renew_sent
= now
;
900 _send_mon_message(m
);
907 int MonClient::wait_auth_rotating(double timeout
)
909 Mutex::Locker
l(monc_lock
);
910 utime_t now
= ceph_clock_now();
914 // Must be initialized
915 assert(auth
!= nullptr);
917 if (auth
->get_protocol() == CEPH_AUTH_NONE
)
920 if (!rotating_secrets
)
923 while (auth_principal_needs_rotating_keys(entity_name
) &&
924 rotating_secrets
->need_new_secrets(now
)) {
926 ldout(cct
, 0) << __func__
<< " timed out after " << timeout
<< dendl
;
929 ldout(cct
, 10) << __func__
<< " waiting (until " << until
<< ")" << dendl
;
930 auth_cond
.WaitUntil(monc_lock
, until
);
931 now
= ceph_clock_now();
933 ldout(cct
, 10) << __func__
<< " done" << dendl
;
939 void MonClient::_send_command(MonCommand
*r
)
943 peer
= active_con
->get_con()->get_peer_addr();
946 if (r
->target_rank
>= 0 &&
947 r
->target_rank
!= monmap
.get_rank(peer
)) {
948 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
949 << " wants rank " << r
->target_rank
950 << ", reopening session"
952 if (r
->target_rank
>= (int)monmap
.size()) {
953 ldout(cct
, 10) << " target " << r
->target_rank
<< " >= max mon " << monmap
.size() << dendl
;
954 _finish_command(r
, -ENOENT
, "mon rank dne");
957 _reopen_session(r
->target_rank
);
961 if (r
->target_name
.length() &&
962 r
->target_name
!= monmap
.get_name(peer
)) {
963 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
964 << " wants mon " << r
->target_name
965 << ", reopening session"
967 if (!monmap
.contains(r
->target_name
)) {
968 ldout(cct
, 10) << " target " << r
->target_name
<< " not present in monmap" << dendl
;
969 _finish_command(r
, -ENOENT
, "mon dne");
972 _reopen_session(monmap
.get_rank(r
->target_name
));
976 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
977 MMonCommand
*m
= new MMonCommand(monmap
.fsid
);
980 m
->set_data(r
->inbl
);
981 _send_mon_message(m
);
985 void MonClient::_resend_mon_commands()
987 // resend any requests
988 for (map
<uint64_t,MonCommand
*>::iterator p
= mon_commands
.begin();
989 p
!= mon_commands
.end();
991 _send_command(p
->second
);
995 void MonClient::handle_mon_command_ack(MMonCommandAck
*ack
)
997 MonCommand
*r
= NULL
;
998 uint64_t tid
= ack
->get_tid();
1000 if (tid
== 0 && !mon_commands
.empty()) {
1001 r
= mon_commands
.begin()->second
;
1002 ldout(cct
, 10) << __func__
<< " has tid 0, assuming it is " << r
->tid
<< dendl
;
1004 map
<uint64_t,MonCommand
*>::iterator p
= mon_commands
.find(tid
);
1005 if (p
== mon_commands
.end()) {
1006 ldout(cct
, 10) << __func__
<< " " << ack
->get_tid() << " not found" << dendl
;
1013 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
1015 r
->poutbl
->claim(ack
->get_data());
1016 _finish_command(r
, ack
->r
, ack
->rs
);
1020 int MonClient::_cancel_mon_command(uint64_t tid
)
1022 assert(monc_lock
.is_locked());
1024 map
<ceph_tid_t
, MonCommand
*>::iterator it
= mon_commands
.find(tid
);
1025 if (it
== mon_commands
.end()) {
1026 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
1030 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
1032 MonCommand
*cmd
= it
->second
;
1033 _finish_command(cmd
, -ETIMEDOUT
, "");
1037 void MonClient::_finish_command(MonCommand
*r
, int ret
, string rs
)
1039 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " = " << ret
<< " " << rs
<< dendl
;
1045 finisher
.queue(r
->onfinish
, ret
);
1046 mon_commands
.erase(r
->tid
);
1050 void MonClient::start_mon_command(const vector
<string
>& cmd
,
1051 const bufferlist
& inbl
,
1052 bufferlist
*outbl
, string
*outs
,
1055 Mutex::Locker
l(monc_lock
);
1056 MonCommand
*r
= new MonCommand(++last_mon_command_tid
);
1061 r
->onfinish
= onfinish
;
1062 if (cct
->_conf
->rados_mon_op_timeout
> 0) {
1063 class C_CancelMonCommand
: public Context
1068 C_CancelMonCommand(uint64_t tid
, MonClient
*monc
) : tid(tid
), monc(monc
) {}
1069 void finish(int r
) override
{
1070 monc
->_cancel_mon_command(tid
);
1073 r
->ontimeout
= new C_CancelMonCommand(r
->tid
, this);
1074 timer
.add_event_after(cct
->_conf
->rados_mon_op_timeout
, r
->ontimeout
);
1076 mon_commands
[r
->tid
] = r
;
1080 void MonClient::start_mon_command(const string
&mon_name
,
1081 const vector
<string
>& cmd
,
1082 const bufferlist
& inbl
,
1083 bufferlist
*outbl
, string
*outs
,
1086 Mutex::Locker
l(monc_lock
);
1087 MonCommand
*r
= new MonCommand(++last_mon_command_tid
);
1088 r
->target_name
= mon_name
;
1093 r
->onfinish
= onfinish
;
1094 mon_commands
[r
->tid
] = r
;
1098 void MonClient::start_mon_command(int rank
,
1099 const vector
<string
>& cmd
,
1100 const bufferlist
& inbl
,
1101 bufferlist
*outbl
, string
*outs
,
1104 Mutex::Locker
l(monc_lock
);
1105 MonCommand
*r
= new MonCommand(++last_mon_command_tid
);
1106 r
->target_rank
= rank
;
1111 r
->onfinish
= onfinish
;
1112 mon_commands
[r
->tid
] = r
;
1118 void MonClient::get_version(string map
, version_t
*newest
, version_t
*oldest
, Context
*onfinish
)
1120 version_req_d
*req
= new version_req_d(onfinish
, newest
, oldest
);
1121 ldout(cct
, 10) << "get_version " << map
<< " req " << req
<< dendl
;
1122 Mutex::Locker
l(monc_lock
);
1123 MMonGetVersion
*m
= new MMonGetVersion();
1125 m
->handle
= ++version_req_id
;
1126 version_requests
[m
->handle
] = req
;
1127 _send_mon_message(m
);
1130 void MonClient::handle_get_version_reply(MMonGetVersionReply
* m
)
1132 assert(monc_lock
.is_locked());
1133 map
<ceph_tid_t
, version_req_d
*>::iterator iter
= version_requests
.find(m
->handle
);
1134 if (iter
== version_requests
.end()) {
1135 ldout(cct
, 0) << __func__
<< " version request with handle " << m
->handle
1136 << " not found" << dendl
;
1138 version_req_d
*req
= iter
->second
;
1139 ldout(cct
, 10) << __func__
<< " finishing " << req
<< " version " << m
->version
<< dendl
;
1140 version_requests
.erase(iter
);
1142 *req
->newest
= m
->version
;
1144 *req
->oldest
= m
->oldest_version
;
1145 finisher
.queue(req
->context
, 0);
1151 AuthAuthorizer
* MonClient::build_authorizer(int service_id
) const {
1152 Mutex::Locker
l(monc_lock
);
1154 return auth
->build_authorizer(service_id
);
1156 ldout(cct
, 0) << __func__
<< " for " << ceph_entity_type_name(service_id
)
1157 << ", but no auth is available now" << dendl
;
1162 #define dout_subsys ceph_subsys_monc
1164 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1166 MonConnection::MonConnection(CephContext
*cct
, ConnectionRef con
, uint64_t global_id
)
1167 : cct(cct
), con(con
), global_id(global_id
)
1170 MonConnection::~MonConnection()
1178 bool MonConnection::have_session() const
1180 return state
== State::HAVE_SESSION
;
1183 void MonConnection::start(epoch_t epoch
,
1184 const EntityName
& entity_name
,
1185 const AuthMethodList
& auth_supported
)
1187 // restart authentication handshake
1188 state
= State::NEGOTIATING
;
1190 // send an initial keepalive to ensure our timestamp is valid by the
1191 // time we are in an OPENED state (by sequencing this before
1193 con
->send_keepalive();
1197 m
->monmap_epoch
= epoch
;
1199 ::encode(struct_v
, m
->auth_payload
);
1200 ::encode(auth_supported
.get_supported_set(), m
->auth_payload
);
1201 ::encode(entity_name
, m
->auth_payload
);
1202 ::encode(global_id
, m
->auth_payload
);
1203 con
->send_message(m
);
1206 int MonConnection::handle_auth(MAuthReply
* m
,
1207 const EntityName
& entity_name
,
1209 RotatingKeyRing
* keyring
)
1211 if (state
== State::NEGOTIATING
) {
1212 int r
= _negotiate(m
, entity_name
, want_keys
, keyring
);
1216 state
= State::AUTHENTICATING
;
1218 int r
= authenticate(m
);
1220 state
= State::HAVE_SESSION
;
1225 int MonConnection::_negotiate(MAuthReply
*m
,
1226 const EntityName
& entity_name
,
1228 RotatingKeyRing
* keyring
)
1230 if (auth
&& (int)m
->protocol
== auth
->get_protocol()) {
1231 // good, negotiation completed
1236 auth
.reset(get_auth_client_handler(cct
, m
->protocol
, keyring
));
1238 ldout(cct
, 10) << "no handler for protocol " << m
->protocol
<< dendl
;
1239 if (m
->result
== -ENOTSUP
) {
1240 ldout(cct
, 10) << "none of our auth protocols are supported by the server"
1246 // do not request MGR key unless the mon has the SERVER_KRAKEN
1247 // feature. otherwise it will give us an auth error. note that
1248 // we have to use the FEATUREMASK because pre-jewel the kraken
1249 // feature bit was used for something else.
1250 if ((want_keys
& CEPH_ENTITY_TYPE_MGR
) &&
1251 !(m
->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN
))) {
1252 ldout(cct
, 1) << __func__
1253 << " not requesting MGR keys from pre-kraken monitor"
1255 want_keys
&= ~CEPH_ENTITY_TYPE_MGR
;
1257 auth
->set_want_keys(want_keys
);
1258 auth
->init(entity_name
);
1259 auth
->set_global_id(global_id
);
1263 int MonConnection::authenticate(MAuthReply
*m
)
1266 if (!m
->global_id
) {
1267 ldout(cct
, 1) << "peer sent an invalid global_id" << dendl
;
1269 if (m
->global_id
!= global_id
) {
1270 // it's a new session
1272 global_id
= m
->global_id
;
1273 auth
->set_global_id(global_id
);
1274 ldout(cct
, 10) << "my global_id is " << m
->global_id
<< dendl
;
1276 auto p
= m
->result_bl
.begin();
1277 int ret
= auth
->handle_response(m
->result
, p
);
1278 if (ret
== -EAGAIN
) {
1279 auto ma
= new MAuth
;
1280 ma
->protocol
= auth
->get_protocol();
1281 auth
->prepare_build_request();
1282 auth
->build_request(ma
->auth_payload
);
1283 con
->send_message(ma
);