]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/Connection.h
import 15.2.2 octopus source
[ceph.git] / ceph / src / msg / Connection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_CONNECTION_H
16 #define CEPH_CONNECTION_H
17
18 #include <stdlib.h>
19 #include <ostream>
20
21 #include "auth/Auth.h"
22 #include "common/RefCountedObj.h"
23 #include "common/config.h"
24 #include "common/debug.h"
25 #include "common/ref.h"
26 #include "common/ceph_mutex.h"
27 #include "include/ceph_assert.h" // Because intusive_ptr clobbers our assert...
28 #include "include/buffer.h"
29 #include "include/types.h"
30 #include "common/item_history.h"
31 #include "msg/MessageRef.h"
32
33 // ======================================================
34
35 // abstract Connection, for keeping per-connection state
36
37 class Messenger;
38
39 #ifdef UNIT_TESTS_BUILT
40 class Interceptor;
41 #endif
42
43 struct Connection : public RefCountedObjectSafe {
44 mutable ceph::mutex lock = ceph::make_mutex("Connection::lock");
45 Messenger *msgr;
46 RefCountedPtr priv;
47 int peer_type = -1;
48 int64_t peer_id = -1; // [msgr2 only] the 0 of osd.0, 4567 or client.4567
49 safe_item_history<entity_addrvec_t> peer_addrs;
50 utime_t last_keepalive, last_keepalive_ack;
51 bool anon = false; ///< anonymous outgoing connection
52 private:
53 uint64_t features = 0;
54 public:
55 bool is_loopback = false;
56 bool failed = false; // true if we are a lossy connection that has failed.
57
58 int rx_buffers_version = 0;
59 std::map<ceph_tid_t,std::pair<ceph::buffer::list, int>> rx_buffers;
60
61 // authentication state
62 // FIXME make these private after ms_handle_authorizer is removed
63 public:
64 AuthCapsInfo peer_caps_info;
65 EntityName peer_name;
66 uint64_t peer_global_id = 0;
67
68 #ifdef UNIT_TESTS_BUILT
69 Interceptor *interceptor;
70 #endif
71
72 friend class PipeConnection;
73
74 public:
75 void set_priv(const RefCountedPtr& o) {
76 std::lock_guard l{lock};
77 priv = o;
78 }
79
80 RefCountedPtr get_priv() {
81 std::lock_guard l{lock};
82 return priv;
83 }
84
85 void clear_priv() {
86 std::lock_guard l{lock};
87 priv.reset(nullptr);
88 }
89
90 /**
91 * Used to judge whether this connection is ready to send. Usually, the
92 * implementation need to build a own shakehand or sesson then it can be
93 * ready to send.
94 *
95 * @return true if ready to send, or false otherwise
96 */
97 virtual bool is_connected() = 0;
98
99 virtual bool is_msgr2() const {
100 return false;
101 }
102
103 bool is_anon() const {
104 return anon;
105 }
106
107 Messenger *get_messenger() {
108 return msgr;
109 }
110
111 /**
112 * Queue the given Message to send out on the given Connection.
113 * Success in this function does not guarantee Message delivery, only
114 * success in queueing the Message. Other guarantees may be provided based
115 * on the Connection policy.
116 *
117 * @param m The Message to send. The Messenger consumes a single reference
118 * when you pass it in.
119 *
120 * @return 0 on success, or -errno on failure.
121 */
122 virtual int send_message(Message *m) = 0;
123
124 virtual int send_message2(MessageRef m)
125 {
126 return send_message(m.detach()); /* send_message(Message *m) consumes a reference */
127 }
128
129 /**
130 * Send a "keepalive" ping along the given Connection, if it's working.
131 * If the underlying connection has broken, this function does nothing.
132 *
133 * @return 0, or implementation-defined error numbers.
134 */
135 virtual void send_keepalive() = 0;
136 /**
137 * Mark down the given Connection.
138 *
139 * This will cause us to discard its outgoing queue, and if reset
140 * detection is enabled in the policy and the endpoint tries to
141 * reconnect they will discard their queue when we inform them of
142 * the session reset.
143 *
144 * It does not generate any notifications to the Dispatcher.
145 */
146 virtual void mark_down() = 0;
147
148 /**
149 * Mark a Connection as "disposable", setting it to lossy
150 * (regardless of initial Policy). This does not immediately close
151 * the Connection once Messages have been delivered, so as long as
152 * there are no errors you can continue to receive responses; but it
153 * will not attempt to reconnect for message delivery or preserve
154 * your old delivery semantics, either.
155 *
156 * TODO: There's some odd stuff going on in our SimpleMessenger
157 * implementation during connect that looks unused; is there
158 * more of a contract that that's enforcing?
159 */
160 virtual void mark_disposable() = 0;
161
162 // WARNING / FIXME: this is not populated for loopback connections
163 AuthCapsInfo& get_peer_caps_info() {
164 return peer_caps_info;
165 }
166 const EntityName& get_peer_entity_name() {
167 return peer_name;
168 }
169 uint64_t get_peer_global_id() {
170 return peer_global_id;
171 }
172
173 int get_peer_type() const { return peer_type; }
174 void set_peer_type(int t) { peer_type = t; }
175
176 // peer_id is only defined for msgr2
177 int64_t get_peer_id() const { return peer_id; }
178 void set_peer_id(int64_t t) { peer_id = t; }
179
180 bool peer_is_mon() const { return peer_type == CEPH_ENTITY_TYPE_MON; }
181 bool peer_is_mgr() const { return peer_type == CEPH_ENTITY_TYPE_MGR; }
182 bool peer_is_mds() const { return peer_type == CEPH_ENTITY_TYPE_MDS; }
183 bool peer_is_osd() const { return peer_type == CEPH_ENTITY_TYPE_OSD; }
184 bool peer_is_client() const { return peer_type == CEPH_ENTITY_TYPE_CLIENT; }
185
186 /// which of the peer's addrs is actually in use for this connection
187 virtual entity_addr_t get_peer_socket_addr() const = 0;
188
189 entity_addr_t get_peer_addr() const {
190 return peer_addrs->front();
191 }
192 const entity_addrvec_t& get_peer_addrs() const {
193 return *peer_addrs;
194 }
195 void set_peer_addr(const entity_addr_t& a) {
196 peer_addrs = entity_addrvec_t(a);
197 }
198 void set_peer_addrs(const entity_addrvec_t& av) { peer_addrs = av; }
199
200 uint64_t get_features() const { return features; }
201 bool has_feature(uint64_t f) const { return features & f; }
202 bool has_features(uint64_t f) const {
203 return (features & f) == f;
204 }
205 void set_features(uint64_t f) { features = f; }
206 void set_feature(uint64_t f) { features |= f; }
207
208 virtual int get_con_mode() const {
209 return CEPH_CON_MODE_CRC;
210 }
211
212 void post_rx_buffer(ceph_tid_t tid, ceph::buffer::list& bl) {
213 #if 0
214 std::lock_guard l{lock};
215 ++rx_buffers_version;
216 rx_buffers[tid] = pair<bufferlist,int>(bl, rx_buffers_version);
217 #endif
218 }
219
220 void revoke_rx_buffer(ceph_tid_t tid) {
221 #if 0
222 std::lock_guard l{lock};
223 rx_buffers.erase(tid);
224 #endif
225 }
226
227 utime_t get_last_keepalive() const {
228 std::lock_guard l{lock};
229 return last_keepalive;
230 }
231 void set_last_keepalive(utime_t t) {
232 std::lock_guard l{lock};
233 last_keepalive = t;
234 }
235 utime_t get_last_keepalive_ack() const {
236 std::lock_guard l{lock};
237 return last_keepalive_ack;
238 }
239 void set_last_keepalive_ack(utime_t t) {
240 std::lock_guard l{lock};
241 last_keepalive_ack = t;
242 }
243 bool is_blackhole() const;
244
245 protected:
246 Connection(CephContext *cct, Messenger *m)
247 : RefCountedObjectSafe(cct),
248 msgr(m)
249 {}
250
251 ~Connection() override {
252 //generic_dout(0) << "~Connection " << this << dendl;
253 }
254 };
255
256 using ConnectionRef = ceph::ref_t<Connection>;
257
258 #endif /* CEPH_CONNECTION_H */