]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/xio/XioConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #ifndef XIO_CONNECTION_H
17 #define XIO_CONNECTION_H
19 #include <boost/intrusive/avl_set.hpp>
20 #include <boost/intrusive/list.hpp>
25 #include "XioSubmit.h"
26 #include "msg/Connection.h"
27 #include "msg/Messenger.h"
28 #include "include/atomic.h"
29 #include "auth/AuthSessionHandler.h"
31 #define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
34 #define XIO_NOP_TAG_MARKDOWN 0x0001
36 namespace bi
= boost::intrusive
;
42 class XioConnection
: public Connection
45 enum type
{ ACTIVE
, PASSIVE
};
57 enum session_startup_states
{
66 XioConnection::type xio_conn_type
;
70 struct xio_session
*session
;
71 struct xio_connection
*conn
;
72 pthread_spinlock_t sp
;
75 uint32_t n_reqs
; // Accelio-initiated reqs in progress (!counting partials)
77 uint32_t special_handling
;
84 // different from Pipe states?
95 uint32_t connect_seq
, peer_global_seq
;
96 uint64_t in_seq
, out_seq_acked
; // atomic<uint64_t>, got receipt
97 atomic64_t out_seq
; // atomic<uint32_t>
99 lifecycle() : state(lifecycle::INIT
), reconnects(0), connect_seq(0),
100 peer_global_seq(0), in_seq(0), out_seq_acked(0),
103 void set_in_seq(uint64_t seq
) {
107 uint64_t next_out_seq() {
108 return out_seq
.inc();
119 static const int FLAG_NONE
= 0x0000;
120 static const int FLAG_BAD_AUTH
= 0x0001;
121 static const int FLAG_MAPPED
= 0x0002;
122 static const int FLAG_RESET
= 0x0004;
124 static const int OP_FLAG_NONE
= 0x0000;
125 static const int OP_FLAG_LOCKED
= 0x0001;
126 static const int OP_FLAG_LRU
= 0x0002;
129 Messenger::Policy policy
;
131 CryptoKey session_key
;
132 ceph::shared_ptr
<AuthSessionHandler
> session_security
;
133 AuthAuthorizer
*authorizer
;
135 uint32_t protocol_version
;
137 atomic_t session_state
;
138 atomic_t startup_state
;
141 uint32_t connect_seq
, global_seq
, peer_global_seq
;
142 uint64_t in_seq
, out_seq_acked
; // atomic<uint64_t>, got receipt
143 atomic64_t out_seq
; // atomic<uint64_t>
147 explicit CState(XioConnection
* _xcon
)
163 uint64_t get_session_state() {
164 return session_state
.read();
167 uint64_t get_startup_state() {
168 return startup_state
.read();
171 void set_in_seq(uint64_t seq
) {
175 uint64_t next_out_seq() {
176 return out_seq
.inc();
181 int next_state(Message
* m
);
182 #if 0 // future (session startup)
183 int msg_connect(MConnect
*m
);
184 int msg_connect_reply(MConnectReply
*m
);
185 int msg_connect_reply(MConnectAuthReply
*m
);
186 int msg_connect_auth(MConnectAuth
*m
);
187 int msg_connect_auth_reply(MConnectAuthReply
*m
);
189 int state_up_ready(uint32_t flags
);
190 int state_flow_controlled(uint32_t flags
);
192 int state_fail(Message
* m
, uint32_t flags
);
194 } cstate
; /* CState */
196 // message submission queue
201 Message::Queue mqueue
; // deferred
202 XioSubmit::Queue requeue
;
204 SendQ():keepalive(false), ack(false){}
207 // conns_entity_map comparison functor
210 // for internal ordering
211 bool operator()(const XioConnection
&lhs
, const XioConnection
&rhs
) const
212 { return lhs
.get_peer() < rhs
.get_peer(); }
214 // for external search by entity_inst_t(peer)
215 bool operator()(const entity_inst_t
&peer
, const XioConnection
&c
) const
216 { return peer
< c
.get_peer(); }
218 bool operator()(const XioConnection
&c
, const entity_inst_t
&peer
) const
219 { return c
.get_peer() < peer
; }
222 bi::list_member_hook
<> conns_hook
;
223 bi::avl_set_member_hook
<> conns_entity_map_hook
;
225 typedef bi::list
< XioConnection
,
226 bi::member_hook
<XioConnection
, bi::list_member_hook
<>,
227 &XioConnection::conns_hook
> > ConnList
;
229 typedef bi::member_hook
<XioConnection
, bi::avl_set_member_hook
<>,
230 &XioConnection::conns_entity_map_hook
> EntityHook
;
232 typedef bi::avl_set
< XioConnection
, EntityHook
,
233 bi::compare
<EntityComp
> > EntitySet
;
235 friend class XioPortal
;
236 friend class XioMessenger
;
237 friend class XioDispatchHook
;
238 friend class XioMarkDownHook
;
239 friend class XioSend
;
241 int on_disconnect_event() {
242 connected
.set(false);
243 pthread_spin_lock(&sp
);
244 discard_out_queues(CState::OP_FLAG_LOCKED
);
245 pthread_spin_unlock(&sp
);
249 int on_teardown_event() {
250 pthread_spin_lock(&sp
);
252 xio_connection_destroy(conn
);
254 pthread_spin_unlock(&sp
);
259 int xio_qdepth_high_mark() {
263 int xio_qdepth_low_mark() {
268 XioConnection(XioMessenger
*m
, XioConnection::type _type
,
269 const entity_inst_t
& peer
);
273 xio_connection_destroy(conn
);
275 ostream
& conn_prefix(std::ostream
*_dout
);
277 bool is_connected() override
{ return connected
.read(); }
279 int send_message(Message
*m
) override
;
280 void send_keepalive() override
{send_keepalive_or_ack();}
281 void send_keepalive_or_ack(bool ack
= false, const utime_t
*tp
= nullptr);
282 void mark_down() override
;
283 int _mark_down(uint32_t flags
);
284 void mark_disposable() override
;
285 int _mark_disposable(uint32_t flags
);
287 const entity_inst_t
& get_peer() const { return peer
; }
289 XioConnection
* get() {
291 int refs
= nref
.read();
292 cout
<< "XioConnection::get " << this << " " << refs
<< std::endl
;
294 RefCountedObject::get();
299 RefCountedObject::put();
301 int refs
= nref
.read();
302 cout
<< "XioConnection::put " << this << " " << refs
<< std::endl
;
307 if (is_connected()) {
308 connected
.set(false);
309 xio_disconnect(conn
); // normal teardown will clean up conn
313 uint32_t get_magic() { return magic
; }
314 void set_magic(int _magic
) { magic
= _magic
; }
315 uint32_t get_special_handling() { return special_handling
; }
316 void set_special_handling(int n
) { special_handling
= n
; }
317 uint64_t get_scount() { return scount
; }
319 int passive_setup(); /* XXX */
321 int handle_data_msg(struct xio_session
*session
, struct xio_msg
*msg
,
322 int more_in_batch
, void *cb_user_context
);
323 int on_msg(struct xio_session
*session
, struct xio_msg
*msg
,
324 int more_in_batch
, void *cb_user_context
);
325 int on_ow_msg_send_complete(struct xio_session
*session
, struct xio_msg
*msg
,
326 void *conn_user_context
);
327 int on_msg_error(struct xio_session
*session
, enum xio_status error
,
328 struct xio_msg
*msg
, void *conn_user_context
);
329 void msg_send_fail(XioSend
*xsend
, int code
);
330 void msg_release_fail(struct xio_msg
*msg
, int code
);
332 void send_keepalive_or_ack_internal(bool ack
= false, const utime_t
*tp
= nullptr);
333 int flush_out_queues(uint32_t flags
);
334 int discard_out_queues(uint32_t flags
);
335 int adjust_clru(uint32_t flags
);
338 typedef boost::intrusive_ptr
<XioConnection
> XioConnectionRef
;
340 class XioLoopbackConnection
: public Connection
345 explicit XioLoopbackConnection(Messenger
*m
) : Connection(m
->cct
, m
), seq(0)
347 const entity_inst_t
& m_inst
= m
->get_myinst();
348 peer_addr
= m_inst
.addr
;
349 peer_type
= m_inst
.name
.type();
350 set_features(XIO_ALL_FEATURES
); /* XXXX set to ours */
353 XioLoopbackConnection
* get() {
354 return static_cast<XioLoopbackConnection
*>(RefCountedObject::get());
357 bool is_connected() override
{ return true; }
359 int send_message(Message
*m
) override
;
360 void send_keepalive() override
;
361 void mark_down() override
{}
362 void mark_disposable() override
{}
368 uint64_t next_seq() {
373 typedef boost::intrusive_ptr
<XioLoopbackConnection
> XioLoopbackConnectionRef
;
375 #endif /* XIO_CONNECTION_H */