1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #ifndef CEPH_MONCLIENT_H
15 #define CEPH_MONCLIENT_H
25 #include "msg/Messenger.h"
30 #include "common/Timer.h"
31 #include "common/Finisher.h"
32 #include "common/config.h"
34 #include "auth/AuthClient.h"
35 #include "auth/AuthServer.h"
39 class MMonGetVersionReply
;
40 struct MMonSubscribeAck
;
45 class AuthClientHandler
;
48 class RotatingKeyRing
;
52 MonConnection(CephContext
*cct
,
55 AuthRegistry
*auth_registry
);
57 MonConnection(MonConnection
&& rhs
) = default;
58 MonConnection
& operator=(MonConnection
&&) = default;
59 MonConnection(const MonConnection
& rhs
) = delete;
60 MonConnection
& operator=(const MonConnection
&) = delete;
61 int handle_auth(MAuthReply
*m
,
62 const EntityName
& entity_name
,
64 RotatingKeyRing
* keyring
);
65 int authenticate(MAuthReply
*m
);
66 void start(epoch_t epoch
,
67 const EntityName
& entity_name
);
68 bool have_session() const;
69 uint64_t get_global_id() const {
72 ConnectionRef
get_con() {
75 std::unique_ptr
<AuthClientHandler
>& get_auth() {
81 std::vector
<uint32_t> *preferred_modes
,
82 ceph::buffer::list
*out
,
83 const EntityName
& entity_name
,
85 RotatingKeyRing
* keyring
);
86 int handle_auth_reply_more(
87 AuthConnectionMeta
*auth_meta
,
88 const ceph::buffer::list
& bl
,
89 ceph::buffer::list
*reply
);
91 AuthConnectionMeta
*auth_meta
,
93 const ceph::buffer::list
& bl
,
94 CryptoKey
*session_key
,
95 std::string
*connection_secret
);
96 int handle_auth_bad_method(
97 uint32_t old_auth_method
,
99 const std::vector
<uint32_t>& allowed_methods
,
100 const std::vector
<uint32_t>& allowed_modes
);
102 bool is_con(Connection
*c
) const {
103 return con
.get() == c
;
105 void queue_command(Message
*m
) {
106 pending_tell_command
= m
;
110 int _negotiate(MAuthReply
*m
,
111 const EntityName
& entity_name
,
113 RotatingKeyRing
* keyring
);
114 int _init_auth(uint32_t method
,
115 const EntityName
& entity_name
,
117 RotatingKeyRing
* keyring
,
124 NEGOTIATING
, // v1 only
125 AUTHENTICATING
, // v1 and v2
128 State state
= State::NONE
;
130 int auth_method
= -1;
133 std::unique_ptr
<AuthClientHandler
> auth
;
136 MessageRef pending_tell_command
;
138 AuthRegistry
*auth_registry
;
142 struct MonClientPinger
: public Dispatcher
,
145 ceph::mutex lock
= ceph::make_mutex("MonClientPinger::lock");
146 ceph::condition_variable ping_recvd_cond
;
149 RotatingKeyRing
*keyring
;
150 std::unique_ptr
<MonConnection
> mc
;
152 MonClientPinger(CephContext
*cct_
,
153 RotatingKeyRing
*keyring
,
161 int wait_for_reply(double timeout
= 0.0) {
162 std::unique_lock locker
{lock
};
164 timeout
= cct
->_conf
->client_mount_timeout
;
167 if (ping_recvd_cond
.wait_for(locker
,
168 ceph::make_timespan(timeout
),
169 [this] { return done
; })) {
176 bool ms_dispatch(Message
*m
) override
{
178 std::lock_guard
l(lock
);
179 if (m
->get_type() != CEPH_MSG_PING
)
182 ceph::buffer::list
&payload
= m
->get_payload();
183 if (result
&& payload
.length() > 0) {
184 auto p
= std::cbegin(payload
);
188 ping_recvd_cond
.notify_all();
192 bool ms_handle_reset(Connection
*con
) override
{
193 std::lock_guard
l(lock
);
195 ping_recvd_cond
.notify_all();
198 void ms_handle_remote_reset(Connection
*con
) override
{}
199 bool ms_handle_refused(Connection
*con
) override
{
204 int get_auth_request(
206 AuthConnectionMeta
*auth_meta
,
207 uint32_t *auth_method
,
208 std::vector
<uint32_t> *preferred_modes
,
209 ceph::buffer::list
*bl
) override
{
210 return mc
->get_auth_request(auth_method
, preferred_modes
, bl
,
211 cct
->_conf
->name
, 0, keyring
);
213 int handle_auth_reply_more(
215 AuthConnectionMeta
*auth_meta
,
216 const ceph::buffer::list
& bl
,
217 ceph::buffer::list
*reply
) override
{
218 return mc
->handle_auth_reply_more(auth_meta
, bl
, reply
);
220 int handle_auth_done(
222 AuthConnectionMeta
*auth_meta
,
225 const ceph::buffer::list
& bl
,
226 CryptoKey
*session_key
,
227 std::string
*connection_secret
) override
{
228 return mc
->handle_auth_done(auth_meta
, global_id
, bl
,
229 session_key
, connection_secret
);
231 int handle_auth_bad_method(
233 AuthConnectionMeta
*auth_meta
,
234 uint32_t old_auth_method
,
236 const std::vector
<uint32_t>& allowed_methods
,
237 const std::vector
<uint32_t>& allowed_modes
) override
{
238 return mc
->handle_auth_bad_method(old_auth_method
, result
,
239 allowed_methods
, allowed_modes
);
244 class MonClient
: public Dispatcher
,
246 public AuthServer
/* for mgr, osd, mds */ {
249 std::map
<std::string
,std::string
> config_mgr
;
252 Messenger
*messenger
;
254 std::unique_ptr
<MonConnection
> active_con
;
255 std::map
<entity_addrvec_t
, MonConnection
> pending_cons
;
256 std::set
<unsigned> tried
;
258 EntityName entity_name
;
260 mutable ceph::mutex monc_lock
= ceph::make_mutex("MonClient::monc_lock");
265 bool stopping
= false;
267 LogClient
*log_client
;
268 bool more_log_pending
;
270 void send_log(bool flush
= false);
272 bool ms_dispatch(Message
*m
) override
;
273 bool ms_handle_reset(Connection
*con
) override
;
274 void ms_handle_remote_reset(Connection
*con
) override
{}
275 bool ms_handle_refused(Connection
*con
) override
{ return false; }
277 void handle_monmap(MMonMap
*m
);
278 void handle_config(MConfig
*m
);
280 void handle_auth(MAuthReply
*m
);
283 utime_t last_keepalive
;
284 utime_t last_send_log
;
287 void schedule_tick();
291 ceph::condition_variable map_cond
;
292 bool passthrough_monmap
= false;
293 bool got_config
= false;
296 std::unique_ptr
<AuthClientHandler
> auth
;
297 uint32_t want_keys
= 0;
298 uint64_t global_id
= 0;
299 ceph::condition_variable auth_cond
;
300 int authenticate_err
= 0;
301 bool authenticated
= false;
303 std::list
<MessageRef
> waiting_for_session
;
304 utime_t last_rotating_renew_sent
;
305 std::unique_ptr
<Context
> session_established_context
;
306 bool had_a_connection
;
307 double reopen_interval_multiplier
;
309 Dispatcher
*handle_authentication_dispatcher
= nullptr;
311 bool _opened() const;
312 bool _hunting() const;
313 void _start_hunting();
314 void _finish_hunting(int auth_err
);
315 void _finish_auth(int auth_err
);
316 void _reopen_session(int rank
= -1);
317 MonConnection
& _add_conn(unsigned rank
, uint64_t global_id
);
319 void _add_conns(uint64_t global_id
);
320 void _send_mon_message(MessageRef m
);
322 std::map
<entity_addrvec_t
, MonConnection
>::iterator
_find_pending_con(
323 const ConnectionRef
& con
) {
324 for (auto i
= pending_cons
.begin(); i
!= pending_cons
.end(); ++i
) {
325 if (i
->second
.get_con() == con
) {
329 return pending_cons
.end();
334 int get_auth_request(
336 AuthConnectionMeta
*auth_meta
,
338 std::vector
<uint32_t> *preferred_modes
,
339 ceph::buffer::list
*bl
) override
;
340 int handle_auth_reply_more(
342 AuthConnectionMeta
*auth_meta
,
343 const ceph::buffer::list
& bl
,
344 ceph::buffer::list
*reply
) override
;
345 int handle_auth_done(
347 AuthConnectionMeta
*auth_meta
,
350 const ceph::buffer::list
& bl
,
351 CryptoKey
*session_key
,
352 std::string
*connection_secret
) override
;
353 int handle_auth_bad_method(
355 AuthConnectionMeta
*auth_meta
,
356 uint32_t old_auth_method
,
358 const std::vector
<uint32_t>& allowed_methods
,
359 const std::vector
<uint32_t>& allowed_modes
) override
;
361 int handle_auth_request(
363 AuthConnectionMeta
*auth_meta
,
365 uint32_t auth_method
,
366 const ceph::buffer::list
& bl
,
367 ceph::buffer::list
*reply
);
369 void set_entity_name(EntityName name
) { entity_name
= name
; }
370 void set_handle_authentication_dispatcher(Dispatcher
*d
) {
371 handle_authentication_dispatcher
= d
;
373 int _check_auth_tickets();
374 int _check_auth_rotating();
375 int wait_auth_rotating(double timeout
);
377 int authenticate(double timeout
=0.0);
378 bool is_authenticated() const {return authenticated
;}
380 bool is_connected() const { return active_con
!= nullptr; }
383 * Try to flush as many log messages as we can in a single
384 * message. Use this before shutting down to transmit your
393 void handle_subscribe_ack(MMonSubscribeAck
* m
);
397 std::lock_guard
l(monc_lock
);
400 bool sub_want(std::string what
, version_t start
, unsigned flags
) {
401 std::lock_guard
l(monc_lock
);
402 return sub
.want(what
, start
, flags
);
404 void sub_got(std::string what
, version_t have
) {
405 std::lock_guard
l(monc_lock
);
408 void sub_unwant(std::string what
) {
409 std::lock_guard
l(monc_lock
);
412 bool sub_want_increment(std::string what
, version_t start
, unsigned flags
) {
413 std::lock_guard
l(monc_lock
);
414 return sub
.inc_want(what
, start
, flags
);
417 std::unique_ptr
<KeyRing
> keyring
;
418 std::unique_ptr
<RotatingKeyRing
> rotating_secrets
;
421 explicit MonClient(CephContext
*cct_
);
422 MonClient(const MonClient
&) = delete;
423 MonClient
& operator=(const MonClient
&) = delete;
424 ~MonClient() override
;
429 void set_log_client(LogClient
*clog
) {
432 LogClient
*get_log_client() {
436 int build_initial_monmap();
438 int get_monmap_and_config();
440 * If you want to see MonMap messages, set this and
441 * the MonClient will tell the Messenger it hasn't
443 * Note that if you do this, *you* are of course responsible for
444 * putting the message reference!
446 void set_passthrough_monmap() {
447 std::lock_guard
l(monc_lock
);
448 passthrough_monmap
= true;
450 void unset_passthrough_monmap() {
451 std::lock_guard
l(monc_lock
);
452 passthrough_monmap
= false;
455 * Ping monitor with ID @p mon_id and record the resulting
456 * reply in @p result_reply.
458 * @param[in] mon_id Target monitor's ID
459 * @param[out] result_reply reply from mon.ID, if param != NULL
460 * @returns 0 in case of success; < 0 in case of error,
461 * -ETIMEDOUT if monitor didn't reply before timeout
462 * expired (default: conf->client_mount_timeout).
464 int ping_monitor(const std::string
&mon_id
, std::string
*result_reply
);
466 void send_mon_message(Message
*m
) {
467 send_mon_message(MessageRef
{m
, false});
469 void send_mon_message(MessageRef m
);
471 * If you specify a callback, you should not call
472 * reopen_session() again until it has been triggered. The MonClient
473 * will behave, but the first callback could be triggered after
474 * the session has been killed and the MonClient has started trying
475 * to reconnect to another monitor.
477 void reopen_session(Context
*cb
=NULL
) {
478 std::lock_guard
l(monc_lock
);
480 session_established_context
.reset(cb
);
485 const uuid_d
& get_fsid() const {
489 entity_addrvec_t
get_mon_addrs(unsigned i
) const {
490 std::lock_guard
l(monc_lock
);
491 if (i
< monmap
.size())
492 return monmap
.get_addrs(i
);
493 return entity_addrvec_t();
495 int get_num_mon() const {
496 std::lock_guard
l(monc_lock
);
497 return monmap
.size();
500 uint64_t get_global_id() const {
501 std::lock_guard
l(monc_lock
);
505 void set_messenger(Messenger
*m
) { messenger
= m
; }
506 entity_addrvec_t
get_myaddrs() const { return messenger
->get_myaddrs(); }
507 AuthAuthorizer
* build_authorizer(int service_id
) const;
509 void set_want_keys(uint32_t want
) {
515 uint64_t last_mon_command_tid
;
519 std::string target_name
;
521 ConnectionRef target_con
;
522 std::unique_ptr
<MonConnection
> target_session
;
523 unsigned send_attempts
= 0; ///< attempt count for legacy mons
524 utime_t last_send_attempt
;
527 std::vector
<std::string
> cmd
;
528 ceph::buffer::list inbl
;
529 ceph::buffer::list
*poutbl
;
532 Context
*onfinish
, *ontimeout
;
534 explicit MonCommand(uint64_t t
)
537 poutbl(NULL
), prs(NULL
), prval(NULL
), onfinish(NULL
), ontimeout(NULL
)
540 bool is_tell() const {
541 return target_name
.size() || target_rank
>= 0;
544 std::map
<uint64_t,MonCommand
*> mon_commands
;
546 void _send_command(MonCommand
*r
);
547 void _check_tell_commands();
548 void _resend_mon_commands();
549 int _cancel_mon_command(uint64_t tid
);
550 void _finish_command(MonCommand
*r
, int ret
, std::string rs
);
552 void handle_mon_command_ack(MMonCommandAck
*ack
);
553 void handle_command_reply(MCommandReply
*reply
);
556 void start_mon_command(const std::vector
<std::string
>& cmd
, const ceph::buffer::list
& inbl
,
557 ceph::buffer::list
*outbl
, std::string
*outs
,
559 void start_mon_command(int mon_rank
,
560 const std::vector
<std::string
>& cmd
, const ceph::buffer::list
& inbl
,
561 ceph::buffer::list
*outbl
, std::string
*outs
,
563 void start_mon_command(const std::string
&mon_name
, ///< mon name, with mon. prefix
564 const std::vector
<std::string
>& cmd
, const ceph::buffer::list
& inbl
,
565 ceph::buffer::list
*outbl
, std::string
*outs
,
571 * get latest known version(s) of cluster map
573 * @param map std::string name of map (e.g., 'osdmap')
574 * @param newest pointer where newest map version will be stored
575 * @param oldest pointer where oldest map version will be stored
576 * @param onfinish context that will be triggered on completion
577 * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
579 void get_version(std::string map
, version_t
*newest
, version_t
*oldest
, Context
*onfinish
);
581 * Run a callback within our lock, with a reference
584 template<typename Callback
, typename
...Args
>
585 auto with_monmap(Callback
&& cb
, Args
&&...args
) const ->
586 decltype(cb(monmap
, std::forward
<Args
>(args
)...)) {
587 std::lock_guard
l(monc_lock
);
588 return std::forward
<Callback
>(cb
)(monmap
, std::forward
<Args
>(args
)...);
591 void register_config_callback(md_config_t::config_callback fn
);
592 void register_config_notify_callback(std::function
<void(void)> f
) {
593 config_notify_cb
= f
;
595 md_config_t::config_callback
get_config_callback();
598 struct version_req_d
{
600 version_t
*newest
, *oldest
;
601 version_req_d(Context
*con
, version_t
*n
, version_t
*o
) : context(con
),newest(n
), oldest(o
) {}
604 std::map
<ceph_tid_t
, version_req_d
*> version_requests
;
605 ceph_tid_t version_req_id
;
606 void handle_get_version_reply(MMonGetVersionReply
* m
);
608 md_config_t::config_callback config_cb
;
609 std::function
<void(void)> config_notify_cb
;