#include <stdlib.h>
#include <ostream>
-#include <boost/intrusive_ptr.hpp>
-
#include "auth/Auth.h"
#include "common/RefCountedObj.h"
#include "common/config.h"
#include "common/debug.h"
-#include "common/Mutex.h"
+#include "common/ref.h"
+#include "common/ceph_mutex.h"
#include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
#include "include/buffer.h"
#include "include/types.h"
#include "common/item_history.h"
#include "msg/MessageRef.h"
-
// ======================================================
// abstract Connection, for keeping per-connection state
class Interceptor;
#endif
-struct Connection : public RefCountedObject {
- mutable Mutex lock;
+struct Connection : public RefCountedObjectSafe {
+ mutable ceph::mutex lock = ceph::make_mutex("Connection::lock");
Messenger *msgr;
RefCountedPtr priv;
- int peer_type;
+ int peer_type = -1;
int64_t peer_id = -1; // [msgr2 only] the 0 of osd.0, 4567 or client.4567
safe_item_history<entity_addrvec_t> peer_addrs;
utime_t last_keepalive, last_keepalive_ack;
+ bool anon = false; ///< anonymous outgoing connection
private:
- uint64_t features;
+ uint64_t features = 0;
public:
- bool failed; // true if we are a lossy connection that has failed.
+ bool is_loopback = false;
+ bool failed = false; // true if we are a lossy connection that has failed.
- int rx_buffers_version;
- map<ceph_tid_t,pair<bufferlist,int> > rx_buffers;
+ int rx_buffers_version = 0;
+ std::map<ceph_tid_t,std::pair<ceph::buffer::list, int>> rx_buffers;
// authentication state
// FIXME make these private after ms_handle_authorizer is removed
Interceptor *interceptor;
#endif
- friend class boost::intrusive_ptr<Connection>;
friend class PipeConnection;
public:
- Connection(CephContext *cct, Messenger *m)
- // we are managed exclusively by ConnectionRef; make it so you can
- // ConnectionRef foo = new Connection;
- : RefCountedObject(cct, 0),
- lock("Connection::lock"),
- msgr(m),
- peer_type(-1),
- features(0),
- failed(false),
- rx_buffers_version(0) {
- }
-
- ~Connection() override {
- //generic_dout(0) << "~Connection " << this << dendl;
- }
-
void set_priv(const RefCountedPtr& o) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
priv = o;
}
RefCountedPtr get_priv() {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return priv;
}
+ void clear_priv() {
+ std::lock_guard l{lock};
+ priv.reset(nullptr);
+ }
+
/**
* Used to judge whether this connection is ready to send. Usually, the
* implementation need to build a own shakehand or sesson then it can be
*/
virtual bool is_connected() = 0;
+ virtual bool is_msgr2() const {
+ return false;
+ }
+
+ bool is_anon() const {
+ return anon;
+ }
+
Messenger *get_messenger() {
return msgr;
}
return CEPH_CON_MODE_CRC;
}
- void post_rx_buffer(ceph_tid_t tid, bufferlist& bl) {
+ void post_rx_buffer(ceph_tid_t tid, ceph::buffer::list& bl) {
#if 0
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
++rx_buffers_version;
rx_buffers[tid] = pair<bufferlist,int>(bl, rx_buffers_version);
#endif
void revoke_rx_buffer(ceph_tid_t tid) {
#if 0
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
rx_buffers.erase(tid);
#endif
}
utime_t get_last_keepalive() const {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return last_keepalive;
}
void set_last_keepalive(utime_t t) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
last_keepalive = t;
}
utime_t get_last_keepalive_ack() const {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
return last_keepalive_ack;
}
void set_last_keepalive_ack(utime_t t) {
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
last_keepalive_ack = t;
}
+ bool is_blackhole() const;
-};
+protected:
+ Connection(CephContext *cct, Messenger *m)
+ : RefCountedObjectSafe(cct),
+ msgr(m)
+ {}
-typedef boost::intrusive_ptr<Connection> ConnectionRef;
+ ~Connection() override {
+ //generic_dout(0) << "~Connection " << this << dendl;
+ }
+};
+using ConnectionRef = ceph::ref_t<Connection>;
#endif /* CEPH_CONNECTION_H */