]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * Portions Copyright (C) 2013 CohortFS, LLC | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #ifndef XIO_CONNECTION_H | |
17 | #define XIO_CONNECTION_H | |
18 | ||
31f18b77 FG |
19 | #include <atomic> |
20 | ||
7c673cae FG |
21 | #include <boost/intrusive/avl_set.hpp> |
22 | #include <boost/intrusive/list.hpp> | |
31f18b77 | 23 | |
7c673cae FG |
24 | extern "C" { |
25 | #include "libxio.h" | |
26 | } | |
31f18b77 | 27 | |
7c673cae FG |
28 | #include "XioInSeq.h" |
29 | #include "XioSubmit.h" | |
30 | #include "msg/Connection.h" | |
31 | #include "msg/Messenger.h" | |
7c673cae FG |
32 | #include "auth/AuthSessionHandler.h" |
33 | ||
34 | #define XIO_ALL_FEATURES (CEPH_FEATURES_ALL) | |
35 | ||
36 | ||
37 | #define XIO_NOP_TAG_MARKDOWN 0x0001 | |
38 | ||
39 | namespace bi = boost::intrusive; | |
40 | ||
41 | class XioPortal; | |
42 | class XioMessenger; | |
43 | class XioSend; | |
44 | ||
45 | class XioConnection : public Connection | |
46 | { | |
47 | public: | |
48 | enum type { ACTIVE, PASSIVE }; | |
49 | ||
31f18b77 | 50 | enum class session_states : unsigned { |
7c673cae FG |
51 | INIT = 0, |
52 | START, | |
53 | UP, | |
54 | FLOW_CONTROLLED, | |
55 | DISCONNECTED, | |
56 | DELETED, | |
57 | BARRIER | |
58 | }; | |
59 | ||
31f18b77 | 60 | enum class session_startup_states : unsigned { |
7c673cae FG |
61 | IDLE = 0, |
62 | CONNECTING, | |
63 | ACCEPTING, | |
64 | READY, | |
65 | FAIL | |
66 | }; | |
67 | ||
68 | private: | |
69 | XioConnection::type xio_conn_type; | |
70 | XioPortal *portal; | |
31f18b77 | 71 | std::atomic<bool> connected = { false }; |
7c673cae FG |
72 | entity_inst_t peer; |
73 | struct xio_session *session; | |
74 | struct xio_connection *conn; | |
75 | pthread_spinlock_t sp; | |
31f18b77 FG |
76 | std::atomic<int64_t> send = { 0 }; |
77 | std::atomic<int64_t> recv = { 0 }; | |
7c673cae FG |
78 | uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials) |
79 | uint32_t magic; | |
80 | uint32_t special_handling; | |
81 | uint64_t scount; | |
82 | uint32_t send_ctr; | |
83 | int q_high_mark; | |
84 | int q_low_mark; | |
85 | ||
86 | struct lifecycle { | |
87 | // different from Pipe states? | |
88 | enum lf_state { | |
89 | INIT, | |
90 | LOCAL_DISCON, | |
91 | REMOTE_DISCON, | |
92 | RECONNECTING, | |
93 | UP, | |
94 | DEAD } state; | |
95 | ||
96 | /* XXX */ | |
97 | uint32_t reconnects; | |
98 | uint32_t connect_seq, peer_global_seq; | |
99 | uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt | |
31f18b77 | 100 | std::atomic<int64_t> out_seq = { 0 }; |
7c673cae FG |
101 | |
102 | lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0), | |
31f18b77 FG |
103 | peer_global_seq(0), in_seq(0), out_seq_acked(0) |
104 | {} | |
7c673cae FG |
105 | |
106 | void set_in_seq(uint64_t seq) { | |
107 | in_seq = seq; | |
108 | } | |
109 | ||
110 | uint64_t next_out_seq() { | |
31f18b77 | 111 | return ++out_seq; |
7c673cae FG |
112 | } |
113 | ||
114 | } state; | |
115 | ||
116 | /* batching */ | |
117 | XioInSeq in_seq; | |
118 | ||
119 | class CState | |
120 | { | |
121 | public: | |
122 | static const int FLAG_NONE = 0x0000; | |
123 | static const int FLAG_BAD_AUTH = 0x0001; | |
124 | static const int FLAG_MAPPED = 0x0002; | |
125 | static const int FLAG_RESET = 0x0004; | |
126 | ||
127 | static const int OP_FLAG_NONE = 0x0000; | |
128 | static const int OP_FLAG_LOCKED = 0x0001; | |
129 | static const int OP_FLAG_LRU = 0x0002; | |
130 | ||
131 | uint64_t features; | |
132 | Messenger::Policy policy; | |
133 | ||
134 | CryptoKey session_key; | |
135 | ceph::shared_ptr<AuthSessionHandler> session_security; | |
136 | AuthAuthorizer *authorizer; | |
137 | XioConnection *xcon; | |
138 | uint32_t protocol_version; | |
139 | ||
31f18b77 FG |
140 | std::atomic<session_states> session_state = { 0 }; |
141 | std::atomic<session_startup_state> startup_state = { 0 }; | |
7c673cae FG |
142 | |
143 | uint32_t reconnects; | |
144 | uint32_t connect_seq, global_seq, peer_global_seq; | |
145 | uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt | |
31f18b77 | 146 | std::atomic<uint64_t> out_seq = { 0 }; |
7c673cae FG |
147 | |
148 | uint32_t flags; | |
149 | ||
150 | explicit CState(XioConnection* _xcon) | |
151 | : features(0), | |
152 | authorizer(NULL), | |
153 | xcon(_xcon), | |
154 | protocol_version(0), | |
155 | session_state(INIT), | |
156 | startup_state(IDLE), | |
157 | reconnects(0), | |
158 | connect_seq(0), | |
159 | global_seq(0), | |
160 | peer_global_seq(0), | |
161 | in_seq(0), | |
162 | out_seq_acked(0), | |
7c673cae FG |
163 | flags(FLAG_NONE) {} |
164 | ||
165 | uint64_t get_session_state() { | |
31f18b77 | 166 | return session_state; |
7c673cae FG |
167 | } |
168 | ||
169 | uint64_t get_startup_state() { | |
31f18b77 | 170 | return startup_state; |
7c673cae FG |
171 | } |
172 | ||
173 | void set_in_seq(uint64_t seq) { | |
174 | in_seq = seq; | |
175 | } | |
176 | ||
177 | uint64_t next_out_seq() { | |
31f18b77 | 178 | return ++out_seq; |
7c673cae FG |
179 | }; |
180 | ||
181 | // state machine | |
182 | int init_state(); | |
183 | int next_state(Message* m); | |
184 | #if 0 // future (session startup) | |
185 | int msg_connect(MConnect *m); | |
186 | int msg_connect_reply(MConnectReply *m); | |
187 | int msg_connect_reply(MConnectAuthReply *m); | |
188 | int msg_connect_auth(MConnectAuth *m); | |
189 | int msg_connect_auth_reply(MConnectAuthReply *m); | |
190 | #endif | |
191 | int state_up_ready(uint32_t flags); | |
192 | int state_flow_controlled(uint32_t flags); | |
193 | int state_discon(); | |
194 | int state_fail(Message* m, uint32_t flags); | |
195 | ||
196 | } cstate; /* CState */ | |
197 | ||
198 | // message submission queue | |
199 | struct SendQ { | |
200 | bool keepalive; | |
201 | bool ack; | |
202 | utime_t ack_time; | |
203 | Message::Queue mqueue; // deferred | |
204 | XioSubmit::Queue requeue; | |
205 | ||
206 | SendQ():keepalive(false), ack(false){} | |
207 | } outgoing; | |
208 | ||
209 | // conns_entity_map comparison functor | |
210 | struct EntityComp | |
211 | { | |
212 | // for internal ordering | |
213 | bool operator()(const XioConnection &lhs, const XioConnection &rhs) const | |
214 | { return lhs.get_peer() < rhs.get_peer(); } | |
215 | ||
216 | // for external search by entity_inst_t(peer) | |
217 | bool operator()(const entity_inst_t &peer, const XioConnection &c) const | |
218 | { return peer < c.get_peer(); } | |
219 | ||
220 | bool operator()(const XioConnection &c, const entity_inst_t &peer) const | |
221 | { return c.get_peer() < peer; } | |
222 | }; | |
223 | ||
224 | bi::list_member_hook<> conns_hook; | |
225 | bi::avl_set_member_hook<> conns_entity_map_hook; | |
226 | ||
227 | typedef bi::list< XioConnection, | |
228 | bi::member_hook<XioConnection, bi::list_member_hook<>, | |
229 | &XioConnection::conns_hook > > ConnList; | |
230 | ||
231 | typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>, | |
232 | &XioConnection::conns_entity_map_hook> EntityHook; | |
233 | ||
234 | typedef bi::avl_set< XioConnection, EntityHook, | |
235 | bi::compare<EntityComp> > EntitySet; | |
236 | ||
237 | friend class XioPortal; | |
238 | friend class XioMessenger; | |
239 | friend class XioDispatchHook; | |
240 | friend class XioMarkDownHook; | |
241 | friend class XioSend; | |
242 | ||
243 | int on_disconnect_event() { | |
31f18b77 | 244 | connected = false; |
7c673cae FG |
245 | pthread_spin_lock(&sp); |
246 | discard_out_queues(CState::OP_FLAG_LOCKED); | |
247 | pthread_spin_unlock(&sp); | |
248 | return 0; | |
249 | } | |
250 | ||
251 | int on_teardown_event() { | |
252 | pthread_spin_lock(&sp); | |
253 | if (conn) | |
254 | xio_connection_destroy(conn); | |
255 | conn = NULL; | |
256 | pthread_spin_unlock(&sp); | |
257 | this->put(); | |
258 | return 0; | |
259 | } | |
260 | ||
261 | int xio_qdepth_high_mark() { | |
262 | return q_high_mark; | |
263 | } | |
264 | ||
265 | int xio_qdepth_low_mark() { | |
266 | return q_low_mark; | |
267 | } | |
268 | ||
269 | public: | |
270 | XioConnection(XioMessenger *m, XioConnection::type _type, | |
271 | const entity_inst_t& peer); | |
272 | ||
273 | ~XioConnection() { | |
274 | if (conn) | |
275 | xio_connection_destroy(conn); | |
276 | } | |
277 | ostream& conn_prefix(std::ostream *_dout); | |
278 | ||
31f18b77 | 279 | bool is_connected() override { return connected; } |
7c673cae FG |
280 | |
281 | int send_message(Message *m) override; | |
282 | void send_keepalive() override {send_keepalive_or_ack();} | |
283 | void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr); | |
284 | void mark_down() override; | |
285 | int _mark_down(uint32_t flags); | |
286 | void mark_disposable() override; | |
287 | int _mark_disposable(uint32_t flags); | |
288 | ||
289 | const entity_inst_t& get_peer() const { return peer; } | |
290 | ||
291 | XioConnection* get() { | |
292 | #if 0 | |
31f18b77 | 293 | cout << "XioConnection::get " << this << " " << nref.load() << std::endl; |
7c673cae FG |
294 | #endif |
295 | RefCountedObject::get(); | |
296 | return this; | |
297 | } | |
298 | ||
299 | void put() { | |
300 | RefCountedObject::put(); | |
301 | #if 0 | |
31f18b77 | 302 | cout << "XioConnection::put " << this << " " << nref.load() << std::endl; |
7c673cae FG |
303 | #endif |
304 | } | |
305 | ||
306 | void disconnect() { | |
307 | if (is_connected()) { | |
31f18b77 | 308 | connected = false; |
7c673cae FG |
309 | xio_disconnect(conn); // normal teardown will clean up conn |
310 | } | |
311 | } | |
312 | ||
313 | uint32_t get_magic() { return magic; } | |
314 | void set_magic(int _magic) { magic = _magic; } | |
315 | uint32_t get_special_handling() { return special_handling; } | |
316 | void set_special_handling(int n) { special_handling = n; } | |
317 | uint64_t get_scount() { return scount; } | |
318 | ||
319 | int passive_setup(); /* XXX */ | |
320 | ||
321 | int handle_data_msg(struct xio_session *session, struct xio_msg *msg, | |
322 | int more_in_batch, void *cb_user_context); | |
323 | int on_msg(struct xio_session *session, struct xio_msg *msg, | |
324 | int more_in_batch, void *cb_user_context); | |
325 | int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg, | |
326 | void *conn_user_context); | |
327 | int on_msg_error(struct xio_session *session, enum xio_status error, | |
328 | struct xio_msg *msg, void *conn_user_context); | |
329 | void msg_send_fail(XioSend *xsend, int code); | |
330 | void msg_release_fail(struct xio_msg *msg, int code); | |
331 | private: | |
332 | void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr); | |
333 | int flush_out_queues(uint32_t flags); | |
334 | int discard_out_queues(uint32_t flags); | |
335 | int adjust_clru(uint32_t flags); | |
336 | }; | |
337 | ||
338 | typedef boost::intrusive_ptr<XioConnection> XioConnectionRef; | |
339 | ||
340 | class XioLoopbackConnection : public Connection | |
341 | { | |
342 | private: | |
31f18b77 | 343 | std::atomic<uint64_t> seq = { 0 }; |
7c673cae | 344 | public: |
31f18b77 | 345 | explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m) |
7c673cae FG |
346 | { |
347 | const entity_inst_t& m_inst = m->get_myinst(); | |
348 | peer_addr = m_inst.addr; | |
349 | peer_type = m_inst.name.type(); | |
350 | set_features(XIO_ALL_FEATURES); /* XXXX set to ours */ | |
351 | } | |
352 | ||
353 | XioLoopbackConnection* get() { | |
354 | return static_cast<XioLoopbackConnection*>(RefCountedObject::get()); | |
355 | } | |
356 | ||
357 | bool is_connected() override { return true; } | |
358 | ||
359 | int send_message(Message *m) override; | |
360 | void send_keepalive() override; | |
361 | void mark_down() override {} | |
362 | void mark_disposable() override {} | |
363 | ||
364 | uint64_t get_seq() { | |
31f18b77 | 365 | return seq; |
7c673cae FG |
366 | } |
367 | ||
368 | uint64_t next_seq() { | |
31f18b77 | 369 | return ++seq; |
7c673cae FG |
370 | } |
371 | }; | |
372 | ||
373 | typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef; | |
374 | ||
375 | #endif /* XIO_CONNECTION_H */ |