1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #ifndef CEPH_MONCLIENT_H
15 #define CEPH_MONCLIENT_H
25 #include "msg/Messenger.h"
30 #include "common/async/completion.h"
31 #include "common/Timer.h"
32 #include "common/config.h"
33 #include "messages/MMonGetVersion.h"
35 #include "auth/AuthClient.h"
36 #include "auth/AuthServer.h"
40 class MMonGetVersionReply
;
43 class AuthClientHandler
;
46 class RotatingKeyRing
;
50 MonConnection(CephContext
*cct
,
53 AuthRegistry
*auth_registry
);
55 MonConnection(MonConnection
&& rhs
) = default;
56 MonConnection
& operator=(MonConnection
&&) = default;
57 MonConnection(const MonConnection
& rhs
) = delete;
58 MonConnection
& operator=(const MonConnection
&) = delete;
59 int handle_auth(MAuthReply
*m
,
60 const EntityName
& entity_name
,
62 RotatingKeyRing
* keyring
);
63 int authenticate(MAuthReply
*m
);
64 void start(epoch_t epoch
,
65 const EntityName
& entity_name
);
66 bool have_session() const;
67 uint64_t get_global_id() const {
70 ConnectionRef
get_con() {
73 std::unique_ptr
<AuthClientHandler
>& get_auth() {
79 std::vector
<uint32_t> *preferred_modes
,
80 ceph::buffer::list
*out
,
81 const EntityName
& entity_name
,
83 RotatingKeyRing
* keyring
);
84 int handle_auth_reply_more(
85 AuthConnectionMeta
*auth_meta
,
86 const ceph::buffer::list
& bl
,
87 ceph::buffer::list
*reply
);
89 AuthConnectionMeta
*auth_meta
,
91 const ceph::buffer::list
& bl
,
92 CryptoKey
*session_key
,
93 std::string
*connection_secret
);
94 int handle_auth_bad_method(
95 uint32_t old_auth_method
,
97 const std::vector
<uint32_t>& allowed_methods
,
98 const std::vector
<uint32_t>& allowed_modes
);
100 bool is_con(Connection
*c
) const {
101 return con
.get() == c
;
103 void queue_command(Message
*m
) {
104 pending_tell_command
= m
;
108 int _negotiate(MAuthReply
*m
,
109 const EntityName
& entity_name
,
111 RotatingKeyRing
* keyring
);
112 int _init_auth(uint32_t method
,
113 const EntityName
& entity_name
,
115 RotatingKeyRing
* keyring
,
122 NEGOTIATING
, // v1 only
123 AUTHENTICATING
, // v1 and v2
126 State state
= State::NONE
;
128 int auth_method
= -1;
131 std::unique_ptr
<AuthClientHandler
> auth
;
134 MessageRef pending_tell_command
;
136 AuthRegistry
*auth_registry
;
140 struct MonClientPinger
: public Dispatcher
,
142 ceph::mutex lock
= ceph::make_mutex("MonClientPinger::lock");
143 ceph::condition_variable ping_recvd_cond
;
146 RotatingKeyRing
*keyring
;
147 std::unique_ptr
<MonConnection
> mc
;
149 MonClientPinger(CephContext
*cct_
,
150 RotatingKeyRing
*keyring
,
158 int wait_for_reply(double timeout
= 0.0) {
159 std::unique_lock locker
{lock
};
161 timeout
= std::chrono::duration
<double>(cct
->_conf
.get_val
<std::chrono::seconds
>("client_mount_timeout")).count();
164 if (ping_recvd_cond
.wait_for(locker
,
165 ceph::make_timespan(timeout
),
166 [this] { return done
; })) {
173 bool ms_dispatch(Message
*m
) override
{
175 std::lock_guard
l(lock
);
176 if (m
->get_type() != CEPH_MSG_PING
)
179 ceph::buffer::list
&payload
= m
->get_payload();
180 if (result
&& payload
.length() > 0) {
181 auto p
= std::cbegin(payload
);
185 ping_recvd_cond
.notify_all();
189 bool ms_handle_reset(Connection
*con
) override
{
190 std::lock_guard
l(lock
);
192 ping_recvd_cond
.notify_all();
195 void ms_handle_remote_reset(Connection
*con
) override
{}
196 bool ms_handle_refused(Connection
*con
) override
{
201 int get_auth_request(
203 AuthConnectionMeta
*auth_meta
,
204 uint32_t *auth_method
,
205 std::vector
<uint32_t> *preferred_modes
,
206 ceph::buffer::list
*bl
) override
{
207 return mc
->get_auth_request(auth_method
, preferred_modes
, bl
,
208 cct
->_conf
->name
, 0, keyring
);
210 int handle_auth_reply_more(
212 AuthConnectionMeta
*auth_meta
,
213 const ceph::buffer::list
& bl
,
214 ceph::buffer::list
*reply
) override
{
215 return mc
->handle_auth_reply_more(auth_meta
, bl
, reply
);
217 int handle_auth_done(
219 AuthConnectionMeta
*auth_meta
,
222 const ceph::buffer::list
& bl
,
223 CryptoKey
*session_key
,
224 std::string
*connection_secret
) override
{
225 return mc
->handle_auth_done(auth_meta
, global_id
, bl
,
226 session_key
, connection_secret
);
228 int handle_auth_bad_method(
230 AuthConnectionMeta
*auth_meta
,
231 uint32_t old_auth_method
,
233 const std::vector
<uint32_t>& allowed_methods
,
234 const std::vector
<uint32_t>& allowed_modes
) override
{
235 return mc
->handle_auth_bad_method(old_auth_method
, result
,
236 allowed_methods
, allowed_modes
);
240 const boost::system::error_category
& monc_category() noexcept
;
242 enum class monc_errc
{
243 shutting_down
= 1, // Command failed due to MonClient shutting down
244 session_reset
, // Monitor session was reset
245 rank_dne
, // Requested monitor rank does not exist
246 mon_dne
, // Requested monitor does not exist
247 timed_out
, // Monitor operation timed out
248 mon_unavailable
// Monitor unavailable
251 namespace boost::system
{
253 struct is_error_code_enum
<::monc_errc
> {
254 static const bool value
= true;
258 // implicit conversion:
259 inline boost::system::error_code
make_error_code(monc_errc e
) noexcept
{
260 return { static_cast<int>(e
), monc_category() };
263 // explicit conversion:
264 inline boost::system::error_condition
make_error_condition(monc_errc e
) noexcept
{
265 return { static_cast<int>(e
), monc_category() };
268 const boost::system::error_category
& monc_category() noexcept
;
270 class MonClient
: public Dispatcher
,
272 public AuthServer
/* for mgr, osd, mds */ {
273 static constexpr auto dout_subsys
= ceph_subsys_monc
;
275 // Error, Newest, Oldest
276 using VersionSig
= void(boost::system::error_code
, version_t
, version_t
);
277 using VersionCompletion
= ceph::async::Completion
<VersionSig
>;
279 using CommandSig
= void(boost::system::error_code
, std::string
,
281 using CommandCompletion
= ceph::async::Completion
<CommandSig
>;
284 std::map
<std::string
,std::string
> config_mgr
;
287 Messenger
*messenger
;
289 std::unique_ptr
<MonConnection
> active_con
;
290 std::map
<entity_addrvec_t
, MonConnection
> pending_cons
;
291 std::set
<unsigned> tried
;
293 EntityName entity_name
;
295 mutable ceph::mutex monc_lock
= ceph::make_mutex("MonClient::monc_lock");
297 boost::asio::io_context
& service
;
298 boost::asio::io_context::strand finish_strand
{service
};
301 bool stopping
= false;
303 LogClient
*log_client
;
304 bool more_log_pending
;
306 void send_log(bool flush
= false);
308 bool ms_dispatch(Message
*m
) override
;
309 bool ms_handle_reset(Connection
*con
) override
;
310 void ms_handle_remote_reset(Connection
*con
) override
{}
311 bool ms_handle_refused(Connection
*con
) override
{ return false; }
313 void handle_monmap(MMonMap
*m
);
314 void handle_config(MConfig
*m
);
316 void handle_auth(MAuthReply
*m
);
319 utime_t last_keepalive
;
320 utime_t last_send_log
;
323 void schedule_tick();
327 ceph::condition_variable map_cond
;
328 bool passthrough_monmap
= false;
330 bool want_bootstrap_config
= false;
331 ceph::ref_t
<MConfig
> bootstrap_config
;
334 std::unique_ptr
<AuthClientHandler
> auth
;
335 uint32_t want_keys
= 0;
336 uint64_t global_id
= 0;
337 ceph::condition_variable auth_cond
;
338 int authenticate_err
= 0;
339 bool authenticated
= false;
341 std::list
<MessageRef
> waiting_for_session
;
342 utime_t last_rotating_renew_sent
;
343 bool had_a_connection
;
344 double reopen_interval_multiplier
;
346 Dispatcher
*handle_authentication_dispatcher
= nullptr;
347 bool _opened() const;
348 bool _hunting() const;
349 void _start_hunting();
350 void _finish_hunting(int auth_err
);
351 void _finish_auth(int auth_err
);
352 void _reopen_session(int rank
= -1);
353 void _add_conn(unsigned rank
);
356 void _send_mon_message(MessageRef m
);
358 std::map
<entity_addrvec_t
, MonConnection
>::iterator
_find_pending_con(
359 const ConnectionRef
& con
) {
360 for (auto i
= pending_cons
.begin(); i
!= pending_cons
.end(); ++i
) {
361 if (i
->second
.get_con() == con
) {
365 return pending_cons
.end();
370 int get_auth_request(
372 AuthConnectionMeta
*auth_meta
,
374 std::vector
<uint32_t> *preferred_modes
,
375 ceph::buffer::list
*bl
) override
;
376 int handle_auth_reply_more(
378 AuthConnectionMeta
*auth_meta
,
379 const ceph::buffer::list
& bl
,
380 ceph::buffer::list
*reply
) override
;
381 int handle_auth_done(
383 AuthConnectionMeta
*auth_meta
,
386 const ceph::buffer::list
& bl
,
387 CryptoKey
*session_key
,
388 std::string
*connection_secret
) override
;
389 int handle_auth_bad_method(
391 AuthConnectionMeta
*auth_meta
,
392 uint32_t old_auth_method
,
394 const std::vector
<uint32_t>& allowed_methods
,
395 const std::vector
<uint32_t>& allowed_modes
) override
;
397 int handle_auth_request(
399 AuthConnectionMeta
*auth_meta
,
401 uint32_t auth_method
,
402 const ceph::buffer::list
& bl
,
403 ceph::buffer::list
*reply
) override
;
405 void set_entity_name(EntityName name
) { entity_name
= name
; }
406 void set_handle_authentication_dispatcher(Dispatcher
*d
) {
407 handle_authentication_dispatcher
= d
;
409 int _check_auth_tickets();
410 int _check_auth_rotating();
411 int wait_auth_rotating(double timeout
);
413 int authenticate(double timeout
=0.0);
414 bool is_authenticated() const {return authenticated
;}
416 bool is_connected() const { return active_con
!= nullptr; }
419 * Try to flush as many log messages as we can in a single
420 * message. Use this before shutting down to transmit your
429 void handle_subscribe_ack(MMonSubscribeAck
* m
);
433 std::lock_guard
l(monc_lock
);
436 bool sub_want(std::string what
, version_t start
, unsigned flags
) {
437 std::lock_guard
l(monc_lock
);
438 return sub
.want(what
, start
, flags
);
440 void sub_got(std::string what
, version_t have
) {
441 std::lock_guard
l(monc_lock
);
444 void sub_unwant(std::string what
) {
445 std::lock_guard
l(monc_lock
);
448 bool sub_want_increment(std::string what
, version_t start
, unsigned flags
) {
449 std::lock_guard
l(monc_lock
);
450 return sub
.inc_want(what
, start
, flags
);
453 std::unique_ptr
<KeyRing
> keyring
;
454 std::unique_ptr
<RotatingKeyRing
> rotating_secrets
;
457 MonClient(CephContext
*cct_
, boost::asio::io_context
& service
);
458 MonClient(const MonClient
&) = delete;
459 MonClient
& operator=(const MonClient
&) = delete;
460 ~MonClient() override
;
465 void set_log_client(LogClient
*clog
) {
468 LogClient
*get_log_client() {
472 int build_initial_monmap();
474 int get_monmap_and_config();
476 * If you want to see MonMap messages, set this and
477 * the MonClient will tell the Messenger it hasn't
479 * Note that if you do this, *you* are of course responsible for
480 * putting the message reference!
482 void set_passthrough_monmap() {
483 std::lock_guard
l(monc_lock
);
484 passthrough_monmap
= true;
486 void unset_passthrough_monmap() {
487 std::lock_guard
l(monc_lock
);
488 passthrough_monmap
= false;
491 * Ping monitor with ID @p mon_id and record the resulting
492 * reply in @p result_reply.
494 * @param[in] mon_id Target monitor's ID
495 * @param[out] result_reply reply from mon.ID, if param != NULL
496 * @returns 0 in case of success; < 0 in case of error,
497 * -ETIMEDOUT if monitor didn't reply before timeout
498 * expired (default: conf->client_mount_timeout).
500 int ping_monitor(const std::string
&mon_id
, std::string
*result_reply
);
502 void send_mon_message(Message
*m
) {
503 send_mon_message(MessageRef
{m
, false});
505 void send_mon_message(MessageRef m
);
507 void reopen_session() {
508 std::lock_guard
l(monc_lock
);
512 const uuid_d
& get_fsid() const {
516 entity_addrvec_t
get_mon_addrs(unsigned i
) const {
517 std::lock_guard
l(monc_lock
);
518 if (i
< monmap
.size())
519 return monmap
.get_addrs(i
);
520 return entity_addrvec_t();
522 int get_num_mon() const {
523 std::lock_guard
l(monc_lock
);
524 return monmap
.size();
527 uint64_t get_global_id() const {
528 std::lock_guard
l(monc_lock
);
532 void set_messenger(Messenger
*m
) { messenger
= m
; }
533 entity_addrvec_t
get_myaddrs() const { return messenger
->get_myaddrs(); }
534 AuthAuthorizer
* build_authorizer(int service_id
) const;
536 void set_want_keys(uint32_t want
) {
542 uint64_t last_mon_command_tid
;
546 std::string target_name
;
547 int target_rank
= -1;
548 ConnectionRef target_con
;
549 std::unique_ptr
<MonConnection
> target_session
;
550 unsigned send_attempts
= 0; ///< attempt count for legacy mons
551 utime_t last_send_attempt
;
553 std::vector
<std::string
> cmd
;
554 ceph::buffer::list inbl
;
555 std::unique_ptr
<CommandCompletion
> onfinish
;
556 std::optional
<boost::asio::steady_timer
> cancel_timer
;
558 MonCommand(MonClient
& monc
, uint64_t t
, std::unique_ptr
<CommandCompletion
> onfinish
)
559 : tid(t
), onfinish(std::move(onfinish
)) {
561 monc
.cct
->_conf
.get_val
<std::chrono::seconds
>("rados_mon_op_timeout");
562 if (timeout
.count() > 0) {
563 cancel_timer
.emplace(monc
.service
, timeout
);
564 cancel_timer
->async_wait(
565 [this, &monc
](boost::system::error_code ec
) {
568 std::scoped_lock
l(monc
.monc_lock
);
569 monc
._cancel_mon_command(tid
);
574 bool is_tell() const {
575 return target_name
.size() || target_rank
>= 0;
579 std::map
<uint64_t,MonCommand
*> mon_commands
;
581 void _send_command(MonCommand
*r
);
582 void _check_tell_commands();
583 void _resend_mon_commands();
584 int _cancel_mon_command(uint64_t tid
);
585 void _finish_command(MonCommand
*r
, boost::system::error_code ret
, std::string_view rs
,
588 void handle_mon_command_ack(MMonCommandAck
*ack
);
589 void handle_command_reply(MCommandReply
*reply
);
592 template<typename CompletionToken
>
593 auto start_mon_command(const std::vector
<std::string
>& cmd
,
594 const ceph::buffer::list
& inbl
,
595 CompletionToken
&& token
) {
596 ldout(cct
,10) << __func__
<< " cmd=" << cmd
<< dendl
;
597 boost::asio::async_completion
<CompletionToken
, CommandSig
> init(token
);
599 std::scoped_lock
l(monc_lock
);
600 auto h
= CommandCompletion::create(service
.get_executor(),
601 std::move(init
.completion_handler
));
602 if (!initialized
|| stopping
) {
603 ceph::async::post(std::move(h
), monc_errc::shutting_down
, std::string
{},
606 auto r
= new MonCommand(*this, ++last_mon_command_tid
, std::move(h
));
609 mon_commands
.emplace(r
->tid
, r
);
613 return init
.result
.get();
616 template<typename CompletionToken
>
617 auto start_mon_command(int mon_rank
, const std::vector
<std::string
>& cmd
,
618 const ceph::buffer::list
& inbl
, CompletionToken
&& token
) {
619 ldout(cct
,10) << __func__
<< " cmd=" << cmd
<< dendl
;
620 boost::asio::async_completion
<CompletionToken
, CommandSig
> init(token
);
622 std::scoped_lock
l(monc_lock
);
623 auto h
= CommandCompletion::create(service
.get_executor(),
624 std::move(init
.completion_handler
));
625 if (!initialized
|| stopping
) {
626 ceph::async::post(std::move(h
), monc_errc::shutting_down
, std::string
{},
629 auto r
= new MonCommand(*this, ++last_mon_command_tid
, std::move(h
));
630 r
->target_rank
= mon_rank
;
633 mon_commands
.emplace(r
->tid
, r
);
637 return init
.result
.get();
640 template<typename CompletionToken
>
641 auto start_mon_command(const std::string
& mon_name
,
642 const std::vector
<std::string
>& cmd
,
643 const ceph::buffer::list
& inbl
,
644 CompletionToken
&& token
) {
645 ldout(cct
,10) << __func__
<< " cmd=" << cmd
<< dendl
;
646 boost::asio::async_completion
<CompletionToken
, CommandSig
> init(token
);
648 std::scoped_lock
l(monc_lock
);
649 auto h
= CommandCompletion::create(service
.get_executor(),
650 std::move(init
.completion_handler
));
651 if (!initialized
|| stopping
) {
652 ceph::async::post(std::move(h
), monc_errc::shutting_down
, std::string
{},
655 auto r
= new MonCommand(*this, ++last_mon_command_tid
, std::move(h
));
656 // detect/tolerate mon *rank* passed as a string
658 int rank
= strict_strtoll(mon_name
.c_str(), 10, &err
);
659 if (err
.size() == 0 && rank
>= 0) {
660 ldout(cct
,10) << __func__
<< " interpreting name '" << mon_name
661 << "' as rank " << rank
<< dendl
;
662 r
->target_rank
= rank
;
664 r
->target_name
= mon_name
;
668 mon_commands
.emplace(r
->tid
, r
);
672 return init
.result
.get();
675 class ContextVerter
{
677 ceph::bufferlist
* outbl
;
681 ContextVerter(std::string
* outs
, ceph::bufferlist
* outbl
, Context
* onfinish
)
682 : outs(outs
), outbl(outbl
), onfinish(onfinish
) {}
683 ~ContextVerter() = default;
684 ContextVerter(const ContextVerter
&) = default;
685 ContextVerter
& operator =(const ContextVerter
&) = default;
686 ContextVerter(ContextVerter
&&) = default;
687 ContextVerter
& operator =(ContextVerter
&&) = default;
689 void operator()(boost::system::error_code e
,
691 ceph::bufferlist bl
) {
693 *outs
= std::move(s
);
695 *outbl
= std::move(bl
);
697 onfinish
->complete(ceph::from_error_code(e
));
701 void start_mon_command(const std::vector
<std::string
>& cmd
, const bufferlist
& inbl
,
702 bufferlist
*outbl
, std::string
*outs
,
704 start_mon_command(cmd
, inbl
, ContextVerter(outs
, outbl
, onfinish
));
706 void start_mon_command(int mon_rank
,
707 const std::vector
<std::string
>& cmd
, const bufferlist
& inbl
,
708 bufferlist
*outbl
, std::string
*outs
,
710 start_mon_command(mon_rank
, cmd
, inbl
, ContextVerter(outs
, outbl
, onfinish
));
712 void start_mon_command(const std::string
&mon_name
, ///< mon name, with mon. prefix
713 const std::vector
<std::string
>& cmd
, const bufferlist
& inbl
,
714 bufferlist
*outbl
, std::string
*outs
,
716 start_mon_command(mon_name
, cmd
, inbl
, ContextVerter(outs
, outbl
, onfinish
));
723 * get latest known version(s) of cluster map
725 * @param map string name of map (e.g., 'osdmap')
726 * @param token context that will be triggered on completion
727 * @return (via Completion) {} on success,
728 * boost::system::errc::resource_unavailable_try_again if we need to
729 * resubmit our request
731 template<typename CompletionToken
>
732 auto get_version(std::string
&& map
, CompletionToken
&& token
) {
733 boost::asio::async_completion
<CompletionToken
, VersionSig
> init(token
);
735 std::scoped_lock
l(monc_lock
);
736 auto m
= ceph::make_message
<MMonGetVersion
>();
737 m
->what
= std::move(map
);
738 m
->handle
= ++version_req_id
;
739 version_requests
.emplace(m
->handle
,
740 VersionCompletion::create(
741 service
.get_executor(),
742 std::move(init
.completion_handler
)));
743 _send_mon_message(m
);
745 return init
.result
.get();
749 * Run a callback within our lock, with a reference
752 template<typename Callback
, typename
...Args
>
753 auto with_monmap(Callback
&& cb
, Args
&&...args
) const ->
754 decltype(cb(monmap
, std::forward
<Args
>(args
)...)) {
755 std::lock_guard
l(monc_lock
);
756 return std::forward
<Callback
>(cb
)(monmap
, std::forward
<Args
>(args
)...);
759 void register_config_callback(md_config_t::config_callback fn
);
760 void register_config_notify_callback(std::function
<void(void)> f
) {
761 config_notify_cb
= f
;
763 md_config_t::config_callback
get_config_callback();
767 std::map
<ceph_tid_t
, std::unique_ptr
<VersionCompletion
>> version_requests
;
768 ceph_tid_t version_req_id
;
769 void handle_get_version_reply(MMonGetVersionReply
* m
);
770 md_config_t::config_callback config_cb
;
771 std::function
<void(void)> config_notify_cb
;