1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #ifndef CEPH_MONCLIENT_H
15 #define CEPH_MONCLIENT_H
25 #include "msg/Messenger.h"
30 #include "common/admin_socket.h"
31 #include "common/async/completion.h"
32 #include "common/Timer.h"
33 #include "common/config.h"
34 #include "messages/MMonGetVersion.h"
36 #include "auth/AuthClient.h"
37 #include "auth/AuthServer.h"
41 class MMonGetVersionReply
;
44 class AuthClientHandler
;
47 class RotatingKeyRing
;
51 MonConnection(CephContext
*cct
,
54 AuthRegistry
*auth_registry
);
56 MonConnection(MonConnection
&& rhs
) = default;
57 MonConnection
& operator=(MonConnection
&&) = default;
58 MonConnection(const MonConnection
& rhs
) = delete;
59 MonConnection
& operator=(const MonConnection
&) = delete;
60 int handle_auth(MAuthReply
*m
,
61 const EntityName
& entity_name
,
63 RotatingKeyRing
* keyring
);
64 int authenticate(MAuthReply
*m
);
65 void start(epoch_t epoch
,
66 const EntityName
& entity_name
);
67 bool have_session() const;
68 uint64_t get_global_id() const {
71 ConnectionRef
get_con() {
74 std::unique_ptr
<AuthClientHandler
>& get_auth() {
80 std::vector
<uint32_t> *preferred_modes
,
81 ceph::buffer::list
*out
,
82 const EntityName
& entity_name
,
84 RotatingKeyRing
* keyring
);
85 int handle_auth_reply_more(
86 AuthConnectionMeta
*auth_meta
,
87 const ceph::buffer::list
& bl
,
88 ceph::buffer::list
*reply
);
90 AuthConnectionMeta
*auth_meta
,
92 const ceph::buffer::list
& bl
,
93 CryptoKey
*session_key
,
94 std::string
*connection_secret
);
95 int handle_auth_bad_method(
96 uint32_t old_auth_method
,
98 const std::vector
<uint32_t>& allowed_methods
,
99 const std::vector
<uint32_t>& allowed_modes
);
101 bool is_con(Connection
*c
) const {
102 return con
.get() == c
;
104 void queue_command(Message
*m
) {
105 pending_tell_command
= m
;
109 int _negotiate(MAuthReply
*m
,
110 const EntityName
& entity_name
,
112 RotatingKeyRing
* keyring
);
113 int _init_auth(uint32_t method
,
114 const EntityName
& entity_name
,
116 RotatingKeyRing
* keyring
,
123 NEGOTIATING
, // v1 only
124 AUTHENTICATING
, // v1 and v2
127 State state
= State::NONE
;
129 int auth_method
= -1;
132 std::unique_ptr
<AuthClientHandler
> auth
;
135 MessageRef pending_tell_command
;
137 AuthRegistry
*auth_registry
;
141 struct MonClientPinger
: public Dispatcher
,
143 ceph::mutex lock
= ceph::make_mutex("MonClientPinger::lock");
144 ceph::condition_variable ping_recvd_cond
;
147 RotatingKeyRing
*keyring
;
148 std::unique_ptr
<MonConnection
> mc
;
150 MonClientPinger(CephContext
*cct_
,
151 RotatingKeyRing
*keyring
,
159 int wait_for_reply(double timeout
= 0.0) {
160 std::unique_lock locker
{lock
};
162 timeout
= std::chrono::duration
<double>(cct
->_conf
.get_val
<std::chrono::seconds
>("client_mount_timeout")).count();
165 if (ping_recvd_cond
.wait_for(locker
,
166 ceph::make_timespan(timeout
),
167 [this] { return done
; })) {
174 bool ms_dispatch(Message
*m
) override
{
176 std::lock_guard
l(lock
);
177 if (m
->get_type() != CEPH_MSG_PING
)
180 ceph::buffer::list
&payload
= m
->get_payload();
181 if (result
&& payload
.length() > 0) {
182 auto p
= std::cbegin(payload
);
186 ping_recvd_cond
.notify_all();
190 bool ms_handle_reset(Connection
*con
) override
{
191 std::lock_guard
l(lock
);
193 ping_recvd_cond
.notify_all();
196 void ms_handle_remote_reset(Connection
*con
) override
{}
197 bool ms_handle_refused(Connection
*con
) override
{
202 int get_auth_request(
204 AuthConnectionMeta
*auth_meta
,
205 uint32_t *auth_method
,
206 std::vector
<uint32_t> *preferred_modes
,
207 ceph::buffer::list
*bl
) override
{
208 return mc
->get_auth_request(auth_method
, preferred_modes
, bl
,
209 cct
->_conf
->name
, 0, keyring
);
211 int handle_auth_reply_more(
213 AuthConnectionMeta
*auth_meta
,
214 const ceph::buffer::list
& bl
,
215 ceph::buffer::list
*reply
) override
{
216 return mc
->handle_auth_reply_more(auth_meta
, bl
, reply
);
218 int handle_auth_done(
220 AuthConnectionMeta
*auth_meta
,
223 const ceph::buffer::list
& bl
,
224 CryptoKey
*session_key
,
225 std::string
*connection_secret
) override
{
226 return mc
->handle_auth_done(auth_meta
, global_id
, bl
,
227 session_key
, connection_secret
);
229 int handle_auth_bad_method(
231 AuthConnectionMeta
*auth_meta
,
232 uint32_t old_auth_method
,
234 const std::vector
<uint32_t>& allowed_methods
,
235 const std::vector
<uint32_t>& allowed_modes
) override
{
236 return mc
->handle_auth_bad_method(old_auth_method
, result
,
237 allowed_methods
, allowed_modes
);
241 const boost::system::error_category
& monc_category() noexcept
;
243 enum class monc_errc
{
244 shutting_down
= 1, // Command failed due to MonClient shutting down
245 session_reset
, // Monitor session was reset
246 rank_dne
, // Requested monitor rank does not exist
247 mon_dne
, // Requested monitor does not exist
248 timed_out
, // Monitor operation timed out
249 mon_unavailable
// Monitor unavailable
252 namespace boost::system
{
254 struct is_error_code_enum
<::monc_errc
> {
255 static const bool value
= true;
259 // implicit conversion:
260 inline boost::system::error_code
make_error_code(monc_errc e
) noexcept
{
261 return { static_cast<int>(e
), monc_category() };
264 // explicit conversion:
265 inline boost::system::error_condition
make_error_condition(monc_errc e
) noexcept
{
266 return { static_cast<int>(e
), monc_category() };
269 const boost::system::error_category
& monc_category() noexcept
;
271 class MonClient
: public Dispatcher
,
273 public AuthServer
, /* for mgr, osd, mds */
274 public AdminSocketHook
{
275 static constexpr auto dout_subsys
= ceph_subsys_monc
;
277 // Error, Newest, Oldest
278 using VersionSig
= void(boost::system::error_code
, version_t
, version_t
);
279 using VersionCompletion
= ceph::async::Completion
<VersionSig
>;
281 using CommandSig
= void(boost::system::error_code
, std::string
,
283 using CommandCompletion
= ceph::async::Completion
<CommandSig
>;
286 std::map
<std::string
,std::string
> config_mgr
;
289 Messenger
*messenger
;
291 std::unique_ptr
<MonConnection
> active_con
;
292 std::map
<entity_addrvec_t
, MonConnection
> pending_cons
;
293 std::set
<unsigned> tried
;
295 EntityName entity_name
;
297 mutable ceph::mutex monc_lock
= ceph::make_mutex("MonClient::monc_lock");
299 boost::asio::io_context
& service
;
300 boost::asio::io_context::strand finish_strand
{service
};
303 bool stopping
= false;
305 LogClient
*log_client
;
306 bool more_log_pending
;
308 void send_log(bool flush
= false);
310 bool ms_dispatch(Message
*m
) override
;
311 bool ms_handle_reset(Connection
*con
) override
;
312 void ms_handle_remote_reset(Connection
*con
) override
{}
313 bool ms_handle_refused(Connection
*con
) override
{ return false; }
315 void handle_monmap(MMonMap
*m
);
316 void handle_config(MConfig
*m
);
318 void handle_auth(MAuthReply
*m
);
321 std::string_view command
,
322 const cmdmap_t
& cmdmap
,
323 const ceph::buffer::list
&inbl
,
326 ceph::buffer::list
& out
) override
;
329 utime_t last_keepalive
;
330 utime_t last_send_log
;
333 void schedule_tick();
337 ceph::condition_variable map_cond
;
338 bool passthrough_monmap
= false;
340 bool want_bootstrap_config
= false;
341 ceph::ref_t
<MConfig
> bootstrap_config
;
344 std::unique_ptr
<AuthClientHandler
> auth
;
345 uint32_t want_keys
= 0;
346 uint64_t global_id
= 0;
347 ceph::condition_variable auth_cond
;
348 int authenticate_err
= 0;
349 bool authenticated
= false;
351 std::list
<MessageRef
> waiting_for_session
;
352 utime_t last_rotating_renew_sent
;
353 bool had_a_connection
;
354 double reopen_interval_multiplier
;
356 Dispatcher
*handle_authentication_dispatcher
= nullptr;
357 bool _opened() const;
358 bool _hunting() const;
359 void _start_hunting();
360 void _finish_hunting(int auth_err
);
361 void _finish_auth(int auth_err
);
362 void _reopen_session(int rank
= -1);
363 void _add_conn(unsigned rank
);
366 void _send_mon_message(MessageRef m
);
368 std::map
<entity_addrvec_t
, MonConnection
>::iterator
_find_pending_con(
369 const ConnectionRef
& con
) {
370 for (auto i
= pending_cons
.begin(); i
!= pending_cons
.end(); ++i
) {
371 if (i
->second
.get_con() == con
) {
375 return pending_cons
.end();
380 int get_auth_request(
382 AuthConnectionMeta
*auth_meta
,
384 std::vector
<uint32_t> *preferred_modes
,
385 ceph::buffer::list
*bl
) override
;
386 int handle_auth_reply_more(
388 AuthConnectionMeta
*auth_meta
,
389 const ceph::buffer::list
& bl
,
390 ceph::buffer::list
*reply
) override
;
391 int handle_auth_done(
393 AuthConnectionMeta
*auth_meta
,
396 const ceph::buffer::list
& bl
,
397 CryptoKey
*session_key
,
398 std::string
*connection_secret
) override
;
399 int handle_auth_bad_method(
401 AuthConnectionMeta
*auth_meta
,
402 uint32_t old_auth_method
,
404 const std::vector
<uint32_t>& allowed_methods
,
405 const std::vector
<uint32_t>& allowed_modes
) override
;
407 int handle_auth_request(
409 AuthConnectionMeta
*auth_meta
,
411 uint32_t auth_method
,
412 const ceph::buffer::list
& bl
,
413 ceph::buffer::list
*reply
) override
;
415 void set_entity_name(EntityName name
) { entity_name
= name
; }
416 void set_handle_authentication_dispatcher(Dispatcher
*d
) {
417 handle_authentication_dispatcher
= d
;
419 int _check_auth_tickets();
420 int _check_auth_rotating();
421 int wait_auth_rotating(double timeout
);
423 int authenticate(double timeout
=0.0);
424 bool is_authenticated() const {return authenticated
;}
426 bool is_connected() const { return active_con
!= nullptr; }
429 * Try to flush as many log messages as we can in a single
430 * message. Use this before shutting down to transmit your
439 void handle_subscribe_ack(MMonSubscribeAck
* m
);
443 std::lock_guard
l(monc_lock
);
446 bool sub_want(std::string what
, version_t start
, unsigned flags
) {
447 std::lock_guard
l(monc_lock
);
448 return sub
.want(what
, start
, flags
);
450 void sub_got(std::string what
, version_t have
) {
451 std::lock_guard
l(monc_lock
);
454 void sub_unwant(std::string what
) {
455 std::lock_guard
l(monc_lock
);
458 bool sub_want_increment(std::string what
, version_t start
, unsigned flags
) {
459 std::lock_guard
l(monc_lock
);
460 return sub
.inc_want(what
, start
, flags
);
463 std::unique_ptr
<KeyRing
> keyring
;
464 std::unique_ptr
<RotatingKeyRing
> rotating_secrets
;
467 MonClient(CephContext
*cct_
, boost::asio::io_context
& service
);
468 MonClient(const MonClient
&) = delete;
469 MonClient
& operator=(const MonClient
&) = delete;
470 ~MonClient() override
;
475 void set_log_client(LogClient
*clog
) {
478 LogClient
*get_log_client() {
482 int build_initial_monmap();
484 int get_monmap_and_config();
486 * If you want to see MonMap messages, set this and
487 * the MonClient will tell the Messenger it hasn't
489 * Note that if you do this, *you* are of course responsible for
490 * putting the message reference!
492 void set_passthrough_monmap() {
493 std::lock_guard
l(monc_lock
);
494 passthrough_monmap
= true;
496 void unset_passthrough_monmap() {
497 std::lock_guard
l(monc_lock
);
498 passthrough_monmap
= false;
501 * Ping monitor with ID @p mon_id and record the resulting
502 * reply in @p result_reply.
504 * @param[in] mon_id Target monitor's ID
505 * @param[out] result_reply reply from mon.ID, if param != NULL
506 * @returns 0 in case of success; < 0 in case of error,
507 * -ETIMEDOUT if monitor didn't reply before timeout
508 * expired (default: conf->client_mount_timeout).
510 int ping_monitor(const std::string
&mon_id
, std::string
*result_reply
);
512 void send_mon_message(Message
*m
) {
513 send_mon_message(MessageRef
{m
, false});
515 void send_mon_message(MessageRef m
);
517 void reopen_session() {
518 std::lock_guard
l(monc_lock
);
522 const uuid_d
& get_fsid() const {
526 entity_addrvec_t
get_mon_addrs(unsigned i
) const {
527 std::lock_guard
l(monc_lock
);
528 if (i
< monmap
.size())
529 return monmap
.get_addrs(i
);
530 return entity_addrvec_t();
532 int get_num_mon() const {
533 std::lock_guard
l(monc_lock
);
534 return monmap
.size();
537 uint64_t get_global_id() const {
538 std::lock_guard
l(monc_lock
);
542 void set_messenger(Messenger
*m
) { messenger
= m
; }
543 entity_addrvec_t
get_myaddrs() const { return messenger
->get_myaddrs(); }
544 AuthAuthorizer
* build_authorizer(int service_id
) const;
546 void set_want_keys(uint32_t want
) {
552 uint64_t last_mon_command_tid
;
556 std::string target_name
;
557 int target_rank
= -1;
558 ConnectionRef target_con
;
559 std::unique_ptr
<MonConnection
> target_session
;
560 unsigned send_attempts
= 0; ///< attempt count for legacy mons
561 utime_t last_send_attempt
;
563 std::vector
<std::string
> cmd
;
564 ceph::buffer::list inbl
;
565 std::unique_ptr
<CommandCompletion
> onfinish
;
566 std::optional
<boost::asio::steady_timer
> cancel_timer
;
568 MonCommand(MonClient
& monc
, uint64_t t
, std::unique_ptr
<CommandCompletion
> onfinish
)
569 : tid(t
), onfinish(std::move(onfinish
)) {
571 monc
.cct
->_conf
.get_val
<std::chrono::seconds
>("rados_mon_op_timeout");
572 if (timeout
.count() > 0) {
573 cancel_timer
.emplace(monc
.service
, timeout
);
574 cancel_timer
->async_wait(
575 [this, &monc
](boost::system::error_code ec
) {
578 std::scoped_lock
l(monc
.monc_lock
);
579 monc
._cancel_mon_command(tid
);
584 bool is_tell() const {
585 return target_name
.size() || target_rank
>= 0;
589 std::map
<uint64_t,MonCommand
*> mon_commands
;
591 void _send_command(MonCommand
*r
);
592 void _check_tell_commands();
593 void _resend_mon_commands();
594 int _cancel_mon_command(uint64_t tid
);
595 void _finish_command(MonCommand
*r
, boost::system::error_code ret
, std::string_view rs
,
598 void handle_mon_command_ack(MMonCommandAck
*ack
);
599 void handle_command_reply(MCommandReply
*reply
);
602 template<typename CompletionToken
>
603 auto start_mon_command(const std::vector
<std::string
>& cmd
,
604 const ceph::buffer::list
& inbl
,
605 CompletionToken
&& token
) {
606 ldout(cct
,10) << __func__
<< " cmd=" << cmd
<< dendl
;
607 boost::asio::async_completion
<CompletionToken
, CommandSig
> init(token
);
609 std::scoped_lock
l(monc_lock
);
610 auto h
= CommandCompletion::create(service
.get_executor(),
611 std::move(init
.completion_handler
));
612 if (!initialized
|| stopping
) {
613 ceph::async::post(std::move(h
), monc_errc::shutting_down
, std::string
{},
616 auto r
= new MonCommand(*this, ++last_mon_command_tid
, std::move(h
));
619 mon_commands
.emplace(r
->tid
, r
);
623 return init
.result
.get();
626 template<typename CompletionToken
>
627 auto start_mon_command(int mon_rank
, const std::vector
<std::string
>& cmd
,
628 const ceph::buffer::list
& inbl
, CompletionToken
&& token
) {
629 ldout(cct
,10) << __func__
<< " cmd=" << cmd
<< dendl
;
630 boost::asio::async_completion
<CompletionToken
, CommandSig
> init(token
);
632 std::scoped_lock
l(monc_lock
);
633 auto h
= CommandCompletion::create(service
.get_executor(),
634 std::move(init
.completion_handler
));
635 if (!initialized
|| stopping
) {
636 ceph::async::post(std::move(h
), monc_errc::shutting_down
, std::string
{},
639 auto r
= new MonCommand(*this, ++last_mon_command_tid
, std::move(h
));
640 r
->target_rank
= mon_rank
;
643 mon_commands
.emplace(r
->tid
, r
);
647 return init
.result
.get();
650 template<typename CompletionToken
>
651 auto start_mon_command(const std::string
& mon_name
,
652 const std::vector
<std::string
>& cmd
,
653 const ceph::buffer::list
& inbl
,
654 CompletionToken
&& token
) {
655 ldout(cct
,10) << __func__
<< " cmd=" << cmd
<< dendl
;
656 boost::asio::async_completion
<CompletionToken
, CommandSig
> init(token
);
658 std::scoped_lock
l(monc_lock
);
659 auto h
= CommandCompletion::create(service
.get_executor(),
660 std::move(init
.completion_handler
));
661 if (!initialized
|| stopping
) {
662 ceph::async::post(std::move(h
), monc_errc::shutting_down
, std::string
{},
665 auto r
= new MonCommand(*this, ++last_mon_command_tid
, std::move(h
));
666 // detect/tolerate mon *rank* passed as a string
668 int rank
= strict_strtoll(mon_name
.c_str(), 10, &err
);
669 if (err
.size() == 0 && rank
>= 0) {
670 ldout(cct
,10) << __func__
<< " interpreting name '" << mon_name
671 << "' as rank " << rank
<< dendl
;
672 r
->target_rank
= rank
;
674 r
->target_name
= mon_name
;
678 mon_commands
.emplace(r
->tid
, r
);
682 return init
.result
.get();
685 class ContextVerter
{
687 ceph::bufferlist
* outbl
;
691 ContextVerter(std::string
* outs
, ceph::bufferlist
* outbl
, Context
* onfinish
)
692 : outs(outs
), outbl(outbl
), onfinish(onfinish
) {}
693 ~ContextVerter() = default;
694 ContextVerter(const ContextVerter
&) = default;
695 ContextVerter
& operator =(const ContextVerter
&) = default;
696 ContextVerter(ContextVerter
&&) = default;
697 ContextVerter
& operator =(ContextVerter
&&) = default;
699 void operator()(boost::system::error_code e
,
701 ceph::bufferlist bl
) {
703 *outs
= std::move(s
);
705 *outbl
= std::move(bl
);
707 onfinish
->complete(ceph::from_error_code(e
));
711 void start_mon_command(const std::vector
<std::string
>& cmd
, const bufferlist
& inbl
,
712 bufferlist
*outbl
, std::string
*outs
,
714 start_mon_command(cmd
, inbl
, ContextVerter(outs
, outbl
, onfinish
));
716 void start_mon_command(int mon_rank
,
717 const std::vector
<std::string
>& cmd
, const bufferlist
& inbl
,
718 bufferlist
*outbl
, std::string
*outs
,
720 start_mon_command(mon_rank
, cmd
, inbl
, ContextVerter(outs
, outbl
, onfinish
));
722 void start_mon_command(const std::string
&mon_name
, ///< mon name, with mon. prefix
723 const std::vector
<std::string
>& cmd
, const bufferlist
& inbl
,
724 bufferlist
*outbl
, std::string
*outs
,
726 start_mon_command(mon_name
, cmd
, inbl
, ContextVerter(outs
, outbl
, onfinish
));
733 * get latest known version(s) of cluster map
735 * @param map string name of map (e.g., 'osdmap')
736 * @param token context that will be triggered on completion
737 * @return (via Completion) {} on success,
738 * boost::system::errc::resource_unavailable_try_again if we need to
739 * resubmit our request
741 template<typename CompletionToken
>
742 auto get_version(std::string
&& map
, CompletionToken
&& token
) {
743 boost::asio::async_completion
<CompletionToken
, VersionSig
> init(token
);
745 std::scoped_lock
l(monc_lock
);
746 auto m
= ceph::make_message
<MMonGetVersion
>();
747 m
->what
= std::move(map
);
748 m
->handle
= ++version_req_id
;
749 version_requests
.emplace(m
->handle
,
750 VersionCompletion::create(
751 service
.get_executor(),
752 std::move(init
.completion_handler
)));
753 _send_mon_message(m
);
755 return init
.result
.get();
759 * Run a callback within our lock, with a reference
762 template<typename Callback
, typename
...Args
>
763 auto with_monmap(Callback
&& cb
, Args
&&...args
) const ->
764 decltype(cb(monmap
, std::forward
<Args
>(args
)...)) {
765 std::lock_guard
l(monc_lock
);
766 return std::forward
<Callback
>(cb
)(monmap
, std::forward
<Args
>(args
)...);
769 void register_config_callback(md_config_t::config_callback fn
);
770 void register_config_notify_callback(std::function
<void(void)> f
) {
771 config_notify_cb
= f
;
773 md_config_t::config_callback
get_config_callback();
777 std::map
<ceph_tid_t
, std::unique_ptr
<VersionCompletion
>> version_requests
;
778 ceph_tid_t version_req_id
;
779 void handle_get_version_reply(MMonGetVersionReply
* m
);
780 md_config_t::config_callback config_cb
;
781 std::function
<void(void)> config_notify_cb
;