#include "msg/async/net_handler.h"
#include "RDMAStack.h"
-#include "Device.h"
-#include "RDMAConnTCP.h"
+
+#include "include/compat.h"
+#include "include/sock_compat.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAServerSocketImpl "
-RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
- : cct(cct), infiniband(i), dispatcher(s), worker(w), sa(a)
+RDMAServerSocketImpl::RDMAServerSocketImpl(
+ CephContext *cct, shared_ptr<Infiniband>& ib,
+ shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w, entity_addr_t& a, unsigned slot)
+ : ServerSocketImpl(a.get_type(), slot),
+ cct(cct), net(cct), server_setup_socket(-1), ib(ib),
+ dispatcher(rdma_dispatcher), worker(w), sa(a)
{
}
-RDMAServerConnTCP::RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
- : RDMAServerSocketImpl(cct, i, s, w, a), net(cct), server_setup_socket(-1)
-{
- ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
- ibport = cct->_conf->ms_async_rdma_port_num;
-
- assert(ibdev);
- assert(ibport > 0);
-
- ibdev->init(ibport);
-}
-
-int RDMAServerConnTCP::listen(entity_addr_t &sa, const SocketOptions &opt)
+int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
{
int rc = 0;
server_setup_socket = net.create_socket(sa.get_family(), true);
if (rc < 0) {
goto err;
}
- net.set_close_on_exec(server_setup_socket);
rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());
if (rc < 0) {
goto err;
}
- rc = ::listen(server_setup_socket, 128);
+ rc = ::listen(server_setup_socket, cct->_conf->ms_tcp_listen_backlog);
if (rc < 0) {
rc = -errno;
lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl;
err:
::close(server_setup_socket);
server_setup_socket = -1;
- return -errno;
+ return rc;
}
-int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
+int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
{
ldout(cct, 15) << __func__ << dendl;
- assert(sock);
+ ceph_assert(sock);
+
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
- int sd = ::accept(server_setup_socket, (sockaddr*)&ss, &slen);
+ int sd = accept_cloexec(server_setup_socket, (sockaddr*)&ss, &slen);
if (sd < 0) {
return -errno;
}
- net.set_close_on_exec(sd);
int r = net.set_nonblock(sd);
if (r < 0) {
::close(sd);
return -errno;
}
- assert(NULL != out); //out should not be NULL in accept connection
+ ceph_assert(NULL != out); //out should not be NULL in accept connection
+ out->set_type(addr_type);
out->set_sockaddr((sockaddr*)&ss);
net.set_priority(sd, opt.priority, out->get_family());
- RDMAConnectedSocketImpl *server;
+ RDMAConnectedSocketImpl* server;
//Worker* w = dispatcher->get_stack()->get_worker();
- RDMAConnTCPInfo conn_info = { sd };
- server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w), &conn_info);
+ server = new RDMAConnectedSocketImpl(cct, ib, dispatcher, dynamic_cast<RDMAWorker*>(w));
+ server->set_accept_fd(sd);
ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;
std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
*sock = ConnectedSocket(std::move(csi));
return 0;
}
-void RDMAServerConnTCP::abort_accept()
+void RDMAServerSocketImpl::abort_accept()
{
if (server_setup_socket >= 0)
::close(server_setup_socket);