]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / rdma / RDMAConnectedSocketImpl.cc
index 37a4eef4c03afddaac5e6827910e766a2a50225f..c897f94f4d5350bef88020b1afdd5d77e6307750 100644 (file)
  */
 #include "RDMAStack.h"
 
+class C_handle_connection_established : public EventCallback {
+  RDMAConnectedSocketImpl *csi;
+  bool active = true;
+ public:
+  C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {}
+  void do_request(uint64_t fd) final {
+    if (active)
+      csi->handle_connection_established();
+  }
+  void close() {
+    active = false;
+  }
+};
+
+class C_handle_connection_read : public EventCallback {
+  RDMAConnectedSocketImpl *csi;
+  bool active = true;
+ public:
+  explicit C_handle_connection_read(RDMAConnectedSocketImpl *w): csi(w) {}
+  void do_request(uint64_t fd) final {
+    if (active)
+      csi->handle_connection();
+  }
+  void close() {
+    active = false;
+  }
+};
+
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
 
-RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
-                                                RDMAWorker *w)
-  : cct(cct), connected(0), error(0), infiniband(ib),
-    dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
-    is_server(false), con_handler(new C_handle_connection(this)),
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib,
+                                                 shared_ptr<RDMADispatcher>& rdma_dispatcher,
+                                                 RDMAWorker *w)
+  : cct(cct), connected(0), error(0), ib(ib),
+    dispatcher(rdma_dispatcher), worker(w),
+    is_server(false), read_handler(new C_handle_connection_read(this)),
+    established_handler(new C_handle_connection_established(this)),
     active(false), pending(false)
 {
   if (!cct->_conf->ms_async_rdma_cm) {
-    qp = infiniband->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
-    my_msg.qpn = qp->get_local_qp_number();
-    my_msg.psn = qp->get_initial_psn();
-    my_msg.lid = infiniband->get_lid();
-    my_msg.peer_qpn = 0;
-    my_msg.gid = infiniband->get_gid();
+    qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
+    local_qpn = qp->get_local_qp_number();
     notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
     dispatcher->register_qp(qp, this);
     dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
@@ -45,7 +71,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
   ldout(cct, 20) << __func__ << " destruct." << dendl;
   cleanup();
   worker->remove_pending_conn(this);
-  dispatcher->erase_qpn(my_msg.qpn);
+  dispatcher->schedule_qp_destroy(local_qpn);
 
   for (unsigned i=0; i < wc.size(); ++i) {
     dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
@@ -54,7 +80,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
     dispatcher->post_chunk_to_pool(buffers[i]);
   }
 
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (notify_fd >= 0)
     ::close(notify_fd);
   if (tcp_fd >= 0)
@@ -64,7 +90,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
 
 void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (wc.empty())
     wc = std::move(v);
   else
@@ -74,7 +100,7 @@ void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
 
 void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   if (wc.empty())
     return ;
   w.swap(wc);
@@ -82,89 +108,20 @@ void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
 
 int RDMAConnectedSocketImpl::activate()
 {
-  ibv_qp_attr qpa;
-  int r;
-
-  // now connect up the qps and switch to RTR
-  memset(&qpa, 0, sizeof(qpa));
-  qpa.qp_state = IBV_QPS_RTR;
-  qpa.path_mtu = IBV_MTU_1024;
-  qpa.dest_qp_num = peer_msg.qpn;
-  qpa.rq_psn = peer_msg.psn;
-  qpa.max_dest_rd_atomic = 1;
-  qpa.min_rnr_timer = 12;
-  //qpa.ah_attr.is_global = 0;
-  qpa.ah_attr.is_global = 1;
-  qpa.ah_attr.grh.hop_limit = 6;
-  qpa.ah_attr.grh.dgid = peer_msg.gid;
-
-  qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
-
-  qpa.ah_attr.dlid = peer_msg.lid;
-  qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
-  qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
-  qpa.ah_attr.src_path_bits = 0;
-  qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
-
-  ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
-
-  r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
-      IBV_QP_AV |
-      IBV_QP_PATH_MTU |
-      IBV_QP_DEST_QPN |
-      IBV_QP_RQ_PSN |
-      IBV_QP_MIN_RNR_TIMER |
-      IBV_QP_MAX_DEST_RD_ATOMIC);
-  if (r) {
-    lderr(cct) << __func__ << " failed to transition to RTR state: "
-               << cpp_strerror(errno) << dendl;
+  qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn;
+  if (qp->modify_qp_to_rtr() != 0)
     return -1;
-  }
 
-  ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
-
-  // now move to RTS
-  qpa.qp_state = IBV_QPS_RTS;
-
-  // How long to wait before retrying if packet lost or server dead.
-  // Supposedly the timeout is 4.096us*2^timeout.  However, the actual
-  // timeout appears to be 4.096us*2^(timeout+1), so the setting
-  // below creates a 135ms timeout.
-  qpa.timeout = 14;
-
-  // How many times to retry after timeouts before giving up.
-  qpa.retry_cnt = 7;
-
-  // How many times to retry after RNR (receiver not ready) condition
-  // before giving up. Occurs when the remote side has not yet posted
-  // a receive request.
-  qpa.rnr_retry = 7; // 7 is infinite retry.
-  qpa.sq_psn = my_msg.psn;
-  qpa.max_rd_atomic = 1;
-
-  r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
-      IBV_QP_TIMEOUT |
-      IBV_QP_RETRY_CNT |
-      IBV_QP_RNR_RETRY |
-      IBV_QP_SQ_PSN |
-      IBV_QP_MAX_QP_RD_ATOMIC);
-  if (r) {
-    lderr(cct) << __func__ << " failed to transition to RTS state: "
-               << cpp_strerror(errno) << dendl;
+  if (qp->modify_qp_to_rts() != 0)
     return -1;
-  }
-
-  // the queue pair should be ready to use once the client has finished
-  // setting up their end.
-  ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
-  ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;
 
   if (!is_server) {
     connected = 1; //indicate successfully
-    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
+    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl;
     submit(false);
   }
   active = true;
+  peer_qpn = qp->get_local_cm_meta().peer_qpn;
 
   return 0;
 }
