1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include <boost/range/adaptor/map.hpp>
19 #include <boost/range/adaptor/filtered.hpp>
20 #include <boost/range/algorithm/copy.hpp>
21 #include <boost/range/algorithm_ext/copy_n.hpp>
22 #include "common/weighted_shuffle.h"
24 #include "include/random.h"
25 #include "include/scope_guard.h"
26 #include "include/stringify.h"
28 #include "messages/MMonGetMap.h"
29 #include "messages/MMonGetVersion.h"
30 #include "messages/MMonGetMap.h"
31 #include "messages/MMonGetVersionReply.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MConfig.h"
34 #include "messages/MAuth.h"
35 #include "messages/MLogAck.h"
36 #include "messages/MAuthReply.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MCommand.h"
40 #include "messages/MCommandReply.h"
41 #include "messages/MPing.h"
43 #include "messages/MMonSubscribe.h"
44 #include "messages/MMonSubscribeAck.h"
45 #include "common/errno.h"
46 #include "common/hostname.h"
47 #include "common/LogClient.h"
49 #include "MonClient.h"
50 #include "error_code.h"
53 #include "auth/Auth.h"
54 #include "auth/KeyRing.h"
55 #include "auth/AuthClientHandler.h"
56 #include "auth/AuthRegistry.h"
57 #include "auth/RotatingKeyRing.h"
59 #define dout_subsys ceph_subsys_monc
61 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
63 namespace bs
= boost::system
;
65 using namespace std::literals
;
67 MonClient::MonClient(CephContext
*cct_
, boost::asio::io_context
& service
) :
71 timer(cct_
, monc_lock
),
75 more_log_pending(false),
77 had_a_connection(false),
78 reopen_interval_multiplier(
79 cct_
->_conf
.get_val
<double>("mon_client_hunt_interval_min_multiple")),
80 last_mon_command_tid(0),
84 MonClient::~MonClient()
88 int MonClient::build_initial_monmap()
90 ldout(cct
, 10) << __func__
<< dendl
;
91 int r
= monmap
.build_initial(cct
, false, std::cerr
);
92 ldout(cct
,10) << "monmap:\n";
98 int MonClient::get_monmap()
100 ldout(cct
, 10) << __func__
<< dendl
;
101 std::unique_lock
l(monc_lock
);
103 sub
.want("monmap", 0, 0);
106 map_cond
.wait(l
, [this] { return !want_monmap
; });
107 ldout(cct
, 10) << __func__
<< " done" << dendl
;
111 int MonClient::get_monmap_and_config()
113 ldout(cct
, 10) << __func__
<< dendl
;
114 ceph_assert(!messenger
);
119 auto shutdown_crypto
= make_scope_guard([this] {
120 cct
->shutdown_crypto();
123 int r
= build_initial_monmap();
125 lderr(cct
) << __func__
<< " cannot identify monitors to contact" << dendl
;
129 messenger
= Messenger::create_client_messenger(
130 cct
, "temp_mon_client");
131 ceph_assert(messenger
);
132 messenger
->add_dispatcher_head(this);
134 auto shutdown_msgr
= make_scope_guard([this] {
135 messenger
->shutdown();
139 if (!monmap
.fsid
.is_zero()) {
140 cct
->_conf
.set_val("fsid", stringify(monmap
.fsid
));
144 want_bootstrap_config
= true;
145 auto shutdown_config
= make_scope_guard([this] {
146 std::unique_lock
l(monc_lock
);
147 want_bootstrap_config
= false;
148 bootstrap_config
.reset();
151 ceph::ref_t
<MConfig
> config
;
152 while (tries
-- > 0) {
158 cct
->_conf
.get_val
<std::chrono::seconds
>("client_mount_timeout").count());
163 std::unique_lock
l(monc_lock
);
164 if (monmap
.get_epoch() &&
165 !monmap
.persistent_features
.contains_all(
166 ceph::features::mon::FEATURE_MIMIC
)) {
167 ldout(cct
,10) << __func__
<< " pre-mimic monitor, no config to fetch"
172 while ((!bootstrap_config
|| monmap
.get_epoch() == 0) && r
== 0) {
173 ldout(cct
,20) << __func__
<< " waiting for monmap|config" << dendl
;
174 auto status
= map_cond
.wait_for(l
, ceph::make_timespan(
175 cct
->_conf
->mon_client_hunt_interval
));
176 if (status
== std::cv_status::timeout
) {
181 if (bootstrap_config
) {
182 ldout(cct
,10) << __func__
<< " success" << dendl
;
183 config
= std::move(bootstrap_config
);
188 lderr(cct
) << __func__
<< " failed to get config" << dendl
;
194 // apply the bootstrap config to ensure its applied prior to completing
196 cct
->_conf
.set_mon_vals(cct
, config
->config
, config_cb
);
205 * Ping the monitor with id @p mon_id and set the resulting reply in
206 * the provided @p result_reply, if this last parameter is not NULL.
208 * So that we don't rely on the MonClient's default messenger, set up
209 * during connect(), we create our own messenger to comunicate with the
210 * specified monitor. This is advantageous in the following ways:
212 * - Isolate the ping procedure from the rest of the MonClient's operations,
213 * allowing us to not acquire or manage the big monc_lock, thus not
214 * having to block waiting for some other operation to finish before we
216 * * for instance, we can ping mon.FOO even if we are currently hunting
217 * or blocked waiting for auth to complete with mon.BAR.
219 * - Ping a monitor prior to establishing a connection (using connect())
220 * and properly establish the MonClient's messenger. This frees us
221 * from dealing with the complex foo that happens in connect().
223 * We also don't rely on MonClient as a dispatcher for this messenger,
224 * unlike what happens with the MonClient's default messenger. This allows
225 * us to sandbox the whole ping, having it much as a separate entity in
226 * the MonClient class, considerably simplifying the handling and dispatching
227 * of messages without needing to consider monc_lock.
229 * Current drawback is that we will establish a messenger for each ping
230 * we want to issue, instead of keeping a single messenger instance that
231 * would be used for all pings.
233 int MonClient::ping_monitor(const string
&mon_id
, string
*result_reply
)
235 ldout(cct
, 10) << __func__
<< dendl
;
238 if (monmap
.contains("noname-"+mon_id
)) {
239 new_mon_id
= "noname-"+mon_id
;
244 if (new_mon_id
.empty()) {
245 ldout(cct
, 10) << __func__
<< " specified mon id is empty!" << dendl
;
247 } else if (!monmap
.contains(new_mon_id
)) {
248 ldout(cct
, 10) << __func__
<< " no such monitor 'mon." << new_mon_id
<< "'"
253 // N.B. monc isn't initialized
255 auth_registry
.refresh_config();
258 keyring
.from_ceph_context(cct
);
259 RotatingKeyRing
rkeyring(cct
, cct
->get_module_type(), &keyring
);
261 MonClientPinger
*pinger
= new MonClientPinger(cct
,
265 Messenger
*smsgr
= Messenger::create_client_messenger(cct
, "temp_ping_client");
266 smsgr
->add_dispatcher_head(pinger
);
267 smsgr
->set_auth_client(pinger
);
270 ConnectionRef con
= smsgr
->connect_to_mon(monmap
.get_addrs(new_mon_id
));
271 ldout(cct
, 10) << __func__
<< " ping mon." << new_mon_id
272 << " " << con
->get_peer_addr() << dendl
;
274 pinger
->mc
.reset(new MonConnection(cct
, con
, 0, &auth_registry
));
275 pinger
->mc
->start(monmap
.get_epoch(), entity_name
);
276 con
->send_message(new MPing
);
278 int ret
= pinger
->wait_for_reply(cct
->_conf
->mon_client_ping_timeout
);
280 ldout(cct
,10) << __func__
<< " got ping reply" << dendl
;
294 bool MonClient::ms_dispatch(Message
*m
)
296 // we only care about these message types
297 switch (m
->get_type()) {
298 case CEPH_MSG_MON_MAP
:
299 case CEPH_MSG_AUTH_REPLY
:
300 case CEPH_MSG_MON_SUBSCRIBE_ACK
:
301 case CEPH_MSG_MON_GET_VERSION_REPLY
:
302 case MSG_MON_COMMAND_ACK
:
303 case MSG_COMMAND_REPLY
:
314 std::lock_guard
lock(monc_lock
);
316 if (!m
->get_connection()->is_anon() &&
317 m
->get_source().type() == CEPH_ENTITY_TYPE_MON
) {
319 auto p
= _find_pending_con(m
->get_connection());
320 if (p
== pending_cons
.end()) {
321 // ignore any messages outside hunting sessions
322 ldout(cct
, 10) << "discarding stray monitor message " << *m
<< dendl
;
326 } else if (!active_con
|| active_con
->get_con() != m
->get_connection()) {
327 // ignore any messages outside our session(s)
328 ldout(cct
, 10) << "discarding stray monitor message " << *m
<< dendl
;
334 switch (m
->get_type()) {
335 case CEPH_MSG_MON_MAP
:
336 handle_monmap(static_cast<MMonMap
*>(m
));
337 if (passthrough_monmap
) {
343 case CEPH_MSG_AUTH_REPLY
:
344 handle_auth(static_cast<MAuthReply
*>(m
));
346 case CEPH_MSG_MON_SUBSCRIBE_ACK
:
347 handle_subscribe_ack(static_cast<MMonSubscribeAck
*>(m
));
349 case CEPH_MSG_MON_GET_VERSION_REPLY
:
350 handle_get_version_reply(static_cast<MMonGetVersionReply
*>(m
));
352 case MSG_MON_COMMAND_ACK
:
353 handle_mon_command_ack(static_cast<MMonCommandAck
*>(m
));
355 case MSG_COMMAND_REPLY
:
356 if (m
->get_connection()->is_anon() &&
357 m
->get_source().type() == CEPH_ENTITY_TYPE_MON
) {
358 // this connection is from 'tell'... ignore everything except our command
359 // reply. (we'll get misc other message because we authenticated, but we
361 handle_command_reply(static_cast<MCommandReply
*>(m
));
364 // leave the message for another dispatch handler (e.g., Objecter)
368 log_client
->handle_log_ack(static_cast<MLogAck
*>(m
));
370 if (more_log_pending
) {
378 handle_config(static_cast<MConfig
*>(m
));
384 void MonClient::send_log(bool flush
)
387 auto lm
= log_client
->get_mon_log_message(flush
);
389 _send_mon_message(std::move(lm
));
390 more_log_pending
= log_client
->are_pending();
394 void MonClient::flush_log()
396 std::lock_guard
l(monc_lock
);
400 /* Unlike all the other message-handling functions, we don't put away a reference
401 * because we want to support MMonMap passthrough to other Dispatchers. */
402 void MonClient::handle_monmap(MMonMap
*m
)
404 ldout(cct
, 10) << __func__
<< " " << *m
<< dendl
;
405 auto con_addrs
= m
->get_source_addrs();
406 string old_name
= monmap
.get_name(con_addrs
);
407 const auto old_epoch
= monmap
.get_epoch();
409 auto p
= m
->monmapbl
.cbegin();
412 ldout(cct
, 10) << " got monmap " << monmap
.epoch
413 << " from mon." << old_name
414 << " (according to old e" << monmap
.get_epoch() << ")"
416 ldout(cct
, 10) << "dump:\n";
417 monmap
.print(*_dout
);
420 if (old_epoch
!= monmap
.get_epoch()) {
423 if (old_name
.size() == 0) {
424 ldout(cct
,10) << " can't identify which mon we were connected to" << dendl
;
427 auto new_name
= monmap
.get_name(con_addrs
);
428 if (new_name
.empty()) {
429 ldout(cct
, 10) << "mon." << old_name
<< " at " << con_addrs
430 << " went away" << dendl
;
431 // can't find the mon we were talking to (above)
433 } else if (messenger
->should_use_msgr2() &&
434 monmap
.get_addrs(new_name
).has_msgr2() &&
435 !con_addrs
.has_msgr2()) {
436 ldout(cct
,1) << " mon." << new_name
<< " has (v2) addrs "
437 << monmap
.get_addrs(new_name
) << " but i'm connected to "
438 << con_addrs
<< ", reconnecting" << dendl
;
443 cct
->set_mon_addrs(monmap
);
445 sub
.got("monmap", monmap
.get_epoch());
446 map_cond
.notify_all();
449 if (authenticate_err
== 1) {
454 void MonClient::handle_config(MConfig
*m
)
456 ldout(cct
,10) << __func__
<< " " << *m
<< dendl
;
458 if (want_bootstrap_config
) {
459 // get_monmap_and_config is waiting for config which it will apply
461 bootstrap_config
= ceph::ref_t
<MConfig
>(m
, false);
462 map_cond
.notify_all();
466 // Take the sledgehammer approach to ensuring we don't depend on
467 // anything in MonClient.
468 boost::asio::post(finish_strand
,
469 [m
, cct
= boost::intrusive_ptr
<CephContext
>(cct
),
470 config_notify_cb
= config_notify_cb
,
471 config_cb
= config_cb
]() {
472 cct
->_conf
.set_mon_vals(cct
.get(), m
->config
, config_cb
);
473 if (config_notify_cb
) {
480 // ----------------------
482 int MonClient::init()
484 ldout(cct
, 10) << __func__
<< dendl
;
486 entity_name
= cct
->_conf
->name
;
488 auth_registry
.refresh_config();
490 std::lock_guard
l(monc_lock
);
491 keyring
.reset(new KeyRing
);
492 if (auth_registry
.is_supported_method(messenger
->get_mytype(),
494 // this should succeed, because auth_registry just checked!
495 int r
= keyring
->from_ceph_context(cct
);
497 // but be somewhat graceful in case there was a race condition
498 lderr(cct
) << "keyring not found" << dendl
;
502 if (!auth_registry
.any_supported_methods(messenger
->get_mytype())) {
506 rotating_secrets
.reset(
507 new RotatingKeyRing(cct
, cct
->get_module_type(), keyring
.get()));
511 messenger
->set_auth_client(this);
512 messenger
->add_dispatcher_head(this);
517 cct
->get_admin_socket()->register_command(
520 "rotate live authentication key");
525 void MonClient::shutdown()
527 ldout(cct
, 10) << __func__
<< dendl
;
529 cct
->get_admin_socket()->unregister_commands(this);
533 while (!version_requests
.empty()) {
534 ceph::async::post(std::move(version_requests
.begin()->second
),
535 monc_errc::shutting_down
, 0, 0);
536 ldout(cct
, 20) << __func__
<< " canceling and discarding version request "
537 << version_requests
.begin()->first
<< dendl
;
538 version_requests
.erase(version_requests
.begin());
540 while (!mon_commands
.empty()) {
541 auto tid
= mon_commands
.begin()->first
;
542 _cancel_mon_command(tid
);
544 ldout(cct
, 20) << __func__
<< " discarding " << waiting_for_session
.size()
545 << " pending message(s)" << dendl
;
546 waiting_for_session
.clear();
549 pending_cons
.clear();
553 authenticate_err
= 0;
554 authenticated
= false;
567 int MonClient::authenticate(double timeout
)
569 std::unique_lock lock
{monc_lock
};
572 ldout(cct
, 5) << "already authenticated" << dendl
;
575 sub
.want("monmap", monmap
.get_epoch() ? monmap
.get_epoch() + 1 : 0, 0);
576 sub
.want("config", 0, 0);
580 auto until
= ceph::mono_clock::now();
581 until
+= ceph::make_timespan(timeout
);
583 ldout(cct
, 10) << "authenticate will time out at " << until
<< dendl
;
584 while (!active_con
&& authenticate_err
>= 0) {
586 auto r
= auth_cond
.wait_until(lock
, until
);
587 if (r
== std::cv_status::timeout
&& !active_con
) {
588 ldout(cct
, 0) << "authenticate timed out after " << timeout
<< dendl
;
589 authenticate_err
= -ETIMEDOUT
;
592 auth_cond
.wait(lock
);
597 ldout(cct
, 5) << __func__
<< " success, global_id "
598 << active_con
->get_global_id() << dendl
;
599 // active_con should not have been set if there was an error
600 ceph_assert(authenticate_err
>= 0);
601 authenticated
= true;
604 if (authenticate_err
< 0 && auth_registry
.no_keyring_disabled_cephx()) {
605 lderr(cct
) << __func__
<< " NOTE: no keyring found; disabled cephx authentication" << dendl
;
608 return authenticate_err
;
612 std::string_view command
,
613 const cmdmap_t
& cmdmap
,
614 const ceph::buffer::list
&inbl
,
617 ceph::buffer::list
& out
)
619 if (command
== "rotate-key") {
622 key
.decode_base64(inbl
.to_str());
623 } catch (buffer::error
& e
) {
624 errss
<< "error decoding key: " << e
.what();
628 ldout(cct
, 1) << "rotate live key for " << entity_name
<< dendl
;
629 keyring
->add(entity_name
, key
);
631 errss
<< "cephx not enabled; no key to rotate";
638 void MonClient::handle_auth(MAuthReply
*m
)
640 ceph_assert(ceph_mutex_is_locked(monc_lock
));
642 if (m
->get_connection()->is_anon()) {
643 // anon connection, used for mon tell commands
644 for (auto& p
: mon_commands
) {
645 if (p
.second
->target_con
== m
->get_connection()) {
646 auto& mc
= p
.second
->target_session
;
647 int ret
= mc
->handle_auth(m
, entity_name
,
648 CEPH_ENTITY_TYPE_MON
,
649 rotating_secrets
.get());
650 (void)ret
; // we don't care
659 std::swap(active_con
->get_auth(), auth
);
660 int ret
= active_con
->authenticate(m
);
662 std::swap(auth
, active_con
->get_auth());
663 if (global_id
!= active_con
->get_global_id()) {
664 lderr(cct
) << __func__
<< " peer assigned me a different global_id: "
665 << active_con
->get_global_id() << dendl
;
667 if (ret
!= -EAGAIN
) {
674 auto found
= _find_pending_con(m
->get_connection());
675 ceph_assert(found
!= pending_cons
.end());
676 int auth_err
= found
->second
.handle_auth(m
, entity_name
, want_keys
,
677 rotating_secrets
.get());
679 if (auth_err
== -EAGAIN
) {
683 pending_cons
.erase(found
);
684 if (!pending_cons
.empty()) {
685 // keep trying with pending connections
688 // the last try just failed, give up.
690 auto& mc
= found
->second
;
691 ceph_assert(mc
.have_session());
692 active_con
.reset(new MonConnection(std::move(mc
)));
693 pending_cons
.clear();
696 _finish_hunting(auth_err
);
697 _finish_auth(auth_err
);
700 void MonClient::_finish_auth(int auth_err
)
702 ldout(cct
,10) << __func__
<< " " << auth_err
<< dendl
;
703 authenticate_err
= auth_err
;
704 // _resend_mon_commands() could _reopen_session() if the connected mon is not
705 // the one the MonCommand is targeting.
706 if (!auth_err
&& active_con
) {
708 _check_auth_tickets();
709 } else if (auth_err
== -EAGAIN
&& !active_con
) {
710 ldout(cct
,10) << __func__
711 << " auth returned EAGAIN, reopening the session to try again"
715 auth_cond
.notify_all();
720 void MonClient::send_mon_message(MessageRef m
)
722 std::lock_guard l
{monc_lock
};
723 _send_mon_message(std::move(m
));
726 void MonClient::_send_mon_message(MessageRef m
)
728 ceph_assert(ceph_mutex_is_locked(monc_lock
));
730 auto cur_con
= active_con
->get_con();
731 ldout(cct
, 10) << "_send_mon_message to mon."
732 << monmap
.get_name(cur_con
->get_peer_addr())
733 << " at " << cur_con
->get_peer_addr() << dendl
;
734 cur_con
->send_message2(std::move(m
));
736 waiting_for_session
.push_back(std::move(m
));
740 void MonClient::_reopen_session(int rank
)
742 ceph_assert(ceph_mutex_is_locked(monc_lock
));
743 ldout(cct
, 10) << __func__
<< " rank " << rank
<< dendl
;
746 pending_cons
.clear();
748 authenticate_err
= 1; // == in progress
758 // throw out old queued messages
759 waiting_for_session
.clear();
761 // throw out version check requests
762 while (!version_requests
.empty()) {
763 ceph::async::post(std::move(version_requests
.begin()->second
),
764 monc_errc::session_reset
, 0, 0);
765 version_requests
.erase(version_requests
.begin());
768 for (auto& c
: pending_cons
) {
769 c
.second
.start(monmap
.get_epoch(), entity_name
);
777 void MonClient::_add_conn(unsigned rank
)
779 auto peer
= monmap
.get_addrs(rank
);
780 auto conn
= messenger
->connect_to_mon(peer
);
781 MonConnection
mc(cct
, conn
, global_id
, &auth_registry
);
783 mc
.get_auth().reset(auth
->clone());
785 pending_cons
.insert(std::make_pair(peer
, std::move(mc
)));
786 ldout(cct
, 10) << "picked mon." << monmap
.get_name(rank
)
792 void MonClient::_add_conns()
794 // collect the next batch of candidates who are listed right next to the ones
796 auto get_next_batch
= [this]() -> std::vector
<unsigned> {
797 std::multimap
<uint16_t, unsigned> ranks_by_priority
;
799 monmap
.mon_info
| boost::adaptors::filtered(
801 auto rank
= monmap
.get_rank(info
.first
);
802 return tried
.count(rank
) == 0;
803 }) | boost::adaptors::transformed(
805 auto rank
= monmap
.get_rank(info
.first
);
806 return std::make_pair(info
.second
.priority
, rank
);
807 }), std::inserter(ranks_by_priority
, end(ranks_by_priority
)));
808 if (ranks_by_priority
.empty()) {
811 // only choose the monitors with lowest priority
812 auto cands
= boost::make_iterator_range(
813 ranks_by_priority
.equal_range(ranks_by_priority
.begin()->first
));
814 std::vector
<unsigned> ranks
;
815 boost::range::copy(cands
| boost::adaptors::map_values
,
816 std::back_inserter(ranks
));
819 auto ranks
= get_next_batch();
821 tried
.clear(); // start over
822 ranks
= get_next_batch();
824 ceph_assert(!ranks
.empty());
825 if (ranks
.size() > 1) {
826 std::vector
<uint16_t> weights
;
827 for (auto i
: ranks
) {
828 auto rank_name
= monmap
.get_name(i
);
829 weights
.push_back(monmap
.get_weight(rank_name
));
832 if (std::accumulate(begin(weights
), end(weights
), 0u) == 0) {
833 std::shuffle(begin(ranks
), end(ranks
), std::mt19937
{rd()});
835 weighted_shuffle(begin(ranks
), end(ranks
), begin(weights
), end(weights
),
839 ldout(cct
, 10) << __func__
<< " ranks=" << ranks
<< dendl
;
840 unsigned n
= cct
->_conf
->mon_client_hunt_parallel
;
841 if (n
== 0 || n
> ranks
.size()) {
844 for (unsigned i
= 0; i
< n
; i
++) {
846 tried
.insert(ranks
[i
]);
850 bool MonClient::ms_handle_reset(Connection
*con
)
852 std::lock_guard
lock(monc_lock
);
854 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_MON
)
857 if (con
->is_anon()) {
858 auto p
= mon_commands
.begin();
859 while (p
!= mon_commands
.end()) {
860 auto cmd
= p
->second
;
862 if (cmd
->target_con
== con
) {
863 _send_command(cmd
); // may retry or fail
871 if (pending_cons
.count(con
->get_peer_addrs())) {
872 ldout(cct
, 10) << __func__
<< " hunted mon " << con
->get_peer_addrs()
875 ldout(cct
, 10) << __func__
<< " stray mon " << con
->get_peer_addrs()
880 if (active_con
&& con
== active_con
->get_con()) {
881 ldout(cct
, 10) << __func__
<< " current mon " << con
->get_peer_addrs()
886 ldout(cct
, 10) << "ms_handle_reset stray mon " << con
->get_peer_addrs()
893 bool MonClient::_opened() const
895 ceph_assert(ceph_mutex_is_locked(monc_lock
));
896 return active_con
|| _hunting();
899 bool MonClient::_hunting() const
901 return !pending_cons
.empty();
904 void MonClient::_start_hunting()
906 ceph_assert(!_hunting());
907 // adjust timeouts if necessary
908 if (!had_a_connection
)
910 reopen_interval_multiplier
*= cct
->_conf
->mon_client_hunt_interval_backoff
;
911 if (reopen_interval_multiplier
>
912 cct
->_conf
->mon_client_hunt_interval_max_multiple
) {
913 reopen_interval_multiplier
=
914 cct
->_conf
->mon_client_hunt_interval_max_multiple
;
918 void MonClient::_finish_hunting(int auth_err
)
920 ldout(cct
,10) << __func__
<< " " << auth_err
<< dendl
;
921 ceph_assert(ceph_mutex_is_locked(monc_lock
));
922 // the pending conns have been cleaned.
923 ceph_assert(!_hunting());
925 auto con
= active_con
->get_con();
926 ldout(cct
, 1) << "found mon."
927 << monmap
.get_name(con
->get_peer_addr())
930 ldout(cct
, 1) << "no mon sessions established" << dendl
;
933 had_a_connection
= true;
937 last_rotating_renew_sent
= utime_t();
938 while (!waiting_for_session
.empty()) {
939 _send_mon_message(std::move(waiting_for_session
.front()));
940 waiting_for_session
.pop_front();
942 _resend_mon_commands();
945 auth
= std::move(active_con
->get_auth());
946 if (global_id
&& global_id
!= active_con
->get_global_id()) {
947 lderr(cct
) << __func__
<< " global_id changed from " << global_id
948 << " to " << active_con
->get_global_id() << dendl
;
950 global_id
= active_con
->get_global_id();
955 void MonClient::tick()
957 ldout(cct
, 10) << __func__
<< dendl
;
959 utime_t now
= ceph_clock_now();
961 auto reschedule_tick
= make_scope_guard([this] {
965 _check_auth_tickets();
966 _check_tell_commands();
969 ldout(cct
, 1) << "continuing hunt" << dendl
;
970 return _reopen_session();
971 } else if (active_con
) {
972 // just renew as needed
973 auto cur_con
= active_con
->get_con();
974 if (!cur_con
->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB
)) {
975 const bool maybe_renew
= sub
.need_renew();
976 ldout(cct
, 10) << "renew subs? -- " << (maybe_renew
? "yes" : "no")
983 if (now
> last_keepalive
+ cct
->_conf
->mon_client_ping_interval
) {
984 cur_con
->send_keepalive();
985 last_keepalive
= now
;
987 if (cct
->_conf
->mon_client_ping_timeout
> 0 &&
988 cur_con
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
989 utime_t lk
= cur_con
->get_last_keepalive_ack();
990 utime_t interval
= now
- lk
;
991 if (interval
> cct
->_conf
->mon_client_ping_timeout
) {
992 ldout(cct
, 1) << "no keepalive since " << lk
<< " (" << interval
993 << " seconds), reconnecting" << dendl
;
994 return _reopen_session();
1001 if (now
> last_send_log
+ cct
->_conf
->mon_client_log_interval
) {
1003 last_send_log
= now
;
1008 void MonClient::_un_backoff()
1010 // un-backoff our reconnect interval
1011 reopen_interval_multiplier
= std::max(
1012 cct
->_conf
.get_val
<double>("mon_client_hunt_interval_min_multiple"),
1013 reopen_interval_multiplier
/
1014 cct
->_conf
.get_val
<double>("mon_client_hunt_interval_backoff"));
1015 ldout(cct
, 20) << __func__
<< " reopen_interval_multipler now "
1016 << reopen_interval_multiplier
<< dendl
;
1019 void MonClient::schedule_tick()
1021 auto do_tick
= make_lambda_context([this](int) { tick(); });
1022 if (!is_connected()) {
1023 // start another round of hunting
1024 const auto hunt_interval
= (cct
->_conf
->mon_client_hunt_interval
*
1025 reopen_interval_multiplier
);
1026 timer
.add_event_after(hunt_interval
, do_tick
);
1029 timer
.add_event_after(std::min(cct
->_conf
->mon_client_ping_interval
,
1030 cct
->_conf
->mon_client_log_interval
),
1037 void MonClient::_renew_subs()
1039 ceph_assert(ceph_mutex_is_locked(monc_lock
));
1040 if (!sub
.have_new()) {
1041 ldout(cct
, 10) << __func__
<< " - empty" << dendl
;
1045 ldout(cct
, 10) << __func__
<< dendl
;
1049 auto m
= ceph::make_message
<MMonSubscribe
>();
1050 m
->what
= sub
.get_subs();
1051 m
->hostname
= ceph_get_short_hostname();
1052 _send_mon_message(std::move(m
));
1057 void MonClient::handle_subscribe_ack(MMonSubscribeAck
*m
)
1059 sub
.acked(m
->interval
);
1063 int MonClient::_check_auth_tickets()
1065 ldout(cct
, 10) << __func__
<< dendl
;
1066 ceph_assert(ceph_mutex_is_locked(monc_lock
));
1067 if (active_con
&& auth
) {
1068 if (auth
->need_tickets()) {
1069 ldout(cct
, 10) << __func__
<< " getting new tickets!" << dendl
;
1070 auto m
= ceph::make_message
<MAuth
>();
1071 m
->protocol
= auth
->get_protocol();
1072 auth
->prepare_build_request();
1073 auth
->build_request(m
->auth_payload
);
1074 _send_mon_message(m
);
1077 _check_auth_rotating();
1082 int MonClient::_check_auth_rotating()
1084 ceph_assert(ceph_mutex_is_locked(monc_lock
));
1085 if (!rotating_secrets
||
1086 !auth_principal_needs_rotating_keys(entity_name
)) {
1087 ldout(cct
, 20) << "_check_auth_rotating not needed by " << entity_name
<< dendl
;
1091 if (!active_con
|| !auth
) {
1092 ldout(cct
, 10) << "_check_auth_rotating waiting for auth session" << dendl
;
1096 utime_t now
= ceph_clock_now();
1097 utime_t cutoff
= now
;
1098 cutoff
-= std::min(30.0, cct
->_conf
->auth_service_ticket_ttl
/ 4.0);
1099 utime_t issued_at_lower_bound
= now
;
1100 issued_at_lower_bound
-= cct
->_conf
->auth_service_ticket_ttl
;
1101 if (!rotating_secrets
->need_new_secrets(cutoff
)) {
1102 ldout(cct
, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff
<< ")" << dendl
;
1103 rotating_secrets
->dump_rotating();
1107 ldout(cct
, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff
<< ")" << dendl
;
1108 if (!rotating_secrets
->need_new_secrets() &&
1109 rotating_secrets
->need_new_secrets(issued_at_lower_bound
)) {
1110 // the key has expired before it has been issued?
1111 lderr(cct
) << __func__
<< " possible clock skew, rotating keys expired way too early"
1112 << " (before " << issued_at_lower_bound
<< ")" << dendl
;
1114 if ((now
> last_rotating_renew_sent
) &&
1115 double(now
- last_rotating_renew_sent
) < 1) {
1116 ldout(cct
, 10) << __func__
<< " called too often (last: "
1117 << last_rotating_renew_sent
<< "), skipping refresh" << dendl
;
1120 auto m
= ceph::make_message
<MAuth
>();
1121 m
->protocol
= auth
->get_protocol();
1122 if (auth
->build_rotating_request(m
->auth_payload
)) {
1123 last_rotating_renew_sent
= now
;
1124 _send_mon_message(std::move(m
));
1129 int MonClient::wait_auth_rotating(double timeout
)
1131 std::unique_lock
l(monc_lock
);
1133 // Must be initialized
1134 ceph_assert(auth
!= nullptr);
1136 if (auth
->get_protocol() == CEPH_AUTH_NONE
)
1139 if (!rotating_secrets
)
1142 ldout(cct
, 10) << __func__
<< " waiting for " << timeout
<< dendl
;
1143 utime_t cutoff
= ceph_clock_now();
1144 cutoff
-= std::min(30.0, cct
->_conf
->auth_service_ticket_ttl
/ 4.0);
1145 if (auth_cond
.wait_for(l
, ceph::make_timespan(timeout
), [this, cutoff
] {
1146 return (!auth_principal_needs_rotating_keys(entity_name
) ||
1147 !rotating_secrets
->need_new_secrets(cutoff
));
1149 ldout(cct
, 10) << __func__
<< " done" << dendl
;
1152 ldout(cct
, 0) << __func__
<< " timed out after " << timeout
<< dendl
;
1159 void MonClient::_send_command(MonCommand
*r
)
1163 if (r
->send_attempts
> cct
->_conf
->mon_client_directed_command_retry
) {
1164 _finish_command(r
, monc_errc::mon_unavailable
, "mon unavailable", {});
1167 // tell-style command
1168 if (monmap
.min_mon_release
>= ceph_release_t::octopus
) {
1169 if (r
->target_con
) {
1170 r
->target_con
->mark_down();
1172 if (r
->target_rank
>= 0) {
1173 if (r
->target_rank
>= (int)monmap
.size()) {
1174 ldout(cct
, 10) << " target " << r
->target_rank
1175 << " >= max mon " << monmap
.size() << dendl
;
1176 _finish_command(r
, monc_errc::rank_dne
, "mon rank dne"sv
, {});
1179 r
->target_con
= messenger
->connect_to_mon(
1180 monmap
.get_addrs(r
->target_rank
), true /* anon */);
1182 if (!monmap
.contains(r
->target_name
)) {
1183 ldout(cct
, 10) << " target " << r
->target_name
1184 << " not present in monmap" << dendl
;
1185 _finish_command(r
, monc_errc::mon_dne
, "mon dne"sv
, {});
1188 r
->target_con
= messenger
->connect_to_mon(
1189 monmap
.get_addrs(r
->target_name
), true /* anon */);
1192 r
->target_session
.reset(new MonConnection(cct
, r
->target_con
, 0,
1194 r
->target_session
->start(monmap
.get_epoch(), entity_name
);
1195 r
->last_send_attempt
= ceph_clock_now();
1197 MCommand
*m
= new MCommand(monmap
.fsid
);
1200 m
->set_data(r
->inbl
);
1201 r
->target_session
->queue_command(m
);
1205 // ugly legacy handling of pre-octopus mons
1208 peer
= active_con
->get_con()->get_peer_addr();
1211 if (r
->target_rank
>= 0 &&
1212 r
->target_rank
!= monmap
.get_rank(peer
)) {
1213 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
1214 << " wants rank " << r
->target_rank
1215 << ", reopening session"
1217 if (r
->target_rank
>= (int)monmap
.size()) {
1218 ldout(cct
, 10) << " target " << r
->target_rank
1219 << " >= max mon " << monmap
.size() << dendl
;
1220 _finish_command(r
, monc_errc::rank_dne
, "mon rank dne"sv
, {});
1223 _reopen_session(r
->target_rank
);
1226 if (r
->target_name
.length() &&
1227 r
->target_name
!= monmap
.get_name(peer
)) {
1228 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
1229 << " wants mon " << r
->target_name
1230 << ", reopening session"
1232 if (!monmap
.contains(r
->target_name
)) {
1233 ldout(cct
, 10) << " target " << r
->target_name
1234 << " not present in monmap" << dendl
;
1235 _finish_command(r
, monc_errc::mon_dne
, "mon dne"sv
, {});
1238 _reopen_session(monmap
.get_rank(r
->target_name
));
1241 // fall-thru to send 'normal' CLI command
1244 // normal CLI command
1245 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
1246 auto m
= ceph::make_message
<MMonCommand
>(monmap
.fsid
);
1249 m
->set_data(r
->inbl
);
1250 _send_mon_message(std::move(m
));
1254 void MonClient::_check_tell_commands()
1256 // resend any requests
1257 auto now
= ceph_clock_now();
1258 auto p
= mon_commands
.begin();
1259 while (p
!= mon_commands
.end()) {
1260 auto cmd
= p
->second
;
1262 if (cmd
->is_tell() &&
1263 cmd
->last_send_attempt
!= utime_t() &&
1264 now
- cmd
->last_send_attempt
> cct
->_conf
->mon_client_hunt_interval
) {
1265 ldout(cct
,5) << __func__
<< " timeout tell command " << cmd
->tid
<< dendl
;
1266 _send_command(cmd
); // might remove cmd from mon_commands
1271 void MonClient::_resend_mon_commands()
1273 // resend any requests
1274 auto p
= mon_commands
.begin();
1275 while (p
!= mon_commands
.end()) {
1276 auto cmd
= p
->second
;
1278 if (cmd
->is_tell() && monmap
.min_mon_release
>= ceph_release_t::octopus
) {
1279 // starting with octopus, tell commands use their own connetion and need no
1280 // special resend when we finish hunting.
1282 _send_command(cmd
); // might remove cmd from mon_commands
1287 void MonClient::handle_mon_command_ack(MMonCommandAck
*ack
)
1289 MonCommand
*r
= NULL
;
1290 uint64_t tid
= ack
->get_tid();
1292 if (tid
== 0 && !mon_commands
.empty()) {
1293 r
= mon_commands
.begin()->second
;
1294 ldout(cct
, 10) << __func__
<< " has tid 0, assuming it is " << r
->tid
<< dendl
;
1296 auto p
= mon_commands
.find(tid
);
1297 if (p
== mon_commands
.end()) {
1298 ldout(cct
, 10) << __func__
<< " " << ack
->get_tid() << " not found" << dendl
;
1305 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
1306 auto ec
= ack
->r
< 0 ? bs::error_code(-ack
->r
, mon_category())
1308 _finish_command(r
, ec
, ack
->rs
,
1309 std::move(ack
->get_data()));
1313 void MonClient::handle_command_reply(MCommandReply
*reply
)
1315 MonCommand
*r
= NULL
;
1316 uint64_t tid
= reply
->get_tid();
1318 if (tid
== 0 && !mon_commands
.empty()) {
1319 r
= mon_commands
.begin()->second
;
1320 ldout(cct
, 10) << __func__
<< " has tid 0, assuming it is " << r
->tid
1323 auto p
= mon_commands
.find(tid
);
1324 if (p
== mon_commands
.end()) {
1325 ldout(cct
, 10) << __func__
<< " " << reply
->get_tid() << " not found"
1333 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " " << r
->cmd
<< dendl
;
1334 auto ec
= reply
->r
< 0 ? bs::error_code(-reply
->r
, mon_category())
1336 _finish_command(r
, ec
, reply
->rs
, std::move(reply
->get_data()));
1340 int MonClient::_cancel_mon_command(uint64_t tid
)
1342 ceph_assert(ceph_mutex_is_locked(monc_lock
));
1344 auto it
= mon_commands
.find(tid
);
1345 if (it
== mon_commands
.end()) {
1346 ldout(cct
, 10) << __func__
<< " tid " << tid
<< " dne" << dendl
;
1350 ldout(cct
, 10) << __func__
<< " tid " << tid
<< dendl
;
1352 MonCommand
*cmd
= it
->second
;
1353 _finish_command(cmd
, monc_errc::timed_out
, "timed out"sv
, {});
1357 void MonClient::_finish_command(MonCommand
*r
, bs::error_code ret
,
1358 std::string_view rs
, ceph::buffer::list
&& bl
)
1360 ldout(cct
, 10) << __func__
<< " " << r
->tid
<< " = " << ret
<< " " << rs
1362 ceph::async::post(std::move(r
->onfinish
), ret
, std::string(rs
),
1364 if (r
->target_con
) {
1365 r
->target_con
->mark_down();
1367 mon_commands
.erase(r
->tid
);
1373 void MonClient::handle_get_version_reply(MMonGetVersionReply
* m
)
1375 ceph_assert(ceph_mutex_is_locked(monc_lock
));
1376 auto iter
= version_requests
.find(m
->handle
);
1377 if (iter
== version_requests
.end()) {
1378 ldout(cct
, 0) << __func__
<< " version request with handle " << m
->handle
1379 << " not found" << dendl
;
1381 auto req
= std::move(iter
->second
);
1382 ldout(cct
, 10) << __func__
<< " finishing " << iter
->first
<< " version "
1383 << m
->version
<< dendl
;
1384 version_requests
.erase(iter
);
1385 ceph::async::post(std::move(req
), bs::error_code(),
1386 m
->version
, m
->oldest_version
);
1391 int MonClient::get_auth_request(
1393 AuthConnectionMeta
*auth_meta
,
1394 uint32_t *auth_method
,
1395 std::vector
<uint32_t> *preferred_modes
,
1396 ceph::buffer::list
*bl
)
1398 std::lock_guard
l(monc_lock
);
1399 ldout(cct
,10) << __func__
<< " con " << con
<< " auth_method " << *auth_method
1402 // connection to mon?
1403 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
1404 ceph_assert(!auth_meta
->authorizer
);
1405 if (con
->is_anon()) {
1406 for (auto& i
: mon_commands
) {
1407 if (i
.second
->target_con
== con
) {
1408 return i
.second
->target_session
->get_auth_request(
1409 auth_method
, preferred_modes
, bl
,
1410 entity_name
, want_keys
, rotating_secrets
.get());
1414 for (auto& i
: pending_cons
) {
1415 if (i
.second
.is_con(con
)) {
1416 return i
.second
.get_auth_request(
1417 auth_method
, preferred_modes
, bl
,
1418 entity_name
, want_keys
, rotating_secrets
.get());
1424 // generate authorizer
1426 lderr(cct
) << __func__
<< " but no auth handler is set up" << dendl
;
1429 auth_meta
->authorizer
.reset(auth
->build_authorizer(con
->get_peer_type()));
1430 if (!auth_meta
->authorizer
) {
1431 lderr(cct
) << __func__
<< " failed to build_authorizer for type "
1432 << ceph_entity_type_name(con
->get_peer_type()) << dendl
;
1435 auth_meta
->auth_method
= auth_meta
->authorizer
->protocol
;
1436 auth_registry
.get_supported_modes(con
->get_peer_type(),
1437 auth_meta
->auth_method
,
1439 *bl
= auth_meta
->authorizer
->bl
;
1443 int MonClient::handle_auth_reply_more(
1445 AuthConnectionMeta
*auth_meta
,
1446 const ceph::buffer::list
& bl
,
1447 ceph::buffer::list
*reply
)
1449 std::lock_guard
l(monc_lock
);
1451 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
1452 if (con
->is_anon()) {
1453 for (auto& i
: mon_commands
) {
1454 if (i
.second
->target_con
== con
) {
1455 return i
.second
->target_session
->handle_auth_reply_more(
1456 auth_meta
, bl
, reply
);
1460 for (auto& i
: pending_cons
) {
1461 if (i
.second
.is_con(con
)) {
1462 return i
.second
.handle_auth_reply_more(auth_meta
, bl
, reply
);
1468 // authorizer challenges
1469 if (!auth
|| !auth_meta
->authorizer
) {
1470 lderr(cct
) << __func__
<< " no authorizer?" << dendl
;
1473 auth_meta
->authorizer
->add_challenge(cct
, bl
);
1474 *reply
= auth_meta
->authorizer
->bl
;
1478 int MonClient::handle_auth_done(
1480 AuthConnectionMeta
*auth_meta
,
1483 const ceph::buffer::list
& bl
,
1484 CryptoKey
*session_key
,
1485 std::string
*connection_secret
)
1487 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
1488 std::lock_guard
l(monc_lock
);
1489 if (con
->is_anon()) {
1490 for (auto& i
: mon_commands
) {
1491 if (i
.second
->target_con
== con
) {
1492 return i
.second
->target_session
->handle_auth_done(
1493 auth_meta
, global_id
, bl
,
1494 session_key
, connection_secret
);
1498 for (auto& i
: pending_cons
) {
1499 if (i
.second
.is_con(con
)) {
1500 int r
= i
.second
.handle_auth_done(
1501 auth_meta
, global_id
, bl
,
1502 session_key
, connection_secret
);
1504 pending_cons
.erase(i
.first
);
1505 if (!pending_cons
.empty()) {
1509 active_con
.reset(new MonConnection(std::move(i
.second
)));
1510 pending_cons
.clear();
1511 ceph_assert(active_con
->have_session());
1515 if (r
|| monmap
.get_epoch() > 0) {
1523 // verify authorizer reply
1524 auto p
= bl
.begin();
1525 if (!auth_meta
->authorizer
->verify_reply(p
, &auth_meta
->connection_secret
)) {
1526 ldout(cct
, 0) << __func__
<< " failed verifying authorizer reply"
1530 auth_meta
->session_key
= auth_meta
->authorizer
->session_key
;
1535 int MonClient::handle_auth_bad_method(
1537 AuthConnectionMeta
*auth_meta
,
1538 uint32_t old_auth_method
,
1540 const std::vector
<uint32_t>& allowed_methods
,
1541 const std::vector
<uint32_t>& allowed_modes
)
1543 auth_meta
->allowed_methods
= allowed_methods
;
1545 std::lock_guard
l(monc_lock
);
1546 if (con
->get_peer_type() == CEPH_ENTITY_TYPE_MON
) {
1547 if (con
->is_anon()) {
1548 for (auto& i
: mon_commands
) {
1549 if (i
.second
->target_con
== con
) {
1550 int r
= i
.second
->target_session
->handle_auth_bad_method(
1556 auto ec
= bs::error_code(-r
, mon_category());
1557 _finish_command(i
.second
, ec
, "auth failed"sv
, {});
1563 for (auto& i
: pending_cons
) {
1564 if (i
.second
.is_con(con
)) {
1565 int r
= i
.second
.handle_auth_bad_method(old_auth_method
,
1570 return r
; // try another method on this con
1572 pending_cons
.erase(i
.first
);
1573 if (!pending_cons
.empty()) {
1574 return r
; // fail this con, maybe another con will succeed
1585 ldout(cct
,10) << __func__
<< " hmm, they didn't like " << old_auth_method
1586 << " result " << cpp_strerror(result
)
1587 << " and auth is " << (auth
? auth
->get_protocol() : 0)
1593 int MonClient::handle_auth_request(
1595 AuthConnectionMeta
*auth_meta
,
1597 uint32_t auth_method
,
1598 const ceph::buffer::list
& payload
,
1599 ceph::buffer::list
*reply
)
1601 if (payload
.length() == 0) {
1602 // for some channels prior to nautilus (osd heartbeat), we
1603 // tolerate the lack of an authorizer.
1604 if (!con
->get_messenger()->require_authorizer
) {
1605 handle_authentication_dispatcher
->ms_handle_fast_authentication(con
);
1610 auth_meta
->auth_mode
= payload
[0];
1611 if (auth_meta
->auth_mode
< AUTH_MODE_AUTHORIZER
||
1612 auth_meta
->auth_mode
> AUTH_MODE_AUTHORIZER_MAX
) {
1615 AuthAuthorizeHandler
*ah
= get_auth_authorize_handler(con
->get_peer_type(),
1618 lderr(cct
) << __func__
<< " no AuthAuthorizeHandler found for auth method "
1619 << auth_method
<< dendl
;
1623 auto ac
= &auth_meta
->authorizer_challenge
;
1624 if (auth_meta
->skip_authorizer_challenge
) {
1625 ldout(cct
, 10) << __func__
<< " skipping challenge on " << con
<< dendl
;
1629 bool was_challenge
= (bool)auth_meta
->authorizer_challenge
;
1630 bool isvalid
= ah
->verify_authorizer(
1634 auth_meta
->get_connection_secret_length(),
1637 &con
->peer_global_id
,
1638 &con
->peer_caps_info
,
1639 &auth_meta
->session_key
,
1640 &auth_meta
->connection_secret
,
1643 handle_authentication_dispatcher
->ms_handle_fast_authentication(con
);
1646 if (!more
&& !was_challenge
&& auth_meta
->authorizer_challenge
) {
1647 ldout(cct
,10) << __func__
<< " added challenge on " << con
<< dendl
;
1650 ldout(cct
,10) << __func__
<< " bad authorizer on " << con
<< dendl
;
1651 // discard old challenge
1652 auth_meta
->authorizer_challenge
.reset();
1656 AuthAuthorizer
* MonClient::build_authorizer(int service_id
) const {
1657 std::lock_guard
l(monc_lock
);
1659 return auth
->build_authorizer(service_id
);
1661 ldout(cct
, 0) << __func__
<< " for " << ceph_entity_type_name(service_id
)
1662 << ", but no auth is available now" << dendl
;
1667 #define dout_subsys ceph_subsys_monc
1669 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1671 MonConnection::MonConnection(
1672 CephContext
*cct
, ConnectionRef con
, uint64_t global_id
,
1674 : cct(cct
), con(con
), global_id(global_id
), auth_registry(ar
)
1677 MonConnection::~MonConnection()
1685 bool MonConnection::have_session() const
1687 return state
== State::HAVE_SESSION
;
1690 void MonConnection::start(epoch_t epoch
,
1691 const EntityName
& entity_name
)
1694 auth_start
= ceph_clock_now();
1696 if (con
->get_peer_addr().is_msgr2()) {
1697 ldout(cct
, 10) << __func__
<< " opening mon connection" << dendl
;
1698 state
= State::AUTHENTICATING
;
1699 con
->send_message(new MMonGetMap());
1703 // restart authentication handshake
1704 state
= State::NEGOTIATING
;
1706 // send an initial keepalive to ensure our timestamp is valid by the
1707 // time we are in an OPENED state (by sequencing this before
1709 con
->send_keepalive();
1712 m
->protocol
= CEPH_AUTH_UNKNOWN
;
1713 m
->monmap_epoch
= epoch
;
1715 encode(struct_v
, m
->auth_payload
);
1716 std::vector
<uint32_t> auth_supported
;
1717 auth_registry
->get_supported_methods(con
->get_peer_type(), &auth_supported
);
1718 encode(auth_supported
, m
->auth_payload
);
1719 encode(entity_name
, m
->auth_payload
);
1720 encode(global_id
, m
->auth_payload
);
1721 con
->send_message(m
);
1724 int MonConnection::get_auth_request(
1726 std::vector
<uint32_t> *preferred_modes
,
1727 ceph::buffer::list
*bl
,
1728 const EntityName
& entity_name
,
1730 RotatingKeyRing
* keyring
)
1734 if (auth_method
< 0) {
1735 std::vector
<uint32_t> as
;
1736 auth_registry
->get_supported_methods(con
->get_peer_type(), &as
);
1740 auth_method
= as
.front();
1742 *method
= auth_method
;
1743 auth_registry
->get_supported_modes(con
->get_peer_type(), auth_method
,
1745 ldout(cct
,10) << __func__
<< " method " << *method
1746 << " preferred_modes " << *preferred_modes
<< dendl
;
1747 if (preferred_modes
->empty()) {
1751 int r
= _init_auth(*method
, entity_name
, want_keys
, keyring
, true);
1752 ceph_assert(r
== 0);
1754 // initial requset includes some boilerplate...
1755 encode((char)AUTH_MODE_MON
, *bl
);
1756 encode(entity_name
, *bl
);
1757 encode(global_id
, *bl
);
1759 // and (maybe) some method-specific initial payload
1760 auth
->build_initial_request(bl
);
1765 int MonConnection::handle_auth_reply_more(
1766 AuthConnectionMeta
*auth_meta
,
1767 const ceph::buffer::list
& bl
,
1768 ceph::buffer::list
*reply
)
1770 ldout(cct
, 10) << __func__
<< " payload " << bl
.length() << dendl
;
1771 ldout(cct
, 30) << __func__
<< " got\n";
1775 auto p
= bl
.cbegin();
1776 ldout(cct
, 10) << __func__
<< " payload_len " << bl
.length() << dendl
;
1777 int r
= auth
->handle_response(0, p
, &auth_meta
->session_key
,
1778 &auth_meta
->connection_secret
);
1780 auth
->prepare_build_request();
1781 auth
->build_request(*reply
);
1782 ldout(cct
, 10) << __func__
<< " responding with " << reply
->length()
1783 << " bytes" << dendl
;
1786 lderr(cct
) << __func__
<< " handle_response returned " << r
<< dendl
;
1788 ldout(cct
, 10) << __func__
<< " authenticated!" << dendl
;
1790 ceph_abort(cct
, "write me");
1795 int MonConnection::handle_auth_done(
1796 AuthConnectionMeta
*auth_meta
,
1797 uint64_t new_global_id
,
1798 const ceph::buffer::list
& bl
,
1799 CryptoKey
*session_key
,
1800 std::string
*connection_secret
)
1802 ldout(cct
,10) << __func__
<< " global_id " << new_global_id
1803 << " payload " << bl
.length()
1805 global_id
= new_global_id
;
1806 auth
->set_global_id(global_id
);
1807 auto p
= bl
.begin();
1808 int auth_err
= auth
->handle_response(0, p
, &auth_meta
->session_key
,
1809 &auth_meta
->connection_secret
);
1810 if (auth_err
>= 0) {
1811 state
= State::HAVE_SESSION
;
1813 con
->set_last_keepalive_ack(auth_start
);
1815 if (pending_tell_command
) {
1816 con
->send_message2(std::move(pending_tell_command
));
1821 int MonConnection::handle_auth_bad_method(
1822 uint32_t old_auth_method
,
1824 const std::vector
<uint32_t>& allowed_methods
,
1825 const std::vector
<uint32_t>& allowed_modes
)
1827 ldout(cct
,10) << __func__
<< " old_auth_method " << old_auth_method
1828 << " result " << cpp_strerror(result
)
1829 << " allowed_methods " << allowed_methods
<< dendl
;
1830 std::vector
<uint32_t> auth_supported
;
1831 auth_registry
->get_supported_methods(con
->get_peer_type(), &auth_supported
);
1832 auto p
= std::find(auth_supported
.begin(), auth_supported
.end(),
1834 assert(p
!= auth_supported
.end());
1835 p
= std::find_first_of(std::next(p
), auth_supported
.end(),
1836 allowed_methods
.begin(), allowed_methods
.end());
1837 if (p
== auth_supported
.end()) {
1838 lderr(cct
) << __func__
<< " server allowed_methods " << allowed_methods
1839 << " but i only support " << auth_supported
<< dendl
;
1843 ldout(cct
,10) << __func__
<< " will try " << auth_method
<< " next" << dendl
;
1847 int MonConnection::handle_auth(MAuthReply
* m
,
1848 const EntityName
& entity_name
,
1850 RotatingKeyRing
* keyring
)
1852 if (state
== State::NEGOTIATING
) {
1853 int r
= _negotiate(m
, entity_name
, want_keys
, keyring
);
1857 state
= State::AUTHENTICATING
;
1859 int r
= authenticate(m
);
1861 state
= State::HAVE_SESSION
;
1866 int MonConnection::_negotiate(MAuthReply
*m
,
1867 const EntityName
& entity_name
,
1869 RotatingKeyRing
* keyring
)
1871 ldout(cct
, 10) << __func__
<< dendl
;
1872 int r
= _init_auth(m
->protocol
, entity_name
, want_keys
, keyring
, false);
1873 if (r
== -ENOTSUP
) {
1874 if (m
->result
== -ENOTSUP
) {
1875 ldout(cct
, 10) << "none of our auth protocols are supported by the server"
1883 int MonConnection::_init_auth(
1885 const EntityName
& entity_name
,
1887 RotatingKeyRing
* keyring
,
1890 ldout(cct
, 10) << __func__
<< " method " << method
<< dendl
;
1891 if (auth
&& auth
->get_protocol() == (int)method
) {
1892 ldout(cct
, 10) << __func__
<< " already have auth, reseting" << dendl
;
1897 ldout(cct
, 10) << __func__
<< " creating new auth" << dendl
;
1898 auth
.reset(AuthClientHandler::create(cct
, method
, keyring
));
1900 ldout(cct
, 10) << " no handler for protocol " << method
<< dendl
;
1904 // do not request MGR key unless the mon has the SERVER_KRAKEN
1905 // feature. otherwise it will give us an auth error. note that
1906 // we have to use the FEATUREMASK because pre-jewel the kraken
1907 // feature bit was used for something else.
1909 (want_keys
& CEPH_ENTITY_TYPE_MGR
) &&
1910 !(con
->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN
))) {
1911 ldout(cct
, 1) << __func__
1912 << " not requesting MGR keys from pre-kraken monitor"
1914 want_keys
&= ~CEPH_ENTITY_TYPE_MGR
;
1916 auth
->set_want_keys(want_keys
);
1917 auth
->init(entity_name
);
1918 auth
->set_global_id(global_id
);
1922 int MonConnection::authenticate(MAuthReply
*m
)
1925 if (!m
->global_id
) {
1926 ldout(cct
, 1) << "peer sent an invalid global_id" << dendl
;
1928 if (m
->global_id
!= global_id
) {
1929 // it's a new session
1931 global_id
= m
->global_id
;
1932 auth
->set_global_id(global_id
);
1933 ldout(cct
, 10) << "my global_id is " << m
->global_id
<< dendl
;
1935 auto p
= m
->result_bl
.cbegin();
1936 int ret
= auth
->handle_response(m
->result
, p
, nullptr, nullptr);
1937 if (ret
== -EAGAIN
) {
1938 auto ma
= new MAuth
;
1939 ma
->protocol
= auth
->get_protocol();
1940 auth
->prepare_build_request();
1941 auth
->build_request(ma
->auth_payload
);
1942 con
->send_message(ma
);
1944 if (ret
== 0 && pending_tell_command
) {
1945 con
->send_message2(std::move(pending_tell_command
));
1951 void MonClient::register_config_callback(md_config_t::config_callback fn
) {
1952 ceph_assert(!config_cb
);
1956 md_config_t::config_callback
MonClient::get_config_callback() {
1960 #pragma GCC diagnostic push
1961 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
1962 #pragma clang diagnostic push
1963 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1964 class monc_error_category
: public ceph::converting_category
{
1966 monc_error_category(){}
1967 const char* name() const noexcept override
;
1968 const char* message(int ev
, char*, std::size_t) const noexcept override
;
1969 std::string
message(int ev
) const override
;
1970 bs::error_condition
default_error_condition(int ev
) const noexcept
1972 bool equivalent(int ev
, const bs::error_condition
& c
) const
1974 using ceph::converting_category::equivalent
;
1975 int from_code(int ev
) const noexcept override
;
1977 #pragma GCC diagnostic pop
1978 #pragma clang diagnostic pop
1980 const char* monc_error_category::name() const noexcept
{
1984 const char* monc_error_category::message(int ev
, char*, std::size_t) const noexcept
{
1988 switch (static_cast<monc_errc
>(ev
)) {
1989 case monc_errc::shutting_down
: // Command failed due to MonClient shutting down
1990 return "Command failed due to MonClient shutting down";
1991 case monc_errc::session_reset
:
1992 return "Monitor session was reset";
1993 case monc_errc::rank_dne
:
1994 return "Requested monitor rank does not exist";
1995 case monc_errc::mon_dne
:
1996 return "Requested monitor does not exist";
1997 case monc_errc::timed_out
:
1998 return "Monitor operation timed out";
1999 case monc_errc::mon_unavailable
:
2000 return "Monitor unavailable";
2003 return "Unknown error";
2006 std::string
monc_error_category::message(int ev
) const {
2007 return message(ev
, nullptr, 0);
2010 bs::error_condition
monc_error_category::default_error_condition(int ev
) const noexcept
{
2011 switch (static_cast<monc_errc
>(ev
)) {
2012 case monc_errc::shutting_down
:
2013 return bs::errc::operation_canceled
;
2014 case monc_errc::session_reset
:
2015 return bs::errc::resource_unavailable_try_again
;
2016 case monc_errc::rank_dne
:
2018 case monc_errc::mon_dne
:
2019 return ceph::errc::not_in_map
;
2020 case monc_errc::timed_out
:
2021 return bs::errc::timed_out
;
2022 case monc_errc::mon_unavailable
:
2023 return bs::errc::no_such_device
;
2025 return { ev
, *this };
2028 bool monc_error_category::equivalent(int ev
, const bs::error_condition
& c
) const noexcept
{
2029 switch (static_cast<monc_errc
>(ev
)) {
2030 case monc_errc::rank_dne
:
2032 case monc_errc::mon_dne
:
2033 return c
== bs::errc::no_such_file_or_directory
;
2035 return default_error_condition(ev
) == c
;
2039 int monc_error_category::from_code(int ev
) const noexcept
{
2043 switch (static_cast<monc_errc
>(ev
)) {
2044 case monc_errc::shutting_down
:
2046 case monc_errc::session_reset
:
2048 case monc_errc::rank_dne
:
2050 case monc_errc::mon_dne
:
2052 case monc_errc::timed_out
:
2054 case monc_errc::mon_unavailable
:
2060 const bs::error_category
& monc_category() noexcept
{
2061 static const monc_error_category c
;