]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/xio/XioConnection.h
update download target update for octopus release
[ceph.git] / ceph / src / msg / xio / XioConnection.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#ifndef XIO_CONNECTION_H
17#define XIO_CONNECTION_H
18
31f18b77
FG
19#include <atomic>
20
7c673cae
FG
21#include <boost/intrusive/avl_set.hpp>
22#include <boost/intrusive/list.hpp>
31f18b77 23
7c673cae
FG
24extern "C" {
25#include "libxio.h"
26}
31f18b77 27
7c673cae
FG
28#include "XioInSeq.h"
29#include "XioSubmit.h"
30#include "msg/Connection.h"
31#include "msg/Messenger.h"
7c673cae
FG
32#include "auth/AuthSessionHandler.h"
33
34#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
35
36
37#define XIO_NOP_TAG_MARKDOWN 0x0001
38
39namespace bi = boost::intrusive;
40
41class XioPortal;
42class XioMessenger;
43class XioSend;
44
45class XioConnection : public Connection
46{
47public:
48 enum type { ACTIVE, PASSIVE };
49
31f18b77 50 enum class session_states : unsigned {
7c673cae
FG
51 INIT = 0,
52 START,
53 UP,
54 FLOW_CONTROLLED,
55 DISCONNECTED,
56 DELETED,
57 BARRIER
58 };
59
31f18b77 60 enum class session_startup_states : unsigned {
7c673cae
FG
61 IDLE = 0,
62 CONNECTING,
63 ACCEPTING,
64 READY,
65 FAIL
66 };
67
68private:
69 XioConnection::type xio_conn_type;
70 XioPortal *portal;
31f18b77 71 std::atomic<bool> connected = { false };
7c673cae
FG
72 entity_inst_t peer;
73 struct xio_session *session;
74 struct xio_connection *conn;
11fdf7f2 75 ceph::util::spinlock sp;
31f18b77
FG
76 std::atomic<int64_t> send = { 0 };
77 std::atomic<int64_t> recv = { 0 };
7c673cae
FG
78 uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials)
79 uint32_t magic;
80 uint32_t special_handling;
81 uint64_t scount;
82 uint32_t send_ctr;
83 int q_high_mark;
84 int q_low_mark;
85
86 struct lifecycle {
87 // different from Pipe states?
88 enum lf_state {
89 INIT,
90 LOCAL_DISCON,
91 REMOTE_DISCON,
92 RECONNECTING,
93 UP,
94 DEAD } state;
95
96 /* XXX */
97 uint32_t reconnects;
98 uint32_t connect_seq, peer_global_seq;
99 uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
31f18b77 100 std::atomic<int64_t> out_seq = { 0 };
7c673cae
FG
101
102 lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0),
31f18b77
FG
103 peer_global_seq(0), in_seq(0), out_seq_acked(0)
104 {}
7c673cae
FG
105
106 void set_in_seq(uint64_t seq) {
107 in_seq = seq;
108 }
109
110 uint64_t next_out_seq() {
31f18b77 111 return ++out_seq;
7c673cae
FG
112 }
113
114 } state;
115
116 /* batching */
117 XioInSeq in_seq;
118
119 class CState
120 {
121 public:
122 static const int FLAG_NONE = 0x0000;
123 static const int FLAG_BAD_AUTH = 0x0001;
124 static const int FLAG_MAPPED = 0x0002;
125 static const int FLAG_RESET = 0x0004;
126
127 static const int OP_FLAG_NONE = 0x0000;
128 static const int OP_FLAG_LOCKED = 0x0001;
129 static const int OP_FLAG_LRU = 0x0002;
130
131 uint64_t features;
132 Messenger::Policy policy;
133
134 CryptoKey session_key;
11fdf7f2 135 std::shared_ptr<AuthSessionHandler> session_security;
7c673cae
FG
136 AuthAuthorizer *authorizer;
137 XioConnection *xcon;
138 uint32_t protocol_version;
139
31f18b77
FG
140 std::atomic<session_states> session_state = { 0 };
141 std::atomic<session_startup_state> startup_state = { 0 };
7c673cae
FG
142
143 uint32_t reconnects;
144 uint32_t connect_seq, global_seq, peer_global_seq;
145 uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
31f18b77 146 std::atomic<uint64_t> out_seq = { 0 };
7c673cae
FG
147
148 uint32_t flags;
149
150 explicit CState(XioConnection* _xcon)
151 : features(0),
152 authorizer(NULL),
153 xcon(_xcon),
154 protocol_version(0),
155 session_state(INIT),
156 startup_state(IDLE),
157 reconnects(0),
158 connect_seq(0),
159 global_seq(0),
160 peer_global_seq(0),
161 in_seq(0),
162 out_seq_acked(0),
7c673cae
FG
163 flags(FLAG_NONE) {}
164
165 uint64_t get_session_state() {
31f18b77 166 return session_state;
7c673cae
FG
167 }
168
169 uint64_t get_startup_state() {
31f18b77 170 return startup_state;
7c673cae
FG
171 }
172
173 void set_in_seq(uint64_t seq) {
174 in_seq = seq;
175 }
176
177 uint64_t next_out_seq() {
31f18b77 178 return ++out_seq;
7c673cae
FG
179 };
180
181 // state machine
182 int init_state();
183 int next_state(Message* m);
184#if 0 // future (session startup)
185 int msg_connect(MConnect *m);
186 int msg_connect_reply(MConnectReply *m);
187 int msg_connect_reply(MConnectAuthReply *m);
188 int msg_connect_auth(MConnectAuth *m);
189 int msg_connect_auth_reply(MConnectAuthReply *m);
190#endif
191 int state_up_ready(uint32_t flags);
192 int state_flow_controlled(uint32_t flags);
193 int state_discon();
194 int state_fail(Message* m, uint32_t flags);
195
196 } cstate; /* CState */
197
198 // message submission queue
199 struct SendQ {
200 bool keepalive;
201 bool ack;
202 utime_t ack_time;
203 Message::Queue mqueue; // deferred
204 XioSubmit::Queue requeue;
205
206 SendQ():keepalive(false), ack(false){}
207 } outgoing;
208
209 // conns_entity_map comparison functor
210 struct EntityComp
211 {
212 // for internal ordering
213 bool operator()(const XioConnection &lhs, const XioConnection &rhs) const
214 { return lhs.get_peer() < rhs.get_peer(); }
215
216 // for external search by entity_inst_t(peer)
217 bool operator()(const entity_inst_t &peer, const XioConnection &c) const
218 { return peer < c.get_peer(); }
219
220 bool operator()(const XioConnection &c, const entity_inst_t &peer) const
221 { return c.get_peer() < peer; }
222 };
223
224 bi::list_member_hook<> conns_hook;
225 bi::avl_set_member_hook<> conns_entity_map_hook;
226
227 typedef bi::list< XioConnection,
228 bi::member_hook<XioConnection, bi::list_member_hook<>,
229 &XioConnection::conns_hook > > ConnList;
230
231 typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>,
232 &XioConnection::conns_entity_map_hook> EntityHook;
233
234 typedef bi::avl_set< XioConnection, EntityHook,
235 bi::compare<EntityComp> > EntitySet;
236
237 friend class XioPortal;
238 friend class XioMessenger;
239 friend class XioDispatchHook;
240 friend class XioMarkDownHook;
241 friend class XioSend;
242
243 int on_disconnect_event() {
11fdf7f2
TL
244 std::lock_guard<ceph::spinlock> lg(sp);
245
31f18b77 246 connected = false;
7c673cae 247 discard_out_queues(CState::OP_FLAG_LOCKED);
11fdf7f2 248
7c673cae
FG
249 return 0;
250 }
251
252 int on_teardown_event() {
11fdf7f2
TL
253
254 {
255 std::lock_guard<ceph::spinlock> lg(sp);
256
7c673cae
FG
257 if (conn)
258 xio_connection_destroy(conn);
259 conn = NULL;
11fdf7f2
TL
260 }
261
7c673cae
FG
262 this->put();
263 return 0;
264 }
265
266 int xio_qdepth_high_mark() {
267 return q_high_mark;
268 }
269
270 int xio_qdepth_low_mark() {
271 return q_low_mark;
272 }
273
274public:
275 XioConnection(XioMessenger *m, XioConnection::type _type,
276 const entity_inst_t& peer);
277
278 ~XioConnection() {
279 if (conn)
280 xio_connection_destroy(conn);
281 }
282 ostream& conn_prefix(std::ostream *_dout);
283
31f18b77 284 bool is_connected() override { return connected; }
7c673cae
FG
285
286 int send_message(Message *m) override;
287 void send_keepalive() override {send_keepalive_or_ack();}
288 void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr);
289 void mark_down() override;
290 int _mark_down(uint32_t flags);
291 void mark_disposable() override;
292 int _mark_disposable(uint32_t flags);
293
294 const entity_inst_t& get_peer() const { return peer; }
295
296 XioConnection* get() {
297#if 0
31f18b77 298 cout << "XioConnection::get " << this << " " << nref.load() << std::endl;
7c673cae
FG
299#endif
300 RefCountedObject::get();
301 return this;
302 }
303
304 void put() {
305 RefCountedObject::put();
306#if 0
31f18b77 307 cout << "XioConnection::put " << this << " " << nref.load() << std::endl;
7c673cae
FG
308#endif
309 }
310
311 void disconnect() {
312 if (is_connected()) {
31f18b77 313 connected = false;
7c673cae
FG
314 xio_disconnect(conn); // normal teardown will clean up conn
315 }
316 }
317
318 uint32_t get_magic() { return magic; }
319 void set_magic(int _magic) { magic = _magic; }
320 uint32_t get_special_handling() { return special_handling; }
321 void set_special_handling(int n) { special_handling = n; }
322 uint64_t get_scount() { return scount; }
323
324 int passive_setup(); /* XXX */
325
326 int handle_data_msg(struct xio_session *session, struct xio_msg *msg,
327 int more_in_batch, void *cb_user_context);
328 int on_msg(struct xio_session *session, struct xio_msg *msg,
329 int more_in_batch, void *cb_user_context);
330 int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
331 void *conn_user_context);
332 int on_msg_error(struct xio_session *session, enum xio_status error,
333 struct xio_msg *msg, void *conn_user_context);
334 void msg_send_fail(XioSend *xsend, int code);
335 void msg_release_fail(struct xio_msg *msg, int code);
336private:
337 void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr);
338 int flush_out_queues(uint32_t flags);
339 int discard_out_queues(uint32_t flags);
340 int adjust_clru(uint32_t flags);
341};
342
343typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
344
345class XioLoopbackConnection : public Connection
346{
347private:
31f18b77 348 std::atomic<uint64_t> seq = { 0 };
7c673cae 349public:
31f18b77 350 explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m)
7c673cae
FG
351 {
352 const entity_inst_t& m_inst = m->get_myinst();
353 peer_addr = m_inst.addr;
354 peer_type = m_inst.name.type();
355 set_features(XIO_ALL_FEATURES); /* XXXX set to ours */
356 }
357
358 XioLoopbackConnection* get() {
359 return static_cast<XioLoopbackConnection*>(RefCountedObject::get());
360 }
361
362 bool is_connected() override { return true; }
363
364 int send_message(Message *m) override;
365 void send_keepalive() override;
366 void mark_down() override {}
367 void mark_disposable() override {}
368
369 uint64_t get_seq() {
31f18b77 370 return seq;
7c673cae
FG
371 }
372
373 uint64_t next_seq() {
31f18b77 374 return ++seq;
7c673cae
FG
375 }
376};
377
378typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef;
379
380#endif /* XIO_CONNECTION_H */