1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
16 #include "RDMAStack.h"
18 class C_handle_connection_established
: public EventCallback
{
19 RDMAConnectedSocketImpl
*csi
;
22 C_handle_connection_established(RDMAConnectedSocketImpl
*w
) : csi(w
) {}
23 void do_request(uint64_t fd
) final
{
25 csi
->handle_connection_established();
32 class C_handle_connection_read
: public EventCallback
{
33 RDMAConnectedSocketImpl
*csi
;
36 explicit C_handle_connection_read(RDMAConnectedSocketImpl
*w
): csi(w
) {}
37 void do_request(uint64_t fd
) final
{
39 csi
->handle_connection();
46 #define dout_subsys ceph_subsys_ms
48 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
50 RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext
*cct
, shared_ptr
<Infiniband
> &ib
,
51 shared_ptr
<RDMADispatcher
>& rdma_dispatcher
,
53 : cct(cct
), connected(0), error(0), ib(ib
),
54 dispatcher(rdma_dispatcher
), worker(w
),
55 is_server(false), read_handler(new C_handle_connection_read(this)),
56 established_handler(new C_handle_connection_established(this)),
57 active(false), pending(false)
59 if (!cct
->_conf
->ms_async_rdma_cm
) {
60 qp
= ib
->create_queue_pair(cct
, dispatcher
->get_tx_cq(), dispatcher
->get_rx_cq(), IBV_QPT_RC
, NULL
);
61 local_qpn
= qp
->get_local_qp_number();
62 notify_fd
= eventfd(0, EFD_CLOEXEC
|EFD_NONBLOCK
);
63 dispatcher
->register_qp(qp
, this);
64 dispatcher
->perf_logger
->inc(l_msgr_rdma_created_queue_pair
);
65 dispatcher
->perf_logger
->inc(l_msgr_rdma_active_queue_pair
);
69 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
71 ldout(cct
, 20) << __func__
<< " destruct." << dendl
;
73 worker
->remove_pending_conn(this);
74 dispatcher
->schedule_qp_destroy(local_qpn
);
76 for (unsigned i
=0; i
< wc
.size(); ++i
) {
77 dispatcher
->post_chunk_to_pool(reinterpret_cast<Chunk
*>(wc
[i
].wr_id
));
79 for (unsigned i
=0; i
< buffers
.size(); ++i
) {
80 dispatcher
->post_chunk_to_pool(buffers
[i
]);
83 std::lock_guard l
{lock
};
91 void RDMAConnectedSocketImpl::pass_wc(std::vector
<ibv_wc
> &&v
)
93 std::lock_guard l
{lock
};
97 wc
.insert(wc
.end(), v
.begin(), v
.end());
101 void RDMAConnectedSocketImpl::get_wc(std::vector
<ibv_wc
> &w
)
103 std::lock_guard l
{lock
};
109 int RDMAConnectedSocketImpl::activate()
111 qp
->get_local_cm_meta().peer_qpn
= qp
->get_peer_cm_meta().local_qpn
;
112 if (qp
->modify_qp_to_rtr() != 0)
115 if (qp
->modify_qp_to_rts() != 0)
119 connected
= 1; //indicate successfully
120 ldout(cct
, 20) << __func__
<< " handle fake send, wake it up. QP: " << local_qpn
<< dendl
;
124 peer_qpn
= qp
->get_local_cm_meta().peer_qpn
;
129 int RDMAConnectedSocketImpl::try_connect(const entity_addr_t
& peer_addr
, const SocketOptions
&opts
) {
130 ldout(cct
, 20) << __func__
<< " nonblock:" << opts
.nonblock
<< ", nodelay:"
131 << opts
.nodelay
<< ", rbuf_size: " << opts
.rcbuf_size
<< dendl
;
134 // we construct a socket to transport ib sync message
135 // but we shouldn't block in tcp connecting
137 tcp_fd
= net
.nonblock_connect(peer_addr
, opts
.connect_bind_addr
);
139 tcp_fd
= net
.connect(peer_addr
, opts
.connect_bind_addr
);
146 int r
= net
.set_socket_options(tcp_fd
, opts
.nodelay
, opts
.rcbuf_size
);
153 ldout(cct
, 20) << __func__
<< " tcp_fd: " << tcp_fd
<< dendl
;
154 net
.set_priority(tcp_fd
, opts
.priority
, peer_addr
.get_family());
157 worker
->center
.create_file_event(tcp_fd
, EVENT_READABLE
| EVENT_WRITABLE
, established_handler
);
159 r
= handle_connection_established(false);
164 int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault
) {
165 ldout(cct
, 20) << __func__
<< " start " << dendl
;
167 worker
->center
.delete_file_event(tcp_fd
, EVENT_READABLE
| EVENT_WRITABLE
);
168 if (1 == connected
) {
169 ldout(cct
, 1) << __func__
<< " warnning: logic failed " << dendl
;
170 if (need_set_fault
) {
175 // send handshake msg to server
176 qp
->get_local_cm_meta().peer_qpn
= 0;
177 int r
= qp
->send_cm_meta(cct
, tcp_fd
);
179 ldout(cct
, 1) << __func__
<< " send handshake msg failed." << r
<< dendl
;
180 if (need_set_fault
) {
185 worker
->center
.create_file_event(tcp_fd
, EVENT_READABLE
, read_handler
);
186 ldout(cct
, 20) << __func__
<< " finish " << dendl
;
190 void RDMAConnectedSocketImpl::handle_connection() {
191 ldout(cct
, 20) << __func__
<< " QP: " << local_qpn
<< " tcp_fd: " << tcp_fd
<< " notify_fd: " << notify_fd
<< dendl
;
192 int r
= qp
->recv_cm_meta(cct
, tcp_fd
);
195 dispatcher
->perf_logger
->inc(l_msgr_rdma_handshake_errors
);
196 ldout(cct
, 1) << __func__
<< " recv handshake msg failed." << dendl
;
202 if (1 == connected
) {
203 ldout(cct
, 1) << __func__
<< " warnning: logic failed: read len: " << r
<< dendl
;
208 if (!is_server
) {// first time: cm meta sync + ack from server
214 r
= qp
->send_cm_meta(cct
, tcp_fd
);
216 ldout(cct
, 1) << __func__
<< " send client ack failed." << dendl
;
217 dispatcher
->perf_logger
->inc(l_msgr_rdma_handshake_errors
);
221 if (qp
->get_peer_cm_meta().peer_qpn
== 0) {// first time: cm meta sync from client
223 ldout(cct
, 10) << __func__
<< " server is already active." << dendl
;
228 r
= qp
->send_cm_meta(cct
, tcp_fd
);
230 ldout(cct
, 1) << __func__
<< " server ack failed." << dendl
;
231 dispatcher
->perf_logger
->inc(l_msgr_rdma_handshake_errors
);
235 } else { // second time: cm meta ack from client
237 ldout(cct
, 10) << __func__
<< " handshake of rdma is done. server connected: " << connected
<< dendl
;
245 ssize_t
RDMAConnectedSocketImpl::read(char* buf
, size_t len
)
247 eventfd_t event_val
= 0;
248 int r
= eventfd_read(notify_fd
, &event_val
);
249 ldout(cct
, 20) << __func__
<< " notify_fd : " << event_val
<< " in " << local_qpn
250 << " r = " << r
<< dendl
;
253 ldout(cct
, 1) << __func__
<< " when ib not active. len: " << len
<< dendl
;
257 if (0 == connected
) {
258 ldout(cct
, 1) << __func__
<< " when ib not connected. len: " << len
<<dendl
;
262 read
= read_buffers(buf
,len
);
264 if (is_server
&& connected
== 0) {
265 ldout(cct
, 20) << __func__
<< " we do not need last handshake, QP: " << local_qpn
<< " peer QP: " << peer_qpn
<< dendl
;
266 connected
= 1; //if so, we don't need the last handshake
271 if (!buffers
.empty()) {
275 if (read
== 0 && error
)
277 return read
== 0 ? -EAGAIN
: read
;
280 void RDMAConnectedSocketImpl::buffer_prefetch(void)
282 std::vector
<ibv_wc
> cqe
;
287 for(size_t i
= 0; i
< cqe
.size(); ++i
) {
288 ibv_wc
* response
= &cqe
[i
];
289 ceph_assert(response
->status
== IBV_WC_SUCCESS
);
290 Chunk
* chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
291 chunk
->prepare_read(response
->byte_len
);
293 if (chunk
->get_size() == 0) {
294 chunk
->reset_read_chunk();
295 dispatcher
->perf_logger
->inc(l_msgr_rdma_rx_fin
);
298 ldout(cct
, 20) << __func__
<< " got remote close msg..." << dendl
;
300 dispatcher
->post_chunk_to_pool(chunk
);
303 buffers
.push_back(chunk
);
304 ldout(cct
, 25) << __func__
<< " buffers add a chunk: " << chunk
->get_offset() << ":" << chunk
->get_bound() << dendl
;
307 worker
->perf_logger
->inc(l_msgr_rdma_rx_chunks
, cqe
.size());
310 ssize_t
RDMAConnectedSocketImpl::read_buffers(char* buf
, size_t len
)
312 size_t read_size
= 0, tmp
= 0;
314 auto pchunk
= buffers
.begin();
315 while (pchunk
!= buffers
.end()) {
316 tmp
= (*pchunk
)->read(buf
+ read_size
, len
- read_size
);
318 ldout(cct
, 25) << __func__
<< " read chunk " << *pchunk
<< " bytes length" << tmp
<< " offset: "
319 << (*pchunk
)->get_offset() << " ,bound: " << (*pchunk
)->get_bound() << dendl
;
321 if ((*pchunk
)->get_size() == 0) {
322 (*pchunk
)->reset_read_chunk();
323 dispatcher
->post_chunk_to_pool(*pchunk
);
324 update_post_backlog();
325 ldout(cct
, 25) << __func__
<< " read over one chunk " << dendl
;
329 if (read_size
== len
) {
334 buffers
.erase(buffers
.begin(), pchunk
);
335 ldout(cct
, 25) << __func__
<< " got " << read_size
<< " bytes, buffers size: " << buffers
.size() << dendl
;
336 worker
->perf_logger
->inc(l_msgr_rdma_rx_bytes
, read_size
);
340 ssize_t
RDMAConnectedSocketImpl::send(bufferlist
&bl
, bool more
)
347 size_t bytes
= bl
.length();
351 std::lock_guard l
{lock
};
352 pending_bl
.claim_append(bl
);
354 ldout(cct
, 20) << __func__
<< " fake send to upper, QP: " << local_qpn
<< dendl
;
358 ldout(cct
, 20) << __func__
<< " QP: " << local_qpn
<< dendl
;
359 ssize_t r
= submit(more
);
360 if (r
< 0 && r
!= -EAGAIN
)
365 size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector
<Chunk
*> &tx_buffers
,
366 size_t req_copy_len
, decltype(std::cbegin(pending_bl
.buffers()))& start
,
367 const decltype(std::cbegin(pending_bl
.buffers()))& end
)
369 ceph_assert(start
!= end
);
370 auto chunk_idx
= tx_buffers
.size();
371 if (0 == worker
->get_reged_mem(this, tx_buffers
, req_copy_len
)) {
372 ldout(cct
, 1) << __func__
<< " no enough buffers in worker " << worker
<< dendl
;
373 worker
->perf_logger
->inc(l_msgr_rdma_tx_no_mem
);
377 Chunk
*current_chunk
= tx_buffers
[chunk_idx
];
378 size_t write_len
= 0;
379 while (start
!= end
) {
380 const uintptr_t addr
= reinterpret_cast<uintptr_t>(start
->c_str());
382 size_t slice_write_len
= 0;
383 while (slice_write_len
< start
->length()) {
384 size_t real_len
= current_chunk
->write((char*)addr
+ slice_write_len
, start
->length() - slice_write_len
);
386 slice_write_len
+= real_len
;
387 write_len
+= real_len
;
388 req_copy_len
-= real_len
;
390 if (current_chunk
->full()) {
391 if (++chunk_idx
== tx_buffers
.size())
393 current_chunk
= tx_buffers
[chunk_idx
];
399 ceph_assert(req_copy_len
== 0);
403 ssize_t
RDMAConnectedSocketImpl::submit(bool more
)
407 std::lock_guard l
{lock
};
408 size_t bytes
= pending_bl
.length();
409 ldout(cct
, 20) << __func__
<< " we need " << bytes
<< " bytes. iov size: "
410 << pending_bl
.get_num_buffers() << dendl
;
414 std::vector
<Chunk
*> tx_buffers
;
415 auto it
= std::cbegin(pending_bl
.buffers());
416 auto copy_start
= it
;
417 size_t total_copied
= 0, wait_copy_len
= 0;
418 while (it
!= pending_bl
.buffers().end()) {
419 if (ib
->is_tx_buffer(it
->raw_c_str())) {
421 size_t copied
= tx_copy_chunk(tx_buffers
, wait_copy_len
, copy_start
, it
);
422 total_copied
+= copied
;
423 if (copied
< wait_copy_len
)
427 ceph_assert(copy_start
== it
);
428 tx_buffers
.push_back(ib
->get_tx_chunk_by_buffer(it
->raw_c_str()));
429 total_copied
+= it
->length();
432 wait_copy_len
+= it
->length();
437 total_copied
+= tx_copy_chunk(tx_buffers
, wait_copy_len
, copy_start
, it
);
440 if (total_copied
== 0)
442 ceph_assert(total_copied
<= pending_bl
.length());
444 if (total_copied
< pending_bl
.length()) {
445 worker
->perf_logger
->inc(l_msgr_rdma_tx_parital_mem
);
446 pending_bl
.splice(total_copied
, pending_bl
.length() - total_copied
, &swapped
);
447 pending_bl
.swap(swapped
);
452 ldout(cct
, 20) << __func__
<< " left bytes: " << pending_bl
.length() << " in buffers "
453 << pending_bl
.get_num_buffers() << " tx chunks " << tx_buffers
.size() << dendl
;
455 int r
= post_work_request(tx_buffers
);
459 ldout(cct
, 20) << __func__
<< " finished sending " << total_copied
<< " bytes." << dendl
;
460 return pending_bl
.length() ? -EAGAIN
: 0;
463 int RDMAConnectedSocketImpl::post_work_request(std::vector
<Chunk
*> &tx_buffers
)
465 ldout(cct
, 20) << __func__
<< " QP: " << local_qpn
<< " " << tx_buffers
[0] << dendl
;
466 vector
<Chunk
*>::iterator current_buffer
= tx_buffers
.begin();
467 ibv_sge isge
[tx_buffers
.size()];
468 uint32_t current_sge
= 0;
469 ibv_send_wr iswr
[tx_buffers
.size()];
470 uint32_t current_swr
= 0;
471 ibv_send_wr
* pre_wr
= NULL
;
474 // FIPS zeroization audit 20191115: these memsets are not security related.
475 memset(iswr
, 0, sizeof(iswr
));
476 memset(isge
, 0, sizeof(isge
));
478 while (current_buffer
!= tx_buffers
.end()) {
479 isge
[current_sge
].addr
= reinterpret_cast<uint64_t>((*current_buffer
)->buffer
);
480 isge
[current_sge
].length
= (*current_buffer
)->get_offset();
481 isge
[current_sge
].lkey
= (*current_buffer
)->mr
->lkey
;
482 ldout(cct
, 25) << __func__
<< " sending buffer: " << *current_buffer
<< " length: " << isge
[current_sge
].length
<< dendl
;
484 iswr
[current_swr
].wr_id
= reinterpret_cast<uint64_t>(*current_buffer
);
485 iswr
[current_swr
].next
= NULL
;
486 iswr
[current_swr
].sg_list
= &isge
[current_sge
];
487 iswr
[current_swr
].num_sge
= 1;
488 iswr
[current_swr
].opcode
= IBV_WR_SEND
;
489 iswr
[current_swr
].send_flags
= IBV_SEND_SIGNALED
;
492 worker
->perf_logger
->inc(l_msgr_rdma_tx_bytes
, isge
[current_sge
].length
);
494 pre_wr
->next
= &iswr
[current_swr
];
495 pre_wr
= &iswr
[current_swr
];
501 ibv_send_wr
*bad_tx_work_request
= nullptr;
502 if (ibv_post_send(qp
->get_qp(), iswr
, &bad_tx_work_request
)) {
503 ldout(cct
, 1) << __func__
<< " failed to send data"
504 << " (most probably should be peer not ready): "
505 << cpp_strerror(errno
) << dendl
;
506 worker
->perf_logger
->inc(l_msgr_rdma_tx_failed
);
509 worker
->perf_logger
->inc(l_msgr_rdma_tx_chunks
, tx_buffers
.size());
510 ldout(cct
, 20) << __func__
<< " qp state is " << get_qp_state() << dendl
;
514 void RDMAConnectedSocketImpl::fin() {
516 // FIPS zeroization audit 20191115: this memset is not security related.
517 memset(&wr
, 0, sizeof(wr
));
519 wr
.wr_id
= reinterpret_cast<uint64_t>(qp
);
521 wr
.opcode
= IBV_WR_SEND
;
522 wr
.send_flags
= IBV_SEND_SIGNALED
;
523 ibv_send_wr
* bad_tx_work_request
= nullptr;
524 if (ibv_post_send(qp
->get_qp(), &wr
, &bad_tx_work_request
)) {
525 ldout(cct
, 1) << __func__
<< " failed to send message="
526 << " ibv_post_send failed(most probably should be peer not ready): "
527 << cpp_strerror(errno
) << dendl
;
528 worker
->perf_logger
->inc(l_msgr_rdma_tx_failed
);
533 void RDMAConnectedSocketImpl::cleanup() {
534 if (read_handler
&& tcp_fd
>= 0) {
535 (static_cast<C_handle_connection_read
*>(read_handler
))->close();
536 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
537 worker
->center
.delete_file_event(tcp_fd
, EVENT_READABLE
| EVENT_WRITABLE
);
540 read_handler
= nullptr;
542 if (established_handler
) {
543 (static_cast<C_handle_connection_established
*>(established_handler
))->close();
544 delete established_handler
;
545 established_handler
= nullptr;
549 void RDMAConnectedSocketImpl::notify()
551 eventfd_t event_val
= 1;
552 int r
= eventfd_write(notify_fd
, event_val
);
556 void RDMAConnectedSocketImpl::shutdown()
564 void RDMAConnectedSocketImpl::close()
572 void RDMAConnectedSocketImpl::fault()
574 ldout(cct
, 1) << __func__
<< " tcp fd " << tcp_fd
<< dendl
;
580 void RDMAConnectedSocketImpl::set_accept_fd(int sd
)
584 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
585 worker
->center
.create_file_event(tcp_fd
, EVENT_READABLE
, read_handler
);
589 void RDMAConnectedSocketImpl::post_chunks_to_rq(int num
)
591 post_backlog
+= num
- ib
->post_chunks_to_rq(num
, qp
);
594 void RDMAConnectedSocketImpl::update_post_backlog()
597 post_backlog
-= post_backlog
- dispatcher
->post_chunks_to_rq(post_backlog
, qp
);