]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/RDMAStack.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
31f18b77 17#include <poll.h>
11fdf7f2 18#include <errno.h>
7c673cae
FG
19#include <sys/time.h>
20#include <sys/resource.h>
21
22#include "include/str_list.h"
11fdf7f2
TL
23#include "include/compat.h"
24#include "common/Cycles.h"
7c673cae
FG
25#include "common/deleter.h"
26#include "common/Tub.h"
27#include "RDMAStack.h"
7c673cae
FG
28
29#define dout_subsys ceph_subsys_ms
30#undef dout_prefix
31#define dout_prefix *_dout << "RDMAStack "
32
7c673cae
FG
33RDMADispatcher::~RDMADispatcher()
34{
7c673cae 35 ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
11fdf7f2 36 polling_stop();
7c673cae 37
11fdf7f2
TL
38 ceph_assert(qp_conns.empty());
39 ceph_assert(num_qp_conn == 0);
40 ceph_assert(dead_queue_pairs.empty());
7c673cae
FG
41}
42
f67539c2 43RDMADispatcher::RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib)
9f95a23c 44 : cct(c), ib(ib)
7c673cae
FG
45{
46 PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
47
48 plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
49 plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
11fdf7f2
TL
50 plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
51 plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers");
7c673cae
FG
52
53 plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
54 plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
55 plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
56 plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
57
58 plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
59 plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
60 plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
61
62 plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
63 plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
64
65 plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
66
67
68 plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
69 plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
70
71 perf_logger = plb.create_perf_counters();
72 cct->get_perfcounters_collection()->add(perf_logger);
11fdf7f2 73 Cycles::init();
7c673cae
FG
74}
75
76void RDMADispatcher::polling_start()
77{
11fdf7f2 78 // take lock because listen/connect can happen from different worker threads
9f95a23c 79 std::lock_guard l{lock};
11fdf7f2
TL
80
81 if (t.joinable())
82 return; // dispatcher thread already running
83
9f95a23c 84 ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
11fdf7f2 85
9f95a23c 86 tx_cc = ib->create_comp_channel(cct);
11fdf7f2 87 ceph_assert(tx_cc);
9f95a23c 88 rx_cc = ib->create_comp_channel(cct);
11fdf7f2 89 ceph_assert(rx_cc);
9f95a23c 90 tx_cq = ib->create_comp_queue(cct, tx_cc);
11fdf7f2 91 ceph_assert(tx_cq);
9f95a23c 92 rx_cq = ib->create_comp_queue(cct, rx_cc);
11fdf7f2 93 ceph_assert(rx_cq);
31f18b77 94
7c673cae 95 t = std::thread(&RDMADispatcher::polling, this);
11fdf7f2 96 ceph_pthread_setname(t.native_handle(), "rdma-polling");
7c673cae
FG
97}
98
99void RDMADispatcher::polling_stop()
100{
11fdf7f2 101 {
9f95a23c 102 std::lock_guard l{lock};
11fdf7f2
TL
103 done = true;
104 }
105
106 if (!t.joinable())
107 return;
108
109 t.join();
110
111 tx_cc->ack_events();
112 rx_cc->ack_events();
113 delete tx_cq;
114 delete rx_cq;
115 delete tx_cc;
116 delete rx_cc;
7c673cae
FG
117}
118
31f18b77 119void RDMADispatcher::handle_async_event()
7c673cae 120{
31f18b77
FG
121 ldout(cct, 30) << __func__ << dendl;
122 while (1) {
123 ibv_async_event async_event;
9f95a23c 124 if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
31f18b77
FG
125 if (errno != EAGAIN)
126 lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
127 << " " << cpp_strerror(errno) << ")" << dendl;
128 return;
129 }
130 perf_logger->inc(l_msgr_rdma_total_async_events);
9f95a23c
TL
131 ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl;
132
133 switch (async_event.event_type) {
134 /***********************CQ events********************/
135 case IBV_EVENT_CQ_ERR:
136 lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, "
137 << " CQ Overflow, dev = " << ib->get_device()->ctxt
138 << " Need destroy and recreate resource " << dendl;
139 break;
140 /***********************QP events********************/
141 case IBV_EVENT_QP_FATAL:
142 {
143 /* Error occurred on a QP and it transitioned to error state */
144 ibv_qp* ib_qp = async_event.element.qp;
145 uint32_t qpn = ib_qp->qp_num;
146 QueuePair* qp = get_qp(qpn);
147 lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn
148 << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
149 << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
150 }
151 break;
152 case IBV_EVENT_QP_LAST_WQE_REACHED:
153 {
154 /*
155 * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
156 * Reason: QP is force switched into Error before posting Beacon WR.
157 * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
158 * For SRQ, only WRs on the QP which is switched into Error status will be flushed.
159 * Handle: Only confirm that qp enter into dead queue pairs
160 * 2. The CQE with error was generated for the last WQE
161 * Handle: output error log
162 */
163 perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
164 ibv_qp* ib_qp = async_event.element.qp;
165 uint32_t qpn = ib_qp->qp_num;
166 std::lock_guard l{lock};
167 RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
168 QueuePair* qp = get_qp_lockless(qpn);
169
170 if (qp && !qp->is_dead()) {
171 lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn
172 << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
173 << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
174 }
175 if (!conn) {
176 ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. "
177 << " qp number: " << qpn << dendl;
178 } else {
179 conn->fault();
180 if (qp) {
f67539c2
TL
181 if (!cct->_conf->ms_async_rdma_cm) {
182 enqueue_dead_qp_lockless(qpn);
183 }
9f95a23c
TL
184 }
185 }
186 }
187 break;
188 case IBV_EVENT_QP_REQ_ERR:
189 /* Invalid Request Local Work Queue Error */
190 [[fallthrough]];
191 case IBV_EVENT_QP_ACCESS_ERR:
192 /* Local access violation error */
193 [[fallthrough]];
194 case IBV_EVENT_COMM_EST:
195 /* Communication was established on a QP */
196 [[fallthrough]];
197 case IBV_EVENT_SQ_DRAINED:
198 /* Send Queue was drained of outstanding messages in progress */
199 [[fallthrough]];
200 case IBV_EVENT_PATH_MIG:
201 /* A connection has migrated to the alternate path */
202 [[fallthrough]];
203 case IBV_EVENT_PATH_MIG_ERR:
204 /* A connection failed to migrate to the alternate path */
205 break;
206 /***********************SRQ events*******************/
207 case IBV_EVENT_SRQ_ERR:
208 /* Error occurred on an SRQ */
209 [[fallthrough]];
210 case IBV_EVENT_SRQ_LIMIT_REACHED:
211 /* SRQ limit was reached */
212 break;
213 /***********************Port events******************/
214 case IBV_EVENT_PORT_ACTIVE:
215 /* Link became active on a port */
216 [[fallthrough]];
217 case IBV_EVENT_PORT_ERR:
218 /* Link became unavailable on a port */
219 [[fallthrough]];
220 case IBV_EVENT_LID_CHANGE:
221 /* LID was changed on a port */
222 [[fallthrough]];
223 case IBV_EVENT_PKEY_CHANGE:
224 /* P_Key table was changed on a port */
225 [[fallthrough]];
226 case IBV_EVENT_SM_CHANGE:
227 /* SM was changed on a port */
228 [[fallthrough]];
229 case IBV_EVENT_CLIENT_REREGISTER:
230 /* SM sent a CLIENT_REREGISTER request to a port */
231 [[fallthrough]];
232 case IBV_EVENT_GID_CHANGE:
233 /* GID table was changed on a port */
234 break;
235
236 /***********************CA events******************/
237 //CA events:
238 case IBV_EVENT_DEVICE_FATAL:
239 /* CA is in FATAL state */
240 lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
241 << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
242 break;
243 default:
244 lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
245 << " unknown event: " << async_event.event_type << dendl;
246 break;
7c673cae 247 }
31f18b77 248 ibv_ack_async_event(&async_event);
7c673cae
FG
249 }
250}
251
11fdf7f2
TL
252void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
253{
9f95a23c
TL
254 std::lock_guard l{lock};
255 ib->post_chunk_to_pool(chunk);
11fdf7f2
TL
256 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
257}
258
9f95a23c 259int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp)
11fdf7f2 260{
9f95a23c
TL
261 std::lock_guard l{lock};
262 return ib->post_chunks_to_rq(num, qp);
11fdf7f2
TL
263}
264
7c673cae
FG
265void RDMADispatcher::polling()
266{
267 static int MAX_COMPLETIONS = 32;
268 ibv_wc wc[MAX_COMPLETIONS];
269
270 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
271 std::vector<ibv_wc> tx_cqe;
31f18b77 272 ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
11fdf7f2 273 uint64_t last_inactive = Cycles::rdtsc();
7c673cae
FG
274 bool rearmed = false;
275 int r = 0;
276
277 while (true) {
31f18b77 278 int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
7c673cae
FG
279 if (tx_ret > 0) {
280 ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
281 << " responses."<< dendl;
31f18b77 282 handle_tx_event(wc, tx_ret);
7c673cae
FG
283 }
284
31f18b77 285 int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
7c673cae 286 if (rx_ret > 0) {
11fdf7f2 287 ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
7c673cae 288 << " responses."<< dendl;
9f95a23c 289 handle_rx_event(wc, rx_ret);
7c673cae
FG
290 }
291
292 if (!tx_ret && !rx_ret) {
7c673cae 293 perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
9f95a23c
TL
294 //
295 // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
296 // we can destroy QPs even earlier, just when beacon has been received,
297 // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
298 // CQ before other WCs are fully consumed from rx CQ. For safety, we
299 // wait for beacon and then "no-events" from CQs.
300 //
301 // Calling size() on vector without locks is totally fine, since we
302 // use it as a hint (accuracy is not important here)
303 //
304 if (!dead_queue_pairs.empty()) {
305 decltype(dead_queue_pairs) dead_qps;
306 {
307 std::lock_guard l{lock};
308 dead_queue_pairs.swap(dead_qps);
309 }
310
311 for (auto& qp: dead_qps) {
312 perf_logger->dec(l_msgr_rdma_active_queue_pair);
313 ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
314 delete qp;
7c673cae
FG
315 }
316 }
9f95a23c 317
11fdf7f2 318 if (!num_qp_conn && done && dead_queue_pairs.empty())
7c673cae
FG
319 break;
320
11fdf7f2
TL
321 uint64_t now = Cycles::rdtsc();
322 if (Cycles::to_microseconds(now - last_inactive) > cct->_conf->ms_async_rdma_polling_us) {
31f18b77 323 handle_async_event();
7c673cae
FG
324 if (!rearmed) {
325 // Clean up cq events after rearm notify ensure no new incoming event
326 // arrived between polling and rearm
31f18b77
FG
327 tx_cq->rearm_notify();
328 rx_cq->rearm_notify();
7c673cae
FG
329 rearmed = true;
330 continue;
331 }
332
31f18b77
FG
333 struct pollfd channel_poll[2];
334 channel_poll[0].fd = tx_cc->get_fd();
9f95a23c 335 channel_poll[0].events = POLLIN;
31f18b77
FG
336 channel_poll[0].revents = 0;
337 channel_poll[1].fd = rx_cc->get_fd();
9f95a23c 338 channel_poll[1].events = POLLIN;
31f18b77
FG
339 channel_poll[1].revents = 0;
340 r = 0;
7c673cae 341 perf_logger->set(l_msgr_rdma_polling, 0);
31f18b77 342 while (!done && r == 0) {
11fdf7f2 343 r = TEMP_FAILURE_RETRY(poll(channel_poll, 2, 100));
31f18b77
FG
344 if (r < 0) {
345 r = -errno;
346 lderr(cct) << __func__ << " poll failed " << r << dendl;
347 ceph_abort();
348 }
349 }
350 if (r > 0 && tx_cc->get_cq_event())
351 ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
352 if (r > 0 && rx_cc->get_cq_event())
353 ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
11fdf7f2 354 last_inactive = Cycles::rdtsc();
7c673cae
FG
355 perf_logger->set(l_msgr_rdma_polling, 1);
356 rearmed = false;
357 }
358 }
359 }
360}
361
362void RDMADispatcher::notify_pending_workers() {
363 if (num_pending_workers) {
364 RDMAWorker *w = nullptr;
365 {
9f95a23c 366 std::lock_guard l{w_lock};
7c673cae
FG
367 if (!pending_workers.empty()) {
368 w = pending_workers.front();
369 pending_workers.pop_front();
370 --num_pending_workers;
371 }
372 }
373 if (w)
374 w->notify_worker();
375 }
376}
377
11fdf7f2 378void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
7c673cae 379{
9f95a23c 380 std::lock_guard l{lock};
11fdf7f2 381 ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
7c673cae
FG
382 qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
383 ++num_qp_conn;
7c673cae
FG
384}
385
386RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
387{
388 auto it = qp_conns.find(qp);
389 if (it == qp_conns.end())
390 return nullptr;
391 if (it->second.first->is_dead())
392 return nullptr;
393 return it->second.second;
394}
395
9f95a23c 396Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp)
11fdf7f2 397{
11fdf7f2
TL
398 // Try to find the QP in qp_conns firstly.
399 auto it = qp_conns.find(qp);
400 if (it != qp_conns.end())
401 return it->second.first;
402
403 // Try again in dead_queue_pairs.
404 for (auto &i: dead_queue_pairs)
405 if (i->get_local_qp_number() == qp)
406 return i;
407
408 return nullptr;
409}
410
9f95a23c 411Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
7c673cae 412{
9f95a23c
TL
413 std::lock_guard l{lock};
414 return get_qp_lockless(qp);
415}
416
f67539c2 417void RDMADispatcher::enqueue_dead_qp_lockless(uint32_t qpn)
9f95a23c 418{
7c673cae 419 auto it = qp_conns.find(qpn);
9f95a23c
TL
420 if (it == qp_conns.end()) {
421 lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
7c673cae 422 return ;
9f95a23c
TL
423 }
424 QueuePair *qp = it->second.first;
425 dead_queue_pairs.push_back(qp);
7c673cae
FG
426 qp_conns.erase(it);
427 --num_qp_conn;
428}
429
f67539c2
TL
430void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
431{
432 std::lock_guard l{lock};
433 enqueue_dead_qp_lockless(qpn);
434}
435
9f95a23c 436void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
7c673cae 437{
9f95a23c
TL
438 std::lock_guard l{lock};
439 auto it = qp_conns.find(qpn);
440 if (it == qp_conns.end()) {
441 lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
442 return;
443 }
444 QueuePair *qp = it->second.first;
445 if (qp->to_dead()) {
446 //
447 // Failed to switch to dead. This is abnormal, but we can't
448 // do anything, so just destroy QP.
449 //
450 dead_queue_pairs.push_back(qp);
451 qp_conns.erase(it);
452 --num_qp_conn;
453 } else {
454 //
455 // Successfully switched to dead, thus keep entry in the map.
456 // But only zero out socked pointer in order to return null from
457 // get_conn_lockless();
458 it->second.second = nullptr;
459 }
7c673cae
FG
460}
461
31f18b77 462void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
7c673cae
FG
463{
464 std::vector<Chunk*> tx_chunks;
465
466 for (int i = 0; i < n; ++i) {
467 ibv_wc* response = &cqe[i];
11fdf7f2 468
9f95a23c
TL
469 // If it's beacon WR, enqueue the QP to be destroyed later
470 if (response->wr_id == BEACON_WRID) {
471 enqueue_dead_qp(response->qp_num);
472 continue;
473 }
474
475 ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len
476 << " status: " << ib->wc_status_to_string(response->status) << dendl;
7c673cae
FG
477
478 if (response->status != IBV_WC_SUCCESS) {
9f95a23c
TL
479 switch(response->status) {
480 case IBV_WC_RETRY_EXC_ERR:
481 {
482 perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
483
484 ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
485 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
486 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
487 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
488
489 std::lock_guard l{lock};
490 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
491 if (conn) {
492 ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
493 << conn->get_peer_qpn() << dendl;
494 }
495 }
496 break;
497 case IBV_WC_WR_FLUSH_ERR:
498 {
499 perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
500
501 std::lock_guard l{lock};
502 QueuePair *qp = get_qp_lockless(response->qp_num);
503 if (qp) {
504 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
505 }
506 if (qp && qp->is_dead()) {
507 ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl;
508 } else {
509 lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR,"
510 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
511 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
512 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
513
514 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
515 if (conn) {
516 ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
517 << conn->get_peer_qpn() << dendl;
518 }
519 }
520 }
521 break;
522
523 default:
524 {
525 lderr(cct) << __func__ << " SQ WR return error,"
526 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
527 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
528 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
529
530 std::lock_guard l{lock};
531 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
532 if (conn && conn->is_connected()) {
533 ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state()
534 << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl;
535 conn->fault();
536 } else {
537 ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl;
538 }
539 }
540 break;
7c673cae
FG
541 }
542 }
543
9f95a23c
TL
544 auto chunk = reinterpret_cast<Chunk *>(response->wr_id);
545 //TX completion may come either from
546 // 1) regular send message, WCE wr_id points to chunk
547 // 2) 'fin' message, wr_id points to the QP
f67539c2 548 if (ib->get_memory_manager()->is_valid_chunk(chunk)) {
7c673cae 549 tx_chunks.push_back(chunk);
31f18b77
FG
550 } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
551 ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
552 } else {
7c673cae 553 ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
31f18b77
FG
554 ceph_abort();
555 }
7c673cae
FG
556 }
557
558 perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
31f18b77 559 post_tx_buffer(tx_chunks);
7c673cae
FG
560}
561
562/**
563 * Add the given Chunks to the given free queue.
564 *
565 * \param[in] chunks
566 * The Chunks to enqueue.
567 * \return
568 * 0 if success or -1 for failure
569 */
31f18b77 570void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
7c673cae
FG
571{
572 if (chunks.empty())
573 return ;
574
575 inflight -= chunks.size();
9f95a23c 576 ib->get_memory_manager()->return_tx(chunks);
7c673cae
FG
577 ldout(cct, 30) << __func__ << " release " << chunks.size()
578 << " chunks, inflight " << inflight << dendl;
579 notify_pending_workers();
580}
581
9f95a23c
TL
582void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
583{
584 perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
585 perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
586
587 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
588 std::lock_guard l{lock};//make sure connected socket alive when pass wc
589
590 for (int i = 0; i < rx_number; ++i) {
591 ibv_wc* response = &cqe[i];
592 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
593 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
594 QueuePair *qp = get_qp_lockless(response->qp_num);
595
596 switch (response->status) {
597 case IBV_WC_SUCCESS:
598 ceph_assert(response->opcode == IBV_WC_RECV);
599 if (!conn) {
600 ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x"
601 << std::hex << chunk << " will be back." << std::dec << dendl;
602 ib->post_chunk_to_pool(chunk);
603 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
604 } else {
605 conn->post_chunks_to_rq(1);
606 polled[conn].push_back(*response);
607
608 if (qp != nullptr && !qp->get_srq()) {
609 qp->remove_rq_wr(chunk);
610 chunk->clear_qp();
611 }
612 }
613 break;
614
615 case IBV_WC_WR_FLUSH_ERR:
616 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
617
618 if (qp) {
619 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
620 }
621 if (qp && qp->is_dead()) {
622 ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl;
623 } else {
624 ldout(cct, 1) << __func__ << " RQ WR return error,"
625 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
626 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
627 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
628 if (conn) {
629 ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: "
630 << conn->get_peer_qpn() << dendl;
631 }
632 }
633
634 ib->post_chunk_to_pool(chunk);
635 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
636 break;
7c673cae 637
9f95a23c
TL
638 default:
639 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
640
641 ldout(cct, 1) << __func__ << " RQ WR return error,"
642 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
643 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
644 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
645 if (conn && conn->is_connected())
646 conn->fault();
647
648 ib->post_chunk_to_pool(chunk);
649 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
650 break;
651 }
652 }
653
654 for (auto &i : polled)
655 i.first->pass_wc(std::move(i.second));
656 polled.clear();
657}
658
659RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id)
660 : Worker(c, worker_id),
661 tx_handler(new C_handle_cq_tx(this))
7c673cae
FG
662{
663 // initialize perf_logger
664 char name[128];
665 sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
666 PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
667
668 plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
669 plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
670 plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
7c673cae
FG
671
672 plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
11fdf7f2 673 plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
7c673cae 674 plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
11fdf7f2 675 plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
224ce89b 676 plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns");
7c673cae
FG
677
678 perf_logger = plb.create_perf_counters();
679 cct->get_perfcounters_collection()->add(perf_logger);
680}
681
682RDMAWorker::~RDMAWorker()
683{
684 delete tx_handler;
685}
686
687void RDMAWorker::initialize()
688{
9f95a23c 689 ceph_assert(dispatcher);
7c673cae
FG
690}
691
11fdf7f2
TL
692int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
693 const SocketOptions &opt,ServerSocket *sock)
7c673cae 694{
9f95a23c 695 ib->init();
11fdf7f2 696 dispatcher->polling_start();
9f95a23c 697
11fdf7f2
TL
698 RDMAServerSocketImpl *p;
699 if (cct->_conf->ms_async_rdma_type == "iwarp") {
9f95a23c 700 p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
11fdf7f2 701 } else {
9f95a23c 702 p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
11fdf7f2 703 }
7c673cae
FG
704 int r = p->listen(sa, opt);
705 if (r < 0) {
706 delete p;
707 return r;
708 }
709
710 *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
711 return 0;
712}
713
714int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
715{
9f95a23c 716 ib->init();
11fdf7f2
TL
717 dispatcher->polling_start();
718
719 RDMAConnectedSocketImpl* p;
720 if (cct->_conf->ms_async_rdma_type == "iwarp") {
9f95a23c 721 p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
11fdf7f2 722 } else {
9f95a23c 723 p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
11fdf7f2 724 }
7c673cae
FG
725 int r = p->try_connect(addr, opts);
726
727 if (r < 0) {
728 ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
729 delete p;
730 return r;
731 }
732 std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
733 *socket = ConnectedSocket(std::move(csi));
734 return 0;
735}
736
737int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
738{
11fdf7f2 739 ceph_assert(center.in_thread());
9f95a23c
TL
740 int r = ib->get_tx_buffers(c, bytes);
741 size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
7c673cae 742 ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
9f95a23c 743 dispatcher->inflight += r;
224ce89b 744 if (got >= bytes)
7c673cae
FG
745 return r;
746
747 if (o) {
224ce89b 748 if (!o->is_pending()) {
7c673cae 749 pending_sent_conns.push_back(o);
224ce89b
WB
750 perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1);
751 o->set_pending(1);
752 }
7c673cae
FG
753 dispatcher->make_pending_worker(this);
754 }
755 return r;
756}
757
758
759void RDMAWorker::handle_pending_message()
760{
761 ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
7c673cae
FG
762 while (!pending_sent_conns.empty()) {
763 RDMAConnectedSocketImpl *o = pending_sent_conns.front();
764 pending_sent_conns.pop_front();
224ce89b
WB
765 ssize_t r = o->submit(false);
766 ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
767 if (r < 0) {
768 if (r == -EAGAIN) {
769 pending_sent_conns.push_back(o);
770 dispatcher->make_pending_worker(this);
771 return ;
7c673cae 772 }
224ce89b 773 o->fault();
7c673cae 774 }
224ce89b
WB
775 o->set_pending(0);
776 perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1);
7c673cae 777 }
7c673cae
FG
778 dispatcher->notify_pending_workers();
779}
780
f67539c2
TL
781RDMAStack::RDMAStack(CephContext *cct)
782 : NetworkStack(cct), ib(std::make_shared<Infiniband>(cct)),
783 rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib))
7c673cae 784{
7c673cae 785 ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
7c673cae
FG
786
787 unsigned num = get_num_worker();
788 for (unsigned i = 0; i < num; ++i) {
789 RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
9f95a23c
TL
790 w->set_dispatcher(rdma_dispatcher);
791 w->set_ib(ib);
7c673cae 792 }
9f95a23c 793 ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
7c673cae
FG
794}
795
796RDMAStack::~RDMAStack()
797{
31f18b77
FG
798 if (cct->_conf->ms_async_rdma_enable_hugepage) {
799 unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
800 }
7c673cae
FG
801}
802
803void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
804{
805 threads.resize(i+1);
806 threads[i] = std::thread(func);
807}
808
809void RDMAStack::join_worker(unsigned i)
810{
11fdf7f2 811 ceph_assert(threads.size() > i && threads[i].joinable());
7c673cae
FG
812 threads[i].join();
813}