1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
16 #include "RDMAStack.h"
18 class C_handle_connection_established
: public EventCallback
{
19 RDMAConnectedSocketImpl
*csi
;
22 C_handle_connection_established(RDMAConnectedSocketImpl
*w
) : csi(w
) {}
23 void do_request(uint64_t fd
) final
{
25 csi
->handle_connection_established();
32 class C_handle_connection_read
: public EventCallback
{
33 RDMAConnectedSocketImpl
*csi
;
36 explicit C_handle_connection_read(RDMAConnectedSocketImpl
*w
): csi(w
) {}
37 void do_request(uint64_t fd
) final
{
39 csi
->handle_connection();
46 #define dout_subsys ceph_subsys_ms
48 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
50 RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext
*cct
, std::shared_ptr
<Infiniband
> &ib
,
51 std::shared_ptr
<RDMADispatcher
>& rdma_dispatcher
,
53 : cct(cct
), connected(0), error(0), ib(ib
),
54 dispatcher(rdma_dispatcher
), worker(w
),
55 is_server(false), read_handler(new C_handle_connection_read(this)),
56 established_handler(new C_handle_connection_established(this)),
57 active(false), pending(false)
59 if (!cct
->_conf
->ms_async_rdma_cm
) {
60 qp
= ib
->create_queue_pair(cct
, dispatcher
->get_tx_cq(), dispatcher
->get_rx_cq(), IBV_QPT_RC
, NULL
);
62 lderr(cct
) << __func__
<< " queue pair create failed" << dendl
;
65 local_qpn
= qp
->get_local_qp_number();
66 notify_fd
= eventfd(0, EFD_CLOEXEC
|EFD_NONBLOCK
);
67 dispatcher
->register_qp(qp
, this);
68 dispatcher
->perf_logger
->inc(l_msgr_rdma_created_queue_pair
);
69 dispatcher
->perf_logger
->inc(l_msgr_rdma_active_queue_pair
);
73 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
75 ldout(cct
, 20) << __func__
<< " destruct." << dendl
;
77 worker
->remove_pending_conn(this);
78 dispatcher
->schedule_qp_destroy(local_qpn
);
80 for (unsigned i
=0; i
< wc
.size(); ++i
) {
81 dispatcher
->post_chunk_to_pool(reinterpret_cast<Chunk
*>(wc
[i
].wr_id
));
83 for (unsigned i
=0; i
< buffers
.size(); ++i
) {
84 dispatcher
->post_chunk_to_pool(buffers
[i
]);
87 std::lock_guard l
{lock
};
95 void RDMAConnectedSocketImpl::pass_wc(std::vector
<ibv_wc
> &&v
)
97 std::lock_guard l
{lock
};
101 wc
.insert(wc
.end(), v
.begin(), v
.end());
105 void RDMAConnectedSocketImpl::get_wc(std::vector
<ibv_wc
> &w
)
107 std::lock_guard l
{lock
};
113 int RDMAConnectedSocketImpl::activate()
115 qp
->get_local_cm_meta().peer_qpn
= qp
->get_peer_cm_meta().local_qpn
;
116 if (qp
->modify_qp_to_rtr() != 0)
119 if (qp
->modify_qp_to_rts() != 0)
123 connected
= 1; //indicate successfully
124 ldout(cct
, 20) << __func__
<< " handle fake send, wake it up. QP: " << local_qpn
<< dendl
;
128 peer_qpn
= qp
->get_local_cm_meta().peer_qpn
;
133 int RDMAConnectedSocketImpl::try_connect(const entity_addr_t
& peer_addr
, const SocketOptions
&opts
) {
134 ldout(cct
, 20) << __func__
<< " nonblock:" << opts
.nonblock
<< ", nodelay:"
135 << opts
.nodelay
<< ", rbuf_size: " << opts
.rcbuf_size
<< dendl
;
136 ceph::NetHandler
net(cct
);
138 // we construct a socket to transport ib sync message
139 // but we shouldn't block in tcp connecting
141 tcp_fd
= net
.nonblock_connect(peer_addr
, opts
.connect_bind_addr
);
143 tcp_fd
= net
.connect(peer_addr
, opts
.connect_bind_addr
);
150 int r
= net
.set_socket_options(tcp_fd
, opts
.nodelay
, opts
.rcbuf_size
);
157 ldout(cct
, 20) << __func__
<< " tcp_fd: " << tcp_fd
<< dendl
;
158 net
.set_priority(tcp_fd
, opts
.priority
, peer_addr
.get_family());
161 worker
->center
.create_file_event(tcp_fd
, EVENT_READABLE
| EVENT_WRITABLE
, established_handler
);
163 r
= handle_connection_established(false);
168 int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault
) {
169 ldout(cct
, 20) << __func__
<< " start " << dendl
;
171 worker
->center
.delete_file_event(tcp_fd
, EVENT_READABLE
| EVENT_WRITABLE
);
172 if (1 == connected
) {
173 ldout(cct
, 1) << __func__
<< " warnning: logic failed " << dendl
;
174 if (need_set_fault
) {
179 // send handshake msg to server
180 qp
->get_local_cm_meta().peer_qpn
= 0;
181 int r
= qp
->send_cm_meta(cct
, tcp_fd
);
183 ldout(cct
, 1) << __func__
<< " send handshake msg failed." << r
<< dendl
;
184 if (need_set_fault
) {
189 worker
->center
.create_file_event(tcp_fd
, EVENT_READABLE
, read_handler
);
190 ldout(cct
, 20) << __func__
<< " finish " << dendl
;
194 void RDMAConnectedSocketImpl::handle_connection() {
195 ldout(cct
, 20) << __func__
<< " QP: " << local_qpn
<< " tcp_fd: " << tcp_fd
<< " notify_fd: " << notify_fd
<< dendl
;
196 int r
= qp
->recv_cm_meta(cct
, tcp_fd
);
199 dispatcher
->perf_logger
->inc(l_msgr_rdma_handshake_errors
);
200 ldout(cct
, 1) << __func__
<< " recv handshake msg failed." << dendl
;
206 if (1 == connected
) {
207 ldout(cct
, 1) << __func__
<< " warnning: logic failed: read len: " << r
<< dendl
;
212 if (!is_server
) {// first time: cm meta sync + ack from server
218 r
= qp
->send_cm_meta(cct
, tcp_fd
);
220 ldout(cct
, 1) << __func__
<< " send client ack failed." << dendl
;
221 dispatcher
->perf_logger
->inc(l_msgr_rdma_handshake_errors
);
225 if (qp
->get_peer_cm_meta().peer_qpn
== 0) {// first time: cm meta sync from client
227 ldout(cct
, 10) << __func__
<< " server is already active." << dendl
;
232 r
= qp
->send_cm_meta(cct
, tcp_fd
);
234 ldout(cct
, 1) << __func__
<< " server ack failed." << dendl
;
235 dispatcher
->perf_logger
->inc(l_msgr_rdma_handshake_errors
);
239 } else { // second time: cm meta ack from client
241 ldout(cct
, 10) << __func__
<< " handshake of rdma is done. server connected: " << connected
<< dendl
;
249 ssize_t
RDMAConnectedSocketImpl::read(char* buf
, size_t len
)
251 eventfd_t event_val
= 0;
252 int r
= eventfd_read(notify_fd
, &event_val
);
253 ldout(cct
, 20) << __func__
<< " notify_fd : " << event_val
<< " in " << local_qpn
254 << " r = " << r
<< dendl
;
257 ldout(cct
, 1) << __func__
<< " when ib not active. len: " << len
<< dendl
;
261 if (0 == connected
) {
262 ldout(cct
, 1) << __func__
<< " when ib not connected. len: " << len
<<dendl
;
266 read
= read_buffers(buf
,len
);
268 if (is_server
&& connected
== 0) {
269 ldout(cct
, 20) << __func__
<< " we do not need last handshake, QP: " << local_qpn
<< " peer QP: " << peer_qpn
<< dendl
;
270 connected
= 1; //if so, we don't need the last handshake
275 if (!buffers
.empty()) {
279 if (read
== 0 && error
)
281 return read
== 0 ? -EAGAIN
: read
;
284 void RDMAConnectedSocketImpl::buffer_prefetch(void)
286 std::vector
<ibv_wc
> cqe
;
291 for(size_t i
= 0; i
< cqe
.size(); ++i
) {
292 ibv_wc
* response
= &cqe
[i
];
293 ceph_assert(response
->status
== IBV_WC_SUCCESS
);
294 Chunk
* chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
295 chunk
->prepare_read(response
->byte_len
);
297 if (chunk
->get_size() == 0) {
298 chunk
->reset_read_chunk();
299 dispatcher
->perf_logger
->inc(l_msgr_rdma_rx_fin
);
302 ldout(cct
, 20) << __func__
<< " got remote close msg..." << dendl
;
304 dispatcher
->post_chunk_to_pool(chunk
);
307 buffers
.push_back(chunk
);
308 ldout(cct
, 25) << __func__
<< " buffers add a chunk: " << chunk
->get_offset() << ":" << chunk
->get_bound() << dendl
;
311 worker
->perf_logger
->inc(l_msgr_rdma_rx_chunks
, cqe
.size());
314 ssize_t
RDMAConnectedSocketImpl::read_buffers(char* buf
, size_t len
)
316 size_t read_size
= 0, tmp
= 0;
318 auto pchunk
= buffers
.begin();
319 while (pchunk
!= buffers
.end()) {
320 tmp
= (*pchunk
)->read(buf
+ read_size
, len
- read_size
);
322 ldout(cct
, 25) << __func__
<< " read chunk " << *pchunk
<< " bytes length" << tmp
<< " offset: "
323 << (*pchunk
)->get_offset() << " ,bound: " << (*pchunk
)->get_bound() << dendl
;
325 if ((*pchunk
)->get_size() == 0) {
326 (*pchunk
)->reset_read_chunk();
327 dispatcher
->post_chunk_to_pool(*pchunk
);
328 update_post_backlog();
329 ldout(cct
, 25) << __func__
<< " read over one chunk " << dendl
;
333 if (read_size
== len
) {
338 buffers
.erase(buffers
.begin(), pchunk
);
339 ldout(cct
, 25) << __func__
<< " got " << read_size
<< " bytes, buffers size: " << buffers
.size() << dendl
;
340 worker
->perf_logger
->inc(l_msgr_rdma_rx_bytes
, read_size
);
344 ssize_t
RDMAConnectedSocketImpl::send(ceph::buffer::list
&bl
, bool more
)
351 size_t bytes
= bl
.length();
355 std::lock_guard l
{lock
};
356 pending_bl
.claim_append(bl
);
358 ldout(cct
, 20) << __func__
<< " fake send to upper, QP: " << local_qpn
<< dendl
;
362 ldout(cct
, 20) << __func__
<< " QP: " << local_qpn
<< dendl
;
363 ssize_t r
= submit(more
);
364 if (r
< 0 && r
!= -EAGAIN
)
369 size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector
<Chunk
*> &tx_buffers
,
370 size_t req_copy_len
, decltype(std::cbegin(pending_bl
.buffers()))& start
,
371 const decltype(std::cbegin(pending_bl
.buffers()))& end
)
373 ceph_assert(start
!= end
);
374 auto chunk_idx
= tx_buffers
.size();
375 if (0 == worker
->get_reged_mem(this, tx_buffers
, req_copy_len
)) {
376 ldout(cct
, 1) << __func__
<< " no enough buffers in worker " << worker
<< dendl
;
377 worker
->perf_logger
->inc(l_msgr_rdma_tx_no_mem
);
381 Chunk
*current_chunk
= tx_buffers
[chunk_idx
];
382 size_t write_len
= 0;
383 while (start
!= end
) {
384 const uintptr_t addr
= reinterpret_cast<uintptr_t>(start
->c_str());
386 size_t slice_write_len
= 0;
387 while (slice_write_len
< start
->length()) {
388 size_t real_len
= current_chunk
->write((char*)addr
+ slice_write_len
, start
->length() - slice_write_len
);
390 slice_write_len
+= real_len
;
391 write_len
+= real_len
;
392 req_copy_len
-= real_len
;
394 if (current_chunk
->full()) {
395 if (++chunk_idx
== tx_buffers
.size())
397 current_chunk
= tx_buffers
[chunk_idx
];
403 ceph_assert(req_copy_len
== 0);
407 ssize_t
RDMAConnectedSocketImpl::submit(bool more
)
411 std::lock_guard l
{lock
};
412 size_t bytes
= pending_bl
.length();
413 ldout(cct
, 20) << __func__
<< " we need " << bytes
<< " bytes. iov size: "
414 << pending_bl
.get_num_buffers() << dendl
;
418 std::vector
<Chunk
*> tx_buffers
;
419 auto it
= std::cbegin(pending_bl
.buffers());
420 auto copy_start
= it
;
421 size_t total_copied
= 0, wait_copy_len
= 0;
422 while (it
!= pending_bl
.buffers().end()) {
423 if (ib
->is_tx_buffer(it
->raw_c_str())) {
425 size_t copied
= tx_copy_chunk(tx_buffers
, wait_copy_len
, copy_start
, it
);
426 total_copied
+= copied
;
427 if (copied
< wait_copy_len
)
431 ceph_assert(copy_start
== it
);
432 tx_buffers
.push_back(ib
->get_tx_chunk_by_buffer(it
->raw_c_str()));
433 total_copied
+= it
->length();
436 wait_copy_len
+= it
->length();
441 total_copied
+= tx_copy_chunk(tx_buffers
, wait_copy_len
, copy_start
, it
);
444 if (total_copied
== 0)
446 ceph_assert(total_copied
<= pending_bl
.length());
447 ceph::buffer::list swapped
;
448 if (total_copied
< pending_bl
.length()) {
449 worker
->perf_logger
->inc(l_msgr_rdma_tx_parital_mem
);
450 pending_bl
.splice(total_copied
, pending_bl
.length() - total_copied
, &swapped
);
451 pending_bl
.swap(swapped
);
456 ldout(cct
, 20) << __func__
<< " left bytes: " << pending_bl
.length() << " in buffers "
457 << pending_bl
.get_num_buffers() << " tx chunks " << tx_buffers
.size() << dendl
;
459 int r
= post_work_request(tx_buffers
);
463 ldout(cct
, 20) << __func__
<< " finished sending " << total_copied
<< " bytes." << dendl
;
464 return pending_bl
.length() ? -EAGAIN
: 0;
467 int RDMAConnectedSocketImpl::post_work_request(std::vector
<Chunk
*> &tx_buffers
)
469 ldout(cct
, 20) << __func__
<< " QP: " << local_qpn
<< " " << tx_buffers
[0] << dendl
;
470 auto current_buffer
= tx_buffers
.begin();
471 ibv_sge isge
[tx_buffers
.size()];
472 uint32_t current_sge
= 0;
473 ibv_send_wr iswr
[tx_buffers
.size()];
474 uint32_t current_swr
= 0;
475 ibv_send_wr
* pre_wr
= NULL
;
478 // FIPS zeroization audit 20191115: these memsets are not security related.
479 memset(iswr
, 0, sizeof(iswr
));
480 memset(isge
, 0, sizeof(isge
));
482 while (current_buffer
!= tx_buffers
.end()) {
483 isge
[current_sge
].addr
= reinterpret_cast<uint64_t>((*current_buffer
)->buffer
);
484 isge
[current_sge
].length
= (*current_buffer
)->get_offset();
485 isge
[current_sge
].lkey
= (*current_buffer
)->mr
->lkey
;
486 ldout(cct
, 25) << __func__
<< " sending buffer: " << *current_buffer
<< " length: " << isge
[current_sge
].length
<< dendl
;
488 iswr
[current_swr
].wr_id
= reinterpret_cast<uint64_t>(*current_buffer
);
489 iswr
[current_swr
].next
= NULL
;
490 iswr
[current_swr
].sg_list
= &isge
[current_sge
];
491 iswr
[current_swr
].num_sge
= 1;
492 iswr
[current_swr
].opcode
= IBV_WR_SEND
;
493 iswr
[current_swr
].send_flags
= IBV_SEND_SIGNALED
;
496 worker
->perf_logger
->inc(l_msgr_rdma_tx_bytes
, isge
[current_sge
].length
);
498 pre_wr
->next
= &iswr
[current_swr
];
499 pre_wr
= &iswr
[current_swr
];
505 ibv_send_wr
*bad_tx_work_request
= nullptr;
506 if (ibv_post_send(qp
->get_qp(), iswr
, &bad_tx_work_request
)) {
507 ldout(cct
, 1) << __func__
<< " failed to send data"
508 << " (most probably should be peer not ready): "
509 << cpp_strerror(errno
) << dendl
;
510 worker
->perf_logger
->inc(l_msgr_rdma_tx_failed
);
513 worker
->perf_logger
->inc(l_msgr_rdma_tx_chunks
, tx_buffers
.size());
514 ldout(cct
, 20) << __func__
<< " qp state is " << get_qp_state() << dendl
;
518 void RDMAConnectedSocketImpl::fin() {
520 // FIPS zeroization audit 20191115: this memset is not security related.
521 memset(&wr
, 0, sizeof(wr
));
523 wr
.wr_id
= reinterpret_cast<uint64_t>(qp
);
525 wr
.opcode
= IBV_WR_SEND
;
526 wr
.send_flags
= IBV_SEND_SIGNALED
;
527 ibv_send_wr
* bad_tx_work_request
= nullptr;
528 if (ibv_post_send(qp
->get_qp(), &wr
, &bad_tx_work_request
)) {
529 ldout(cct
, 1) << __func__
<< " failed to send message="
530 << " ibv_post_send failed(most probably should be peer not ready): "
531 << cpp_strerror(errno
) << dendl
;
532 worker
->perf_logger
->inc(l_msgr_rdma_tx_failed
);
537 void RDMAConnectedSocketImpl::cleanup() {
538 if (read_handler
&& tcp_fd
>= 0) {
539 (static_cast<C_handle_connection_read
*>(read_handler
))->close();
540 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
541 worker
->center
.delete_file_event(tcp_fd
, EVENT_READABLE
| EVENT_WRITABLE
);
544 read_handler
= nullptr;
546 if (established_handler
) {
547 (static_cast<C_handle_connection_established
*>(established_handler
))->close();
548 delete established_handler
;
549 established_handler
= nullptr;
553 void RDMAConnectedSocketImpl::notify()
555 eventfd_t event_val
= 1;
556 int r
= eventfd_write(notify_fd
, event_val
);
560 void RDMAConnectedSocketImpl::shutdown()
568 void RDMAConnectedSocketImpl::close()
576 void RDMAConnectedSocketImpl::set_priority(int sd
, int prio
, int domain
) {
577 ceph::NetHandler
net(cct
);
578 net
.set_priority(sd
, prio
, domain
);
581 void RDMAConnectedSocketImpl::fault()
583 ldout(cct
, 1) << __func__
<< " tcp fd " << tcp_fd
<< dendl
;
589 void RDMAConnectedSocketImpl::set_accept_fd(int sd
)
593 worker
->center
.submit_to(worker
->center
.get_id(), [this]() {
594 worker
->center
.create_file_event(tcp_fd
, EVENT_READABLE
, read_handler
);
598 void RDMAConnectedSocketImpl::post_chunks_to_rq(int num
)
600 post_backlog
+= num
- ib
->post_chunks_to_rq(num
, qp
);
603 void RDMAConnectedSocketImpl::update_post_backlog()
606 post_backlog
-= post_backlog
- dispatcher
->post_chunks_to_rq(post_backlog
, qp
);