1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MONCLIENT_H
16 #define CEPH_MONCLIENT_H
20 #include "msg/Messenger.h"
24 #include "common/Timer.h"
25 #include "common/Finisher.h"
26 #include "common/config.h"
30 class MMonGetVersionReply
;
31 struct MMonSubscribeAck
;
36 struct AuthAuthorizer
;
38 class AuthClientHandler
;
40 class RotatingKeyRing
;
42 struct MonClientPinger
: public Dispatcher
{
49 MonClientPinger(CephContext
*cct_
, string
*res_
) :
51 lock("MonClientPinger::lock"),
56 int wait_for_reply(double timeout
= 0.0) {
57 utime_t until
= ceph_clock_now();
58 until
+= (timeout
> 0 ? timeout
: cct
->_conf
->client_mount_timeout
);
63 ret
= ping_recvd_cond
.WaitUntil(lock
, until
);
70 bool ms_dispatch(Message
*m
) override
{
71 Mutex::Locker
l(lock
);
72 if (m
->get_type() != CEPH_MSG_PING
)
75 bufferlist
&payload
= m
->get_payload();
76 if (result
&& payload
.length() > 0) {
77 bufferlist::iterator p
= payload
.begin();
81 ping_recvd_cond
.SignalAll();
85 bool ms_handle_reset(Connection
*con
) override
{
86 Mutex::Locker
l(lock
);
88 ping_recvd_cond
.SignalAll();
91 void ms_handle_remote_reset(Connection
*con
) override
{}
92 bool ms_handle_refused(Connection
*con
) override
{
99 MonConnection(CephContext
*cct
,
103 MonConnection(MonConnection
&& rhs
) = default;
104 MonConnection
& operator=(MonConnection
&&) = default;
105 MonConnection(const MonConnection
& rhs
) = delete;
106 MonConnection
& operator=(const MonConnection
&) = delete;
107 int handle_auth(MAuthReply
*m
,
108 const EntityName
& entity_name
,
110 RotatingKeyRing
* keyring
);
111 int authenticate(MAuthReply
*m
);
112 void start(epoch_t epoch
,
113 const EntityName
& entity_name
,
114 const AuthMethodList
& auth_supported
);
115 bool have_session() const;
116 uint64_t get_global_id() const {
119 ConnectionRef
get_con() {
122 std::unique_ptr
<AuthClientHandler
>& get_auth() {
127 int _negotiate(MAuthReply
*m
,
128 const EntityName
& entity_name
,
130 RotatingKeyRing
* keyring
);
140 State state
= State::NONE
;
143 std::unique_ptr
<AuthClientHandler
> auth
;
147 class MonClient
: public Dispatcher
{
151 Messenger
*messenger
;
153 std::unique_ptr
<MonConnection
> active_con
;
154 std::map
<entity_addr_t
, MonConnection
> pending_cons
;
156 EntityName entity_name
;
158 entity_addr_t my_addr
;
160 mutable Mutex monc_lock
;
165 bool no_keyring_disabled_cephx
;
167 LogClient
*log_client
;
168 bool more_log_pending
;
170 void send_log(bool flush
= false);
172 std::unique_ptr
<AuthMethodList
> auth_supported
;
174 bool ms_dispatch(Message
*m
) override
;
175 bool ms_handle_reset(Connection
*con
) override
;
176 void ms_handle_remote_reset(Connection
*con
) override
{}
177 bool ms_handle_refused(Connection
*con
) override
{ return false; }
179 void handle_monmap(MMonMap
*m
);
181 void handle_auth(MAuthReply
*m
);
185 void schedule_tick();
190 bool passthrough_monmap
= false;
193 std::unique_ptr
<AuthClientHandler
> auth
;
194 uint32_t want_keys
= 0;
195 uint64_t global_id
= 0;
197 int authenticate_err
= 0;
198 bool authenticated
= false;
200 list
<Message
*> waiting_for_session
;
201 utime_t last_rotating_renew_sent
;
202 std::unique_ptr
<Context
> session_established_context
;
203 bool had_a_connection
;
204 double reopen_interval_multiplier
;
206 bool _opened() const;
207 bool _hunting() const;
208 void _start_hunting();
209 void _finish_hunting();
210 void _finish_auth(int auth_err
);
211 void _reopen_session(int rank
= -1);
212 MonConnection
& _add_conn(unsigned rank
, uint64_t global_id
);
214 void _add_conns(uint64_t global_id
);
215 void _send_mon_message(Message
*m
);
218 void set_entity_name(EntityName name
) { entity_name
= name
; }
220 int _check_auth_tickets();
221 int _check_auth_rotating();
222 int wait_auth_rotating(double timeout
);
224 int authenticate(double timeout
=0.0);
225 bool is_authenticated() const {return authenticated
;}
228 * Try to flush as many log messages as we can in a single
229 * message. Use this before shutting down to transmit your
236 map
<string
,ceph_mon_subscribe_item
> sub_sent
; // my subs, and current versions
237 map
<string
,ceph_mon_subscribe_item
> sub_new
; // unsent new subs
238 utime_t sub_renew_sent
, sub_renew_after
;
241 void handle_subscribe_ack(MMonSubscribeAck
* m
);
243 bool _sub_want(const string
&what
, version_t start
, unsigned flags
) {
244 auto sub
= sub_new
.find(what
);
245 if (sub
!= sub_new
.end() &&
246 sub
->second
.start
== start
&&
247 sub
->second
.flags
== flags
) {
250 sub
= sub_sent
.find(what
);
251 if (sub
!= sub_sent
.end() &&
252 sub
->second
.start
== start
&&
253 sub
->second
.flags
== flags
)
257 sub_new
[what
].start
= start
;
258 sub_new
[what
].flags
= flags
;
261 void _sub_got(const string
&what
, version_t got
) {
262 if (sub_new
.count(what
)) {
263 if (sub_new
[what
].start
<= got
) {
264 if (sub_new
[what
].flags
& CEPH_SUBSCRIBE_ONETIME
)
267 sub_new
[what
].start
= got
+ 1;
269 } else if (sub_sent
.count(what
)) {
270 if (sub_sent
[what
].start
<= got
) {
271 if (sub_sent
[what
].flags
& CEPH_SUBSCRIBE_ONETIME
)
272 sub_sent
.erase(what
);
274 sub_sent
[what
].start
= got
+ 1;
278 void _sub_unwant(const string
&what
) {
279 sub_sent
.erase(what
);
285 Mutex::Locker
l(monc_lock
);
288 bool sub_want(string what
, version_t start
, unsigned flags
) {
289 Mutex::Locker
l(monc_lock
);
290 return _sub_want(what
, start
, flags
);
292 void sub_got(string what
, version_t have
) {
293 Mutex::Locker
l(monc_lock
);
294 _sub_got(what
, have
);
296 void sub_unwant(string what
) {
297 Mutex::Locker
l(monc_lock
);
301 * Increase the requested subscription start point. If you do increase
302 * the value, apply the passed-in flags as well; otherwise do nothing.
304 bool sub_want_increment(string what
, version_t start
, unsigned flags
) {
305 Mutex::Locker
l(monc_lock
);
306 map
<string
,ceph_mon_subscribe_item
>::iterator i
= sub_new
.find(what
);
307 if (i
!= sub_new
.end()) {
308 if (i
->second
.start
>= start
)
310 i
->second
.start
= start
;
311 i
->second
.flags
= flags
;
315 i
= sub_sent
.find(what
);
316 if (i
== sub_sent
.end() || i
->second
.start
< start
) {
317 ceph_mon_subscribe_item
& item
= sub_new
[what
];
325 std::unique_ptr
<KeyRing
> keyring
;
326 std::unique_ptr
<RotatingKeyRing
> rotating_secrets
;
329 explicit MonClient(CephContext
*cct_
);
330 MonClient(const MonClient
&) = delete;
331 MonClient
& operator=(const MonClient
&) = delete;
332 ~MonClient() override
;
337 void set_log_client(LogClient
*clog
) {
341 int build_initial_monmap();
343 int get_monmap_privately();
345 * If you want to see MonMap messages, set this and
346 * the MonClient will tell the Messenger it hasn't
348 * Note that if you do this, *you* are of course responsible for
349 * putting the message reference!
351 void set_passthrough_monmap() {
352 Mutex::Locker
l(monc_lock
);
353 passthrough_monmap
= true;
355 void unset_passthrough_monmap() {
356 Mutex::Locker
l(monc_lock
);
357 passthrough_monmap
= false;
360 * Ping monitor with ID @p mon_id and record the resulting
361 * reply in @p result_reply.
363 * @param[in] mon_id Target monitor's ID
364 * @param[out] result_reply reply from mon.ID, if param != NULL
365 * @returns 0 in case of success; < 0 in case of error,
366 * -ETIMEDOUT if monitor didn't reply before timeout
367 * expired (default: conf->client_mount_timeout).
369 int ping_monitor(const string
&mon_id
, string
*result_reply
);
371 void send_mon_message(Message
*m
) {
372 Mutex::Locker
l(monc_lock
);
373 _send_mon_message(m
);
376 * If you specify a callback, you should not call
377 * reopen_session() again until it has been triggered. The MonClient
378 * will behave, but the first callback could be triggered after
379 * the session has been killed and the MonClient has started trying
380 * to reconnect to another monitor.
382 void reopen_session(Context
*cb
=NULL
) {
383 Mutex::Locker
l(monc_lock
);
385 session_established_context
.reset(cb
);
390 entity_addr_t
get_my_addr() const {
394 const uuid_d
& get_fsid() const {
398 entity_addr_t
get_mon_addr(unsigned i
) const {
399 Mutex::Locker
l(monc_lock
);
400 if (i
< monmap
.size())
401 return monmap
.get_addr(i
);
402 return entity_addr_t();
404 entity_inst_t
get_mon_inst(unsigned i
) const {
405 Mutex::Locker
l(monc_lock
);
406 if (i
< monmap
.size())
407 return monmap
.get_inst(i
);
408 return entity_inst_t();
410 int get_num_mon() const {
411 Mutex::Locker
l(monc_lock
);
412 return monmap
.size();
415 uint64_t get_global_id() const {
416 Mutex::Locker
l(monc_lock
);
420 void set_messenger(Messenger
*m
) { messenger
= m
; }
421 entity_addr_t
get_myaddr() const { return messenger
->get_myaddr(); }
422 AuthAuthorizer
* build_authorizer(int service_id
) const;
424 void set_want_keys(uint32_t want
) {
430 uint64_t last_mon_command_tid
;
440 Context
*onfinish
, *ontimeout
;
442 explicit MonCommand(uint64_t t
)
445 poutbl(NULL
), prs(NULL
), prval(NULL
), onfinish(NULL
), ontimeout(NULL
)
448 map
<uint64_t,MonCommand
*> mon_commands
;
450 void _send_command(MonCommand
*r
);
451 void _resend_mon_commands();
452 int _cancel_mon_command(uint64_t tid
);
453 void _finish_command(MonCommand
*r
, int ret
, string rs
);
455 void handle_mon_command_ack(MMonCommandAck
*ack
);
458 void start_mon_command(const vector
<string
>& cmd
, const bufferlist
& inbl
,
459 bufferlist
*outbl
, string
*outs
,
461 void start_mon_command(int mon_rank
,
462 const vector
<string
>& cmd
, const bufferlist
& inbl
,
463 bufferlist
*outbl
, string
*outs
,
465 void start_mon_command(const string
&mon_name
, ///< mon name, with mon. prefix
466 const vector
<string
>& cmd
, const bufferlist
& inbl
,
467 bufferlist
*outbl
, string
*outs
,
473 * get latest known version(s) of cluster map
475 * @param map string name of map (e.g., 'osdmap')
476 * @param newest pointer where newest map version will be stored
477 * @param oldest pointer where oldest map version will be stored
478 * @param onfinish context that will be triggered on completion
479 * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
481 void get_version(string map
, version_t
*newest
, version_t
*oldest
, Context
*onfinish
);
484 * Run a callback within our lock, with a reference
487 template<typename Callback
, typename
...Args
>
488 auto with_monmap(Callback
&& cb
, Args
&&...args
) const ->
489 decltype(cb(monmap
, std::forward
<Args
>(args
)...)) {
490 Mutex::Locker
l(monc_lock
);
491 return std::forward
<Callback
>(cb
)(monmap
, std::forward
<Args
>(args
)...);
495 struct version_req_d
{
497 version_t
*newest
, *oldest
;
498 version_req_d(Context
*con
, version_t
*n
, version_t
*o
) : context(con
),newest(n
), oldest(o
) {}
501 map
<ceph_tid_t
, version_req_d
*> version_requests
;
502 ceph_tid_t version_req_id
;
503 void handle_get_version_reply(MMonGetVersionReply
* m
);