-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
+ * License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
- *
+ *
*/
#define CEPH_MESSENGER_H
#include <map>
-using namespace std;
+#include <deque>
+
+#include <errno.h>
+#include <sstream>
+#include <memory>
#include "Message.h"
#include "Dispatcher.h"
-#include "common/Mutex.h"
+#include "Policy.h"
#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Throttle.h"
#include "include/Context.h"
#include "include/types.h"
#include "include/ceph_features.h"
#include "auth/Crypto.h"
+#include "common/item_history.h"
+#include "auth/AuthRegistry.h"
+#include "include/ceph_assert.h"
#include <errno.h>
#include <sstream>
class Timer;
+class AuthClient;
+class AuthServer;
+
+#ifdef UNIT_TESTS_BUILT
+
+struct Interceptor {
+ std::mutex lock;
+ std::condition_variable cond_var;
+
+ enum ACTION : uint32_t {
+ CONTINUE = 0,
+ FAIL,
+ STOP
+ };
+
+ virtual ~Interceptor() {}
+ virtual ACTION intercept(Connection *conn, uint32_t step) = 0;
+};
+
+#endif
class Messenger {
private:
- list<Dispatcher*> dispatchers;
- list <Dispatcher*> fast_dispatchers;
+ std::deque<Dispatcher*> dispatchers;
+ std::deque<Dispatcher*> fast_dispatchers;
ZTracer::Endpoint trace_endpoint;
+protected:
void set_endpoint_addr(const entity_addr_t& a,
const entity_name_t &name);
protected:
/// the "name" of the local daemon. eg client.99
- entity_inst_t my_inst;
+ entity_name_t my_name;
+
+ /// my addr
+ safe_item_history<entity_addrvec_t> my_addrs;
+
int default_send_priority;
/// set to true once the Messenger has started, and set to false on shutdown
bool started;
int socket_priority;
public:
+ AuthClient *auth_client = 0;
+ AuthServer *auth_server = 0;
+
+#ifdef UNIT_TESTS_BUILT
+ Interceptor *interceptor = nullptr;
+#endif
+
/**
* Various Messenger conditional config/type flags to allow
* different "transport" Messengers to tune themselves
CephContext *cct;
int crcflags;
- /**
- * A Policy describes the rules of a Connection. Is there a limit on how
- * much data this Connection can have locally? When the underlying connection
- * experiences an error, does the Connection disappear? Can this Messenger
- * re-establish the underlying connection?
- */
- struct Policy {
- /// If true, the Connection is tossed out on errors.
- bool lossy;
- /// If true, the underlying connection can't be re-established from this end.
- bool server;
- /// If true, we will standby when idle
- bool standby;
- /// If true, we will try to detect session resets
- bool resetcheck;
- /**
- * The throttler is used to limit how much data is held by Messages from
- * the associated Connection(s). When reading in a new Message, the Messenger
- * will call throttler->throttle() for the size of the new Message.
- */
- Throttle *throttler_bytes;
- Throttle *throttler_messages;
-
- /// Specify features supported locally by the endpoint.
- uint64_t features_supported;
- /// Specify features any remotes must have to talk to this endpoint.
- uint64_t features_required;
-
- Policy()
- : lossy(false), server(false), standby(false), resetcheck(true),
- throttler_bytes(NULL),
- throttler_messages(NULL),
- features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
- features_required(0) {}
- private:
- Policy(bool l, bool s, bool st, bool r, uint64_t req)
- : lossy(l), server(s), standby(st), resetcheck(r),
- throttler_bytes(NULL),
- throttler_messages(NULL),
- features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
- features_required(req) {}
-
- public:
- static Policy stateful_server(uint64_t req) {
- return Policy(false, true, true, true, req);
- }
- static Policy stateless_server(uint64_t req) {
- return Policy(true, true, false, false, req);
- }
- static Policy lossless_peer(uint64_t req) {
- return Policy(false, false, true, false, req);
- }
- static Policy lossless_peer_reuse(uint64_t req) {
- return Policy(false, false, true, true, req);
- }
- static Policy lossy_client(uint64_t req) {
- return Policy(true, false, false, false, req);
- }
- static Policy lossless_client(uint64_t req) {
- return Policy(false, false, false, true, req);
- }
- };
+ using Policy = ceph::net::Policy<Throttle>;
+protected:
+ // for authentication
+ AuthRegistry auth_registry;
+
+public:
/**
* Messenger constructor. Call this from your implementation.
* Messenger users should construct full implementations directly,
* or use the create() function.
*/
- Messenger(CephContext *cct_, entity_name_t w)
- : trace_endpoint("0.0.0.0", 0, "Messenger"),
- my_inst(),
- default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
- magic(0),
- socket_priority(-1),
- cct(cct_),
- crcflags(get_default_crc_flags(cct->_conf))
- {
- my_inst.name = w;
- }
+ Messenger(CephContext *cct_, entity_name_t w);
virtual ~Messenger() {}
/**
* @defgroup Accessors
* @{
*/
+ int get_mytype() const { return my_name.type(); }
+
/**
- * Retrieve the Messenger's instance.
+ * Retrieve the Messenger's name
*
- * @return A const reference to the instance this Messenger
+ * @return A const reference to the name this Messenger
* currently believes to be its own.
*/
- const entity_inst_t& get_myinst() { return my_inst; }
- /**
- * set messenger's instance
- */
- void set_myinst(entity_inst_t i) { my_inst = i; }
-
- uint32_t get_magic() { return magic; }
- void set_magic(int _magic) { magic = _magic; }
+ const entity_name_t& get_myname() { return my_name; }
/**
* Retrieve the Messenger's address.
* @return A const reference to the address this Messenger
* currently believes to be its own.
*/
- const entity_addr_t& get_myaddr() { return my_inst.addr; }
+ const entity_addrvec_t& get_myaddrs() {
+ return *my_addrs;
+ }
+
+ /**
+ * get legacy addr for myself, suitable for protocol v1
+ *
+ * Note that myaddrs might be a proper addrvec with v1 in it, or it might be an
+ * ANY addr (if i am a pure client).
+ */
+ entity_addr_t get_myaddr_legacy() {
+ return my_addrs->as_legacy_addr();
+ }
+
+
+ /**
+ * set messenger's instance
+ */
+ uint32_t get_magic() { return magic; }
+ void set_magic(int _magic) { magic = _magic; }
+
+ void set_auth_client(AuthClient *ac) {
+ auth_client = ac;
+ }
+ void set_auth_server(AuthServer *as) {
+ auth_server = as;
+ }
+
protected:
/**
* set messenger's address
*/
- virtual void set_myaddr(const entity_addr_t& a) {
- my_inst.addr = a;
- set_endpoint_addr(a, my_inst.name);
+ virtual void set_myaddrs(const entity_addrvec_t& a) {
+ my_addrs = a;
+ set_endpoint_addr(a.front(), my_name);
}
public:
/**
return &trace_endpoint;
}
- /**
- * Retrieve the Messenger's name.
- *
- * @return A const reference to the name this Messenger
- * currently believes to be its own.
- */
- const entity_name_t& get_myname() { return my_inst.name; }
/**
* Set the name of the local entity. The name is reported to others and
* can be changed while the system is running, but doing so at incorrect
*
* @param m The name to set.
*/
- void set_myname(const entity_name_t& m) { my_inst.name = m; }
+ void set_myname(const entity_name_t& m) { my_name = m; }
+
/**
* Set the unknown address components for this Messenger.
* This is useful if the Messenger doesn't know its full address just by
*
* @param addr The address to use as a template.
*/
- virtual void set_addr_unknowns(const entity_addr_t &addr) = 0;
+ virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0;
/**
* Set the address for this Messenger. This is useful if the Messenger
* binds to a specific address but advertises a different address on the
*
* @param addr The address to use.
*/
- virtual void set_addr(const entity_addr_t &addr) = 0;
+ virtual void set_addrs(const entity_addrvec_t &addr) = 0;
/// Get the default send priority.
int get_default_send_priority() { return default_send_priority; }
/**
* (0 if the queue is empty)
*/
virtual double get_dispatch_queue_max_age(utime_t now) = 0;
- /**
- * Get the default crc flags for this messenger.
- * but not yet dispatched.
- */
- static int get_default_crc_flags(md_config_t *);
/**
* @} // Accessors
* @param p The cluster protocol to use. Defined externally.
*/
void set_default_send_priority(int p) {
- assert(!started);
+ ceph_assert(!started);
default_send_priority = p;
}
/**
*
* @param d The Dispatcher to insert into the list.
*/
- void add_dispatcher_head(Dispatcher *d) {
+ void add_dispatcher_head(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_front(d);
if (d->ms_can_fast_dispatch_any())
*
* @param d The Dispatcher to insert into the list.
*/
- void add_dispatcher_tail(Dispatcher *d) {
+ void add_dispatcher_tail(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_back(d);
if (d->ms_can_fast_dispatch_any())
* we can be more specific about the failure.
*/
virtual int bind(const entity_addr_t& bind_addr) = 0;
+
/**
* This function performs a full restart of the Messenger component,
* whatever that means. Other entities who connect to this
* @return 0 on success, or -1 on error, or -errno if
*/
virtual int client_bind(const entity_addr_t& bind_addr) = 0;
+
+ virtual int bindv(const entity_addrvec_t& addrs);
+
+
+ virtual bool should_use_msgr2() {
+ return false;
+ }
+
/**
* @} // Configuration
*/
*
* @return 0 on success, or -errno on failure.
*/
- virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
+ virtual int send_to(
+ Message *m,
+ int type,
+ const entity_addrvec_t& addr) = 0;
+ int send_to_mon(
+ Message *m, const entity_addrvec_t& addrs) {
+ return send_to(m, CEPH_ENTITY_TYPE_MON, addrs);
+ }
+ int send_to_mds(
+ Message *m, const entity_addrvec_t& addrs) {
+ return send_to(m, CEPH_ENTITY_TYPE_MDS, addrs);
+ }
+ int send_to_osd(
+ Message *m, const entity_addrvec_t& addrs) {
+ return send_to(m, CEPH_ENTITY_TYPE_OSD, addrs);
+ }
+ int send_to_mgr(
+ Message *m, const entity_addrvec_t& addrs) {
+ return send_to(m, CEPH_ENTITY_TYPE_MGR, addrs);
+ }
/**
* @} // Messaging
*
* @param dest The entity to get a connection for.
*/
- virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0;
+ virtual ConnectionRef connect_to(
+ int type, const entity_addrvec_t& dest) = 0;
+ ConnectionRef connect_to_mon(const entity_addrvec_t& dest) {
+ return connect_to(CEPH_ENTITY_TYPE_MON, dest);
+ }
+ ConnectionRef connect_to_mds(const entity_addrvec_t& dest) {
+ return connect_to(CEPH_ENTITY_TYPE_MDS, dest);
+ }
+ ConnectionRef connect_to_osd(const entity_addrvec_t& dest) {
+ return connect_to(CEPH_ENTITY_TYPE_OSD, dest);
+ }
+ ConnectionRef connect_to_mgr(const entity_addrvec_t& dest) {
+ return connect_to(CEPH_ENTITY_TYPE_MGR, dest);
+ }
+
/**
* Get the Connection object associated with ourselves.
*/
* @param a The address to mark down.
*/
virtual void mark_down(const entity_addr_t& a) = 0;
+ virtual void mark_down_addrs(const entity_addrvec_t& a) {
+ mark_down(a.legacy_addr());
+ }
/**
* Mark all the existing Connections down. This is equivalent
* to iterating over all Connections and calling mark_down()
} else {
blocked = true;
int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
- assert(r == 0);
+ ceph_assert(r == 0);
}
}
~sigpipe_stopper() {
if (blocked) {
struct timespec nowait{0};
int r = sigtimedwait(&pipe_mask, 0, &nowait);
- assert(r == EAGAIN || r == 0);
+ ceph_assert(r == EAGAIN || r == 0);
r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
- assert(r == 0);
+ ceph_assert(r == 0);
}
}
};
*
* @param m The Message we are testing.
*/
- bool ms_can_fast_dispatch(const Message *m) {
- for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
- p != fast_dispatchers.end();
- ++p) {
- if ((*p)->ms_can_fast_dispatch(m))
+ bool ms_can_fast_dispatch(const Message::const_ref& m) {
+ for (const auto &dispatcher : fast_dispatchers) {
+ if (dispatcher->ms_can_fast_dispatch2(m))
return true;
}
return false;
/**
* Deliver a single Message via "fast dispatch".
*
- * @param m The Message we are fast dispatching. We take ownership
- * of one reference to it.
+ * @param m The Message we are fast dispatching.
* If none of our Dispatchers can handle it, ceph_abort().
*/
- void ms_fast_dispatch(Message *m) {
+ void ms_fast_dispatch(const Message::ref &m) {
m->set_dispatch_stamp(ceph_clock_now());
- for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
- p != fast_dispatchers.end();
- ++p) {
- if ((*p)->ms_can_fast_dispatch(m)) {
- (*p)->ms_fast_dispatch(m);
+ for (const auto &dispatcher : fast_dispatchers) {
+ if (dispatcher->ms_can_fast_dispatch2(m)) {
+ dispatcher->ms_fast_dispatch2(m);
return;
}
}
ceph_abort();
}
+ void ms_fast_dispatch(Message *m) {
+ return ms_fast_dispatch(Message::ref(m, false)); /* consume ref */
+ }
/**
*
*/
- void ms_fast_preprocess(Message *m) {
- for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
- p != fast_dispatchers.end();
- ++p) {
- (*p)->ms_fast_preprocess(m);
+ void ms_fast_preprocess(const Message::ref &m) {
+ for (const auto &dispatcher : fast_dispatchers) {
+ dispatcher->ms_fast_preprocess2(m);
}
}
/**
* Deliver a single Message. Send it to each Dispatcher
* in sequence until one of them handles it.
- * If none of our Dispatchers can handle it, assert(0).
+ * If none of our Dispatchers can handle it, ceph_abort().
*
- * @param m The Message to deliver. We take ownership of
- * one reference to it.
+ * @param m The Message to deliver.
*/
- void ms_deliver_dispatch(Message *m) {
+ void ms_deliver_dispatch(const Message::ref &m) {
m->set_dispatch_stamp(ceph_clock_now());
- for (list<Dispatcher*>::iterator p = dispatchers.begin();
- p != dispatchers.end();
- ++p) {
- if ((*p)->ms_dispatch(m))
+ for (const auto &dispatcher : dispatchers) {
+ if (dispatcher->ms_dispatch2(m))
return;
}
lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
<< m->get_source_inst() << dendl;
- assert(!cct->_conf->ms_die_on_unhandled_msg);
- m->put();
+ ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
+ }
+ void ms_deliver_dispatch(Message *m) {
+ return ms_deliver_dispatch(Message::ref(m, false)); /* consume ref */
}
/**
* Notify each Dispatcher of a new Connection. Call
* @param con Pointer to the new Connection.
*/
void ms_deliver_handle_connect(Connection *con) {
- for (list<Dispatcher*>::iterator p = dispatchers.begin();
- p != dispatchers.end();
- ++p)
- (*p)->ms_handle_connect(con);
+ for (const auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_connect(con);
+ }
}
/**
* @param con Pointer to the new Connection.
*/
void ms_deliver_handle_fast_connect(Connection *con) {
- for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
- p != fast_dispatchers.end();
- ++p)
- (*p)->ms_handle_fast_connect(con);
+ for (const auto& dispatcher : fast_dispatchers) {
+ dispatcher->ms_handle_fast_connect(con);
+ }
}
/**
- * Notify each Dispatcher of a new incomming Connection. Call
+ * Notify each Dispatcher of a new incoming Connection. Call
* this function whenever a new Connection is accepted.
*
* @param con Pointer to the new Connection.
*/
void ms_deliver_handle_accept(Connection *con) {
- for (list<Dispatcher*>::iterator p = dispatchers.begin();
- p != dispatchers.end();
- ++p)
- (*p)->ms_handle_accept(con);
+ for (const auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_accept(con);
+ }
}
/**
* @param con Pointer to the new Connection.
*/
void ms_deliver_handle_fast_accept(Connection *con) {
- for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
- p != fast_dispatchers.end();
- ++p)
- (*p)->ms_handle_fast_accept(con);
+ for (const auto& dispatcher : fast_dispatchers) {
+ dispatcher->ms_handle_fast_accept(con);
+ }
}
/**
* @param con Pointer to the broken Connection.
*/
void ms_deliver_handle_reset(Connection *con) {
- for (list<Dispatcher*>::iterator p = dispatchers.begin();
- p != dispatchers.end();
- ++p) {
- if ((*p)->ms_handle_reset(con))
+ for (const auto& dispatcher : dispatchers) {
+ if (dispatcher->ms_handle_reset(con))
return;
}
}
* @param con Pointer to the broken Connection.
*/
void ms_deliver_handle_remote_reset(Connection *con) {
- for (list<Dispatcher*>::iterator p = dispatchers.begin();
- p != dispatchers.end();
- ++p)
- (*p)->ms_handle_remote_reset(con);
+ for (const auto& dispatcher : dispatchers) {
+ dispatcher->ms_handle_remote_reset(con);
+ }
}
/**
* @param con Pointer to the broken Connection.
*/
void ms_deliver_handle_refused(Connection *con) {
- for (list<Dispatcher*>::iterator p = dispatchers.begin();
- p != dispatchers.end();
- ++p) {
- if ((*p)->ms_handle_refused(con))
+ for (const auto& dispatcher : dispatchers) {
+ if (dispatcher->ms_handle_refused(con))
return;
}
}
* @param force_new True if we want to wait for new keys, false otherwise.
* @return A pointer to the AuthAuthorizer, if we have one; NULL otherwise
*/
- AuthAuthorizer *ms_deliver_get_authorizer(int peer_type, bool force_new) {
+ AuthAuthorizer *ms_deliver_get_authorizer(int peer_type) {
AuthAuthorizer *a = 0;
- for (list<Dispatcher*>::iterator p = dispatchers.begin();
- p != dispatchers.end();
- ++p) {
- if ((*p)->ms_get_authorizer(peer_type, &a, force_new))
+ for (const auto& dispatcher : dispatchers) {
+ if (dispatcher->ms_get_authorizer(peer_type, &a))
return a;
}
return NULL;
* @return True if we were able to prove or disprove correctness of
* authorizer, false otherwise.
*/
- bool ms_deliver_verify_authorizer(Connection *con, int peer_type,
- int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
- bool& isvalid, CryptoKey& session_key,
- std::unique_ptr<AuthAuthorizerChallenge> *challenge) {
- for (list<Dispatcher*>::iterator p = dispatchers.begin();
- p != dispatchers.end();
- ++p) {
- if ((*p)->ms_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply,
- isvalid, session_key, challenge))
- return true;
- }
- return false;
- }
+ bool ms_deliver_verify_authorizer(
+ Connection *con, int peer_type,
+ int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
+ bool& isvalid,
+ CryptoKey& session_key,
+ std::string *connection_secret,
+ std::unique_ptr<AuthAuthorizerChallenge> *challenge);
/**
* @} // Dispatcher Interfacing