@@ -173,7 +130,14 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
   ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
                  << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
   NetHandler net(cct);
-  tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
+
+  // we construct a socket to transport ib sync message
+  // but we shouldn't block in tcp connecting
+  if (opts.nonblock) {
+    tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr);
+  } else {
+    tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
+  }
 
   if (tcp_fd < 0) {
     return -errno;
@@ -188,18 +152,44 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
 
   ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
   net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
-  my_msg.peer_qpn = 0;
-  r = infiniband->send_msg(cct, tcp_fd, my_msg);
-  if (r < 0)
-    return r;
+  r = 0;
+  if (opts.nonblock) {
+    worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler);
+  } else {
+    r = handle_connection_established(false);
+  }
+  return r;
+}
 
-  worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) {
+  ldout(cct, 20) << __func__ << " start " << dendl;
+  // delete read event
+  worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
+  if (1 == connected) {
+    ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl;
+    if (need_set_fault) {
+      fault();
+    }
+    return -1;
+  }
+  // send handshake msg to server
+  qp->get_local_cm_meta().peer_qpn = 0;
+  int r = qp->send_cm_meta(cct, tcp_fd);
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl;
+    if (need_set_fault) {
+      fault();
+    }
+    return r;
+  }
+  worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
+  ldout(cct, 20) << __func__ << " finish " << dendl;
   return 0;
 }
 
 void RDMAConnectedSocketImpl::handle_connection() {
-  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
-  int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
+  ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
+  int r = qp->recv_cm_meta(cct, tcp_fd);
   if (r <= 0) {
     if (r != -EAGAIN) {
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
@@ -215,37 +205,34 @@ void RDMAConnectedSocketImpl::handle_connection() {
     return;
   }
 
-  if (!is_server) {// syn + ack from server
-    my_msg.peer_qpn = peer_msg.qpn;
-    ldout(cct, 20) << __func__ << " peer msg :  < " << peer_msg.qpn << ", " << peer_msg.psn
-                   <<  ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
+  if (!is_server) {// first time: cm meta sync + ack from server
     if (!connected) {
       r = activate();
       ceph_assert(!r);
     }
     notify();
-    r = infiniband->send_msg(cct, tcp_fd, my_msg);
+    r = qp->send_cm_meta(cct, tcp_fd);
     if (r < 0) {
       ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
       fault();
     }
   } else {
-    if (peer_msg.peer_qpn == 0) {// syn from client
+    if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client
       if (active) {
         ldout(cct, 10) << __func__ << " server is already active." << dendl;
         return ;
       }
       r = activate();
       ceph_assert(!r);
-      r = infiniband->send_msg(cct, tcp_fd, my_msg);
+      r = qp->send_cm_meta(cct, tcp_fd);
       if (r < 0) {
         ldout(cct, 1) << __func__ << " server ack failed." << dendl;
         dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
         fault();
         return ;
       }
-    } else { // ack from client
+    } else { // second time: cm meta ack from client
       connected = 1;
       ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
       //cleanup();
@@ -257,157 +244,97 @@ void RDMAConnectedSocketImpl::handle_connection() {
 
 ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
 {
-  uint64_t i = 0;
-  int r = ::read(notify_fd, &i, sizeof(i));
-  ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
-  
+  eventfd_t event_val = 0;
+  int r = eventfd_read(notify_fd, &event_val);
+  ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn
+                 << " r = " << r << dendl;
+
   if (!active) {
     ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
     return -EAGAIN;
   }
-  
+
   if (0 == connected) {
     ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
     return -EAGAIN;
   }
   ssize_t read = 0;
-  if (!buffers.empty())
-    read = read_buffers(buf,len);
+  read = read_buffers(buf,len);
+
+  if (is_server && connected == 0) {
+    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl;
+    connected = 1; //if so, we don't need the last handshake
+    cleanup();
+    submit(false);
+  }
+
+  if (!buffers.empty()) {
+    notify();
+  }
+
+  if (read == 0 && error)
+    return -error;
+  return read == 0 ? -EAGAIN : read;
+}
 
+void RDMAConnectedSocketImpl::buffer_prefetch(void)
+{
   std::vector<ibv_wc> cqe;
   get_wc(cqe);
-  if (cqe.empty()) {
-    if (!buffers.empty()) {
-      notify();
-    }
-    if (read > 0) {
-      return read;
-    }
-    if (error) {
-      return -error;
-    } else {
-      return -EAGAIN;
-    }
-  }
+  if(cqe.empty())
+    return;
 
-  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
-  for (size_t i = 0; i < cqe.size(); ++i) {
+  for(size_t i = 0; i < cqe.size(); ++i) {
     ibv_wc* response = &cqe[i];
     ceph_assert(response->status == IBV_WC_SUCCESS);
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-    ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
     chunk->prepare_read(response->byte_len);
-    worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
-    if (response->byte_len == 0) {
+
+    if (chunk->get_size() == 0) {
+      chunk->reset_read_chunk();
       dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
       if (connected) {
         error = ECONNRESET;
         ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
       }
       dispatcher->post_chunk_to_pool(chunk);
+      continue;
     } else {
-      if (read == (ssize_t)len) {
-        buffers.push_back(chunk);
-        ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
-      } else if (read + response->byte_len > (ssize_t)len) {
-        read += chunk->read(buf+read, (ssize_t)len-read);
-        buffers.push_back(chunk);
-        ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
-      } else {
-        read += chunk->read(buf+read, response->byte_len);
-        dispatcher->post_chunk_to_pool(chunk);
-        update_post_backlog();
-      }
+      buffers.push_back(chunk);
+      ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
     }
   }
-
   worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
-  if (is_server && connected == 0) {
-    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
-    connected = 1; //if so, we don't need the last handshake
-    cleanup();
-    submit(false);
-  }
-
-  if (!buffers.empty()) {
-    notify();
-  }
-
-  if (read == 0 && error)
-    return -error;
-  return read == 0 ? -EAGAIN : read;
 }
 
 ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
 {
-  size_t read = 0, tmp = 0;
-  auto c = buffers.begin();
-  for (; c != buffers.end() ; ++c) {
-    tmp = (*c)->read(buf+read, len-read);
-    read += tmp;
-    ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound()  << ". Chunk:" << *c  << dendl;
-    if ((*c)->over()) {
-      dispatcher->post_chunk_to_pool(*c);
+  size_t read_size = 0, tmp = 0;
+  buffer_prefetch();
+  auto pchunk = buffers.begin();
+  while (pchunk != buffers.end()) {
+    tmp = (*pchunk)->read(buf + read_size, len - read_size);
+    read_size += tmp;
+    ldout(cct, 25) << __func__ << " read chunk " << *pchunk << " bytes length" << tmp << " offset: "
+                   << (*pchunk)->get_offset() << " ,bound: " << (*pchunk)->get_bound() << dendl;
+
+    if ((*pchunk)->get_size() == 0) {
+      (*pchunk)->reset_read_chunk();
+      dispatcher->post_chunk_to_pool(*pchunk);
       update_post_backlog();
-      ldout(cct, 25) << __func__ << " one chunk over." << dendl;
-    }
-    if (read == len) {
-      break;
+      ldout(cct, 25) << __func__ << " read over one chunk " << dendl;
+      pchunk++;
     }
-  }
 
-  if (c != buffers.end() && (*c)->over())
-    ++c;
-  buffers.erase(buffers.begin(), c);
-  ldout(cct, 25) << __func__ << " got " << read  << " bytes, buffers size: " << buffers.size() << dendl;
-  return read;
-}
-
-ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
-{
-  if (error)
-    return -error;
-  static const int MAX_COMPLETIONS = 16;
-  ibv_wc wc[MAX_COMPLETIONS];
-  ssize_t size = 0;
-
-  ibv_wc*  response;
-  Chunk* chunk;
-  bool loaded = false;
-  auto iter = buffers.begin();
-  if (iter != buffers.end()) {
-    chunk = *iter;
-    // FIXME need to handle release
-    // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
-    buffers.erase(iter);
-    loaded = true;
-    size = chunk->bound;
-  }
-
-  std::vector<ibv_wc> cqe;
-  get_wc(cqe);
-  if (cqe.empty())
-    return size == 0 ? -EAGAIN : size;
-
-  ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
-
-  for (size_t i = 0; i < cqe.size(); ++i) {
-    response = &wc[i];
-    chunk = reinterpret_cast<Chunk*>(response->wr_id);
-    chunk->prepare_read(response->byte_len);
-    if (!loaded && i == 0) {
-      // FIXME need to handle release
-      // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
-      size = chunk->bound;
-      continue;
+    if (read_size == len) {
+      break;
     }
-    buffers.push_back(chunk);
-    iter++;
   }
 
-  if (size == 0)
-    return -EAGAIN;
-  return size;
+  buffers.erase(buffers.begin(), pchunk);
+  ldout(cct, 25) << __func__ << " got " << read_size  << " bytes, buffers size: " << buffers.size() << dendl;
+  worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
+  return read_size;
 }
 
 ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
@@ -421,119 +348,121 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
   if (!bytes)
     return 0;
   {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     pending_bl.claim_append(bl);
     if (!connected) {
-      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
+      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl;
       return bytes;
     }
   }
-  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
+  ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl;
   ssize_t r = submit(more);
   if (r < 0 && r != -EAGAIN)
     return r;
   return bytes;
 }
 
+size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector<Chunk*> &tx_buffers,
+    size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start,
+    const decltype(std::cbegin(pending_bl.buffers()))& end)
+{
+  ceph_assert(start != end);
+  auto chunk_idx = tx_buffers.size();
+  if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) {
+    ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
+    worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
+    return 0;
+  }
+
+  Chunk *current_chunk = tx_buffers[chunk_idx];
+  size_t write_len = 0;
+  while (start != end) {
+    const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
+
+    size_t slice_write_len = 0;
+    while (slice_write_len < start->length()) {
+      size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len);
+
+      slice_write_len += real_len;
+      write_len += real_len;
+      req_copy_len -= real_len;
+
+      if (current_chunk->full()) {
+        if (++chunk_idx == tx_buffers.size())
+          return write_len;
+        current_chunk = tx_buffers[chunk_idx];
+      }
+    }
+
+    ++start;
+  }
+  ceph_assert(req_copy_len == 0);
+  return write_len;
+}
+
 ssize_t RDMAConnectedSocketImpl::submit(bool more)
 {
   if (error)
     return -error;
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   size_t bytes = pending_bl.length();
   ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
-                 << pending_bl.buffers().size() << dendl;
+                 << pending_bl.get_num_buffers() << dendl;
   if (!bytes)
     return 0;
 
-  auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers,
-                                 unsigned bytes,
-                                 auto& start,
-                                 const auto& end) -> unsigned {
-    ceph_assert(start != end);
-    auto chunk_idx = tx_buffers.size();
-    int ret = worker->get_reged_mem(this, tx_buffers, bytes);
-    if (ret == 0) {
-      ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
-      worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
-      return 0;
-    }
-
-    unsigned total_copied = 0;
-    Chunk *current_chunk = tx_buffers[chunk_idx];
-    while (start != end) {
-      const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
-      unsigned copied = 0;
-      while (copied < start->length()) {
-        uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
-        copied += r;
-        total_copied += r;
-        bytes -= r;
-        if (current_chunk->full()){
-          if (++chunk_idx == tx_buffers.size())
-            return total_copied;
-          current_chunk = tx_buffers[chunk_idx];
-        }
-      }
-      ++start;
-    }
-    ceph_assert(bytes == 0);
-    return total_copied;
-  };
-
   std::vector<Chunk*> tx_buffers;
   auto it = std::cbegin(pending_bl.buffers());
-  auto copy_it = it;
-  unsigned total = 0;
-  unsigned need_reserve_bytes = 0;
+  auto copy_start = it;
+  size_t total_copied = 0, wait_copy_len = 0;
   while (it != pending_bl.buffers().end()) {
-    if (infiniband->is_tx_buffer(it->raw_c_str())) {
-      if (need_reserve_bytes) {
-        unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
-        total += copied;
-        if (copied < need_reserve_bytes)
+    if (ib->is_tx_buffer(it->raw_c_str())) {
+      if (wait_copy_len) {
+        size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
+        total_copied += copied;
+        if (copied < wait_copy_len)
           goto sending;
-        need_reserve_bytes = 0;
+        wait_copy_len = 0;
       }
-      ceph_assert(copy_it == it);
-      tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
-      total += it->length();
-      ++copy_it;
+      ceph_assert(copy_start == it);
+      tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()));
+      total_copied += it->length();
+      ++copy_start;
     } else {
-      need_reserve_bytes += it->length();
+      wait_copy_len += it->length();
     }
     ++it;
   }
-  if (need_reserve_bytes)
-    total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
+  if (wait_copy_len)
+    total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
 
  sending:
-  if (total == 0)
+  if (total_copied == 0)
     return -EAGAIN;
-  ceph_assert(total <= pending_bl.length());
+  ceph_assert(total_copied <= pending_bl.length());
   bufferlist swapped;
-  if (total < pending_bl.length()) {
+  if (total_copied < pending_bl.length()) {
     worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
-    pending_bl.splice(total, pending_bl.length()-total, &swapped);
+    pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped);
     pending_bl.swap(swapped);
   } else {
     pending_bl.clear();
   }
 
   ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
-                 << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
+                 << pending_bl.get_num_buffers() << " tx chunks " << tx_buffers.size() << dendl;
 
   int r = post_work_request(tx_buffers);
   if (r < 0)
     return r;
 
-  ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;
+  ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl;
   return pending_bl.length() ? -EAGAIN : 0;
 }
 
 int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
 {
-  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
+  ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
   vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
   ibv_sge isge[tx_buffers.size()];
   uint32_t current_sge = 0;
@@ -558,10 +487,6 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
     iswr[current_swr].num_sge = 1;
     iswr[current_swr].opcode = IBV_WR_SEND;
     iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
-    /*if (isge[current_sge].length < infiniband->max_inline_data) {
-      iswr[current_swr].send_flags = IBV_SEND_INLINE;
-      ldout(cct, 20) << __func__ << " send_inline." << dendl;
-      }*/
 
     num++;
     worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
@@ -573,7 +498,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
     ++current_buffer;
   }
 
-  ibv_send_wr *bad_tx_work_request;
+  ibv_send_wr *bad_tx_work_request = nullptr;
   if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
     ldout(cct, 1) << __func__ << " failed to send data"
                   << " (most probably should be peer not ready): "
@@ -581,9 +506,8 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
     worker->perf_logger->inc(l_msgr_rdma_tx_failed);
     return -errno;
   }
-  qp->add_tx_wr(num);
   worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
-  ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+  ldout(cct, 20) << __func__ << " qp state is " << get_qp_state() << dendl;
   return 0;
 }
 
@@ -596,7 +520,7 @@ void RDMAConnectedSocketImpl::fin() {
   wr.num_sge = 0;
   wr.opcode = IBV_WR_SEND;
   wr.send_flags = IBV_SEND_SIGNALED;
-  ibv_send_wr* bad_tx_work_request;
+  ibv_send_wr* bad_tx_work_request = nullptr;
   if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
     ldout(cct, 1) << __func__ << " failed to send message="
                   << " ibv_post_send failed(most probably should be peer not ready): "
@@ -604,27 +528,29 @@ void RDMAConnectedSocketImpl::fin() {
     worker->perf_logger->inc(l_msgr_rdma_tx_failed);
     return ;
   }
-  qp->add_tx_wr(1);
 }
 
 void RDMAConnectedSocketImpl::cleanup() {
-  if (con_handler && tcp_fd >= 0) {
-    (static_cast<C_handle_connection*>(con_handler))->close();
+  if (read_handler && tcp_fd >= 0) {
+    (static_cast<C_handle_connection_read*>(read_handler))->close();
     worker->center.submit_to(worker->center.get_id(), [this]() {
-      worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
+      worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
     }, false);
-    delete con_handler;
-    con_handler = nullptr;
+    delete read_handler;
+    read_handler = nullptr;
+  }
+  if (established_handler) {
+    (static_cast<C_handle_connection_established*>(established_handler))->close();
+    delete established_handler;
+    established_handler = nullptr;
   }
 }
 
 void RDMAConnectedSocketImpl::notify()
 {
-  // note: notify_fd is an event fd (man eventfd)
-  // write argument must be a 64bit integer
-  uint64_t i = 1;
-
-  ceph_assert(sizeof(i) == write(notify_fd, &i, sizeof(i)));
+  eventfd_t event_val = 1;
+  int r = eventfd_write(notify_fd, event_val);
+  ceph_assert(r == 0);
 }
 
 void RDMAConnectedSocketImpl::shutdown()
@@ -646,10 +572,6 @@ void RDMAConnectedSocketImpl::close()
 void RDMAConnectedSocketImpl::fault()
 {
   ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
-  /*if (qp) {
-    qp->to_dead();
-    qp = NULL;
-    }*/
   error = ECONNRESET;
   connected = 1;
   notify();
@@ -660,17 +582,17 @@ void RDMAConnectedSocketImpl::set_accept_fd(int sd)
   tcp_fd = sd;
   is_server = true;
   worker->center.submit_to(worker->center.get_id(), [this]() {
-                          worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+                          worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
                           }, true);
 }
 
 void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
 {
-  post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp());
+  post_backlog += num - ib->post_chunks_to_rq(num, qp);
 }
 
 void RDMAConnectedSocketImpl::update_post_backlog()
 {
   if (post_backlog)
-    post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp->get_qp());
+    post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp);
 }