1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include <boost/range/adaptor/map.hpp>
19 #include <boost/range/adaptor/filtered.hpp>
20 #include <boost/range/algorithm/copy.hpp>
21 #include <boost/range/algorithm_ext/copy_n.hpp>
22 #include "common/weighted_shuffle.h"
24 #include "include/scope_guard.h"
25 #include "include/stringify.h"
27 #include "messages/MMonGetMap.h"
28 #include "messages/MMonGetVersion.h"
29 #include "messages/MMonGetVersionReply.h"
30 #include "messages/MMonMap.h"
31 #include "messages/MConfig.h"
32 #include "messages/MGetConfig.h"
33 #include "messages/MAuth.h"
34 #include "messages/MLogAck.h"
35 #include "messages/MAuthReply.h"
36 #include "messages/MMonCommand.h"
37 #include "messages/MMonCommandAck.h"
38 #include "messages/MCommand.h"
39 #include "messages/MCommandReply.h"
40 #include "messages/MPing.h"
42 #include "messages/MMonSubscribe.h"
43 #include "messages/MMonSubscribeAck.h"
44 #include "common/errno.h"
45 #include "common/hostname.h"
46 #include "common/LogClient.h"
48 #include "MonClient.h"
51 #include "auth/Auth.h"
52 #include "auth/KeyRing.h"
53 #include "auth/AuthClientHandler.h"
54 #include "auth/AuthRegistry.h"
55 #include "auth/RotatingKeyRing.h"
57 #define dout_subsys ceph_subsys_monc
59 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
63 MonClient::MonClient(CephContext
*cct_
) :
67 timer(cct_
, monc_lock
),
71 more_log_pending(false),
73 had_a_connection(false),
74 reopen_interval_multiplier(
75 cct_
->_conf
.get_val
<double>("mon_client_hunt_interval_min_multiple")),
76 last_mon_command_tid(0),
80 MonClient::~MonClient()
84 int MonClient::build_initial_monmap()
86 ldout(cct
, 10) << __func__
<< dendl
;
87 int r
= monmap
.build_initial(cct
, false, std::cerr
);
88 ldout(cct
,10) << "monmap:\n";
94 int MonClient::get_monmap()
96 ldout(cct
, 10) << __func__
<< dendl
;
97 std::unique_lock
l(monc_lock
);
99 sub
.want("monmap", 0, 0);
102 map_cond
.wait(l
, [this] { return !want_monmap
; });
103 ldout(cct
, 10) << __func__
<< " done" << dendl
;
107 int MonClient::get_monmap_and_config()
109 ldout(cct
, 10) << __func__
<< dendl
;
110 ceph_assert(!messenger
);
115 auto shutdown_crypto
= make_scope_guard([this] {
116 cct
->shutdown_crypto();
119 int r
= build_initial_monmap();
121 lderr(cct
) << __func__
<< " cannot identify monitors to contact" << dendl
;
125 messenger
= Messenger::create_client_messenger(
126 cct
, "temp_mon_client");
127 ceph_assert(messenger
);
128 messenger
->add_dispatcher_head(this);
130 auto shutdown_msgr
= make_scope_guard([this] {
131 messenger
->shutdown();
135 if (!monmap
.fsid
.is_zero()) {
136 cct
->_conf
.set_val("fsid", stringify(monmap
.fsid
));
140 while (tries
-- > 0) {
145 r
= authenticate(cct
->_conf
->client_mount_timeout
);
146 if (r
== -ETIMEDOUT
) {
154 std::unique_lock
l(monc_lock
);
155 if (monmap
.get_epoch() &&
156 !monmap
.persistent_features
.contains_all(
157 ceph::features::mon::FEATURE_MIMIC
)) {
158 ldout(cct
,10) << __func__
<< " pre-mimic monitor, no config to fetch"
163 while ((!got_config
|| monmap
.get_epoch() == 0) && r
== 0) {
164 ldout(cct
,20) << __func__
<< " waiting for monmap|config" << dendl
;
165 auto status
= map_cond
.wait_for(l
, ceph::make_timespan(
166 cct
->_conf
->mon_client_hunt_interval
));
167 if (status
== std::cv_status::timeout
) {
172 ldout(cct
,10) << __func__
<< " success" << dendl
;
177 lderr(cct
) << __func__
<< " failed to get config" << dendl
;
188 * Ping the monitor with id @p mon_id and set the resulting reply in
189 * the provided @p result_reply, if this last parameter is not NULL.
191 * So that we don't rely on the MonClient's default messenger, set up
192 * during connect(), we create our own messenger to comunicate with the
193 * specified monitor. This is advantageous in the following ways:
195 * - Isolate the ping procedure from the rest of the MonClient's operations,
196 * allowing us to not acquire or manage the big monc_lock, thus not
197 * having to block waiting for some other operation to finish before we
199 * * for instance, we can ping mon.FOO even if we are currently hunting
200 * or blocked waiting for auth to complete with mon.BAR.
202 * - Ping a monitor prior to establishing a connection (using connect())
203 * and properly establish the MonClient's messenger. This frees us
204 * from dealing with the complex foo that happens in connect().
206 * We also don't rely on MonClient as a dispatcher for this messenger,
207 * unlike what happens with the MonClient's default messenger. This allows
208 * us to sandbox the whole ping, having it much as a separate entity in
209 * the MonClient class, considerably simplifying the handling and dispatching
210 * of messages without needing to consider monc_lock.
212 * Current drawback is that we will establish a messenger for each ping
213 * we want to issue, instead of keeping a single messenger instance that
214 * would be used for all pings.
216 int MonClient::ping_monitor(const string
&mon_id
, string
*result_reply
)
218 ldout(cct
, 10) << __func__
<< dendl
;
221 if (monmap
.contains("noname-"+mon_id
)) {
222 new_mon_id
= "noname-"+mon_id
;
227 if (new_mon_id
.empty()) {
228 ldout(cct
, 10) << __func__
<< " specified mon id is empty!" << dendl
;
230 } else if (!monmap
.contains(new_mon_id
)) {
231 ldout(cct
, 10) << __func__
<< " no such monitor 'mon." << new_mon_id
<< "'"
236 // N.B. monc isn't initialized
238 auth_registry
.refresh_config();
241 keyring
.from_ceph_context(cct
);
242 RotatingKeyRing
rkeyring(cct
, cct
->get_module_type(), &keyring
);
244 MonClientPinger
*pinger
= new MonClientPinger(cct
,
248 Messenger
*smsgr
= Messenger::create_client_messenger(cct
, "temp_ping_client");
249 smsgr
->add_dispatcher_head(pinger
);
250 smsgr
->set_auth_client(pinger
);
253 ConnectionRef con
= smsgr
->connect_to_mon(monmap
.get_addrs(new_mon_id
));
254 ldout(cct
, 10) << __func__
<< " ping mon." << new_mon_id
255 << " " << con
->get_peer_addr() << dendl
;
257 pinger
->mc
.reset(new MonConnection(cct
, con
, 0, &auth_registry
));
258 pinger
->mc
->start(monmap
.get_epoch(), entity_name
);
259 con
->send_message(new MPing
);
261 int ret
= pinger
->wait_for_reply(cct
->_conf
->mon_client_ping_timeout
);
263 ldout(cct
,10) << __func__
<< " got ping reply" << dendl
;
277 bool MonClient::ms_dispatch(Message
*m
)
279 // we only care about these message types
280 switch (m
->get_type()) {
281 case CEPH_MSG_MON_MAP
:
282 case CEPH_MSG_AUTH_REPLY
:
283 case CEPH_MSG_MON_SUBSCRIBE_ACK
:
284 case CEPH_MSG_MON_GET_VERSION_REPLY
:
285 case MSG_MON_COMMAND_ACK
:
286 case MSG_COMMAND_REPLY
:
297 std::lock_guard
lock(monc_lock
);
299 if (!m
->get_connection()->is_anon() &&
300 m
->get_source().type() == CEPH_ENTITY_TYPE_MON
) {
302 auto p
= _find_pending_con(m
->get_connection());
303 if (p
== pending_cons
.end()) {
304 // ignore any messages outside hunting sessions
305 ldout(cct
, 10) << "discarding stray monitor message " << *m
<< dendl
;
309 } else if (!active_con
|| active_con
->get_con() != m
->get_connection()) {
310 // ignore any messages outside our session(s)
311 ldout(cct
, 10) << "discarding stray monitor message " << *m
<< dendl
;
317 switch (m
->get_type()) {
318 case CEPH_MSG_MON_MAP
:
319 handle_monmap(static_cast<MMonMap
*>(m
));
320 if (passthrough_monmap
) {
326 case CEPH_MSG_AUTH_REPLY
:
327 handle_auth(static_cast<MAuthReply
*>(m
));
329 case CEPH_MSG_MON_SUBSCRIBE_ACK
:
330 handle_subscribe_ack(static_cast<MMonSubscribeAck
*>(m
));
332 case CEPH_MSG_MON_GET_VERSION_REPLY
:
333 handle_get_version_reply(static_cast<MMonGetVersionReply
*>(m
));
335 case MSG_MON_COMMAND_ACK
:
336 handle_mon_command_ack(static_cast<MMonCommandAck
*>(m
));
338 case MSG_COMMAND_REPLY
:
339 if (m
->get_connection()->is_anon() &&
340 m
->get_source().type() == CEPH_ENTITY_TYPE_MON
) {
341 // this connection is from 'tell'... ignore everything except our command
342 // reply. (we'll get misc other message because we authenticated, but we
344 handle_command_reply(static_cast<MCommandReply
*>(m
));
347 // leave the message for another dispatch handler (e.g., Objecter)
351 log_client
->handle_log_ack(static_cast<MLogAck
*>(m
));
353 if (more_log_pending
) {
361 handle_config(static_cast<MConfig
*>(m
));
367 void MonClient::send_log(bool flush
)
370 auto lm
= log_client
->get_mon_log_message(flush
);
372 _send_mon_message(std::move(lm
));
373 more_log_pending
= log_client
->are_pending();
377 void MonClient::flush_log()
379 std::lock_guard
l(monc_lock
);
383 /* Unlike all the other message-handling functions, we don't put away a reference
384 * because we want to support MMonMap passthrough to other Dispatchers. */
385 void MonClient::handle_monmap(MMonMap
*m
)
387 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
388 auto con_addrs
= m
->get_source_addrs();
389 string old_name
= monmap
.get_name(con_addrs
);
390 const auto old_epoch
= monmap
.get_epoch();
392 auto p
= m
->monmapbl
.cbegin();
395 ldout(cct
, 10) << " got monmap " << monmap
.epoch
396 << " from mon." << old_name
397 << " (according to old e" << monmap
.get_epoch() << ")"
399 ldout(cct
, 10) << "dump:\n";
400 monmap
.print(*_dout
);
403 if (old_epoch
!= monmap
.get_epoch()) {
406 if (old_name
.size() == 0) {
407 ldout(cct
,10) << " can't identify which mon we were connected to" << dendl
;
410 auto new_name
= monmap
.get_name(con_addrs
);
411 if (new_name
.empty()) {
412 ldout(cct
, 10) << "mon." << old_name
<< " at " << con_addrs
413 << " went away" << dendl
;
414 // can't find the mon we were talking to (above)
416 } else if (messenger
->should_use_msgr2() &&
417 monmap
.get_addrs(new_name
).has_msgr2() &&
418 !con_addrs
.has_msgr2()) {
419 ldout(cct
,1) << " mon." << new_name
<< " has (v2) addrs "
420 << monmap
.get_addrs(new_name
) << " but i'm connected to "
421 << con_addrs
<< ", reconnecting" << dendl
;
426 cct
->set_mon_addrs(monmap
);
428 sub
.got("monmap", monmap
.get_epoch());
429 map_cond
.notify_all();
432 if (authenticate_err
== 1) {
437 void MonClient::handle_config(MConfig
*m
)
439 ldout(cct
,10) << __func__
<< " " << *m
<< dendl
;
440 finisher
.queue(new LambdaContext([this, m
](int r
) {
441 cct
->_conf
.set_mon_vals(cct
, m
->config
, config_cb
);
442 if (config_notify_cb
) {
448 map_cond
.notify_all();
451 // ----------------------
453 int MonClient::init()
455 ldout(cct
, 10) << __func__
<< dendl
;
457 entity_name
= cct
->_conf
->name
;
459 auth_registry
.refresh_config();
461 std::lock_guard
l(monc_lock
);
462 keyring
.reset(new KeyRing
);
463 if (auth_registry
.is_supported_method(messenger
->get_mytype(),
465 // this should succeed, because auth_registry just checked!
466 int r
= keyring
->from_ceph_context(cct
);
468 // but be somewhat graceful in case there was a race condition
469 lderr(cct
) << "keyring not found" << dendl
;
473 if (!auth_registry
.any_supported_methods(messenger
->get_mytype())) {
477 rotating_secrets
.reset(
478 new RotatingKeyRing(cct
, cct
->get_module_type(), keyring
.get()));
482 messenger
->set_auth_client(this);
483 messenger
->add_dispatcher_head(this);
492 void MonClient::shutdown()
494 ldout(cct
, 10) << __func__
<< dendl
;
497 while (!version_requests
.empty()) {
498 version_requests
.begin()->second
->context
->complete(-ECANCELED
);
499 ldout(cct
, 20) << __func__
<< " canceling and discarding version request "
500 << version_requests
.begin()->second
<< dendl
;
501 delete version_requests
.begin()->second
;
502 version_requests
.erase(version_requests
.begin());
504 while (!mon_commands
.empty()) {
505 auto tid
= mon_commands
.begin()->first
;
506 _cancel_mon_command(tid
);
508 ldout(cct
, 20) << __func__
<< " discarding " << waiting_for_session
.size()
509 << " pending message(s)" << dendl
;
510 waiting_for_session
.clear();
513 pending_cons
.clear();
517 authenticate_err
= 0;
518 authenticated
= false;
523 finisher
.wait_for_empty();
533 int MonClient::authenticate(double timeout
)
535 std::unique_lock lock
{monc_lock
};
538 ldout(cct
, 5) << "already authenticated" << dendl
;
541 sub
.want("monmap", monmap
.get_epoch() ? monmap
.get_epoch() + 1 : 0, 0);
542 sub
.want("config", 0, 0);
546 auto until
= ceph::real_clock::now();
547 until
+= ceph::make_timespan(timeout
);
549 ldout(cct
, 10) << "authenticate will time out at " << until
<< dendl
;
550 authenticate_err
= 1; // == in progress
551 while (!active_con
&& authenticate_err
>= 0) {
553 auto r
= auth_cond
.wait_until(lock
, until
);
554 if (r
== cv_status::timeout
&& !active_con
) {
555 ldout(cct
, 0) << "authenticate timed out after " << timeout
<< dendl
;
556 authenticate_err
= -ETIMEDOUT
;
559 auth_cond
.wait(lock
);
564 ldout(cct
, 5) << __func__
<< " success, global_id "
565 << active_con
->get_global_id() << dendl
;
566 // active_con should not have been set if there was an error
567 ceph_assert(authenticate_err
>= 0);
568 authenticated
= true;
571 if (authenticate_err
< 0 && auth_registry
.no_keyring_disabled_cephx()) {
572 lderr(cct
) << __func__
<< " NOTE: no keyring found; disabled cephx authentication" << dendl
;
575 return authenticate_err
;
578 void MonClient::handle_auth(MAuthReply
*m
)
580 ceph_assert(ceph_mutex_is_locked(monc_lock
));
582 if (m
->get_connection()->is_anon()) {
583 // anon connection, used for mon tell commands
584 for (auto& p
: mon_commands
) {
585 if (p
.second
->target_con
== m
->get_connection()) {
586 auto& mc
= p
.second
->target_session
;
587 int ret
= mc
->handle_auth(m
, entity_name
,
588 CEPH_ENTITY_TYPE_MON
,
589 rotating_secrets
.get());
590 (void)ret
; // we don't care
599 std::swap(active_con
->get_auth(), auth
);
600 int ret
= active_con
->authenticate(m
);
602 std::swap(auth
, active_con
->get_auth());
603 if (global_id
!= active_con
->get_global_id()) {
604 lderr(cct
) << __func__
<< " peer assigned me a different global_id: "
605 << active_con
->get_global_id() << dendl
;
607 if (ret
!= -EAGAIN
) {
614 auto found
= _find_pending_con(m
->get_connection());
615 ceph_assert(found
!= pending_cons
.end());
616 int auth_err
= found
->second
.handle_auth(m
, entity_name
, want_keys
,
617 rotating_secrets
.get());
619 if (auth_err
== -EAGAIN
) {
623 pending_cons
.erase(found
);
624 if (!pending_cons
.empty()) {
625 // keep trying with pending connections
628 // the last try just failed, give up.
630 auto& mc
= found
->second
;
631 ceph_assert(mc
.have_session());
632 active_con
.reset(new MonConnection(std::move(mc
)));
633 pending_cons
.clear();
636 _finish_hunting(auth_err
);
637 _finish_auth(auth_err
);
640 void MonClient::_finish_auth(int auth_err
)
642 ldout(cct
,10) << __func__
<< " " << auth_err
<< dendl
;
643 authenticate_err
= auth_err
;
644 // _resend_mon_commands() could _reopen_session() if the connected mon is not
645 // the one the MonCommand is targeting.
646 if (!auth_err
&& active_con
) {
648 _check_auth_tickets();
650 auth_cond
.notify_all();
653 Context
*cb
= nullptr;
654 if (session_established_context
) {
655 cb
= session_established_context
.release();
667 void MonClient::send_mon_message(MessageRef m
)
669 std::lock_guard l
{monc_lock
};
670 _send_mon_message(std::move(m
));
673 void MonClient::_send_mon_message(MessageRef m
)
675 ceph_assert(ceph_mutex_is_locked(monc_lock
));
677 auto cur_con
= active_con
->get_con();
678 ldout(cct
, 10) << "_send_mon_message to mon."
679 << monmap
.get_name(cur_con
->get_peer_addr())
680 << " at " << cur_con
->get_peer_addr() << dendl
;
681 cur_con
->send_message2(std::move(m
));
683 waiting_for_session
.push_back(std::move(m
));
687 void MonClient::_reopen_session(int rank
)
689 ceph_assert(ceph_mutex_is_locked(monc_lock
));
690 ldout(cct
, 10) << __func__
<< " rank " << rank
<< dendl
;
693 pending_cons
.clear();
703 // throw out old queued messages
704 waiting_for_session
.clear();
706 // throw out version check requests
707 while (!version_requests
.empty()) {
708 finisher
.queue(version_requests
.begin()->second
->context
, -EAGAIN
);
709 delete version_requests
.begin()->second
;
710 version_requests
.erase(version_requests
.begin());
713 for (auto& c
: pending_cons
) {
714 c
.second
.start(monmap
.get_epoch(), entity_name
);
722 MonConnection
& MonClient::_add_conn(unsigned rank
)
724 auto peer
= monmap
.get_addrs(rank
);
725 auto conn
= messenger
->connect_to_mon(peer
);
726 MonConnection
mc(cct
, conn
, global_id
, &auth_registry
);
728 mc
.get_auth().reset(auth
->clone());
730 auto inserted
= pending_cons
.insert(std::make_pair(peer
, std::move(mc
)));
731 ldout(cct
, 10) << "picked mon." << monmap
.get_name(rank
)
735 return inserted
.first
->second
;
738 void MonClient::_add_conns()
740 // collect the next batch of candidates who are listed right next to the ones
742 auto get_next_batch
= [this]() -> std::vector
<unsigned> {
743 std::multimap
<uint16_t, unsigned> ranks_by_priority
;
745 monmap
.mon_info
| boost::adaptors::filtered(
747 auto rank
= monmap
.get_rank(info
.first
);
748 return tried
.count(rank
) == 0;
749 }) | boost::adaptors::transformed(
751 auto rank
= monmap
.get_rank(info
.first
);
752 return std::make_pair(info
.second
.priority
, rank
);
753 }), std::inserter(ranks_by_priority
, end(ranks_by_priority
)));
754 if (ranks_by_priority
.empty()) {
757 // only choose the monitors with lowest priority
758 auto cands
= boost::make_iterator_range(
759 ranks_by_priority
.equal_range(ranks_by_priority
.begin()->first
));
760 std::vector
<unsigned> ranks
;
761 boost::range::copy(cands
| boost::adaptors::map_values
,
762 std::back_inserter(ranks
));
765 auto ranks
= get_next_batch();
767 tried
.clear(); // start over
768 ranks
= get_next_batch();
770 ceph_assert(!ranks
.empty());
771 if (ranks
.size() > 1) {
772 std::vector
<uint16_t> weights
;
773 for (auto i
: ranks
) {
774 auto rank_name
= monmap
.get_name(i
);
775 weights
.push_back(monmap
.get_weight(rank_name
));
777 std::random_device rd
;
778 if (std::accumulate(begin(weights
), end(weights
), 0u) == 0) {
779 std::shuffle(begin(ranks
), end(ranks
), std::mt19937
{rd()});
781 weighted_shuffle(begin(ranks
), end(ranks
), begin(weights
), end(weights
),
785 ldout(cct
, 10) << __func__
<< " ranks=" << ranks
<< dendl
;
786 unsigned n
= cct
->_conf
->mon_client_hunt_parallel
;
787 if (n
== 0 || n
> ranks
.size()) {
790 for (unsigned i
= 0; i
< n
; i
++) {
792 tried
.insert(ranks
[i
]);
796 bool MonClient::ms_handle_reset(Connection
*con
)
798 std::lock_guard
lock(monc_lock
);
800 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
)
803 if (con
->is_anon()) {
804 auto p
= mon_commands
.begin();
805 while (p
!= mon_commands
.end()) {
806 auto cmd
= p
->second
;
808 if (cmd
->target_con
== con
) {
809 _send_command(cmd
); // may retry or fail
817 if (pending_cons
.count(con
->get_peer_addrs())) {
818 ldout(cct
, 10) << __func__
<< " hunted mon " << con
->get_peer_addrs()
821 ldout(cct
, 10) << __func__
<< " stray mon " << con
->get_peer_addrs()
826 if (active_con
&& con
== active_con
->get_con()) {
827 ldout(cct
, 10) << __func__
<< " current mon " << con
->get_peer_addrs()
832 ldout(cct
, 10) << "ms_handle_reset stray mon " << con
->get_peer_addrs()
839 bool MonClient::_opened() const
841 ceph_assert(ceph_mutex_is_locked(monc_lock
));
842 return active_con
|| _hunting();
845 bool MonClient::_hunting() const
847 return !pending_cons
.empty();
850 void MonClient::_start_hunting()
852 ceph_assert(!_hunting());
853 // adjust timeouts if necessary
854 if (!had_a_connection
)
856 reopen_interval_multiplier
*= cct
->_conf
->mon_client_hunt_interval_backoff
;
857 if (reopen_interval_multiplier
>
858 cct
->_conf
->mon_client_hunt_interval_max_multiple
) {
859 reopen_interval_multiplier
=
860 cct
->_conf
->mon_client_hunt_interval_max_multiple
;
864 void MonClient::_finish_hunting(int auth_err
)
866 ldout(cct
,10) << __func__
<< " " << auth_err
<< dendl
;
867 ceph_assert(ceph_mutex_is_locked(monc_lock
));
868 // the pending conns have been cleaned.
869 ceph_assert(!_hunting());
871 auto con
= active_con
->get_con();
872 ldout(cct
, 1) << "found mon."
873 << monmap
.get_name(con
->get_peer_addr())
876 ldout(cct
, 1) << "no mon sessions established" << dendl
;
879 had_a_connection
= true;
883 last_rotating_renew_sent
= utime_t();
884 while (!waiting_for_session
.empty()) {
885 _send_mon_message(std::move(waiting_for_session
.front()));
886 waiting_for_session
.pop_front();
888 _resend_mon_commands();
891 auth
= std::move(active_con
->get_auth());
892 if (global_id
&& global_id
!= active_con
->get_global_id()) {
893 lderr(cct
) << __func__
<< " global_id changed from " << global_id
894 << " to " << active_con
->get_global_id() << dendl
;
896 global_id
= active_con
->get_global_id();
901 void MonClient::tick()
903 ldout(cct
, 10) << __func__
<< dendl
;
905 utime_t now
= ceph_clock_now();
907 auto reschedule_tick
= make_scope_guard([this] {
911 _check_auth_tickets();
912 _check_tell_commands();
915 ldout(cct
, 1) << "continuing hunt" << dendl
;
916 return _reopen_session();
917 } else if (active_con
) {
918 // just renew as needed
919 auto cur_con
= active_con
->get_con();
920 if (!cur_con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
)) {
921 const bool maybe_renew
= sub
.need_renew();
922 ldout(cct
, 10) << "renew subs? -- " << (maybe_renew
? "yes" : "no")
929 if (now
> last_keepalive
+ cct
->_conf
->mon_client_ping_interval
) {
930 cur_con
->send_keepalive();
931 last_keepalive
= now
;
933 if (cct
->_conf
->mon_client_ping_timeout
> 0 &&
934 cur_con
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
935 utime_t lk
= cur_con
->get_last_keepalive_ack();
936 utime_t interval
= now
- lk
;
937 if (interval
> cct
->_conf
->mon_client_ping_timeout
) {
938 ldout(cct
, 1) << "no keepalive since " << lk
<< " (" << interval
939 << " seconds), reconnecting" << dendl
;
940 return _reopen_session();
947 if (now
> last_send_log
+ cct
->_conf
->mon_client_log_interval
) {
954 void MonClient::_un_backoff()
956 // un-backoff our reconnect interval
957 reopen_interval_multiplier
= std::max(
958 cct
->_conf
.get_val
<double>("mon_client_hunt_interval_min_multiple"),
959 reopen_interval_multiplier
/
960 cct
->_conf
.get_val
<double>("mon_client_hunt_interval_backoff"));
961 ldout(cct
, 20) << __func__
<< " reopen_interval_multipler now "
962 << reopen_interval_multiplier
<< dendl
;
965 void MonClient::schedule_tick()
967 auto do_tick
= make_lambda_context([this](int) { tick(); });
968 if (!is_connected()) {
969 // start another round of hunting
970 const auto hunt_interval
= (cct
->_conf
->mon_client_hunt_interval
*
971 reopen_interval_multiplier
);
972 timer
.add_event_after(hunt_interval
, do_tick
);
975 timer
.add_event_after(std::min(cct
->_conf
->mon_client_ping_interval
,
976 cct
->_conf
->mon_client_log_interval
),
983 void MonClient::_renew_subs()
985 ceph_assert(ceph_mutex_is_locked(monc_lock
));
986 if (!sub
.have_new()) {
987 ldout(cct
, 10) << __func__
<< " - empty" << dendl
;
991 ldout(cct
, 10) << __func__
<< dendl
;
995 auto m
= ceph::make_message
<MMonSubscribe
>();
996 m
->what
= sub
.get_subs();
997 m
->hostname
= ceph_get_short_hostname();
998 _send_mon_message(std::move(m
));
1003 void MonClient::handle_subscribe_ack(MMonSubscribeAck
*m
)
1005 sub
.acked(m
->interval
);
1009 int MonClient::_check_auth_tickets()
1011 ceph_assert(ceph_mutex_is_locked(monc_lock
));
1012 if (active_con
&& auth
) {
1013 if (auth
->need_tickets()) {
1014 ldout(cct
, 10) << __func__
<< " getting new tickets!" << dendl
;
1015 auto m
= ceph::make_message
<MAuth
>();
1016 m
->protocol
= auth
->get_protocol();
1017 auth
->prepare_build_request();
1018 auth
->build_request(m
->auth_payload
);
1019 _send_mon_message(m
);
1022 _check_auth_rotating();
1027 int MonClient::_check_auth_rotating()
1029 ceph_assert(ceph_mutex_is_locked(monc_lock
));
1030 if (!rotating_secrets
||
1031 !auth_principal_needs_rotating_keys(entity_name
)) {
1032 ldout(cct
, 20) << "_check_auth_rotating not needed by " << entity_name
<< dendl
;
1036 if (!active_con
|| !auth
) {
1037 ldout(cct
, 10) << "_check_auth_rotating waiting for auth session" << dendl
;
1041 utime_t now
= ceph_clock_now();
1042 utime_t cutoff
= now
;
1043 cutoff
-= std::min(30.0, cct
->_conf
->auth_service_ticket_ttl
/ 4.0);
1044 utime_t issued_at_lower_bound
= now
;
1045 issued_at_lower_bound
-= cct
->_conf
->auth_service_ticket_ttl
;
1046 if (!rotating_secrets
->need_new_secrets(cutoff
)) {
1047 ldout(cct
, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff
<< ")" << dendl
;
1048 rotating_secrets
->dump_rotating();
1052 ldout(cct
, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff
<< ")" << dendl
;
1053 if (!rotating_secrets
->need_new_secrets() &&
1054 rotating_secrets
->need_new_secrets(issued_at_lower_bound
)) {
1055 // the key has expired before it has been issued?
1056 lderr(cct
) << __func__
<< " possible clock skew, rotating keys expired way too early"
1057 << " (before " << issued_at_lower_bound
<< ")" << dendl
;
1059 if ((now
> last_rotating_renew_sent
) &&
1060 double(now
- last_rotating_renew_sent
) < 1) {
1061 ldout(cct
, 10) << __func__
<< " called too often (last: "
1062 << last_rotating_renew_sent
<< "), skipping refresh" << dendl
;
1065 auto m
= ceph::make_message
<MAuth
>();
1066 m
->protocol
= auth
->get_protocol();
1067 if (auth
->build_rotating_request(m
->auth_payload
)) {
1068 last_rotating_renew_sent
= now
;
1069 _send_mon_message(std::move(m
));
1074 int MonClient::wait_auth_rotating(double timeout
)
1076 std::unique_lock
l(monc_lock
);
1078 // Must be initialized
1079 ceph_assert(auth
!= nullptr);
1081 if (auth
->get_protocol() == CEPH_AUTH_NONE
)
1084 if (!rotating_secrets
)
1087 ldout(cct
, 10) << __func__
<< " waiting for " << timeout
<< dendl
;
1088 utime_t now
= ceph_clock_now();
1089 if (auth_cond
.wait_for(l
, ceph::make_timespan(timeout
), [now
, this] {
1090 return (!auth_principal_needs_rotating_keys(entity_name
) ||
1091 !rotating_secrets
->need_new_secrets(now
));
1093 ldout(cct
, 10) << __func__
<< " done" << dendl
;
1096 ldout(cct
, 0) << __func__
<< " timed out after " << timeout
<< dendl
;
1103 void MonClient::_send_command(MonCommand
*r
)
1107 if (r
->send_attempts
> cct
->_conf
->mon_client_directed_command_retry
) {
1108 _finish_command(r
, -ENXIO
, "mon unavailable");
1112 // tell-style command
1113 if (monmap
.min_mon_release
>= ceph_release_t::octopus
) {
1114 if (r
->target_con
) {
1115 r
->target_con
->mark_down();
1117 if (r
->target_rank
>= 0) {
1118 if (r
->target_rank
>= (int)monmap
.size()) {
1119 ldout(cct
, 10) << " target " << r
->target_rank
1120 << " >= max mon " << monmap
.size() << dendl
;
1121 _finish_command(r
, -ENOENT
, "mon rank dne");
1124 r
->target_con
= messenger
->connect_to_mon(
1125 monmap
.get_addrs(r
->target_rank
), true /* anon */);
1127 if (!monmap
.contains(r
->target_name
)) {
1128 ldout(cct
, 10) << " target " << r
->target_name
1129 << " not present in monmap" << dendl
;
1130 _finish_command(r
, -ENOENT
, "mon dne");
1133 r
->target_con
= messenger
->connect_to_mon(
1134 monmap
.get_addrs(r
->target_name
), true /* anon */);
1137 r
->target_session
.reset(new MonConnection(cct
, r
->target_con
, 0,
1139 r
->target_session
->start(monmap
.get_epoch(), entity_name
);
1140 r
->last_send_attempt
= ceph_clock_now();
1142 MCommand
*m
= new MCommand(monmap
.fsid
);
1145 m
->set_data(r
->inbl
);
1146 r
->target_session
->queue_command(m
);
1150 // ugly legacy handling of pre-octopus mons
1153 peer
= active_con
->get_con()->get_peer_addr();
1156 if (r
->target_rank
>= 0 &&
1157 r
->target_rank
!= monmap
.get_rank(peer
)) {
1158 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
1159 << " wants rank " << r
->target_rank
1160 << ", reopening session"
1162 if (r
->target_rank
>= (int)monmap
.size()) {
1163 ldout(cct
, 10) << " target " << r
->target_rank
1164 << " >= max mon " << monmap
.size() << dendl
;
1165 _finish_command(r
, -ENOENT
, "mon rank dne");
1168 _reopen_session(r
->target_rank
);
1171 if (r
->target_name
.length() &&
1172 r
->target_name
!= monmap
.get_name(peer
)) {
1173 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
1174 << " wants mon " << r
->target_name
1175 << ", reopening session"
1177 if (!monmap
.contains(r
->target_name
)) {
1178 ldout(cct
, 10) << " target " << r
->target_name
1179 << " not present in monmap" << dendl
;
1180 _finish_command(r
, -ENOENT
, "mon dne");
1183 _reopen_session(monmap
.get_rank(r
->target_name
));
1186 // fall-thru to send 'normal' CLI command
1189 // normal CLI command
1190 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
1191 auto m
= ceph::make_message
<MMonCommand
>(monmap
.fsid
);
1194 m
->set_data(r
->inbl
);
1195 _send_mon_message(std::move(m
));
1199 void MonClient::_check_tell_commands()
1201 // resend any requests
1202 auto now
= ceph_clock_now();
1203 auto p
= mon_commands
.begin();
1204 while (p
!= mon_commands
.end()) {
1205 auto cmd
= p
->second
;
1207 if (cmd
->is_tell() &&
1208 cmd
->last_send_attempt
!= utime_t() &&
1209 now
- cmd
->last_send_attempt
> cct
->_conf
->mon_client_hunt_interval
) {
1210 ldout(cct
,5) << __func__
<< " timeout tell command " << cmd
->tid
<< dendl
;
1211 _send_command(cmd
); // might remove cmd from mon_commands
1216 void MonClient::_resend_mon_commands()
1218 // resend any requests
1219 auto p
= mon_commands
.begin();
1220 while (p
!= mon_commands
.end()) {
1221 auto cmd
= p
->second
;
1223 if (cmd
->is_tell() && monmap
.min_mon_release
>= ceph_release_t::octopus
) {
1224 // starting with octopus, tell commands use their own connetion and need no
1225 // special resend when we finish hunting.
1227 _send_command(cmd
); // might remove cmd from mon_commands
1232 void MonClient::handle_mon_command_ack(MMonCommandAck
*ack
)
1234 MonCommand
*r
= NULL
;
1235 uint64_t tid
= ack
->get_tid();
1237 if (tid
== 0 && !mon_commands
.empty()) {
1238 r
= mon_commands
.begin()->second
;
1239 ldout(cct
, 10) << __func__
<< " has tid 0, assuming it is " << r
->tid
<< dendl
;
1241 auto p
= mon_commands
.find(tid
);
1242 if (p
== mon_commands
.end()) {
1243 ldout(cct
, 10) << __func__
<< " " << ack
->get_tid() << " not found" << dendl
;
1250 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
1252 r
->poutbl
->claim(ack
->get_data());
1253 _finish_command(r
, ack
->r
, ack
->rs
);
1257 void MonClient::handle_command_reply(MCommandReply
*reply
)
1259 MonCommand
*r
= NULL
;
1260 uint64_t tid
= reply
->get_tid();
1262 if (tid
== 0 && !mon_commands
.empty()) {
1263 r
= mon_commands
.begin()->second
;
1264 ldout(cct
, 10) << __func__
<< " has tid 0, assuming it is " << r
->tid
1267 auto p
= mon_commands
.find(tid
);
1268 if (p
== mon_commands
.end()) {
1269 ldout(cct
, 10) << __func__
<< " " << reply
->get_tid() << " not found"
1277 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
1279 r
->poutbl
->claim(reply
->get_data());
1280 _finish_command(r
, reply
->r
, reply
->rs
);
1284 int MonClient::_cancel_mon_command(uint64_t tid
)
1286 ceph_assert(ceph_mutex_is_locked(monc_lock
));
1288 auto it
= mon_commands
.find(tid
);
1289 if (it
== mon_commands
.end()) {
1290 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
1294 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
1296 MonCommand
*cmd
= it
->second
;
1297 _finish_command(cmd
, -ETIMEDOUT
, "");
1301 void MonClient::_finish_command(MonCommand
*r
, int ret
, string rs
)
1303 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " = " << ret
<< " " << rs
<< dendl
;
1309 finisher
.queue(r
->onfinish
, ret
);
1310 if (r
->target_con
) {
1311 r
->target_con
->mark_down();
1313 mon_commands
.erase(r
->tid
);
1317 void MonClient::start_mon_command(const std::vector
<string
>& cmd
,
1318 const ceph::buffer::list
& inbl
,
1319 ceph::buffer::list
*outbl
, string
*outs
,
1322 ldout(cct
,10) << __func__
<< " cmd=" << cmd
<< dendl
;
1323 std::lock_guard
l(monc_lock
);
1324 if (!initialized
|| stopping
) {
1326 onfinish
->complete(-ECANCELED
);
1330 MonCommand
*r
= new MonCommand(++last_mon_command_tid
);
1335 r
->onfinish
= onfinish
;
1336 auto timeout
= cct
->_conf
.get_val
<std::chrono::seconds
>("rados_mon_op_timeout");
1337 if (timeout
.count() > 0) {
1338 class C_CancelMonCommand
: public Context
1343 C_CancelMonCommand(uint64_t tid
, MonClient
*monc
) : tid(tid
), monc(monc
) {}
1344 void finish(int r
) override
{
1345 monc
->_cancel_mon_command(tid
);
1348 r
->ontimeout
= new C_CancelMonCommand(r
->tid
, this);
1349 timer
.add_event_after(static_cast<double>(timeout
.count()), r
->ontimeout
);
1351 mon_commands
[r
->tid
] = r
;
1355 void MonClient::start_mon_command(const string
&mon_name
,
1356 const std::vector
<string
>& cmd
,
1357 const ceph::buffer::list
& inbl
,
1358 ceph::buffer::list
*outbl
, string
*outs
,
1361 ldout(cct
,10) << __func__
<< " mon." << mon_name
<< " cmd=" << cmd
<< dendl
;
1362 std::lock_guard
l(monc_lock
);
1363 if (!initialized
|| stopping
) {
1365 onfinish
->complete(-ECANCELED
);
1369 MonCommand
*r
= new MonCommand(++last_mon_command_tid
);
1371 // detect/tolerate mon *rank* passed as a string
1373 int rank
= strict_strtoll(mon_name
.c_str(), 10, &err
);
1374 if (err
.size() == 0 && rank
>= 0) {
1375 ldout(cct
,10) << __func__
<< " interpreting name '" << mon_name
1376 << "' as rank " << rank
<< dendl
;
1377 r
->target_rank
= rank
;
1379 r
->target_name
= mon_name
;
1385 r
->onfinish
= onfinish
;
1386 mon_commands
[r
->tid
] = r
;
1390 void MonClient::start_mon_command(int rank
,
1391 const std::vector
<string
>& cmd
,
1392 const ceph::buffer::list
& inbl
,
1393 ceph::buffer::list
*outbl
, string
*outs
,
1396 ldout(cct
,10) << __func__
<< " rank " << rank
<< " cmd=" << cmd
<< dendl
;
1397 std::lock_guard
l(monc_lock
);
1398 if (!initialized
|| stopping
) {
1400 onfinish
->complete(-ECANCELED
);
1404 MonCommand
*r
= new MonCommand(++last_mon_command_tid
);
1405 r
->target_rank
= rank
;
1410 r
->onfinish
= onfinish
;
1411 mon_commands
[r
->tid
] = r
;
1417 void MonClient::get_version(string map
, version_t
*newest
, version_t
*oldest
, Context
*onfinish
)
1419 version_req_d
*req
= new version_req_d(onfinish
, newest
, oldest
);
1420 ldout(cct
, 10) << "get_version " << map
<< " req " << req
<< dendl
;
1421 std::lock_guard
l(monc_lock
);
1422 auto m
= ceph::make_message
<MMonGetVersion
>();
1424 m
->handle
= ++version_req_id
;
1425 version_requests
[m
->handle
] = req
;
1426 _send_mon_message(std::move(m
));
1429 void MonClient::handle_get_version_reply(MMonGetVersionReply
* m
)
1431 ceph_assert(ceph_mutex_is_locked(monc_lock
));
1432 auto iter
= version_requests
.find(m
->handle
);
1433 if (iter
== version_requests
.end()) {
1434 ldout(cct
, 0) << __func__
<< " version request with handle " << m
->handle
1435 << " not found" << dendl
;
1437 version_req_d
*req
= iter
->second
;
1438 ldout(cct
, 10) << __func__
<< " finishing " << req
<< " version " << m
->version
<< dendl
;
1439 version_requests
.erase(iter
);
1441 *req
->newest
= m
->version
;
1443 *req
->oldest
= m
->oldest_version
;
1444 finisher
.queue(req
->context
, 0);
1450 int MonClient::get_auth_request(
1452 AuthConnectionMeta
*auth_meta
,
1453 uint32_t *auth_method
,
1454 std::vector
<uint32_t> *preferred_modes
,
1455 ceph::buffer::list
*bl
)
1457 std::lock_guard
l(monc_lock
);
1458 ldout(cct
,10) << __func__
<< " con " << con
<< " auth_method " << *auth_method
1461 // connection to mon?
1462 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
1463 ceph_assert(!auth_meta
->authorizer
);
1464 if (con
->is_anon()) {
1465 for (auto& i
: mon_commands
) {
1466 if (i
.second
->target_con
== con
) {
1467 return i
.second
->target_session
->get_auth_request(
1468 auth_method
, preferred_modes
, bl
,
1469 entity_name
, want_keys
, rotating_secrets
.get());
1473 for (auto& i
: pending_cons
) {
1474 if (i
.second
.is_con(con
)) {
1475 return i
.second
.get_auth_request(
1476 auth_method
, preferred_modes
, bl
,
1477 entity_name
, want_keys
, rotating_secrets
.get());
1483 // generate authorizer
1485 lderr(cct
) << __func__
<< " but no auth handler is set up" << dendl
;
1488 auth_meta
->authorizer
.reset(auth
->build_authorizer(con
->get_peer_type()));
1489 if (!auth_meta
->authorizer
) {
1490 lderr(cct
) << __func__
<< " failed to build_authorizer for type "
1491 << ceph_entity_type_name(con
->get_peer_type()) << dendl
;
1494 auth_meta
->auth_method
= auth_meta
->authorizer
->protocol
;
1495 auth_registry
.get_supported_modes(con
->get_peer_type(),
1496 auth_meta
->auth_method
,
1498 *bl
= auth_meta
->authorizer
->bl
;
1502 int MonClient::handle_auth_reply_more(
1504 AuthConnectionMeta
*auth_meta
,
1505 const ceph::buffer::list
& bl
,
1506 ceph::buffer::list
*reply
)
1508 std::lock_guard
l(monc_lock
);
1510 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
1511 if (con
->is_anon()) {
1512 for (auto& i
: mon_commands
) {
1513 if (i
.second
->target_con
== con
) {
1514 return i
.second
->target_session
->handle_auth_reply_more(
1515 auth_meta
, bl
, reply
);
1519 for (auto& i
: pending_cons
) {
1520 if (i
.second
.is_con(con
)) {
1521 return i
.second
.handle_auth_reply_more(auth_meta
, bl
, reply
);
1527 // authorizer challenges
1528 if (!auth
|| !auth_meta
->authorizer
) {
1529 lderr(cct
) << __func__
<< " no authorizer?" << dendl
;
1532 auth_meta
->authorizer
->add_challenge(cct
, bl
);
1533 *reply
= auth_meta
->authorizer
->bl
;
1537 int MonClient::handle_auth_done(
1539 AuthConnectionMeta
*auth_meta
,
1542 const ceph::buffer::list
& bl
,
1543 CryptoKey
*session_key
,
1544 std::string
*connection_secret
)
1546 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
1547 std::lock_guard
l(monc_lock
);
1548 if (con
->is_anon()) {
1549 for (auto& i
: mon_commands
) {
1550 if (i
.second
->target_con
== con
) {
1551 return i
.second
->target_session
->handle_auth_done(
1552 auth_meta
, global_id
, bl
,
1553 session_key
, connection_secret
);
1557 for (auto& i
: pending_cons
) {
1558 if (i
.second
.is_con(con
)) {
1559 int r
= i
.second
.handle_auth_done(
1560 auth_meta
, global_id
, bl
,
1561 session_key
, connection_secret
);
1563 pending_cons
.erase(i
.first
);
1564 if (!pending_cons
.empty()) {
1568 active_con
.reset(new MonConnection(std::move(i
.second
)));
1569 pending_cons
.clear();
1570 ceph_assert(active_con
->have_session());
1574 if (r
|| monmap
.get_epoch() > 0) {
1582 // verify authorizer reply
1583 auto p
= bl
.begin();
1584 if (!auth_meta
->authorizer
->verify_reply(p
, &auth_meta
->connection_secret
)) {
1585 ldout(cct
, 0) << __func__
<< " failed verifying authorizer reply"
1589 auth_meta
->session_key
= auth_meta
->authorizer
->session_key
;
1594 int MonClient::handle_auth_bad_method(
1596 AuthConnectionMeta
*auth_meta
,
1597 uint32_t old_auth_method
,
1599 const std::vector
<uint32_t>& allowed_methods
,
1600 const std::vector
<uint32_t>& allowed_modes
)
1602 auth_meta
->allowed_methods
= allowed_methods
;
1604 std::lock_guard
l(monc_lock
);
1605 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
1606 if (con
->is_anon()) {
1607 for (auto& i
: mon_commands
) {
1608 if (i
.second
->target_con
== con
) {
1609 int r
= i
.second
->target_session
->handle_auth_bad_method(
1615 _finish_command(i
.second
, r
, "auth failed");
1621 for (auto& i
: pending_cons
) {
1622 if (i
.second
.is_con(con
)) {
1623 int r
= i
.second
.handle_auth_bad_method(old_auth_method
,
1628 return r
; // try another method on this con
1630 pending_cons
.erase(i
.first
);
1631 if (!pending_cons
.empty()) {
1632 return r
; // fail this con, maybe another con will succeed
1643 ldout(cct
,10) << __func__
<< " hmm, they didn't like " << old_auth_method
1644 << " result " << cpp_strerror(result
)
1645 << " and auth is " << (auth
? auth
->get_protocol() : 0)
1651 int MonClient::handle_auth_request(
1653 AuthConnectionMeta
*auth_meta
,
1655 uint32_t auth_method
,
1656 const ceph::buffer::list
& payload
,
1657 ceph::buffer::list
*reply
)
1659 if (payload
.length() == 0) {
1660 // for some channels prior to nautilus (osd heartbeat), we
1661 // tolerate the lack of an authorizer.
1662 if (!con
->get_messenger()->require_authorizer
) {
1663 handle_authentication_dispatcher
->ms_handle_authentication(con
);
1668 auth_meta
->auth_mode
= payload
[0];
1669 if (auth_meta
->auth_mode
< AUTH_MODE_AUTHORIZER
||
1670 auth_meta
->auth_mode
> AUTH_MODE_AUTHORIZER_MAX
) {
1673 AuthAuthorizeHandler
*ah
= get_auth_authorize_handler(con
->get_peer_type(),
1676 lderr(cct
) << __func__
<< " no AuthAuthorizeHandler found for auth method "
1677 << auth_method
<< dendl
;
1681 auto ac
= &auth_meta
->authorizer_challenge
;
1682 if (auth_meta
->skip_authorizer_challenge
) {
1683 ldout(cct
, 10) << __func__
<< " skipping challenge on " << con
<< dendl
;
1687 bool was_challenge
= (bool)auth_meta
->authorizer_challenge
;
1688 bool isvalid
= ah
->verify_authorizer(
1692 auth_meta
->get_connection_secret_length(),
1695 &con
->peer_global_id
,
1696 &con
->peer_caps_info
,
1697 &auth_meta
->session_key
,
1698 &auth_meta
->connection_secret
,
1701 handle_authentication_dispatcher
->ms_handle_authentication(con
);
1704 if (!more
&& !was_challenge
&& auth_meta
->authorizer_challenge
) {
1705 ldout(cct
,10) << __func__
<< " added challenge on " << con
<< dendl
;
1708 ldout(cct
,10) << __func__
<< " bad authorizer on " << con
<< dendl
;
1709 // discard old challenge
1710 auth_meta
->authorizer_challenge
.reset();
1714 AuthAuthorizer
* MonClient::build_authorizer(int service_id
) const {
1715 std::lock_guard
l(monc_lock
);
1717 return auth
->build_authorizer(service_id
);
1719 ldout(cct
, 0) << __func__
<< " for " << ceph_entity_type_name(service_id
)
1720 << ", but no auth is available now" << dendl
;
1725 #define dout_subsys ceph_subsys_monc
1727 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1729 MonConnection::MonConnection(
1730 CephContext
*cct
, ConnectionRef con
, uint64_t global_id
,
1732 : cct(cct
), con(con
), global_id(global_id
), auth_registry(ar
)
1735 MonConnection::~MonConnection()
1743 bool MonConnection::have_session() const
1745 return state
== State::HAVE_SESSION
;
1748 void MonConnection::start(epoch_t epoch
,
1749 const EntityName
& entity_name
)
1752 auth_start
= ceph_clock_now();
1754 if (con
->get_peer_addr().is_msgr2()) {
1755 ldout(cct
, 10) << __func__
<< " opening mon connection" << dendl
;
1756 state
= State::AUTHENTICATING
;
1757 con
->send_message(new MMonGetMap());
1761 // restart authentication handshake
1762 state
= State::NEGOTIATING
;
1764 // send an initial keepalive to ensure our timestamp is valid by the
1765 // time we are in an OPENED state (by sequencing this before
1767 con
->send_keepalive();
1770 m
->protocol
= CEPH_AUTH_UNKNOWN
;
1771 m
->monmap_epoch
= epoch
;
1773 encode(struct_v
, m
->auth_payload
);
1774 std::vector
<uint32_t> auth_supported
;
1775 auth_registry
->get_supported_methods(con
->get_peer_type(), &auth_supported
);
1776 encode(auth_supported
, m
->auth_payload
);
1777 encode(entity_name
, m
->auth_payload
);
1778 encode(global_id
, m
->auth_payload
);
1779 con
->send_message(m
);
1782 int MonConnection::get_auth_request(
1784 std::vector
<uint32_t> *preferred_modes
,
1785 ceph::buffer::list
*bl
,
1786 const EntityName
& entity_name
,
1788 RotatingKeyRing
* keyring
)
1792 if (auth_method
< 0) {
1793 std::vector
<uint32_t> as
;
1794 auth_registry
->get_supported_methods(con
->get_peer_type(), &as
);
1798 auth_method
= as
.front();
1800 *method
= auth_method
;
1801 auth_registry
->get_supported_modes(con
->get_peer_type(), auth_method
,
1803 ldout(cct
,10) << __func__
<< " method " << *method
1804 << " preferred_modes " << *preferred_modes
<< dendl
;
1805 if (preferred_modes
->empty()) {
1809 int r
= _init_auth(*method
, entity_name
, want_keys
, keyring
, true);
1810 ceph_assert(r
== 0);
1812 // initial requset includes some boilerplate...
1813 encode((char)AUTH_MODE_MON
, *bl
);
1814 encode(entity_name
, *bl
);
1815 encode(global_id
, *bl
);
1817 // and (maybe) some method-specific initial payload
1818 auth
->build_initial_request(bl
);
1823 int MonConnection::handle_auth_reply_more(
1824 AuthConnectionMeta
*auth_meta
,
1825 const ceph::buffer::list
& bl
,
1826 ceph::buffer::list
*reply
)
1828 ldout(cct
, 10) << __func__
<< " payload " << bl
.length() << dendl
;
1829 ldout(cct
, 30) << __func__
<< " got\n";
1833 auto p
= bl
.cbegin();
1834 ldout(cct
, 10) << __func__
<< " payload_len " << bl
.length() << dendl
;
1835 int r
= auth
->handle_response(0, p
, &auth_meta
->session_key
,
1836 &auth_meta
->connection_secret
);
1838 auth
->prepare_build_request();
1839 auth
->build_request(*reply
);
1840 ldout(cct
, 10) << __func__
<< " responding with " << reply
->length()
1841 << " bytes" << dendl
;
1844 lderr(cct
) << __func__
<< " handle_response returned " << r
<< dendl
;
1846 ldout(cct
, 10) << __func__
<< " authenticated!" << dendl
;
1848 ceph_abort(cct
, "write me");
1853 int MonConnection::handle_auth_done(
1854 AuthConnectionMeta
*auth_meta
,
1855 uint64_t new_global_id
,
1856 const ceph::buffer::list
& bl
,
1857 CryptoKey
*session_key
,
1858 std::string
*connection_secret
)
1860 ldout(cct
,10) << __func__
<< " global_id " << new_global_id
1861 << " payload " << bl
.length()
1863 global_id
= new_global_id
;
1864 auth
->set_global_id(global_id
);
1865 auto p
= bl
.begin();
1866 int auth_err
= auth
->handle_response(0, p
, &auth_meta
->session_key
,
1867 &auth_meta
->connection_secret
);
1868 if (auth_err
>= 0) {
1869 state
= State::HAVE_SESSION
;
1871 con
->set_last_keepalive_ack(auth_start
);
1873 if (pending_tell_command
) {
1874 con
->send_message2(std::move(pending_tell_command
));
1879 int MonConnection::handle_auth_bad_method(
1880 uint32_t old_auth_method
,
1882 const std::vector
<uint32_t>& allowed_methods
,
1883 const std::vector
<uint32_t>& allowed_modes
)
1885 ldout(cct
,10) << __func__
<< " old_auth_method " << old_auth_method
1886 << " result " << cpp_strerror(result
)
1887 << " allowed_methods " << allowed_methods
<< dendl
;
1888 std::vector
<uint32_t> auth_supported
;
1889 auth_registry
->get_supported_methods(con
->get_peer_type(), &auth_supported
);
1890 auto p
= std::find(auth_supported
.begin(), auth_supported
.end(),
1892 assert(p
!= auth_supported
.end());
1893 p
= std::find_first_of(std::next(p
), auth_supported
.end(),
1894 allowed_methods
.begin(), allowed_methods
.end());
1895 if (p
== auth_supported
.end()) {
1896 lderr(cct
) << __func__
<< " server allowed_methods " << allowed_methods
1897 << " but i only support " << auth_supported
<< dendl
;
1901 ldout(cct
,10) << __func__
<< " will try " << auth_method
<< " next" << dendl
;
1905 int MonConnection::handle_auth(MAuthReply
* m
,
1906 const EntityName
& entity_name
,
1908 RotatingKeyRing
* keyring
)
1910 if (state
== State::NEGOTIATING
) {
1911 int r
= _negotiate(m
, entity_name
, want_keys
, keyring
);
1915 state
= State::AUTHENTICATING
;
1917 int r
= authenticate(m
);
1919 state
= State::HAVE_SESSION
;
1924 int MonConnection::_negotiate(MAuthReply
*m
,
1925 const EntityName
& entity_name
,
1927 RotatingKeyRing
* keyring
)
1929 int r
= _init_auth(m
->protocol
, entity_name
, want_keys
, keyring
, false);
1930 if (r
== -ENOTSUP
) {
1931 if (m
->result
== -ENOTSUP
) {
1932 ldout(cct
, 10) << "none of our auth protocols are supported by the server"
1940 int MonConnection::_init_auth(
1942 const EntityName
& entity_name
,
1944 RotatingKeyRing
* keyring
,
1947 ldout(cct
, 10) << __func__
<< " method " << method
<< dendl
;
1948 if (auth
&& auth
->get_protocol() == (int)method
) {
1949 ldout(cct
, 10) << __func__
<< " already have auth, reseting" << dendl
;
1954 ldout(cct
, 10) << __func__
<< " creating new auth" << dendl
;
1955 auth
.reset(AuthClientHandler::create(cct
, method
, keyring
));
1957 ldout(cct
, 10) << " no handler for protocol " << method
<< dendl
;
1961 // do not request MGR key unless the mon has the SERVER_KRAKEN
1962 // feature. otherwise it will give us an auth error. note that
1963 // we have to use the FEATUREMASK because pre-jewel the kraken
1964 // feature bit was used for something else.
1966 (want_keys
& CEPH_ENTITY_TYPE_MGR
) &&
1967 !(con
->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN
))) {
1968 ldout(cct
, 1) << __func__
1969 << " not requesting MGR keys from pre-kraken monitor"
1971 want_keys
&= ~CEPH_ENTITY_TYPE_MGR
;
1973 auth
->set_want_keys(want_keys
);
1974 auth
->init(entity_name
);
1975 auth
->set_global_id(global_id
);
1979 int MonConnection::authenticate(MAuthReply
*m
)
1982 if (!m
->global_id
) {
1983 ldout(cct
, 1) << "peer sent an invalid global_id" << dendl
;
1985 if (m
->global_id
!= global_id
) {
1986 // it's a new session
1988 global_id
= m
->global_id
;
1989 auth
->set_global_id(global_id
);
1990 ldout(cct
, 10) << "my global_id is " << m
->global_id
<< dendl
;
1992 auto p
= m
->result_bl
.cbegin();
1993 int ret
= auth
->handle_response(m
->result
, p
, nullptr, nullptr);
1994 if (ret
== -EAGAIN
) {
1995 auto ma
= new MAuth
;
1996 ma
->protocol
= auth
->get_protocol();
1997 auth
->prepare_build_request();
1998 auth
->build_request(ma
->auth_payload
);
1999 con
->send_message(ma
);
2001 if (ret
== 0 && pending_tell_command
) {
2002 con
->send_message2(std::move(pending_tell_command
));
2008 void MonClient::register_config_callback(md_config_t::config_callback fn
) {
2009 ceph_assert(!config_cb
);
2013 md_config_t::config_callback
MonClient::get_config_callback() {