#include <ostream>
#include <boost/intrusive_ptr.hpp>
-// Because intusive_ptr clobbers our assert...
-#include "include/assert.h"
-
-#include "include/types.h"
-#include "include/buffer.h"
+#include "auth/Auth.h"
#include "common/RefCountedObj.h"
-
-#include "common/debug.h"
#include "common/config.h"
+#include "common/debug.h"
+#include "common/Mutex.h"
+#include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
+#include "include/buffer.h"
+#include "include/types.h"
+#include "common/item_history.h"
+#include "msg/MessageRef.h"
// ======================================================
// abstract Connection, for keeping per-connection state
-class Message;
class Messenger;
+#ifdef UNIT_TESTS_BUILT
+class Interceptor;
+#endif
+
struct Connection : public RefCountedObject {
mutable Mutex lock;
Messenger *msgr;
- RefCountedObject *priv;
+ RefCountedPtr priv;
int peer_type;
- entity_addr_t peer_addr;
+ int64_t peer_id = -1; // [msgr2 only] the 0 of osd.0, 4567 or client.4567
+ safe_item_history<entity_addrvec_t> peer_addrs;
utime_t last_keepalive, last_keepalive_ack;
private:
uint64_t features;
int rx_buffers_version;
map<ceph_tid_t,pair<bufferlist,int> > rx_buffers;
+ // authentication state
+ // FIXME make these private after ms_handle_authorizer is removed
+public:
+ AuthCapsInfo peer_caps_info;
+ EntityName peer_name;
+ uint64_t peer_global_id = 0;
+
+#ifdef UNIT_TESTS_BUILT
+ Interceptor *interceptor;
+#endif
+
friend class boost::intrusive_ptr<Connection>;
friend class PipeConnection;
public:
Connection(CephContext *cct, Messenger *m)
- // we are managed exlusively by ConnectionRef; make it so you can
+ // we are managed exclusively by ConnectionRef; make it so you can
// ConnectionRef foo = new Connection;
: RefCountedObject(cct, 0),
lock("Connection::lock"),
msgr(m),
- priv(NULL),
peer_type(-1),
features(0),
failed(false),
~Connection() override {
//generic_dout(0) << "~Connection " << this << dendl;
- if (priv) {
- //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl;
- priv->put();
- }
}
- void set_priv(RefCountedObject *o) {
+ void set_priv(const RefCountedPtr& o) {
Mutex::Locker l(lock);
- if (priv)
- priv->put();
priv = o;
}
- RefCountedObject *get_priv() {
+ RefCountedPtr get_priv() {
Mutex::Locker l(lock);
- if (priv)
- return priv->get();
- return NULL;
+ return priv;
}
/**
* @return 0 on success, or -errno on failure.
*/
virtual int send_message(Message *m) = 0;
+
+ virtual int send_message2(MessageRef m)
+ {
+ return send_message(m.detach()); /* send_message(Message *m) consumes a reference */
+ }
+
/**
* Send a "keepalive" ping along the given Connection, if it's working.
* If the underlying connection has broken, this function does nothing.
*/
virtual void mark_disposable() = 0;
+ // WARNING / FIXME: this is not populated for loopback connections
+ AuthCapsInfo& get_peer_caps_info() {
+ return peer_caps_info;
+ }
+ const EntityName& get_peer_entity_name() {
+ return peer_name;
+ }
+ uint64_t get_peer_global_id() {
+ return peer_global_id;
+ }
int get_peer_type() const { return peer_type; }
void set_peer_type(int t) { peer_type = t; }
+ // peer_id is only defined for msgr2
+ int64_t get_peer_id() const { return peer_id; }
+ void set_peer_id(int64_t t) { peer_id = t; }
+
bool peer_is_mon() const { return peer_type == CEPH_ENTITY_TYPE_MON; }
bool peer_is_mgr() const { return peer_type == CEPH_ENTITY_TYPE_MGR; }
bool peer_is_mds() const { return peer_type == CEPH_ENTITY_TYPE_MDS; }
bool peer_is_osd() const { return peer_type == CEPH_ENTITY_TYPE_OSD; }
bool peer_is_client() const { return peer_type == CEPH_ENTITY_TYPE_CLIENT; }
- const entity_addr_t& get_peer_addr() const { return peer_addr; }
- void set_peer_addr(const entity_addr_t& a) { peer_addr = a; }
+ /// which of the peer's addrs is actually in use for this connection
+ virtual entity_addr_t get_peer_socket_addr() const = 0;
+
+ entity_addr_t get_peer_addr() const {
+ return peer_addrs->front();
+ }
+ const entity_addrvec_t& get_peer_addrs() const {
+ return *peer_addrs;
+ }
+ void set_peer_addr(const entity_addr_t& a) {
+ peer_addrs = entity_addrvec_t(a);
+ }
+ void set_peer_addrs(const entity_addrvec_t& av) { peer_addrs = av; }
uint64_t get_features() const { return features; }
bool has_feature(uint64_t f) const { return features & f; }
void set_features(uint64_t f) { features = f; }
void set_feature(uint64_t f) { features |= f; }
+ virtual int get_con_mode() const {
+ return CEPH_CON_MODE_CRC;
+ }
+
void post_rx_buffer(ceph_tid_t tid, bufferlist& bl) {
+#if 0
Mutex::Locker l(lock);
++rx_buffers_version;
rx_buffers[tid] = pair<bufferlist,int>(bl, rx_buffers_version);
+#endif
}
void revoke_rx_buffer(ceph_tid_t tid) {
+#if 0
Mutex::Locker l(lock);
rx_buffers.erase(tid);
+#endif
}
utime_t get_last_keepalive() const {