]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * Portions Copyright (C) 2013 CohortFS, LLC | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #ifndef XIO_CONNECTION_H | |
17 | #define XIO_CONNECTION_H | |
18 | ||
31f18b77 FG |
19 | #include <atomic> |
20 | ||
7c673cae FG |
21 | #include <boost/intrusive/avl_set.hpp> |
22 | #include <boost/intrusive/list.hpp> | |
31f18b77 | 23 | |
7c673cae FG |
24 | extern "C" { |
25 | #include "libxio.h" | |
26 | } | |
31f18b77 | 27 | |
7c673cae FG |
28 | #include "XioInSeq.h" |
29 | #include "XioSubmit.h" | |
30 | #include "msg/Connection.h" | |
31 | #include "msg/Messenger.h" | |
7c673cae FG |
32 | #include "auth/AuthSessionHandler.h" |
33 | ||
34 | #define XIO_ALL_FEATURES (CEPH_FEATURES_ALL) | |
35 | ||
36 | ||
37 | #define XIO_NOP_TAG_MARKDOWN 0x0001 | |
38 | ||
39 | namespace bi = boost::intrusive; | |
40 | ||
41 | class XioPortal; | |
42 | class XioMessenger; | |
43 | class XioSend; | |
44 | ||
45 | class XioConnection : public Connection | |
46 | { | |
47 | public: | |
48 | enum type { ACTIVE, PASSIVE }; | |
49 | ||
31f18b77 | 50 | enum class session_states : unsigned { |
7c673cae FG |
51 | INIT = 0, |
52 | START, | |
53 | UP, | |
54 | FLOW_CONTROLLED, | |
55 | DISCONNECTED, | |
56 | DELETED, | |
57 | BARRIER | |
58 | }; | |
59 | ||
31f18b77 | 60 | enum class session_startup_states : unsigned { |
7c673cae FG |
61 | IDLE = 0, |
62 | CONNECTING, | |
63 | ACCEPTING, | |
64 | READY, | |
65 | FAIL | |
66 | }; | |
67 | ||
68 | private: | |
69 | XioConnection::type xio_conn_type; | |
70 | XioPortal *portal; | |
31f18b77 | 71 | std::atomic<bool> connected = { false }; |
7c673cae FG |
72 | entity_inst_t peer; |
73 | struct xio_session *session; | |
74 | struct xio_connection *conn; | |
11fdf7f2 | 75 | ceph::util::spinlock sp; |
31f18b77 FG |
76 | std::atomic<int64_t> send = { 0 }; |
77 | std::atomic<int64_t> recv = { 0 }; | |
7c673cae FG |
78 | uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials) |
79 | uint32_t magic; | |
80 | uint32_t special_handling; | |
81 | uint64_t scount; | |
82 | uint32_t send_ctr; | |
83 | int q_high_mark; | |
84 | int q_low_mark; | |
85 | ||
86 | struct lifecycle { | |
87 | // different from Pipe states? | |
88 | enum lf_state { | |
89 | INIT, | |
90 | LOCAL_DISCON, | |
91 | REMOTE_DISCON, | |
92 | RECONNECTING, | |
93 | UP, | |
94 | DEAD } state; | |
95 | ||
96 | /* XXX */ | |
97 | uint32_t reconnects; | |
98 | uint32_t connect_seq, peer_global_seq; | |
99 | uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt | |
31f18b77 | 100 | std::atomic<int64_t> out_seq = { 0 }; |
7c673cae FG |
101 | |
102 | lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0), | |
31f18b77 FG |
103 | peer_global_seq(0), in_seq(0), out_seq_acked(0) |
104 | {} | |
7c673cae FG |
105 | |
106 | void set_in_seq(uint64_t seq) { | |
107 | in_seq = seq; | |
108 | } | |
109 | ||
110 | uint64_t next_out_seq() { | |
31f18b77 | 111 | return ++out_seq; |
7c673cae FG |
112 | } |
113 | ||
114 | } state; | |
115 | ||
116 | /* batching */ | |
117 | XioInSeq in_seq; | |
118 | ||
119 | class CState | |
120 | { | |
121 | public: | |
122 | static const int FLAG_NONE = 0x0000; | |
123 | static const int FLAG_BAD_AUTH = 0x0001; | |
124 | static const int FLAG_MAPPED = 0x0002; | |
125 | static const int FLAG_RESET = 0x0004; | |
126 | ||
127 | static const int OP_FLAG_NONE = 0x0000; | |
128 | static const int OP_FLAG_LOCKED = 0x0001; | |
129 | static const int OP_FLAG_LRU = 0x0002; | |
130 | ||
131 | uint64_t features; | |
132 | Messenger::Policy policy; | |
133 | ||
134 | CryptoKey session_key; | |
11fdf7f2 | 135 | std::shared_ptr<AuthSessionHandler> session_security; |
7c673cae FG |
136 | AuthAuthorizer *authorizer; |
137 | XioConnection *xcon; | |
138 | uint32_t protocol_version; | |
139 | ||
31f18b77 FG |
140 | std::atomic<session_states> session_state = { 0 }; |
141 | std::atomic<session_startup_state> startup_state = { 0 }; | |
7c673cae FG |
142 | |
143 | uint32_t reconnects; | |
144 | uint32_t connect_seq, global_seq, peer_global_seq; | |
145 | uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt | |
31f18b77 | 146 | std::atomic<uint64_t> out_seq = { 0 }; |
7c673cae FG |
147 | |
148 | uint32_t flags; | |
149 | ||
150 | explicit CState(XioConnection* _xcon) | |
151 | : features(0), | |
152 | authorizer(NULL), | |
153 | xcon(_xcon), | |
154 | protocol_version(0), | |
155 | session_state(INIT), | |
156 | startup_state(IDLE), | |
157 | reconnects(0), | |
158 | connect_seq(0), | |
159 | global_seq(0), | |
160 | peer_global_seq(0), | |
161 | in_seq(0), | |
162 | out_seq_acked(0), | |
7c673cae FG |
163 | flags(FLAG_NONE) {} |
164 | ||
165 | uint64_t get_session_state() { | |
31f18b77 | 166 | return session_state; |
7c673cae FG |
167 | } |
168 | ||
169 | uint64_t get_startup_state() { | |
31f18b77 | 170 | return startup_state; |
7c673cae FG |
171 | } |
172 | ||
173 | void set_in_seq(uint64_t seq) { | |
174 | in_seq = seq; | |
175 | } | |
176 | ||
177 | uint64_t next_out_seq() { | |
31f18b77 | 178 | return ++out_seq; |
7c673cae FG |
179 | }; |
180 | ||
181 | // state machine | |
182 | int init_state(); | |
183 | int next_state(Message* m); | |
184 | #if 0 // future (session startup) | |
185 | int msg_connect(MConnect *m); | |
186 | int msg_connect_reply(MConnectReply *m); | |
187 | int msg_connect_reply(MConnectAuthReply *m); | |
188 | int msg_connect_auth(MConnectAuth *m); | |
189 | int msg_connect_auth_reply(MConnectAuthReply *m); | |
190 | #endif | |
191 | int state_up_ready(uint32_t flags); | |
192 | int state_flow_controlled(uint32_t flags); | |
193 | int state_discon(); | |
194 | int state_fail(Message* m, uint32_t flags); | |
195 | ||
196 | } cstate; /* CState */ | |
197 | ||
198 | // message submission queue | |
199 | struct SendQ { | |
200 | bool keepalive; | |
201 | bool ack; | |
202 | utime_t ack_time; | |
203 | Message::Queue mqueue; // deferred | |
204 | XioSubmit::Queue requeue; | |
205 | ||
206 | SendQ():keepalive(false), ack(false){} | |
207 | } outgoing; | |
208 | ||
209 | // conns_entity_map comparison functor | |
210 | struct EntityComp | |
211 | { | |
212 | // for internal ordering | |
213 | bool operator()(const XioConnection &lhs, const XioConnection &rhs) const | |
214 | { return lhs.get_peer() < rhs.get_peer(); } | |
215 | ||
216 | // for external search by entity_inst_t(peer) | |
217 | bool operator()(const entity_inst_t &peer, const XioConnection &c) const | |
218 | { return peer < c.get_peer(); } | |
219 | ||
220 | bool operator()(const XioConnection &c, const entity_inst_t &peer) const | |
221 | { return c.get_peer() < peer; } | |
222 | }; | |
223 | ||
224 | bi::list_member_hook<> conns_hook; | |
225 | bi::avl_set_member_hook<> conns_entity_map_hook; | |
226 | ||
227 | typedef bi::list< XioConnection, | |
228 | bi::member_hook<XioConnection, bi::list_member_hook<>, | |
229 | &XioConnection::conns_hook > > ConnList; | |
230 | ||
231 | typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>, | |
232 | &XioConnection::conns_entity_map_hook> EntityHook; | |
233 | ||
234 | typedef bi::avl_set< XioConnection, EntityHook, | |
235 | bi::compare<EntityComp> > EntitySet; | |
236 | ||
237 | friend class XioPortal; | |
238 | friend class XioMessenger; | |
239 | friend class XioDispatchHook; | |
240 | friend class XioMarkDownHook; | |
241 | friend class XioSend; | |
242 | ||
243 | int on_disconnect_event() { | |
11fdf7f2 TL |
244 | std::lock_guard<ceph::spinlock> lg(sp); |
245 | ||
31f18b77 | 246 | connected = false; |
7c673cae | 247 | discard_out_queues(CState::OP_FLAG_LOCKED); |
11fdf7f2 | 248 | |
7c673cae FG |
249 | return 0; |
250 | } | |
251 | ||
252 | int on_teardown_event() { | |
11fdf7f2 TL |
253 | |
254 | { | |
255 | std::lock_guard<ceph::spinlock> lg(sp); | |
256 | ||
7c673cae FG |
257 | if (conn) |
258 | xio_connection_destroy(conn); | |
259 | conn = NULL; | |
11fdf7f2 TL |
260 | } |
261 | ||
7c673cae FG |
262 | this->put(); |
263 | return 0; | |
264 | } | |
265 | ||
266 | int xio_qdepth_high_mark() { | |
267 | return q_high_mark; | |
268 | } | |
269 | ||
270 | int xio_qdepth_low_mark() { | |
271 | return q_low_mark; | |
272 | } | |
273 | ||
274 | public: | |
275 | XioConnection(XioMessenger *m, XioConnection::type _type, | |
276 | const entity_inst_t& peer); | |
277 | ||
278 | ~XioConnection() { | |
279 | if (conn) | |
280 | xio_connection_destroy(conn); | |
281 | } | |
282 | ostream& conn_prefix(std::ostream *_dout); | |
283 | ||
31f18b77 | 284 | bool is_connected() override { return connected; } |
7c673cae FG |
285 | |
286 | int send_message(Message *m) override; | |
287 | void send_keepalive() override {send_keepalive_or_ack();} | |
288 | void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr); | |
289 | void mark_down() override; | |
290 | int _mark_down(uint32_t flags); | |
291 | void mark_disposable() override; | |
292 | int _mark_disposable(uint32_t flags); | |
293 | ||
294 | const entity_inst_t& get_peer() const { return peer; } | |
295 | ||
296 | XioConnection* get() { | |
297 | #if 0 | |
31f18b77 | 298 | cout << "XioConnection::get " << this << " " << nref.load() << std::endl; |
7c673cae FG |
299 | #endif |
300 | RefCountedObject::get(); | |
301 | return this; | |
302 | } | |
303 | ||
304 | void put() { | |
305 | RefCountedObject::put(); | |
306 | #if 0 | |
31f18b77 | 307 | cout << "XioConnection::put " << this << " " << nref.load() << std::endl; |
7c673cae FG |
308 | #endif |
309 | } | |
310 | ||
311 | void disconnect() { | |
312 | if (is_connected()) { | |
31f18b77 | 313 | connected = false; |
7c673cae FG |
314 | xio_disconnect(conn); // normal teardown will clean up conn |
315 | } | |
316 | } | |
317 | ||
318 | uint32_t get_magic() { return magic; } | |
319 | void set_magic(int _magic) { magic = _magic; } | |
320 | uint32_t get_special_handling() { return special_handling; } | |
321 | void set_special_handling(int n) { special_handling = n; } | |
322 | uint64_t get_scount() { return scount; } | |
323 | ||
324 | int passive_setup(); /* XXX */ | |
325 | ||
326 | int handle_data_msg(struct xio_session *session, struct xio_msg *msg, | |
327 | int more_in_batch, void *cb_user_context); | |
328 | int on_msg(struct xio_session *session, struct xio_msg *msg, | |
329 | int more_in_batch, void *cb_user_context); | |
330 | int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg, | |
331 | void *conn_user_context); | |
332 | int on_msg_error(struct xio_session *session, enum xio_status error, | |
333 | struct xio_msg *msg, void *conn_user_context); | |
334 | void msg_send_fail(XioSend *xsend, int code); | |
335 | void msg_release_fail(struct xio_msg *msg, int code); | |
336 | private: | |
337 | void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr); | |
338 | int flush_out_queues(uint32_t flags); | |
339 | int discard_out_queues(uint32_t flags); | |
340 | int adjust_clru(uint32_t flags); | |
341 | }; | |
342 | ||
343 | typedef boost::intrusive_ptr<XioConnection> XioConnectionRef; | |
344 | ||
345 | class XioLoopbackConnection : public Connection | |
346 | { | |
347 | private: | |
31f18b77 | 348 | std::atomic<uint64_t> seq = { 0 }; |
7c673cae | 349 | public: |
31f18b77 | 350 | explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m) |
7c673cae FG |
351 | { |
352 | const entity_inst_t& m_inst = m->get_myinst(); | |
353 | peer_addr = m_inst.addr; | |
354 | peer_type = m_inst.name.type(); | |
355 | set_features(XIO_ALL_FEATURES); /* XXXX set to ours */ | |
356 | } | |
357 | ||
358 | XioLoopbackConnection* get() { | |
359 | return static_cast<XioLoopbackConnection*>(RefCountedObject::get()); | |
360 | } | |
361 | ||
362 | bool is_connected() override { return true; } | |
363 | ||
364 | int send_message(Message *m) override; | |
365 | void send_keepalive() override; | |
366 | void mark_down() override {} | |
367 | void mark_disposable() override {} | |
368 | ||
369 | uint64_t get_seq() { | |
31f18b77 | 370 | return seq; |
7c673cae FG |
371 | } |
372 | ||
373 | uint64_t next_seq() { | |
31f18b77 | 374 | return ++seq; |
7c673cae FG |
375 | } |
376 | }; | |
377 | ||
378 | typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef; | |
379 | ||
380 | #endif /* XIO_CONNECTION_H */ |