1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_CONNECTION_H
16 #define CEPH_CONNECTION_H
21 #include "auth/Auth.h"
22 #include "common/RefCountedObj.h"
23 #include "common/config.h"
24 #include "common/debug.h"
25 #include "common/ref.h"
26 #include "common/ceph_mutex.h"
27 #include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
28 #include "include/buffer.h"
29 #include "include/types.h"
30 #include "common/item_history.h"
31 #include "msg/MessageRef.h"
33 // ======================================================
35 // abstract Connection, for keeping per-connection state
39 #ifdef UNIT_TESTS_BUILT
43 struct Connection
: public RefCountedObjectSafe
{
44 mutable ceph::mutex lock
= ceph::make_mutex("Connection::lock");
48 int64_t peer_id
= -1; // [msgr2 only] the 0 of osd.0, 4567 or client.4567
49 safe_item_history
<entity_addrvec_t
> peer_addrs
;
50 utime_t last_keepalive
, last_keepalive_ack
;
51 bool anon
= false; ///< anonymous outgoing connection
53 uint64_t features
= 0;
55 bool is_loopback
= false;
56 bool failed
= false; // true if we are a lossy connection that has failed.
58 int rx_buffers_version
= 0;
59 std::map
<ceph_tid_t
,std::pair
<ceph::buffer::list
, int>> rx_buffers
;
61 // authentication state
62 // FIXME make these private after ms_handle_authorizer is removed
64 AuthCapsInfo peer_caps_info
;
66 uint64_t peer_global_id
= 0;
68 #ifdef UNIT_TESTS_BUILT
69 Interceptor
*interceptor
;
73 void set_priv(const RefCountedPtr
& o
) {
74 std::lock_guard l
{lock
};
78 RefCountedPtr
get_priv() {
79 std::lock_guard l
{lock
};
84 std::lock_guard l
{lock
};
89 * Used to judge whether this connection is ready to send. Usually, the
90 * implementation need to build a own shakehand or sesson then it can be
93 * @return true if ready to send, or false otherwise
95 virtual bool is_connected() = 0;
97 virtual bool is_msgr2() const {
101 bool is_anon() const {
105 Messenger
*get_messenger() {
110 * Queue the given Message to send out on the given Connection.
111 * Success in this function does not guarantee Message delivery, only
112 * success in queueing the Message. Other guarantees may be provided based
113 * on the Connection policy.
115 * @param m The Message to send. The Messenger consumes a single reference
116 * when you pass it in.
118 * @return 0 on success, or -errno on failure.
120 virtual int send_message(Message
*m
) = 0;
122 virtual int send_message2(MessageRef m
)
124 return send_message(m
.detach()); /* send_message(Message *m) consumes a reference */
128 * Send a "keepalive" ping along the given Connection, if it's working.
129 * If the underlying connection has broken, this function does nothing.
131 * @return 0, or implementation-defined error numbers.
133 virtual void send_keepalive() = 0;
135 * Mark down the given Connection.
137 * This will cause us to discard its outgoing queue, and if reset
138 * detection is enabled in the policy and the endpoint tries to
139 * reconnect they will discard their queue when we inform them of
142 * It does not generate any notifications to the Dispatcher.
144 virtual void mark_down() = 0;
147 * Mark a Connection as "disposable", setting it to lossy
148 * (regardless of initial Policy). This does not immediately close
149 * the Connection once Messages have been delivered, so as long as
150 * there are no errors you can continue to receive responses; but it
151 * will not attempt to reconnect for message delivery or preserve
152 * your old delivery semantics, either.
154 * TODO: There's some odd stuff going on in our SimpleMessenger
155 * implementation during connect that looks unused; is there
156 * more of a contract that that's enforcing?
158 virtual void mark_disposable() = 0;
160 // WARNING / FIXME: this is not populated for loopback connections
161 AuthCapsInfo
& get_peer_caps_info() {
162 return peer_caps_info
;
164 const EntityName
& get_peer_entity_name() {
167 uint64_t get_peer_global_id() {
168 return peer_global_id
;
171 int get_peer_type() const { return peer_type
; }
172 void set_peer_type(int t
) { peer_type
= t
; }
174 // peer_id is only defined for msgr2
175 int64_t get_peer_id() const { return peer_id
; }
176 void set_peer_id(int64_t t
) { peer_id
= t
; }
178 bool peer_is_mon() const { return peer_type
== CEPH_ENTITY_TYPE_MON
; }
179 bool peer_is_mgr() const { return peer_type
== CEPH_ENTITY_TYPE_MGR
; }
180 bool peer_is_mds() const { return peer_type
== CEPH_ENTITY_TYPE_MDS
; }
181 bool peer_is_osd() const { return peer_type
== CEPH_ENTITY_TYPE_OSD
; }
182 bool peer_is_client() const { return peer_type
== CEPH_ENTITY_TYPE_CLIENT
; }
184 /// which of the peer's addrs is actually in use for this connection
185 virtual entity_addr_t
get_peer_socket_addr() const = 0;
187 entity_addr_t
get_peer_addr() const {
188 return peer_addrs
->front();
190 const entity_addrvec_t
& get_peer_addrs() const {
193 void set_peer_addr(const entity_addr_t
& a
) {
194 peer_addrs
= entity_addrvec_t(a
);
196 void set_peer_addrs(const entity_addrvec_t
& av
) { peer_addrs
= av
; }
198 uint64_t get_features() const { return features
; }
199 bool has_feature(uint64_t f
) const { return features
& f
; }
200 bool has_features(uint64_t f
) const {
201 return (features
& f
) == f
;
203 void set_features(uint64_t f
) { features
= f
; }
204 void set_feature(uint64_t f
) { features
|= f
; }
206 virtual int get_con_mode() const {
207 return CEPH_CON_MODE_CRC
;
210 void post_rx_buffer(ceph_tid_t tid
, ceph::buffer::list
& bl
) {
212 std::lock_guard l
{lock
};
213 ++rx_buffers_version
;
214 rx_buffers
[tid
] = pair
<bufferlist
,int>(bl
, rx_buffers_version
);
218 void revoke_rx_buffer(ceph_tid_t tid
) {
220 std::lock_guard l
{lock
};
221 rx_buffers
.erase(tid
);
225 utime_t
get_last_keepalive() const {
226 std::lock_guard l
{lock
};
227 return last_keepalive
;
229 void set_last_keepalive(utime_t t
) {
230 std::lock_guard l
{lock
};
233 utime_t
get_last_keepalive_ack() const {
234 std::lock_guard l
{lock
};
235 return last_keepalive_ack
;
237 void set_last_keepalive_ack(utime_t t
) {
238 std::lock_guard l
{lock
};
239 last_keepalive_ack
= t
;
241 bool is_blackhole() const;
244 Connection(CephContext
*cct
, Messenger
*m
)
245 : RefCountedObjectSafe(cct
),
249 ~Connection() override
{
250 //generic_dout(0) << "~Connection " << this << dendl;
254 using ConnectionRef
= ceph::ref_t
<Connection
>;
256 #endif /* CEPH_CONNECTION_H */