* Foundation. See file COPYING.
*
*/
-
#ifndef CEPH_MONCLIENT_H
#define CEPH_MONCLIENT_H
#include "msg/Messenger.h"
#include "MonMap.h"
+#include "MonSub.h"
#include "common/Timer.h"
#include "common/Finisher.h"
#include "common/config.h"
+#include "auth/AuthClient.h"
+#include "auth/AuthServer.h"
class MMonMap;
+class MConfig;
class MMonGetVersionReply;
struct MMonSubscribeAck;
class MMonCommandAck;
struct MAuthReply;
-class MAuthRotating;
class LogClient;
-struct AuthAuthorizer;
-class AuthMethodList;
+class AuthAuthorizer;
class AuthClientHandler;
+class AuthMethodList;
+class AuthRegistry;
class KeyRing;
class RotatingKeyRing;
-struct MonClientPinger : public Dispatcher {
+class MonConnection {
+public:
+ MonConnection(CephContext *cct,
+ ConnectionRef conn,
+ uint64_t global_id,
+ AuthRegistry *auth_registry);
+ ~MonConnection();
+ MonConnection(MonConnection&& rhs) = default;
+ MonConnection& operator=(MonConnection&&) = default;
+ MonConnection(const MonConnection& rhs) = delete;
+ MonConnection& operator=(const MonConnection&) = delete;
+ int handle_auth(MAuthReply *m,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring);
+ int authenticate(MAuthReply *m);
+ void start(epoch_t epoch,
+ const EntityName& entity_name);
+ bool have_session() const;
+ uint64_t get_global_id() const {
+ return global_id;
+ }
+ ConnectionRef get_con() {
+ return con;
+ }
+ std::unique_ptr<AuthClientHandler>& get_auth() {
+ return auth;
+ }
+
+ int get_auth_request(
+ uint32_t *method,
+ std::vector<uint32_t> *preferred_modes,
+ bufferlist *out,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring);
+ int handle_auth_reply_more(
+ AuthConnectionMeta *auth_meta,
+ const bufferlist& bl,
+ bufferlist *reply);
+ int handle_auth_done(
+ AuthConnectionMeta *auth_meta,
+ uint64_t global_id,
+ const bufferlist& bl,
+ CryptoKey *session_key,
+ std::string *connection_secret);
+ int handle_auth_bad_method(
+ uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes);
+
+ bool is_con(Connection *c) const {
+ return con.get() == c;
+ }
+
+private:
+ int _negotiate(MAuthReply *m,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring);
+ int _init_auth(uint32_t method,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring,
+ bool msgr2);
+
+private:
+ CephContext *cct;
+ enum class State {
+ NONE,
+ NEGOTIATING, // v1 only
+ AUTHENTICATING, // v1 and v2
+ HAVE_SESSION,
+ };
+ State state = State::NONE;
+ ConnectionRef con;
+ int auth_method = -1;
+ utime_t auth_start;
+
+ std::unique_ptr<AuthClientHandler> auth;
+ uint64_t global_id;
+
+ AuthRegistry *auth_registry;
+};
+
+
+struct MonClientPinger : public Dispatcher,
+ public AuthClient {
Mutex lock;
Cond ping_recvd_cond;
string *result;
bool done;
+ RotatingKeyRing *keyring;
+ std::unique_ptr<MonConnection> mc;
- MonClientPinger(CephContext *cct_, string *res_) :
+ MonClientPinger(CephContext *cct_,
+ RotatingKeyRing *keyring,
+ string *res_) :
Dispatcher(cct_),
lock("MonClientPinger::lock"),
result(res_),
- done(false)
+ done(false),
+ keyring(keyring)
{ }
int wait_for_reply(double timeout = 0.0) {
}
bool ms_dispatch(Message *m) override {
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
if (m->get_type() != CEPH_MSG_PING)
return false;
bufferlist &payload = m->get_payload();
if (result && payload.length() > 0) {
- bufferlist::iterator p = payload.begin();
- ::decode(*result, p);
+ auto p = std::cbegin(payload);
+ decode(*result, p);
}
done = true;
ping_recvd_cond.SignalAll();
return true;
}
bool ms_handle_reset(Connection *con) override {
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
done = true;
ping_recvd_cond.SignalAll();
return true;
bool ms_handle_refused(Connection *con) override {
return false;
}
-};
-class MonConnection {
-public:
- MonConnection(CephContext *cct,
- ConnectionRef conn,
- uint64_t global_id);
- ~MonConnection();
- MonConnection(MonConnection&& rhs) = default;
- MonConnection& operator=(MonConnection&&) = default;
- MonConnection(const MonConnection& rhs) = delete;
- MonConnection& operator=(const MonConnection&) = delete;
- int handle_auth(MAuthReply *m,
- const EntityName& entity_name,
- uint32_t want_keys,
- RotatingKeyRing* keyring);
- int authenticate(MAuthReply *m);
- void start(epoch_t epoch,
- const EntityName& entity_name,
- const AuthMethodList& auth_supported);
- bool have_session() const;
- uint64_t get_global_id() const {
- return global_id;
+ // AuthClient
+ int get_auth_request(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint32_t *auth_method,
+ std::vector<uint32_t> *preferred_modes,
+ bufferlist *bl) override {
+ return mc->get_auth_request(auth_method, preferred_modes, bl,
+ cct->_conf->name, 0, keyring);
}
- ConnectionRef get_con() {
- return con;
+ int handle_auth_reply_more(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ const bufferlist& bl,
+ bufferlist *reply) override {
+ return mc->handle_auth_reply_more(auth_meta, bl, reply);
}
- std::unique_ptr<AuthClientHandler>& get_auth() {
- return auth;
+ int handle_auth_done(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint64_t global_id,
+ uint32_t con_mode,
+ const bufferlist& bl,
+ CryptoKey *session_key,
+ std::string *connection_secret) override {
+ return mc->handle_auth_done(auth_meta, global_id, bl,
+ session_key, connection_secret);
+ }
+ int handle_auth_bad_method(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes) override {
+ return mc->handle_auth_bad_method(old_auth_method, result,
+ allowed_methods, allowed_modes);
}
-
-private:
- int _negotiate(MAuthReply *m,
- const EntityName& entity_name,
- uint32_t want_keys,
- RotatingKeyRing* keyring);
-
-private:
- CephContext *cct;
- enum class State {
- NONE,
- NEGOTIATING,
- AUTHENTICATING,
- HAVE_SESSION,
- };
- State state = State::NONE;
- ConnectionRef con;
-
- std::unique_ptr<AuthClientHandler> auth;
- uint64_t global_id;
};
-class MonClient : public Dispatcher {
+
+class MonClient : public Dispatcher,
+ public AuthClient,
+ public AuthServer /* for mgr, osd, mds */ {
public:
MonMap monmap;
+ map<string,string> config_mgr;
+
private:
Messenger *messenger;
std::unique_ptr<MonConnection> active_con;
- std::map<entity_addr_t, MonConnection> pending_cons;
+ std::map<entity_addrvec_t, MonConnection> pending_cons;
EntityName entity_name;
- entity_addr_t my_addr;
-
mutable Mutex monc_lock;
SafeTimer timer;
Finisher finisher;
bool initialized;
- bool no_keyring_disabled_cephx;
+ bool stopping = false;
LogClient *log_client;
bool more_log_pending;
void send_log(bool flush = false);
- std::unique_ptr<AuthMethodList> auth_supported;
-
bool ms_dispatch(Message *m) override;
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override { return false; }
void handle_monmap(MMonMap *m);
+ void handle_config(MConfig *m);
void handle_auth(MAuthReply *m);
bool want_monmap;
Cond map_cond;
bool passthrough_monmap = false;
+ bool got_config = false;
// authenticate
std::unique_ptr<AuthClientHandler> auth;
bool had_a_connection;
double reopen_interval_multiplier;
+ Dispatcher *handle_authentication_dispatcher = nullptr;
+
bool _opened() const;
bool _hunting() const;
void _start_hunting();
- void _finish_hunting();
+ void _finish_hunting(int auth_err);
void _finish_auth(int auth_err);
void _reopen_session(int rank = -1);
MonConnection& _add_conn(unsigned rank, uint64_t global_id);
void _add_conns(uint64_t global_id);
void _send_mon_message(Message *m);
+ std::map<entity_addrvec_t, MonConnection>::iterator _find_pending_con(
+ const ConnectionRef& con) {
+ for (auto i = pending_cons.begin(); i != pending_cons.end(); ++i) {
+ if (i->second.get_con() == con) {
+ return i;
+ }
+ }
+ return pending_cons.end();
+ }
+
public:
- void set_entity_name(EntityName name) { entity_name = name; }
+ // AuthClient
+ int get_auth_request(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint32_t *method,
+ std::vector<uint32_t> *preferred_modes,
+ bufferlist *bl) override;
+ int handle_auth_reply_more(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ const bufferlist& bl,
+ bufferlist *reply) override;
+ int handle_auth_done(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint64_t global_id,
+ uint32_t con_mode,
+ const bufferlist& bl,
+ CryptoKey *session_key,
+ std::string *connection_secret) override;
+ int handle_auth_bad_method(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes) override;
+ // AuthServer
+ int handle_auth_request(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ bool more,
+ uint32_t auth_method,
+ const bufferlist& bl,
+ bufferlist *reply);
+ void set_entity_name(EntityName name) { entity_name = name; }
+ void set_handle_authentication_dispatcher(Dispatcher *d) {
+ handle_authentication_dispatcher = d;
+ }
int _check_auth_tickets();
int _check_auth_rotating();
int wait_auth_rotating(double timeout);
*/
void flush_log();
- // mon subscriptions
private:
- map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
- map<string,ceph_mon_subscribe_item> sub_new; // unsent new subs
- utime_t sub_renew_sent, sub_renew_after;
-
+ // mon subscriptions
+ MonSub sub;
void _renew_subs();
void handle_subscribe_ack(MMonSubscribeAck* m);
- bool _sub_want(const string &what, version_t start, unsigned flags) {
- auto sub = sub_new.find(what);
- if (sub != sub_new.end() &&
- sub->second.start == start &&
- sub->second.flags == flags) {
- return false;
- } else {
- sub = sub_sent.find(what);
- if (sub != sub_sent.end() &&
- sub->second.start == start &&
- sub->second.flags == flags)
- return false;
- }
-
- sub_new[what].start = start;
- sub_new[what].flags = flags;
- return true;
- }
- void _sub_got(const string &what, version_t got) {
- if (sub_new.count(what)) {
- if (sub_new[what].start <= got) {
- if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
- sub_new.erase(what);
- else
- sub_new[what].start = got + 1;
- }
- } else if (sub_sent.count(what)) {
- if (sub_sent[what].start <= got) {
- if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
- sub_sent.erase(what);
- else
- sub_sent[what].start = got + 1;
- }
- }
- }
- void _sub_unwant(const string &what) {
- sub_sent.erase(what);
- sub_new.erase(what);
- }
-
public:
void renew_subs() {
- Mutex::Locker l(monc_lock);
+ std::lock_guard l(monc_lock);
_renew_subs();
}
bool sub_want(string what, version_t start, unsigned flags) {
- Mutex::Locker l(monc_lock);
- return _sub_want(what, start, flags);
+ std::lock_guard l(monc_lock);
+ return sub.want(what, start, flags);
}
void sub_got(string what, version_t have) {
- Mutex::Locker l(monc_lock);
- _sub_got(what, have);
+ std::lock_guard l(monc_lock);
+ sub.got(what, have);
}
void sub_unwant(string what) {
- Mutex::Locker l(monc_lock);
- _sub_unwant(what);
+ std::lock_guard l(monc_lock);
+ sub.unwant(what);
}
- /**
- * Increase the requested subscription start point. If you do increase
- * the value, apply the passed-in flags as well; otherwise do nothing.
- */
bool sub_want_increment(string what, version_t start, unsigned flags) {
- Mutex::Locker l(monc_lock);
- map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
- if (i != sub_new.end()) {
- if (i->second.start >= start)
- return false;
- i->second.start = start;
- i->second.flags = flags;
- return true;
- }
-
- i = sub_sent.find(what);
- if (i == sub_sent.end() || i->second.start < start) {
- ceph_mon_subscribe_item& item = sub_new[what];
- item.start = start;
- item.flags = flags;
- return true;
- }
- return false;
+ std::lock_guard l(monc_lock);
+ return sub.inc_want(what, start, flags);
}
std::unique_ptr<KeyRing> keyring;
int build_initial_monmap();
int get_monmap();
- int get_monmap_privately();
+ int get_monmap_and_config();
/**
* If you want to see MonMap messages, set this and
* the MonClient will tell the Messenger it hasn't
* putting the message reference!
*/
void set_passthrough_monmap() {
- Mutex::Locker l(monc_lock);
+ std::lock_guard l(monc_lock);
passthrough_monmap = true;
}
void unset_passthrough_monmap() {
- Mutex::Locker l(monc_lock);
+ std::lock_guard l(monc_lock);
passthrough_monmap = false;
}
/**
int ping_monitor(const string &mon_id, string *result_reply);
void send_mon_message(Message *m) {
- Mutex::Locker l(monc_lock);
+ std::lock_guard l(monc_lock);
_send_mon_message(m);
}
/**
* to reconnect to another monitor.
*/
void reopen_session(Context *cb=NULL) {
- Mutex::Locker l(monc_lock);
+ std::lock_guard l(monc_lock);
if (cb) {
session_established_context.reset(cb);
}
_reopen_session();
}
- entity_addr_t get_my_addr() const {
- return my_addr;
- }
-
const uuid_d& get_fsid() const {
return monmap.fsid;
}
- entity_addr_t get_mon_addr(unsigned i) const {
- Mutex::Locker l(monc_lock);
- if (i < monmap.size())
- return monmap.get_addr(i);
- return entity_addr_t();
- }
- entity_inst_t get_mon_inst(unsigned i) const {
- Mutex::Locker l(monc_lock);
+ entity_addrvec_t get_mon_addrs(unsigned i) const {
+ std::lock_guard l(monc_lock);
if (i < monmap.size())
- return monmap.get_inst(i);
- return entity_inst_t();
+ return monmap.get_addrs(i);
+ return entity_addrvec_t();
}
int get_num_mon() const {
- Mutex::Locker l(monc_lock);
+ std::lock_guard l(monc_lock);
return monmap.size();
}
uint64_t get_global_id() const {
- Mutex::Locker l(monc_lock);
+ std::lock_guard l(monc_lock);
return global_id;
}
void set_messenger(Messenger *m) { messenger = m; }
- entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
+ entity_addrvec_t get_myaddrs() const { return messenger->get_myaddrs(); }
AuthAuthorizer* build_authorizer(int service_id) const;
void set_want_keys(uint32_t want) {
// admin commands
private:
uint64_t last_mon_command_tid;
+
struct MonCommand {
string target_name;
int target_rank;
* @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
*/
void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish);
-
/**
* Run a callback within our lock, with a reference
* to the MonMap
template<typename Callback, typename...Args>
auto with_monmap(Callback&& cb, Args&&...args) const ->
decltype(cb(monmap, std::forward<Args>(args)...)) {
- Mutex::Locker l(monc_lock);
+ std::lock_guard l(monc_lock);
return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
}
+ void register_config_callback(md_config_t::config_callback fn);
+ void register_config_notify_callback(std::function<void(void)> f) {
+ config_notify_cb = f;
+ }
+ md_config_t::config_callback get_config_callback();
+
private:
struct version_req_d {
Context *context;
ceph_tid_t version_req_id;
void handle_get_version_reply(MMonGetVersionReply* m);
-
+ md_config_t::config_callback config_cb;
+ std::function<void(void)> config_notify_cb;
};
#endif