*/
#include "RDMAStack.h"
+class C_handle_connection_established : public EventCallback {
+ RDMAConnectedSocketImpl *csi;
+ bool active = true;
+ public:
+ C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {}
+ void do_request(uint64_t fd) final {
+ if (active)
+ csi->handle_connection_established();
+ }
+ void close() {
+ active = false;
+ }
+};
+
+class C_handle_connection_read : public EventCallback {
+ RDMAConnectedSocketImpl *csi;
+ bool active = true;
+ public:
+ explicit C_handle_connection_read(RDMAConnectedSocketImpl *w): csi(w) {}
+ void do_request(uint64_t fd) final {
+ if (active)
+ csi->handle_connection();
+ }
+ void close() {
+ active = false;
+ }
+};
+
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
-RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
- RDMAWorker *w)
- : cct(cct), connected(0), error(0), infiniband(ib),
- dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
- is_server(false), con_handler(new C_handle_connection(this)),
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<Infiniband> &ib,
+ shared_ptr<RDMADispatcher>& rdma_dispatcher,
+ RDMAWorker *w)
+ : cct(cct), connected(0), error(0), ib(ib),
+ dispatcher(rdma_dispatcher), worker(w),
+ is_server(false), read_handler(new C_handle_connection_read(this)),
+ established_handler(new C_handle_connection_established(this)),
active(false), pending(false)
{
if (!cct->_conf->ms_async_rdma_cm) {
- qp = infiniband->create_queue_pair(cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC, NULL);
- my_msg.qpn = qp->get_local_qp_number();
- my_msg.psn = qp->get_initial_psn();
- my_msg.lid = infiniband->get_lid();
- my_msg.peer_qpn = 0;
- my_msg.gid = infiniband->get_gid();
+ qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
+ local_qpn = qp->get_local_qp_number();
notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
ldout(cct, 20) << __func__ << " destruct." << dendl;
cleanup();
worker->remove_pending_conn(this);
- dispatcher->erase_qpn(my_msg.qpn);
+ dispatcher->schedule_qp_destroy(local_qpn);
for (unsigned i=0; i < wc.size(); ++i) {
dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
dispatcher->post_chunk_to_pool(buffers[i]);
}
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (notify_fd >= 0)
::close(notify_fd);
if (tcp_fd >= 0)
void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (wc.empty())
wc = std::move(v);
else
void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (wc.empty())
return ;
w.swap(wc);
int RDMAConnectedSocketImpl::activate()
{
- ibv_qp_attr qpa;
- int r;
-
- // now connect up the qps and switch to RTR
- memset(&qpa, 0, sizeof(qpa));
- qpa.qp_state = IBV_QPS_RTR;
- qpa.path_mtu = IBV_MTU_1024;
- qpa.dest_qp_num = peer_msg.qpn;
- qpa.rq_psn = peer_msg.psn;
- qpa.max_dest_rd_atomic = 1;
- qpa.min_rnr_timer = 12;
- //qpa.ah_attr.is_global = 0;
- qpa.ah_attr.is_global = 1;
- qpa.ah_attr.grh.hop_limit = 6;
- qpa.ah_attr.grh.dgid = peer_msg.gid;
-
- qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
-
- qpa.ah_attr.dlid = peer_msg.lid;
- qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
- qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
- qpa.ah_attr.src_path_bits = 0;
- qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
-
- ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
-
- r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
- IBV_QP_AV |
- IBV_QP_PATH_MTU |
- IBV_QP_DEST_QPN |
- IBV_QP_RQ_PSN |
- IBV_QP_MIN_RNR_TIMER |
- IBV_QP_MAX_DEST_RD_ATOMIC);
- if (r) {
- lderr(cct) << __func__ << " failed to transition to RTR state: "
- << cpp_strerror(errno) << dendl;
+ qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn;
+ if (qp->modify_qp_to_rtr() != 0)
return -1;
- }
- ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
-
- // now move to RTS
- qpa.qp_state = IBV_QPS_RTS;
-
- // How long to wait before retrying if packet lost or server dead.
- // Supposedly the timeout is 4.096us*2^timeout. However, the actual
- // timeout appears to be 4.096us*2^(timeout+1), so the setting
- // below creates a 135ms timeout.
- qpa.timeout = 14;
-
- // How many times to retry after timeouts before giving up.
- qpa.retry_cnt = 7;
-
- // How many times to retry after RNR (receiver not ready) condition
- // before giving up. Occurs when the remote side has not yet posted
- // a receive request.
- qpa.rnr_retry = 7; // 7 is infinite retry.
- qpa.sq_psn = my_msg.psn;
- qpa.max_rd_atomic = 1;
-
- r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
- IBV_QP_TIMEOUT |
- IBV_QP_RETRY_CNT |
- IBV_QP_RNR_RETRY |
- IBV_QP_SQ_PSN |
- IBV_QP_MAX_QP_RD_ATOMIC);
- if (r) {
- lderr(cct) << __func__ << " failed to transition to RTS state: "
- << cpp_strerror(errno) << dendl;
+ if (qp->modify_qp_to_rts() != 0)
return -1;
- }
-
- // the queue pair should be ready to use once the client has finished
- // setting up their end.
- ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
- ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;
if (!is_server) {
connected = 1; //indicate successfully
- ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
+ ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl;
submit(false);
}
active = true;
+ peer_qpn = qp->get_local_cm_meta().peer_qpn;
return 0;
}
ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
<< opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
NetHandler net(cct);
- tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
+
+ // we construct a socket to transport ib sync message
+ // but we shouldn't block in tcp connecting
+ if (opts.nonblock) {
+ tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr);
+ } else {
+ tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
+ }
if (tcp_fd < 0) {
return -errno;
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
- my_msg.peer_qpn = 0;
- r = infiniband->send_msg(cct, tcp_fd, my_msg);
- if (r < 0)
- return r;
+ r = 0;
+ if (opts.nonblock) {
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler);
+ } else {
+ r = handle_connection_established(false);
+ }
+ return r;
+}
- worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) {
+ ldout(cct, 20) << __func__ << " start " << dendl;
+ // delete read event
+ worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
+ if (1 == connected) {
+ ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl;
+ if (need_set_fault) {
+ fault();
+ }
+ return -1;
+ }
+ // send handshake msg to server
+ qp->get_local_cm_meta().peer_qpn = 0;
+ int r = qp->send_cm_meta(cct, tcp_fd);
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl;
+ if (need_set_fault) {
+ fault();
+ }
+ return r;
+ }
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
+ ldout(cct, 20) << __func__ << " finish " << dendl;
return 0;
}
void RDMAConnectedSocketImpl::handle_connection() {
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
- int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
+ ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
+ int r = qp->recv_cm_meta(cct, tcp_fd);
if (r <= 0) {
if (r != -EAGAIN) {
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
return;
}
- if (!is_server) {// syn + ack from server
- my_msg.peer_qpn = peer_msg.qpn;
- ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn
- << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
+ if (!is_server) {// first time: cm meta sync + ack from server
if (!connected) {
r = activate();
ceph_assert(!r);
}
notify();
- r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ r = qp->send_cm_meta(cct, tcp_fd);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
}
} else {
- if (peer_msg.peer_qpn == 0) {// syn from client
+ if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client
if (active) {
ldout(cct, 10) << __func__ << " server is already active." << dendl;
return ;
}
r = activate();
ceph_assert(!r);
- r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ r = qp->send_cm_meta(cct, tcp_fd);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
return ;
}
- } else { // ack from client
+ } else { // second time: cm meta ack from client
connected = 1;
ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
//cleanup();
ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
{
- uint64_t i = 0;
- int r = ::read(notify_fd, &i, sizeof(i));
- ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
-
+ eventfd_t event_val = 0;
+ int r = eventfd_read(notify_fd, &event_val);
+ ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn
+ << " r = " << r << dendl;
+
if (!active) {
ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
return -EAGAIN;
}
-
+
if (0 == connected) {
ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
return -EAGAIN;
}
ssize_t read = 0;
- if (!buffers.empty())
- read = read_buffers(buf,len);
+ read = read_buffers(buf,len);
+
+ if (is_server && connected == 0) {
+ ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl;
+ connected = 1; //if so, we don't need the last handshake
+ cleanup();
+ submit(false);
+ }
+
+ if (!buffers.empty()) {
+ notify();
+ }
+
+ if (read == 0 && error)
+ return -error;
+ return read == 0 ? -EAGAIN : read;
+}
+void RDMAConnectedSocketImpl::buffer_prefetch(void)
+{
std::vector<ibv_wc> cqe;
get_wc(cqe);
- if (cqe.empty()) {
- if (!buffers.empty()) {
- notify();
- }
- if (read > 0) {
- return read;
- }
- if (error) {
- return -error;
- } else {
- return -EAGAIN;
- }
- }
+ if(cqe.empty())
+ return;
- ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
- for (size_t i = 0; i < cqe.size(); ++i) {
+ for(size_t i = 0; i < cqe.size(); ++i) {
ibv_wc* response = &cqe[i];
ceph_assert(response->status == IBV_WC_SUCCESS);
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
- ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
chunk->prepare_read(response->byte_len);
- worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
- if (response->byte_len == 0) {
+
+ if (chunk->get_size() == 0) {
+ chunk->reset_read_chunk();
dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
if (connected) {
error = ECONNRESET;
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
dispatcher->post_chunk_to_pool(chunk);
+ continue;
} else {
- if (read == (ssize_t)len) {
- buffers.push_back(chunk);
- ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
- } else if (read + response->byte_len > (ssize_t)len) {
- read += chunk->read(buf+read, (ssize_t)len-read);
- buffers.push_back(chunk);
- ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
- } else {
- read += chunk->read(buf+read, response->byte_len);
- dispatcher->post_chunk_to_pool(chunk);
- update_post_backlog();
- }
+ buffers.push_back(chunk);
+ ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
}
}
-
worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
- if (is_server && connected == 0) {
- ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
- connected = 1; //if so, we don't need the last handshake
- cleanup();
- submit(false);
- }
-
- if (!buffers.empty()) {
- notify();
- }
-
- if (read == 0 && error)
- return -error;
- return read == 0 ? -EAGAIN : read;
}
ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
{
- size_t read = 0, tmp = 0;
- auto c = buffers.begin();
- for (; c != buffers.end() ; ++c) {
- tmp = (*c)->read(buf+read, len-read);
- read += tmp;
- ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
- if ((*c)->over()) {
- dispatcher->post_chunk_to_pool(*c);
+ size_t read_size = 0, tmp = 0;
+ buffer_prefetch();
+ auto pchunk = buffers.begin();
+ while (pchunk != buffers.end()) {
+ tmp = (*pchunk)->read(buf + read_size, len - read_size);
+ read_size += tmp;
+ ldout(cct, 25) << __func__ << " read chunk " << *pchunk << " bytes length" << tmp << " offset: "
+ << (*pchunk)->get_offset() << " ,bound: " << (*pchunk)->get_bound() << dendl;
+
+ if ((*pchunk)->get_size() == 0) {
+ (*pchunk)->reset_read_chunk();
+ dispatcher->post_chunk_to_pool(*pchunk);
update_post_backlog();
- ldout(cct, 25) << __func__ << " one chunk over." << dendl;
- }
- if (read == len) {
- break;
+ ldout(cct, 25) << __func__ << " read over one chunk " << dendl;
+ pchunk++;
}
- }
- if (c != buffers.end() && (*c)->over())
- ++c;
- buffers.erase(buffers.begin(), c);
- ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl;
- return read;
-}
-
-ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
-{
- if (error)
- return -error;
- static const int MAX_COMPLETIONS = 16;
- ibv_wc wc[MAX_COMPLETIONS];
- ssize_t size = 0;
-
- ibv_wc* response;
- Chunk* chunk;
- bool loaded = false;
- auto iter = buffers.begin();
- if (iter != buffers.end()) {
- chunk = *iter;
- // FIXME need to handle release
- // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
- buffers.erase(iter);
- loaded = true;
- size = chunk->bound;
- }
-
- std::vector<ibv_wc> cqe;
- get_wc(cqe);
- if (cqe.empty())
- return size == 0 ? -EAGAIN : size;
-
- ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
-
- for (size_t i = 0; i < cqe.size(); ++i) {
- response = &wc[i];
- chunk = reinterpret_cast<Chunk*>(response->wr_id);
- chunk->prepare_read(response->byte_len);
- if (!loaded && i == 0) {
- // FIXME need to handle release
- // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
- size = chunk->bound;
- continue;
+ if (read_size == len) {
+ break;
}
- buffers.push_back(chunk);
- iter++;
}
- if (size == 0)
- return -EAGAIN;
- return size;
+ buffers.erase(buffers.begin(), pchunk);
+ ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
+ return read_size;
}
ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
if (!bytes)
return 0;
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
pending_bl.claim_append(bl);
if (!connected) {
- ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
+ ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl;
return bytes;
}
}
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl;
ssize_t r = submit(more);
if (r < 0 && r != -EAGAIN)
return r;
return bytes;
}
+size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector<Chunk*> &tx_buffers,
+ size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start,
+ const decltype(std::cbegin(pending_bl.buffers()))& end)
+{
+ ceph_assert(start != end);
+ auto chunk_idx = tx_buffers.size();
+ if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) {
+ ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
+ return 0;
+ }
+
+ Chunk *current_chunk = tx_buffers[chunk_idx];
+ size_t write_len = 0;
+ while (start != end) {
+ const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
+
+ size_t slice_write_len = 0;
+ while (slice_write_len < start->length()) {
+ size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len);
+
+ slice_write_len += real_len;
+ write_len += real_len;
+ req_copy_len -= real_len;
+
+ if (current_chunk->full()) {
+ if (++chunk_idx == tx_buffers.size())
+ return write_len;
+ current_chunk = tx_buffers[chunk_idx];
+ }
+ }
+
+ ++start;
+ }
+ ceph_assert(req_copy_len == 0);
+ return write_len;
+}
+
ssize_t RDMAConnectedSocketImpl::submit(bool more)
{
if (error)
return -error;
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
size_t bytes = pending_bl.length();
ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
- << pending_bl.buffers().size() << dendl;
+ << pending_bl.get_num_buffers() << dendl;
if (!bytes)
return 0;
- auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers,
- unsigned bytes,
- auto& start,
- const auto& end) -> unsigned {
- ceph_assert(start != end);
- auto chunk_idx = tx_buffers.size();
- int ret = worker->get_reged_mem(this, tx_buffers, bytes);
- if (ret == 0) {
- ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
- worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
- return 0;
- }
-
- unsigned total_copied = 0;
- Chunk *current_chunk = tx_buffers[chunk_idx];
- while (start != end) {
- const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
- unsigned copied = 0;
- while (copied < start->length()) {
- uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
- copied += r;
- total_copied += r;
- bytes -= r;
- if (current_chunk->full()){
- if (++chunk_idx == tx_buffers.size())
- return total_copied;
- current_chunk = tx_buffers[chunk_idx];
- }
- }
- ++start;
- }
- ceph_assert(bytes == 0);
- return total_copied;
- };
-
std::vector<Chunk*> tx_buffers;
auto it = std::cbegin(pending_bl.buffers());
- auto copy_it = it;
- unsigned total = 0;
- unsigned need_reserve_bytes = 0;
+ auto copy_start = it;
+ size_t total_copied = 0, wait_copy_len = 0;
while (it != pending_bl.buffers().end()) {
- if (infiniband->is_tx_buffer(it->raw_c_str())) {
- if (need_reserve_bytes) {
- unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
- total += copied;
- if (copied < need_reserve_bytes)
+ if (ib->is_tx_buffer(it->raw_c_str())) {
+ if (wait_copy_len) {
+ size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
+ total_copied += copied;
+ if (copied < wait_copy_len)
goto sending;
- need_reserve_bytes = 0;
+ wait_copy_len = 0;
}
- ceph_assert(copy_it == it);
- tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
- total += it->length();
- ++copy_it;
+ ceph_assert(copy_start == it);
+ tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()));
+ total_copied += it->length();
+ ++copy_start;
} else {
- need_reserve_bytes += it->length();
+ wait_copy_len += it->length();
}
++it;
}
- if (need_reserve_bytes)
- total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
+ if (wait_copy_len)
+ total_copied += tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
sending:
- if (total == 0)
+ if (total_copied == 0)
return -EAGAIN;
- ceph_assert(total <= pending_bl.length());
+ ceph_assert(total_copied <= pending_bl.length());
bufferlist swapped;
- if (total < pending_bl.length()) {
+ if (total_copied < pending_bl.length()) {
worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
- pending_bl.splice(total, pending_bl.length()-total, &swapped);
+ pending_bl.splice(total_copied, pending_bl.length() - total_copied, &swapped);
pending_bl.swap(swapped);
} else {
pending_bl.clear();
}
ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
- << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
+ << pending_bl.get_num_buffers() << " tx chunks " << tx_buffers.size() << dendl;
int r = post_work_request(tx_buffers);
if (r < 0)
return r;
- ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;
+ ldout(cct, 20) << __func__ << " finished sending " << total_copied << " bytes." << dendl;
return pending_bl.length() ? -EAGAIN : 0;
}
int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
{
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
ibv_sge isge[tx_buffers.size()];
uint32_t current_sge = 0;
iswr[current_swr].num_sge = 1;
iswr[current_swr].opcode = IBV_WR_SEND;
iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
- /*if (isge[current_sge].length < infiniband->max_inline_data) {
- iswr[current_swr].send_flags = IBV_SEND_INLINE;
- ldout(cct, 20) << __func__ << " send_inline." << dendl;
- }*/
num++;
worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
++current_buffer;
}
- ibv_send_wr *bad_tx_work_request;
+ ibv_send_wr *bad_tx_work_request = nullptr;
if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
ldout(cct, 1) << __func__ << " failed to send data"
<< " (most probably should be peer not ready): "
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return -errno;
}
- qp->add_tx_wr(num);
worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
- ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+ ldout(cct, 20) << __func__ << " qp state is " << get_qp_state() << dendl;
return 0;
}
wr.num_sge = 0;
wr.opcode = IBV_WR_SEND;
wr.send_flags = IBV_SEND_SIGNALED;
- ibv_send_wr* bad_tx_work_request;
+ ibv_send_wr* bad_tx_work_request = nullptr;
if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
ldout(cct, 1) << __func__ << " failed to send message="
<< " ibv_post_send failed(most probably should be peer not ready): "
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return ;
}
- qp->add_tx_wr(1);
}
void RDMAConnectedSocketImpl::cleanup() {
- if (con_handler && tcp_fd >= 0) {
- (static_cast<C_handle_connection*>(con_handler))->close();
+ if (read_handler && tcp_fd >= 0) {
+ (static_cast<C_handle_connection_read*>(read_handler))->close();
worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
+ worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
}, false);
- delete con_handler;
- con_handler = nullptr;
+ delete read_handler;
+ read_handler = nullptr;
+ }
+ if (established_handler) {
+ (static_cast<C_handle_connection_established*>(established_handler))->close();
+ delete established_handler;
+ established_handler = nullptr;
}
}
void RDMAConnectedSocketImpl::notify()
{
- // note: notify_fd is an event fd (man eventfd)
- // write argument must be a 64bit integer
- uint64_t i = 1;
-
- ceph_assert(sizeof(i) == write(notify_fd, &i, sizeof(i)));
+ eventfd_t event_val = 1;
+ int r = eventfd_write(notify_fd, event_val);
+ ceph_assert(r == 0);
}
void RDMAConnectedSocketImpl::shutdown()
void RDMAConnectedSocketImpl::fault()
{
ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
- /*if (qp) {
- qp->to_dead();
- qp = NULL;
- }*/
error = ECONNRESET;
connected = 1;
notify();
tcp_fd = sd;
is_server = true;
worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE, read_handler);
}, true);
}
void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
{
- post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp());
+ post_backlog += num - ib->post_chunks_to_rq(num, qp);
}
void RDMAConnectedSocketImpl::update_post_backlog()
{
if (post_backlog)
- post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp->get_qp());
+ post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp);
}