1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MONCLIENT_H
16 #define CEPH_MONCLIENT_H
20 #include "msg/Messenger.h"
24 #include "common/Timer.h"
25 #include "common/Finisher.h"
26 #include "common/config.h"
30 class MMonGetVersionReply
;
31 struct MMonSubscribeAck
;
36 struct AuthAuthorizer
;
38 class AuthClientHandler
;
40 class RotatingKeyRing
;
42 struct MonClientPinger
: public Dispatcher
{
49 MonClientPinger(CephContext
*cct_
, string
*res_
) :
51 lock("MonClientPinger::lock"),
56 int wait_for_reply(double timeout
= 0.0) {
57 utime_t until
= ceph_clock_now();
58 until
+= (timeout
> 0 ? timeout
: cct
->_conf
->client_mount_timeout
);
63 ret
= ping_recvd_cond
.WaitUntil(lock
, until
);
70 bool ms_dispatch(Message
*m
) override
{
71 Mutex::Locker
l(lock
);
72 if (m
->get_type() != CEPH_MSG_PING
)
75 bufferlist
&payload
= m
->get_payload();
76 if (result
&& payload
.length() > 0) {
77 bufferlist::iterator p
= payload
.begin();
81 ping_recvd_cond
.SignalAll();
85 bool ms_handle_reset(Connection
*con
) override
{
86 Mutex::Locker
l(lock
);
88 ping_recvd_cond
.SignalAll();
91 void ms_handle_remote_reset(Connection
*con
) override
{}
92 bool ms_handle_refused(Connection
*con
) override
{
99 MonConnection(CephContext
*cct
,
103 MonConnection(MonConnection
&& rhs
) = default;
104 MonConnection
& operator=(MonConnection
&&) = default;
105 MonConnection(const MonConnection
& rhs
) = delete;
106 MonConnection
& operator=(const MonConnection
&) = delete;
107 int handle_auth(MAuthReply
*m
,
108 const EntityName
& entity_name
,
110 RotatingKeyRing
* keyring
);
111 int authenticate(MAuthReply
*m
);
112 void start(epoch_t epoch
,
113 const EntityName
& entity_name
,
114 const AuthMethodList
& auth_supported
);
115 bool have_session() const;
116 uint64_t get_global_id() const {
119 ConnectionRef
get_con() {
122 std::unique_ptr
<AuthClientHandler
>& get_auth() {
127 int _negotiate(MAuthReply
*m
,
128 const EntityName
& entity_name
,
130 RotatingKeyRing
* keyring
);
140 State state
= State::NONE
;
143 std::unique_ptr
<AuthClientHandler
> auth
;
147 class MonClient
: public Dispatcher
{
151 Messenger
*messenger
;
153 std::unique_ptr
<MonConnection
> active_con
;
154 std::map
<entity_addr_t
, MonConnection
> pending_cons
;
156 EntityName entity_name
;
158 entity_addr_t my_addr
;
160 mutable Mutex monc_lock
;
165 bool no_keyring_disabled_cephx
;
167 LogClient
*log_client
;
168 bool more_log_pending
;
170 void send_log(bool flush
= false);
172 std::unique_ptr
<AuthMethodList
> auth_supported
;
174 bool ms_dispatch(Message
*m
) override
;
175 bool ms_handle_reset(Connection
*con
) override
;
176 void ms_handle_remote_reset(Connection
*con
) override
{}
177 bool ms_handle_refused(Connection
*con
) override
{ return false; }
179 void handle_monmap(MMonMap
*m
);
181 void handle_auth(MAuthReply
*m
);
185 void schedule_tick();
190 bool passthrough_monmap
= false;
193 std::unique_ptr
<AuthClientHandler
> auth
;
194 uint32_t want_keys
= 0;
195 uint64_t global_id
= 0;
197 int authenticate_err
= 0;
198 bool authenticated
= false;
200 list
<Message
*> waiting_for_session
;
201 utime_t last_rotating_renew_sent
;
202 std::unique_ptr
<Context
> session_established_context
;
203 bool had_a_connection
;
204 double reopen_interval_multiplier
;
206 bool _opened() const;
207 bool _hunting() const;
208 void _start_hunting();
209 void _finish_hunting();
210 void _finish_auth(int auth_err
);
211 void _reopen_session(int rank
= -1);
212 MonConnection
& _add_conn(unsigned rank
, uint64_t global_id
);
213 void _add_conns(uint64_t global_id
);
214 void _send_mon_message(Message
*m
);
217 void set_entity_name(EntityName name
) { entity_name
= name
; }
219 int _check_auth_tickets();
220 int _check_auth_rotating();
221 int wait_auth_rotating(double timeout
);
223 int authenticate(double timeout
=0.0);
224 bool is_authenticated() const {return authenticated
;}
227 * Try to flush as many log messages as we can in a single
228 * message. Use this before shutting down to transmit your
235 map
<string
,ceph_mon_subscribe_item
> sub_sent
; // my subs, and current versions
236 map
<string
,ceph_mon_subscribe_item
> sub_new
; // unsent new subs
237 utime_t sub_renew_sent
, sub_renew_after
;
240 void handle_subscribe_ack(MMonSubscribeAck
* m
);
242 bool _sub_want(const string
&what
, version_t start
, unsigned flags
) {
243 auto sub
= sub_new
.find(what
);
244 if (sub
!= sub_new
.end() &&
245 sub
->second
.start
== start
&&
246 sub
->second
.flags
== flags
) {
249 sub
= sub_sent
.find(what
);
250 if (sub
!= sub_sent
.end() &&
251 sub
->second
.start
== start
&&
252 sub
->second
.flags
== flags
)
256 sub_new
[what
].start
= start
;
257 sub_new
[what
].flags
= flags
;
260 void _sub_got(const string
&what
, version_t got
) {
261 if (sub_new
.count(what
)) {
262 if (sub_new
[what
].start
<= got
) {
263 if (sub_new
[what
].flags
& CEPH_SUBSCRIBE_ONETIME
)
266 sub_new
[what
].start
= got
+ 1;
268 } else if (sub_sent
.count(what
)) {
269 if (sub_sent
[what
].start
<= got
) {
270 if (sub_sent
[what
].flags
& CEPH_SUBSCRIBE_ONETIME
)
271 sub_sent
.erase(what
);
273 sub_sent
[what
].start
= got
+ 1;
277 void _sub_unwant(const string
&what
) {
278 sub_sent
.erase(what
);
284 Mutex::Locker
l(monc_lock
);
287 bool sub_want(string what
, version_t start
, unsigned flags
) {
288 Mutex::Locker
l(monc_lock
);
289 return _sub_want(what
, start
, flags
);
291 void sub_got(string what
, version_t have
) {
292 Mutex::Locker
l(monc_lock
);
293 _sub_got(what
, have
);
295 void sub_unwant(string what
) {
296 Mutex::Locker
l(monc_lock
);
300 * Increase the requested subscription start point. If you do increase
301 * the value, apply the passed-in flags as well; otherwise do nothing.
303 bool sub_want_increment(string what
, version_t start
, unsigned flags
) {
304 Mutex::Locker
l(monc_lock
);
305 map
<string
,ceph_mon_subscribe_item
>::iterator i
= sub_new
.find(what
);
306 if (i
!= sub_new
.end()) {
307 if (i
->second
.start
>= start
)
309 i
->second
.start
= start
;
310 i
->second
.flags
= flags
;
314 i
= sub_sent
.find(what
);
315 if (i
== sub_sent
.end() || i
->second
.start
< start
) {
316 ceph_mon_subscribe_item
& item
= sub_new
[what
];
324 std::unique_ptr
<KeyRing
> keyring
;
325 std::unique_ptr
<RotatingKeyRing
> rotating_secrets
;
328 explicit MonClient(CephContext
*cct_
);
329 MonClient(const MonClient
&) = delete;
330 MonClient
& operator=(const MonClient
&) = delete;
331 ~MonClient() override
;
336 void set_log_client(LogClient
*clog
) {
340 int build_initial_monmap();
342 int get_monmap_privately();
344 * If you want to see MonMap messages, set this and
345 * the MonClient will tell the Messenger it hasn't
347 * Note that if you do this, *you* are of course responsible for
348 * putting the message reference!
350 void set_passthrough_monmap() {
351 Mutex::Locker
l(monc_lock
);
352 passthrough_monmap
= true;
354 void unset_passthrough_monmap() {
355 Mutex::Locker
l(monc_lock
);
356 passthrough_monmap
= false;
359 * Ping monitor with ID @p mon_id and record the resulting
360 * reply in @p result_reply.
362 * @param[in] mon_id Target monitor's ID
363 * @param[out] result_reply reply from mon.ID, if param != NULL
364 * @returns 0 in case of success; < 0 in case of error,
365 * -ETIMEDOUT if monitor didn't reply before timeout
366 * expired (default: conf->client_mount_timeout).
368 int ping_monitor(const string
&mon_id
, string
*result_reply
);
370 void send_mon_message(Message
*m
) {
371 Mutex::Locker
l(monc_lock
);
372 _send_mon_message(m
);
375 * If you specify a callback, you should not call
376 * reopen_session() again until it has been triggered. The MonClient
377 * will behave, but the first callback could be triggered after
378 * the session has been killed and the MonClient has started trying
379 * to reconnect to another monitor.
381 void reopen_session(Context
*cb
=NULL
) {
382 Mutex::Locker
l(monc_lock
);
384 session_established_context
.reset(cb
);
389 entity_addr_t
get_my_addr() const {
393 const uuid_d
& get_fsid() const {
397 entity_addr_t
get_mon_addr(unsigned i
) const {
398 Mutex::Locker
l(monc_lock
);
399 if (i
< monmap
.size())
400 return monmap
.get_addr(i
);
401 return entity_addr_t();
403 entity_inst_t
get_mon_inst(unsigned i
) const {
404 Mutex::Locker
l(monc_lock
);
405 if (i
< monmap
.size())
406 return monmap
.get_inst(i
);
407 return entity_inst_t();
409 int get_num_mon() const {
410 Mutex::Locker
l(monc_lock
);
411 return monmap
.size();
414 uint64_t get_global_id() const {
415 Mutex::Locker
l(monc_lock
);
419 void set_messenger(Messenger
*m
) { messenger
= m
; }
420 entity_addr_t
get_myaddr() const { return messenger
->get_myaddr(); }
421 AuthAuthorizer
* build_authorizer(int service_id
) const;
423 void set_want_keys(uint32_t want
) {
429 uint64_t last_mon_command_tid
;
439 Context
*onfinish
, *ontimeout
;
441 explicit MonCommand(uint64_t t
)
444 poutbl(NULL
), prs(NULL
), prval(NULL
), onfinish(NULL
), ontimeout(NULL
)
447 map
<uint64_t,MonCommand
*> mon_commands
;
449 void _send_command(MonCommand
*r
);
450 void _resend_mon_commands();
451 int _cancel_mon_command(uint64_t tid
);
452 void _finish_command(MonCommand
*r
, int ret
, string rs
);
454 void handle_mon_command_ack(MMonCommandAck
*ack
);
457 void start_mon_command(const vector
<string
>& cmd
, const bufferlist
& inbl
,
458 bufferlist
*outbl
, string
*outs
,
460 void start_mon_command(int mon_rank
,
461 const vector
<string
>& cmd
, const bufferlist
& inbl
,
462 bufferlist
*outbl
, string
*outs
,
464 void start_mon_command(const string
&mon_name
, ///< mon name, with mon. prefix
465 const vector
<string
>& cmd
, const bufferlist
& inbl
,
466 bufferlist
*outbl
, string
*outs
,
472 * get latest known version(s) of cluster map
474 * @param map string name of map (e.g., 'osdmap')
475 * @param newest pointer where newest map version will be stored
476 * @param oldest pointer where oldest map version will be stored
477 * @param onfinish context that will be triggered on completion
478 * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
480 void get_version(string map
, version_t
*newest
, version_t
*oldest
, Context
*onfinish
);
483 * Run a callback within our lock, with a reference
486 template<typename Callback
, typename
...Args
>
487 auto with_monmap(Callback
&& cb
, Args
&&...args
) const ->
488 decltype(cb(monmap
, std::forward
<Args
>(args
)...)) {
489 Mutex::Locker
l(monc_lock
);
490 return std::forward
<Callback
>(cb
)(monmap
, std::forward
<Args
>(args
)...);
494 struct version_req_d
{
496 version_t
*newest
, *oldest
;
497 version_req_d(Context
*con
, version_t
*n
, version_t
*o
) : context(con
),newest(n
), oldest(o
) {}
500 map
<ceph_tid_t
, version_req_d
*> version_requests
;
501 ceph_tid_t version_req_id
;
502 void handle_get_version_reply(MMonGetVersionReply
* m
);