1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
16 #include "RDMAStack.h"
18 #define dout_subsys ceph_subsys_ms
20 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
22 RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext
*cct
, Infiniband
* ib
, RDMADispatcher
* s
,
24 : cct(cct
), connected(0), error(0), infiniband(ib
),
25 dispatcher(s
), worker(w
), lock("RDMAConnectedSocketImpl::lock"),
26 is_server(false), con_handler(new C_handle_connection(this)),
27 active(false), pending(false)
29 if (!cct
->_conf
->ms_async_rdma_cm
) {
30 qp
= infiniband
->create_queue_pair(cct
, s
->get_tx_cq(), s
->get_rx_cq(), IBV_QPT_RC
, NULL
);
31 my_msg
.qpn
= qp
->get_local_qp_number();
32 my_msg
.psn
= qp
->get_initial_psn();
33 my_msg
.lid
= infiniband
->get_lid();
35 my_msg
.gid
= infiniband
->get_gid();
36 notify_fd
= eventfd(0, EFD_CLOEXEC
|EFD_NONBLOCK
);
37 dispatcher
->register_qp(qp
, this);
38 dispatcher
->perf_logger
->inc(l_msgr_rdma_created_queue_pair
);
39 dispatcher
->perf_logger
->inc(l_msgr_rdma_active_queue_pair
);
43 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
45 ldout(cct
, 20) << __func__
<< " destruct." << dendl
;
47 worker
->remove_pending_conn(this);
48 dispatcher
->erase_qpn(my_msg
.qpn
);
50 for (unsigned i
=0; i
< wc
.size(); ++i
) {
51 dispatcher
->post_chunk_to_pool(reinterpret_cast<Chunk
*>(wc
[i
].wr_id
));
53 for (unsigned i
=0; i
< buffers
.size(); ++i
) {
54 dispatcher
->post_chunk_to_pool(buffers
[i
]);
57 Mutex::Locker
l(lock
);
65 void RDMAConnectedSocketImpl::pass_wc(std::vector
<ibv_wc
> &&v
)
67 Mutex::Locker
l(lock
);
71 wc
.insert(wc
.end(), v
.begin(), v
.end());
75 void RDMAConnectedSocketImpl::get_wc(std::vector
<ibv_wc
> &w
)
77 Mutex::Locker
l(lock
);
83 int RDMAConnectedSocketImpl::activate()
88 // now connect up the qps and switch to RTR
89 memset(&qpa
, 0, sizeof(qpa
));
90 qpa
.qp_state
= IBV_QPS_RTR
;
91 qpa
.path_mtu
= IBV_MTU_1024
;
92 qpa
.dest_qp_num
= peer_msg
.qpn
;
93 qpa
.rq_psn
= peer_msg
.psn
;
94 qpa
.max_dest_rd_atomic
= 1;
95 qpa
.min_rnr_timer
= 12;
96 //qpa.ah_attr.is_global = 0;
97 qpa
.ah_attr
.is_global
= 1;
98 qpa
.ah_attr
.grh
.hop_limit
= 6;
99 qpa
.ah_attr
.grh
.dgid
= peer_msg
.gid
;
101 qpa
.ah_attr
.grh
.sgid_index
= infiniband
->get_device()->get_gid_idx();
103 qpa
.ah_attr
.dlid
= peer_msg
.lid
;
104 qpa
.ah_attr
.sl
= cct
->_conf
->ms_async_rdma_sl
;
105 qpa
.ah_attr
.grh
.traffic_class
= cct
->_conf
->ms_async_rdma_dscp
;
106 qpa
.ah_attr
.src_path_bits
= 0;
107 qpa
.ah_attr
.port_num
= (uint8_t)(infiniband
->get_ib_physical_port());
109 ldout(cct
, 20) << __func__
<< " Choosing gid_index " << (int)qpa
.ah_attr
.grh
.sgid_index
<< ", sl " << (int)qpa
.ah_attr
.sl
<< dendl
;
111 r
= ibv_modify_qp(qp
->get_qp(), &qpa
, IBV_QP_STATE
|
116 IBV_QP_MIN_RNR_TIMER
|
117 IBV_QP_MAX_DEST_RD_ATOMIC
);
119 lderr(cct
) << __func__
<< " failed to transition to RTR state: "
120 << cpp_strerror(errno
) << dendl
;
124 ldout(cct
, 20) << __func__
<< " transition to RTR state successfully." << dendl
;
127 qpa
.qp_state
= IBV_QPS_RTS
;
129 // How long to wait before retrying if packet lost or server dead.
130 // Supposedly the timeout is 4.096us*2^timeout. However, the actual
131 // timeout appears to be 4.096us*2^(timeout+1), so the setting
132 // below creates a 135ms timeout.
135 // How many times to retry after timeouts before giving up.
138 // How many times to retry after RNR (receiver not ready) condition
139 // before giving up. Occurs when the remote side has not yet posted
140 // a receive request.
141 qpa
.rnr_retry
= 7; // 7 is infinite retry.
142 qpa
.sq_psn
= my_msg
.psn
;
143 qpa
.max_rd_atomic
= 1;
145 r
= ibv_modify_qp(qp
->get_qp(), &qpa
, IBV_QP_STATE
|
150 IBV_QP_MAX_QP_RD_ATOMIC
);
152 lderr(cct
) << __func__
<< " failed to transition to RTS state: "
153 << cpp_strerror(errno
) << dendl
;
157 // the queue pair should be ready to use once the client has finished
158 // setting up their end.
159 ldout(cct
, 20) << __func__
<< " transition to RTS state successfully." << dendl
;
160 ldout(cct
, 20) << __func__
<< " QueuePair: " << qp
<< " with qp:" << qp
->get_qp() << dendl
;
163 connected
= 1; //indicate successfully
164 ldout(cct
, 20) << __func__
<< " handle fake send, wake it up. QP: " << my_msg
.qpn
<< dendl
;
172 int RDMAConnectedSocketImpl::try_connect(const entity_addr_t
& peer_addr
, const SocketOptions
&opts
) {
173 ldout(cct
, 20) << __func__
<< " nonblock:" << opts
.nonblock
<< ", nodelay:"
174 << opts
.nodelay
<< ", rbuf_size: " << opts
.rcbuf_size
<< dendl
;
176 tcp_fd
= net
.connect(peer_addr
, opts
.connect_bind_addr
);
182 int r
= net
.set_socket_options(tcp_fd
, opts
.nodelay
, opts
.rcbuf_size
);
189 ldout(cct
, 20) << __func__
<< " tcp_fd: " << tcp_fd
<< dendl
;
190 net
.set_priority(tcp_fd
, opts
.priority
, peer_addr
.get_family());
192 r
= infiniband
->send_msg(cct
, tcp_fd
, my_msg
);
196 worker
->center
.create_file_event(tcp_fd
, EVENT_READABLE
, con_handler
);
200 void RDMAConnectedSocketImpl::handle_connection() {
201 ldout(cct
, 20) << __func__
<< " QP: " << my_msg
.qpn
<< " tcp_fd: " << tcp_fd
<< " notify_fd: " << notify_fd
<< dendl
;
202 int r
= infiniband
->recv_msg(cct
, tcp_fd
, peer_msg
);
205 dispatcher
->perf_logger
->inc(l_msgr_rdma_handshake_errors
);
206 ldout(cct
, 1) << __func__
<< " recv handshake msg failed." << dendl
;
212 if (1 == connected
) {
213 ldout(cct
, 1) << __func__
<< " warnning: logic failed: read len: " << r
<< dendl
;
218 if (!is_server
) {// syn + ack from server
219 my_msg
.peer_qpn
= peer_msg
.qpn
;
220 ldout(cct
, 20) << __func__
<< " peer msg : < " << peer_msg
.qpn
<< ", " << peer_msg
.psn
221 << ", " << peer_msg
.lid
<< ", " << peer_msg
.peer_qpn
<< "> " << dendl
;
227 r
= infiniband
->send_msg(cct
, tcp_fd
, my_msg
);
229 ldout(cct
, 1) << __func__
<< " send client ack failed." << dendl
;
230 dispatcher
->perf_logger
->inc(l_msgr_rdma_handshake_errors
);
234 if (peer_msg
.peer_qpn
== 0) {// syn from client
236 ldout(cct
, 10) << __func__
<< " server is already active." << dendl
;
241 r
= infiniband
->send_msg(cct
, tcp_fd
, my_msg
);
243 ldout(cct
, 1) << __func__
<< " server ack failed." << dendl
;
244 dispatcher
->perf_logger
->inc(l_msgr_rdma_handshake_errors
);
248 } else { // ack from client
250 ldout(cct
, 10) << __func__
<< " handshake of rdma is done. server connected: " << connected
<< dendl
;
258 ssize_t
RDMAConnectedSocketImpl::read(char* buf
, size_t len
)
261 int r
= ::read(notify_fd
, &i
, sizeof(i
));
262 ldout(cct
, 20) << __func__
<< " notify_fd : " << i
<< " in " << my_msg
.qpn
<< " r = " << r
<< dendl
;
265 ldout(cct
, 1) << __func__
<< " when ib not active. len: " << len
<< dendl
;
269 if (0 == connected
) {
270 ldout(cct
, 1) << __func__
<< " when ib not connected. len: " << len
<<dendl
;
274 if (!buffers
.empty())
275 read
= read_buffers(buf
,len
);
277 std::vector
<ibv_wc
> cqe
;
280 if (!buffers
.empty()) {
293 ldout(cct
, 20) << __func__
<< " poll queue got " << cqe
.size() << " responses. QP: " << my_msg
.qpn
<< dendl
;
294 for (size_t i
= 0; i
< cqe
.size(); ++i
) {
295 ibv_wc
* response
= &cqe
[i
];
296 ceph_assert(response
->status
== IBV_WC_SUCCESS
);
297 Chunk
* chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
298 ldout(cct
, 25) << __func__
<< " chunk length: " << response
->byte_len
<< " bytes." << chunk
<< dendl
;
299 chunk
->prepare_read(response
->byte_len
);
300 worker
->perf_logger
->inc(l_msgr_rdma_rx_bytes
, response
->byte_len
);
301 if (response
->byte_len
== 0) {
302 dispatcher
->perf_logger
->inc(l_msgr_rdma_rx_fin
);
305 ldout(cct
, 20) << __func__
<< " got remote close msg..." << dendl
;
307 dispatcher
->post_chunk_to_pool(chunk
);
309 if (read
== (ssize_t
)len
) {
310 buffers
.push_back(chunk
);
311 ldout(cct
, 25) << __func__
<< " buffers add a chunk: " << response
->byte_len
<< dendl
;
312 } else if (read
+ response
->byte_len
> (ssize_t
)len
) {
313 read
+= chunk
->read(buf
+read
, (ssize_t
)len
-read
);
314 buffers
.push_back(chunk
);
315 ldout(cct
, 25) << __func__
<< " buffers add a chunk: " << chunk
->get_offset() << ":" << chunk
->get_bound() << dendl
;
317 read
+= chunk
->read(buf
+read
, response
->byte_len
);
318 dispatcher
->post_chunk_to_pool(chunk
);
319 update_post_backlog();
324 worker
->perf_logger
->inc(l_msgr_rdma_rx_chunks
, cqe
.size());
325 if (is_server
&& connected
== 0) {
326 ldout(cct
, 20) << __func__
<< " we do not need last handshake, QP: " << my_msg
.qpn
<< " peer QP: " << peer_msg
.qpn
<< dendl
;
327 connected
= 1; //if so, we don't need the last handshake
332 if (!buffers
.empty()) {
336 if (read
== 0 && error
)
338 return read
== 0 ? -EAGAIN
: read
;
341 ssize_t
RDMAConnectedSocketImpl::read_buffers(char* buf
, size_t len
)
343 size_t read
= 0, tmp
= 0;
344 auto c
= buffers
.begin();
345 for (; c
!= buffers
.end() ; ++c
) {
346 tmp
= (*c
)->read(buf
+read
, len
-read
);
348 ldout(cct
, 25) << __func__
<< " this iter read: " << tmp
<< " bytes." << " offset: " << (*c
)->get_offset() << " ,bound: " << (*c
)->get_bound() << ". Chunk:" << *c
<< dendl
;
350 dispatcher
->post_chunk_to_pool(*c
);
351 update_post_backlog();
352 ldout(cct
, 25) << __func__
<< " one chunk over." << dendl
;
359 if (c
!= buffers
.end() && (*c
)->over())
361 buffers
.erase(buffers
.begin(), c
);
362 ldout(cct
, 25) << __func__
<< " got " << read
<< " bytes, buffers size: " << buffers
.size() << dendl
;
366 ssize_t
RDMAConnectedSocketImpl::zero_copy_read(bufferptr
&data
)
370 static const int MAX_COMPLETIONS
= 16;
371 ibv_wc wc
[MAX_COMPLETIONS
];
377 auto iter
= buffers
.begin();
378 if (iter
!= buffers
.end()) {
380 // FIXME need to handle release
381 // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
387 std::vector
<ibv_wc
> cqe
;
390 return size
== 0 ? -EAGAIN
: size
;
392 ldout(cct
, 20) << __func__
<< " pool completion queue got " << cqe
.size() << " responses."<< dendl
;
394 for (size_t i
= 0; i
< cqe
.size(); ++i
) {
396 chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
397 chunk
->prepare_read(response
->byte_len
);
398 if (!loaded
&& i
== 0) {
399 // FIXME need to handle release
400 // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
404 buffers
.push_back(chunk
);
413 ssize_t
RDMAConnectedSocketImpl::send(bufferlist
&bl
, bool more
)
420 size_t bytes
= bl
.length();
424 Mutex::Locker
l(lock
);
425 pending_bl
.claim_append(bl
);
427 ldout(cct
, 20) << __func__
<< " fake send to upper, QP: " << my_msg
.qpn
<< dendl
;
431 ldout(cct
, 20) << __func__
<< " QP: " << my_msg
.qpn
<< dendl
;
432 ssize_t r
= submit(more
);
433 if (r
< 0 && r
!= -EAGAIN
)
438 ssize_t
RDMAConnectedSocketImpl::submit(bool more
)
442 Mutex::Locker
l(lock
);
443 size_t bytes
= pending_bl
.length();
444 ldout(cct
, 20) << __func__
<< " we need " << bytes
<< " bytes. iov size: "
445 << pending_bl
.buffers().size() << dendl
;
449 auto fill_tx_via_copy
= [this](std::vector
<Chunk
*> &tx_buffers
,
452 const auto& end
) -> unsigned {
453 ceph_assert(start
!= end
);
454 auto chunk_idx
= tx_buffers
.size();
455 int ret
= worker
->get_reged_mem(this, tx_buffers
, bytes
);
457 ldout(cct
, 1) << __func__
<< " no enough buffers in worker " << worker
<< dendl
;
458 worker
->perf_logger
->inc(l_msgr_rdma_tx_no_mem
);
462 unsigned total_copied
= 0;
463 Chunk
*current_chunk
= tx_buffers
[chunk_idx
];
464 while (start
!= end
) {
465 const uintptr_t addr
= reinterpret_cast<uintptr_t>(start
->c_str());
467 while (copied
< start
->length()) {
468 uint32_t r
= current_chunk
->write((char*)addr
+copied
, start
->length() - copied
);
472 if (current_chunk
->full()){
473 if (++chunk_idx
== tx_buffers
.size())
475 current_chunk
= tx_buffers
[chunk_idx
];
480 ceph_assert(bytes
== 0);
484 std::vector
<Chunk
*> tx_buffers
;
485 auto it
= std::cbegin(pending_bl
.buffers());
488 unsigned need_reserve_bytes
= 0;
489 while (it
!= pending_bl
.buffers().end()) {
490 if (infiniband
->is_tx_buffer(it
->raw_c_str())) {
491 if (need_reserve_bytes
) {
492 unsigned copied
= fill_tx_via_copy(tx_buffers
, need_reserve_bytes
, copy_it
, it
);
494 if (copied
< need_reserve_bytes
)
496 need_reserve_bytes
= 0;
498 ceph_assert(copy_it
== it
);
499 tx_buffers
.push_back(infiniband
->get_tx_chunk_by_buffer(it
->raw_c_str()));
500 total
+= it
->length();
503 need_reserve_bytes
+= it
->length();
507 if (need_reserve_bytes
)
508 total
+= fill_tx_via_copy(tx_buffers
, need_reserve_bytes
, copy_it
, it
);
513 ceph_assert(total
<= pending_bl
.length());
515 if (total
< pending_bl
.length()) {
516 worker
->perf_logger
->inc(l_msgr_rdma_tx_parital_mem
);
517 pending_bl
.splice(total
, pending_bl
.length()-total
, &swapped
);
518 pending_bl
.swap(swapped
);
523 ldout(cct
, 20) << __func__
<< " left bytes: " << pending_bl
.length() << " in buffers "
524 << pending_bl
.buffers().size() << " tx chunks " << tx_buffers
.size() << dendl
;
526 int r
= post_work_request(tx_buffers
);
530 ldout(cct
, 20) << __func__
<< " finished sending " << bytes
<< " bytes." << dendl
;
531 return pending_bl
.length() ? -EAGAIN
: 0;
534 int RDMAConnectedSocketImpl::post_work_request(std::vector
<Chunk
*> &tx_buffers
)
536 ldout(cct
, 20) << __func__
<< " QP: " << my_msg
.qpn
<< " " << tx_buffers
[0] << dendl
;
537 vector
<Chunk
*>::iterator current_buffer
= tx_buffers
.begin();
538 ibv_sge isge
[tx_buffers
.size()];
539 uint32_t current_sge
= 0;
540 ibv_send_wr iswr
[tx_buffers
.size()];
541 uint32_t current_swr
= 0;
542 ibv_send_wr
* pre_wr
= NULL
;
545 // FIPS zeroization audit 20191115: these memsets are not security related.
546 memset(iswr
, 0, sizeof(iswr
));
547 memset(isge
, 0, sizeof(isge
));
549 while (current_buffer
!= tx_buffers
.end()) {
550 isge
[current_sge
].addr
= reinterpret_cast<uint64_t>((*current_buffer
)->buffer
);
551 isge
[current_sge
].length
= (*current_buffer
)->get_offset();
552 isge
[current_sge
].lkey
= (*current_buffer
)->mr
->lkey
;
553 ldout(cct
, 25) << __func__
<< " sending buffer: " << *current_buffer
<< " length: " << isge
[current_sge
].length
<< dendl
;
555 iswr
[current_swr
].wr_id
= reinterpret_cast<uint64_t>(*current_buffer
);
556 iswr
[current_swr
].next
= NULL
;
557 iswr
[current_swr
].sg_list
= &isge
[current_sge
];
558 iswr
[current_swr
].num_sge
= 1;
559 iswr
[current_swr
].opcode
= IBV_WR_SEND
;
560 iswr
[current_swr
].send_flags
= IBV_SEND_SIGNALED
;
561 /*if (isge[current_sge].length < infiniband->max_inline_data) {
562 iswr[current_swr].send_flags = IBV_SEND_INLINE;
563 ldout(cct, 20) << __func__ << " send_inline." << dendl;
567 worker
->perf_logger
->inc(l_msgr_rdma_tx_bytes
, isge
[current_sge
].length
);
569 pre_wr
->next
= &iswr
[current_swr
];
570 pre_wr
= &iswr
[current_swr
];
576 ibv_send_wr
*bad_tx_work_request
;
577 if (ibv_post_send(qp
->get_qp(), iswr
, &bad_tx_work_request
)) {
578 ldout(cct
, 1) << __func__
<< " failed to send data"
579 << " (most probably should be peer not ready): "
580 << cpp_strerror(errno
) << dendl
;
581 worker
->perf_logger
->inc(l_msgr_rdma_tx_failed
);
585 worker
->perf_logger
->inc(l_msgr_rdma_tx_chunks
, tx_buffers
.size());
586 ldout(cct
, 20) << __func__
<< " qp state is " << Infiniband::qp_state_string(qp
->get_state()) << dendl
;
590 void RDMAConnectedSocketImpl::fin() {
592 // FIPS zeroization audit 20191115: this memset is not security related.
593 memset(&wr
, 0, sizeof(wr
));
595 wr
.wr_id
= reinterpret_cast<uint64_t>(qp
);
597 wr
.opcode
= IBV_WR_SEND
;
598 wr
.send_flags
= IBV_SEND_SIGNALED
;
599 ibv_send_wr
* bad_tx_work_request
;
600 if (ibv_post_send(qp
->get_qp(), &wr
, &bad_tx_work_request
)) {
601 ldout(cct
, 1) << __func__
<< " failed to send message="
602 << " ibv_post_send failed(most probably should be peer not ready): "
603 << cpp_strerror(errno
) << dendl
;
604 worker
->perf_logger
->inc(l_msgr_rdma_tx_failed
);
610 void RDMAConnectedSocketImpl::cleanup() {
611 if (con_handler
&& tcp_fd
>= 0) {
612 (static_cast<C_handle_connection
*>(con_handler
))->close();
613 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
614 worker
->center
.delete_file_event(tcp_fd
, EVENT_READABLE
);
617 con_handler
= nullptr;
621 void RDMAConnectedSocketImpl::notify()
623 // note: notify_fd is an event fd (man eventfd)
624 // write argument must be a 64bit integer
627 ceph_assert(sizeof(i
) == write(notify_fd
, &i
, sizeof(i
)));
630 void RDMAConnectedSocketImpl::shutdown()
638 void RDMAConnectedSocketImpl::close()
646 void RDMAConnectedSocketImpl::fault()
648 ldout(cct
, 1) << __func__
<< " tcp fd " << tcp_fd
<< dendl
;
658 void RDMAConnectedSocketImpl::set_accept_fd(int sd
)
662 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
663 worker
->center
.create_file_event(tcp_fd
, EVENT_READABLE
, con_handler
);
667 void RDMAConnectedSocketImpl::post_chunks_to_rq(int num
)
669 post_backlog
+= num
- infiniband
->post_chunks_to_rq(num
, qp
->get_qp());
672 void RDMAConnectedSocketImpl::update_post_backlog()
675 post_backlog
-= post_backlog
- dispatcher
->post_chunks_to_rq(post_backlog
, qp
->get_qp());