]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/xio/XioConnection.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / msg / xio / XioConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #ifndef XIO_CONNECTION_H
17 #define XIO_CONNECTION_H
18
19 #include <boost/intrusive/avl_set.hpp>
20 #include <boost/intrusive/list.hpp>
21 extern "C" {
22 #include "libxio.h"
23 }
24 #include "XioInSeq.h"
25 #include "XioSubmit.h"
26 #include "msg/Connection.h"
27 #include "msg/Messenger.h"
28 #include "include/atomic.h"
29 #include "auth/AuthSessionHandler.h"
30
31 #define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
32
33
34 #define XIO_NOP_TAG_MARKDOWN 0x0001
35
36 namespace bi = boost::intrusive;
37
38 class XioPortal;
39 class XioMessenger;
40 class XioSend;
41
42 class XioConnection : public Connection
43 {
44 public:
45 enum type { ACTIVE, PASSIVE };
46
47 enum session_states {
48 INIT = 0,
49 START,
50 UP,
51 FLOW_CONTROLLED,
52 DISCONNECTED,
53 DELETED,
54 BARRIER
55 };
56
57 enum session_startup_states {
58 IDLE = 0,
59 CONNECTING,
60 ACCEPTING,
61 READY,
62 FAIL
63 };
64
65 private:
66 XioConnection::type xio_conn_type;
67 XioPortal *portal;
68 atomic_t connected;
69 entity_inst_t peer;
70 struct xio_session *session;
71 struct xio_connection *conn;
72 pthread_spinlock_t sp;
73 atomic_t send;
74 atomic_t recv;
75 uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials)
76 uint32_t magic;
77 uint32_t special_handling;
78 uint64_t scount;
79 uint32_t send_ctr;
80 int q_high_mark;
81 int q_low_mark;
82
83 struct lifecycle {
84 // different from Pipe states?
85 enum lf_state {
86 INIT,
87 LOCAL_DISCON,
88 REMOTE_DISCON,
89 RECONNECTING,
90 UP,
91 DEAD } state;
92
93 /* XXX */
94 uint32_t reconnects;
95 uint32_t connect_seq, peer_global_seq;
96 uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
97 atomic64_t out_seq; // atomic<uint32_t>
98
99 lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0),
100 peer_global_seq(0), in_seq(0), out_seq_acked(0),
101 out_seq(0) {}
102
103 void set_in_seq(uint64_t seq) {
104 in_seq = seq;
105 }
106
107 uint64_t next_out_seq() {
108 return out_seq.inc();
109 }
110
111 } state;
112
113 /* batching */
114 XioInSeq in_seq;
115
116 class CState
117 {
118 public:
119 static const int FLAG_NONE = 0x0000;
120 static const int FLAG_BAD_AUTH = 0x0001;
121 static const int FLAG_MAPPED = 0x0002;
122 static const int FLAG_RESET = 0x0004;
123
124 static const int OP_FLAG_NONE = 0x0000;
125 static const int OP_FLAG_LOCKED = 0x0001;
126 static const int OP_FLAG_LRU = 0x0002;
127
128 uint64_t features;
129 Messenger::Policy policy;
130
131 CryptoKey session_key;
132 ceph::shared_ptr<AuthSessionHandler> session_security;
133 AuthAuthorizer *authorizer;
134 XioConnection *xcon;
135 uint32_t protocol_version;
136
137 atomic_t session_state;
138 atomic_t startup_state;
139
140 uint32_t reconnects;
141 uint32_t connect_seq, global_seq, peer_global_seq;
142 uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
143 atomic64_t out_seq; // atomic<uint64_t>
144
145 uint32_t flags;
146
147 explicit CState(XioConnection* _xcon)
148 : features(0),
149 authorizer(NULL),
150 xcon(_xcon),
151 protocol_version(0),
152 session_state(INIT),
153 startup_state(IDLE),
154 reconnects(0),
155 connect_seq(0),
156 global_seq(0),
157 peer_global_seq(0),
158 in_seq(0),
159 out_seq_acked(0),
160 out_seq(0),
161 flags(FLAG_NONE) {}
162
163 uint64_t get_session_state() {
164 return session_state.read();
165 }
166
167 uint64_t get_startup_state() {
168 return startup_state.read();
169 }
170
171 void set_in_seq(uint64_t seq) {
172 in_seq = seq;
173 }
174
175 uint64_t next_out_seq() {
176 return out_seq.inc();
177 };
178
179 // state machine
180 int init_state();
181 int next_state(Message* m);
182 #if 0 // future (session startup)
183 int msg_connect(MConnect *m);
184 int msg_connect_reply(MConnectReply *m);
185 int msg_connect_reply(MConnectAuthReply *m);
186 int msg_connect_auth(MConnectAuth *m);
187 int msg_connect_auth_reply(MConnectAuthReply *m);
188 #endif
189 int state_up_ready(uint32_t flags);
190 int state_flow_controlled(uint32_t flags);
191 int state_discon();
192 int state_fail(Message* m, uint32_t flags);
193
194 } cstate; /* CState */
195
196 // message submission queue
197 struct SendQ {
198 bool keepalive;
199 bool ack;
200 utime_t ack_time;
201 Message::Queue mqueue; // deferred
202 XioSubmit::Queue requeue;
203
204 SendQ():keepalive(false), ack(false){}
205 } outgoing;
206
207 // conns_entity_map comparison functor
208 struct EntityComp
209 {
210 // for internal ordering
211 bool operator()(const XioConnection &lhs, const XioConnection &rhs) const
212 { return lhs.get_peer() < rhs.get_peer(); }
213
214 // for external search by entity_inst_t(peer)
215 bool operator()(const entity_inst_t &peer, const XioConnection &c) const
216 { return peer < c.get_peer(); }
217
218 bool operator()(const XioConnection &c, const entity_inst_t &peer) const
219 { return c.get_peer() < peer; }
220 };
221
222 bi::list_member_hook<> conns_hook;
223 bi::avl_set_member_hook<> conns_entity_map_hook;
224
225 typedef bi::list< XioConnection,
226 bi::member_hook<XioConnection, bi::list_member_hook<>,
227 &XioConnection::conns_hook > > ConnList;
228
229 typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>,
230 &XioConnection::conns_entity_map_hook> EntityHook;
231
232 typedef bi::avl_set< XioConnection, EntityHook,
233 bi::compare<EntityComp> > EntitySet;
234
235 friend class XioPortal;
236 friend class XioMessenger;
237 friend class XioDispatchHook;
238 friend class XioMarkDownHook;
239 friend class XioSend;
240
241 int on_disconnect_event() {
242 connected.set(false);
243 pthread_spin_lock(&sp);
244 discard_out_queues(CState::OP_FLAG_LOCKED);
245 pthread_spin_unlock(&sp);
246 return 0;
247 }
248
249 int on_teardown_event() {
250 pthread_spin_lock(&sp);
251 if (conn)
252 xio_connection_destroy(conn);
253 conn = NULL;
254 pthread_spin_unlock(&sp);
255 this->put();
256 return 0;
257 }
258
259 int xio_qdepth_high_mark() {
260 return q_high_mark;
261 }
262
263 int xio_qdepth_low_mark() {
264 return q_low_mark;
265 }
266
267 public:
268 XioConnection(XioMessenger *m, XioConnection::type _type,
269 const entity_inst_t& peer);
270
271 ~XioConnection() {
272 if (conn)
273 xio_connection_destroy(conn);
274 }
275 ostream& conn_prefix(std::ostream *_dout);
276
277 bool is_connected() override { return connected.read(); }
278
279 int send_message(Message *m) override;
280 void send_keepalive() override {send_keepalive_or_ack();}
281 void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr);
282 void mark_down() override;
283 int _mark_down(uint32_t flags);
284 void mark_disposable() override;
285 int _mark_disposable(uint32_t flags);
286
287 const entity_inst_t& get_peer() const { return peer; }
288
289 XioConnection* get() {
290 #if 0
291 int refs = nref.read();
292 cout << "XioConnection::get " << this << " " << refs << std::endl;
293 #endif
294 RefCountedObject::get();
295 return this;
296 }
297
298 void put() {
299 RefCountedObject::put();
300 #if 0
301 int refs = nref.read();
302 cout << "XioConnection::put " << this << " " << refs << std::endl;
303 #endif
304 }
305
306 void disconnect() {
307 if (is_connected()) {
308 connected.set(false);
309 xio_disconnect(conn); // normal teardown will clean up conn
310 }
311 }
312
313 uint32_t get_magic() { return magic; }
314 void set_magic(int _magic) { magic = _magic; }
315 uint32_t get_special_handling() { return special_handling; }
316 void set_special_handling(int n) { special_handling = n; }
317 uint64_t get_scount() { return scount; }
318
319 int passive_setup(); /* XXX */
320
321 int handle_data_msg(struct xio_session *session, struct xio_msg *msg,
322 int more_in_batch, void *cb_user_context);
323 int on_msg(struct xio_session *session, struct xio_msg *msg,
324 int more_in_batch, void *cb_user_context);
325 int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
326 void *conn_user_context);
327 int on_msg_error(struct xio_session *session, enum xio_status error,
328 struct xio_msg *msg, void *conn_user_context);
329 void msg_send_fail(XioSend *xsend, int code);
330 void msg_release_fail(struct xio_msg *msg, int code);
331 private:
332 void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr);
333 int flush_out_queues(uint32_t flags);
334 int discard_out_queues(uint32_t flags);
335 int adjust_clru(uint32_t flags);
336 };
337
338 typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
339
340 class XioLoopbackConnection : public Connection
341 {
342 private:
343 atomic64_t seq;
344 public:
345 explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m), seq(0)
346 {
347 const entity_inst_t& m_inst = m->get_myinst();
348 peer_addr = m_inst.addr;
349 peer_type = m_inst.name.type();
350 set_features(XIO_ALL_FEATURES); /* XXXX set to ours */
351 }
352
353 XioLoopbackConnection* get() {
354 return static_cast<XioLoopbackConnection*>(RefCountedObject::get());
355 }
356
357 bool is_connected() override { return true; }
358
359 int send_message(Message *m) override;
360 void send_keepalive() override;
361 void mark_down() override {}
362 void mark_disposable() override {}
363
364 uint64_t get_seq() {
365 return seq.read();
366 }
367
368 uint64_t next_seq() {
369 return seq.inc();
370 }
371 };
372
373 typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef;
374
375 #endif /* XIO_CONNECTION_H */