]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/RDMAStack.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
31f18b77 17#include <poll.h>
11fdf7f2 18#include <errno.h>
7c673cae
FG
19#include <sys/time.h>
20#include <sys/resource.h>
21
22#include "include/str_list.h"
11fdf7f2
TL
23#include "include/compat.h"
24#include "common/Cycles.h"
7c673cae 25#include "common/deleter.h"
7c673cae 26#include "RDMAStack.h"
7c673cae
FG
27
28#define dout_subsys ceph_subsys_ms
29#undef dout_prefix
30#define dout_prefix *_dout << "RDMAStack "
31
7c673cae
FG
32RDMADispatcher::~RDMADispatcher()
33{
7c673cae 34 ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
11fdf7f2 35 polling_stop();
7c673cae 36
11fdf7f2
TL
37 ceph_assert(qp_conns.empty());
38 ceph_assert(num_qp_conn == 0);
39 ceph_assert(dead_queue_pairs.empty());
7c673cae
FG
40}
41
f67539c2 42RDMADispatcher::RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib)
9f95a23c 43 : cct(c), ib(ib)
7c673cae
FG
44{
45 PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
46
47 plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
48 plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
11fdf7f2
TL
49 plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
50 plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers");
7c673cae
FG
51
52 plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
53 plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
54 plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
55 plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
56
57 plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
58 plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
59 plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
60
61 plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
62 plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
63
64 plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
65
66
67 plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
68 plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
69
70 perf_logger = plb.create_perf_counters();
71 cct->get_perfcounters_collection()->add(perf_logger);
11fdf7f2 72 Cycles::init();
7c673cae
FG
73}
74
75void RDMADispatcher::polling_start()
76{
11fdf7f2 77 // take lock because listen/connect can happen from different worker threads
9f95a23c 78 std::lock_guard l{lock};
11fdf7f2
TL
79
80 if (t.joinable())
81 return; // dispatcher thread already running
82
9f95a23c 83 ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
11fdf7f2 84
9f95a23c 85 tx_cc = ib->create_comp_channel(cct);
11fdf7f2 86 ceph_assert(tx_cc);
9f95a23c 87 rx_cc = ib->create_comp_channel(cct);
11fdf7f2 88 ceph_assert(rx_cc);
9f95a23c 89 tx_cq = ib->create_comp_queue(cct, tx_cc);
11fdf7f2 90 ceph_assert(tx_cq);
9f95a23c 91 rx_cq = ib->create_comp_queue(cct, rx_cc);
11fdf7f2 92 ceph_assert(rx_cq);
31f18b77 93
7c673cae 94 t = std::thread(&RDMADispatcher::polling, this);
11fdf7f2 95 ceph_pthread_setname(t.native_handle(), "rdma-polling");
7c673cae
FG
96}
97
98void RDMADispatcher::polling_stop()
99{
11fdf7f2 100 {
9f95a23c 101 std::lock_guard l{lock};
11fdf7f2
TL
102 done = true;
103 }
104
105 if (!t.joinable())
106 return;
107
108 t.join();
109
110 tx_cc->ack_events();
111 rx_cc->ack_events();
112 delete tx_cq;
113 delete rx_cq;
114 delete tx_cc;
115 delete rx_cc;
7c673cae
FG
116}
117
31f18b77 118void RDMADispatcher::handle_async_event()
7c673cae 119{
31f18b77
FG
120 ldout(cct, 30) << __func__ << dendl;
121 while (1) {
122 ibv_async_event async_event;
9f95a23c 123 if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
31f18b77
FG
124 if (errno != EAGAIN)
125 lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
126 << " " << cpp_strerror(errno) << ")" << dendl;
127 return;
128 }
129 perf_logger->inc(l_msgr_rdma_total_async_events);
9f95a23c
TL
130 ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl;
131
132 switch (async_event.event_type) {
133 /***********************CQ events********************/
134 case IBV_EVENT_CQ_ERR:
135 lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, "
136 << " CQ Overflow, dev = " << ib->get_device()->ctxt
137 << " Need destroy and recreate resource " << dendl;
138 break;
139 /***********************QP events********************/
140 case IBV_EVENT_QP_FATAL:
141 {
142 /* Error occurred on a QP and it transitioned to error state */
143 ibv_qp* ib_qp = async_event.element.qp;
144 uint32_t qpn = ib_qp->qp_num;
145 QueuePair* qp = get_qp(qpn);
146 lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn
147 << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
148 << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
149 }
150 break;
151 case IBV_EVENT_QP_LAST_WQE_REACHED:
152 {
153 /*
154 * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
155 * Reason: QP is force switched into Error before posting Beacon WR.
156 * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
157 * For SRQ, only WRs on the QP which is switched into Error status will be flushed.
158 * Handle: Only confirm that qp enter into dead queue pairs
159 * 2. The CQE with error was generated for the last WQE
160 * Handle: output error log
161 */
162 perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
163 ibv_qp* ib_qp = async_event.element.qp;
164 uint32_t qpn = ib_qp->qp_num;
165 std::lock_guard l{lock};
166 RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
167 QueuePair* qp = get_qp_lockless(qpn);
168
169 if (qp && !qp->is_dead()) {
170 lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn
171 << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
172 << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
173 }
174 if (!conn) {
175 ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. "
176 << " qp number: " << qpn << dendl;
177 } else {
178 conn->fault();
179 if (qp) {
f67539c2
TL
180 if (!cct->_conf->ms_async_rdma_cm) {
181 enqueue_dead_qp_lockless(qpn);
182 }
9f95a23c
TL
183 }
184 }
185 }
186 break;
187 case IBV_EVENT_QP_REQ_ERR:
188 /* Invalid Request Local Work Queue Error */
189 [[fallthrough]];
190 case IBV_EVENT_QP_ACCESS_ERR:
191 /* Local access violation error */
192 [[fallthrough]];
193 case IBV_EVENT_COMM_EST:
194 /* Communication was established on a QP */
195 [[fallthrough]];
196 case IBV_EVENT_SQ_DRAINED:
197 /* Send Queue was drained of outstanding messages in progress */
198 [[fallthrough]];
199 case IBV_EVENT_PATH_MIG:
200 /* A connection has migrated to the alternate path */
201 [[fallthrough]];
202 case IBV_EVENT_PATH_MIG_ERR:
203 /* A connection failed to migrate to the alternate path */
204 break;
205 /***********************SRQ events*******************/
206 case IBV_EVENT_SRQ_ERR:
207 /* Error occurred on an SRQ */
208 [[fallthrough]];
209 case IBV_EVENT_SRQ_LIMIT_REACHED:
210 /* SRQ limit was reached */
211 break;
212 /***********************Port events******************/
213 case IBV_EVENT_PORT_ACTIVE:
214 /* Link became active on a port */
215 [[fallthrough]];
216 case IBV_EVENT_PORT_ERR:
217 /* Link became unavailable on a port */
218 [[fallthrough]];
219 case IBV_EVENT_LID_CHANGE:
220 /* LID was changed on a port */
221 [[fallthrough]];
222 case IBV_EVENT_PKEY_CHANGE:
223 /* P_Key table was changed on a port */
224 [[fallthrough]];
225 case IBV_EVENT_SM_CHANGE:
226 /* SM was changed on a port */
227 [[fallthrough]];
228 case IBV_EVENT_CLIENT_REREGISTER:
229 /* SM sent a CLIENT_REREGISTER request to a port */
230 [[fallthrough]];
231 case IBV_EVENT_GID_CHANGE:
232 /* GID table was changed on a port */
233 break;
234
235 /***********************CA events******************/
236 //CA events:
237 case IBV_EVENT_DEVICE_FATAL:
238 /* CA is in FATAL state */
239 lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
240 << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
241 break;
242 default:
243 lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
244 << " unknown event: " << async_event.event_type << dendl;
245 break;
7c673cae 246 }
31f18b77 247 ibv_ack_async_event(&async_event);
7c673cae
FG
248 }
249}
250
11fdf7f2
TL
251void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
252{
9f95a23c
TL
253 std::lock_guard l{lock};
254 ib->post_chunk_to_pool(chunk);
11fdf7f2
TL
255 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
256}
257
9f95a23c 258int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp)
11fdf7f2 259{
9f95a23c
TL
260 std::lock_guard l{lock};
261 return ib->post_chunks_to_rq(num, qp);
11fdf7f2
TL
262}
263
7c673cae
FG
264void RDMADispatcher::polling()
265{
266 static int MAX_COMPLETIONS = 32;
267 ibv_wc wc[MAX_COMPLETIONS];
268
269 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
270 std::vector<ibv_wc> tx_cqe;
31f18b77 271 ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
11fdf7f2 272 uint64_t last_inactive = Cycles::rdtsc();
7c673cae
FG
273 bool rearmed = false;
274 int r = 0;
275
276 while (true) {
31f18b77 277 int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
7c673cae
FG
278 if (tx_ret > 0) {
279 ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
280 << " responses."<< dendl;
31f18b77 281 handle_tx_event(wc, tx_ret);
7c673cae
FG
282 }
283
31f18b77 284 int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
7c673cae 285 if (rx_ret > 0) {
11fdf7f2 286 ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
7c673cae 287 << " responses."<< dendl;
9f95a23c 288 handle_rx_event(wc, rx_ret);
7c673cae
FG
289 }
290
291 if (!tx_ret && !rx_ret) {
7c673cae 292 perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
9f95a23c
TL
293 //
294 // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
295 // we can destroy QPs even earlier, just when beacon has been received,
296 // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
297 // CQ before other WCs are fully consumed from rx CQ. For safety, we
298 // wait for beacon and then "no-events" from CQs.
299 //
300 // Calling size() on vector without locks is totally fine, since we
301 // use it as a hint (accuracy is not important here)
302 //
303 if (!dead_queue_pairs.empty()) {
304 decltype(dead_queue_pairs) dead_qps;
305 {
306 std::lock_guard l{lock};
307 dead_queue_pairs.swap(dead_qps);
308 }
309
310 for (auto& qp: dead_qps) {
311 perf_logger->dec(l_msgr_rdma_active_queue_pair);
312 ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
313 delete qp;
7c673cae
FG
314 }
315 }
9f95a23c 316
11fdf7f2 317 if (!num_qp_conn && done && dead_queue_pairs.empty())
7c673cae
FG
318 break;
319
11fdf7f2
TL
320 uint64_t now = Cycles::rdtsc();
321 if (Cycles::to_microseconds(now - last_inactive) > cct->_conf->ms_async_rdma_polling_us) {
31f18b77 322 handle_async_event();
7c673cae
FG
323 if (!rearmed) {
324 // Clean up cq events after rearm notify ensure no new incoming event
325 // arrived between polling and rearm
31f18b77
FG
326 tx_cq->rearm_notify();
327 rx_cq->rearm_notify();
7c673cae
FG
328 rearmed = true;
329 continue;
330 }
331
31f18b77
FG
332 struct pollfd channel_poll[2];
333 channel_poll[0].fd = tx_cc->get_fd();
9f95a23c 334 channel_poll[0].events = POLLIN;
31f18b77
FG
335 channel_poll[0].revents = 0;
336 channel_poll[1].fd = rx_cc->get_fd();
9f95a23c 337 channel_poll[1].events = POLLIN;
31f18b77
FG
338 channel_poll[1].revents = 0;
339 r = 0;
7c673cae 340 perf_logger->set(l_msgr_rdma_polling, 0);
31f18b77 341 while (!done && r == 0) {
11fdf7f2 342 r = TEMP_FAILURE_RETRY(poll(channel_poll, 2, 100));
31f18b77
FG
343 if (r < 0) {
344 r = -errno;
345 lderr(cct) << __func__ << " poll failed " << r << dendl;
346 ceph_abort();
347 }
348 }
349 if (r > 0 && tx_cc->get_cq_event())
350 ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
351 if (r > 0 && rx_cc->get_cq_event())
352 ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
11fdf7f2 353 last_inactive = Cycles::rdtsc();
7c673cae
FG
354 perf_logger->set(l_msgr_rdma_polling, 1);
355 rearmed = false;
356 }
357 }
358 }
359}
360
361void RDMADispatcher::notify_pending_workers() {
362 if (num_pending_workers) {
363 RDMAWorker *w = nullptr;
364 {
9f95a23c 365 std::lock_guard l{w_lock};
7c673cae
FG
366 if (!pending_workers.empty()) {
367 w = pending_workers.front();
368 pending_workers.pop_front();
369 --num_pending_workers;
370 }
371 }
372 if (w)
373 w->notify_worker();
374 }
375}
376
11fdf7f2 377void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
7c673cae 378{
9f95a23c 379 std::lock_guard l{lock};
11fdf7f2 380 ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
7c673cae
FG
381 qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
382 ++num_qp_conn;
7c673cae
FG
383}
384
385RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
386{
387 auto it = qp_conns.find(qp);
388 if (it == qp_conns.end())
389 return nullptr;
390 if (it->second.first->is_dead())
391 return nullptr;
392 return it->second.second;
393}
394
9f95a23c 395Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp)
11fdf7f2 396{
11fdf7f2
TL
397 // Try to find the QP in qp_conns firstly.
398 auto it = qp_conns.find(qp);
399 if (it != qp_conns.end())
400 return it->second.first;
401
402 // Try again in dead_queue_pairs.
403 for (auto &i: dead_queue_pairs)
404 if (i->get_local_qp_number() == qp)
405 return i;
406
407 return nullptr;
408}
409
9f95a23c 410Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
7c673cae 411{
9f95a23c
TL
412 std::lock_guard l{lock};
413 return get_qp_lockless(qp);
414}
415
f67539c2 416void RDMADispatcher::enqueue_dead_qp_lockless(uint32_t qpn)
9f95a23c 417{
7c673cae 418 auto it = qp_conns.find(qpn);
9f95a23c
TL
419 if (it == qp_conns.end()) {
420 lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
7c673cae 421 return ;
9f95a23c
TL
422 }
423 QueuePair *qp = it->second.first;
424 dead_queue_pairs.push_back(qp);
7c673cae
FG
425 qp_conns.erase(it);
426 --num_qp_conn;
427}
428
f67539c2
TL
429void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
430{
431 std::lock_guard l{lock};
432 enqueue_dead_qp_lockless(qpn);
433}
434
9f95a23c 435void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
7c673cae 436{
9f95a23c
TL
437 std::lock_guard l{lock};
438 auto it = qp_conns.find(qpn);
439 if (it == qp_conns.end()) {
440 lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
441 return;
442 }
443 QueuePair *qp = it->second.first;
444 if (qp->to_dead()) {
445 //
446 // Failed to switch to dead. This is abnormal, but we can't
447 // do anything, so just destroy QP.
448 //
449 dead_queue_pairs.push_back(qp);
450 qp_conns.erase(it);
451 --num_qp_conn;
452 } else {
453 //
454 // Successfully switched to dead, thus keep entry in the map.
455 // But only zero out socked pointer in order to return null from
456 // get_conn_lockless();
457 it->second.second = nullptr;
458 }
7c673cae
FG
459}
460
31f18b77 461void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
7c673cae
FG
462{
463 std::vector<Chunk*> tx_chunks;
464
465 for (int i = 0; i < n; ++i) {
466 ibv_wc* response = &cqe[i];
11fdf7f2 467
9f95a23c
TL
468 // If it's beacon WR, enqueue the QP to be destroyed later
469 if (response->wr_id == BEACON_WRID) {
470 enqueue_dead_qp(response->qp_num);
471 continue;
472 }
473
474 ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len
475 << " status: " << ib->wc_status_to_string(response->status) << dendl;
7c673cae
FG
476
477 if (response->status != IBV_WC_SUCCESS) {
9f95a23c
TL
478 switch(response->status) {
479 case IBV_WC_RETRY_EXC_ERR:
480 {
481 perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
482
483 ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
484 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
485 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
486 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
487
488 std::lock_guard l{lock};
489 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
490 if (conn) {
491 ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
492 << conn->get_peer_qpn() << dendl;
493 }
494 }
495 break;
496 case IBV_WC_WR_FLUSH_ERR:
497 {
498 perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
499
500 std::lock_guard l{lock};
501 QueuePair *qp = get_qp_lockless(response->qp_num);
502 if (qp) {
503 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
504 }
505 if (qp && qp->is_dead()) {
506 ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl;
507 } else {
508 lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR,"
509 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
510 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
511 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
512
513 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
514 if (conn) {
515 ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
516 << conn->get_peer_qpn() << dendl;
517 }
518 }
519 }
520 break;
521
522 default:
523 {
524 lderr(cct) << __func__ << " SQ WR return error,"
525 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
526 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
527 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
528
529 std::lock_guard l{lock};
530 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
531 if (conn && conn->is_connected()) {
532 ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state()
533 << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl;
534 conn->fault();
535 } else {
536 ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl;
537 }
538 }
539 break;
7c673cae
FG
540 }
541 }
542
9f95a23c
TL
543 auto chunk = reinterpret_cast<Chunk *>(response->wr_id);
544 //TX completion may come either from
545 // 1) regular send message, WCE wr_id points to chunk
546 // 2) 'fin' message, wr_id points to the QP
f67539c2 547 if (ib->get_memory_manager()->is_valid_chunk(chunk)) {
7c673cae 548 tx_chunks.push_back(chunk);
31f18b77
FG
549 } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
550 ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
551 } else {
7c673cae 552 ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
31f18b77
FG
553 ceph_abort();
554 }
7c673cae
FG
555 }
556
557 perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
31f18b77 558 post_tx_buffer(tx_chunks);
7c673cae
FG
559}
560
561/**
562 * Add the given Chunks to the given free queue.
563 *
564 * \param[in] chunks
565 * The Chunks to enqueue.
566 * \return
567 * 0 if success or -1 for failure
568 */
31f18b77 569void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
7c673cae
FG
570{
571 if (chunks.empty())
572 return ;
573
574 inflight -= chunks.size();
9f95a23c 575 ib->get_memory_manager()->return_tx(chunks);
7c673cae
FG
576 ldout(cct, 30) << __func__ << " release " << chunks.size()
577 << " chunks, inflight " << inflight << dendl;
578 notify_pending_workers();
579}
580
9f95a23c
TL
581void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
582{
583 perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
584 perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
585
586 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
587 std::lock_guard l{lock};//make sure connected socket alive when pass wc
588
589 for (int i = 0; i < rx_number; ++i) {
590 ibv_wc* response = &cqe[i];
591 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
592 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
593 QueuePair *qp = get_qp_lockless(response->qp_num);
594
595 switch (response->status) {
596 case IBV_WC_SUCCESS:
597 ceph_assert(response->opcode == IBV_WC_RECV);
598 if (!conn) {
599 ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x"
600 << std::hex << chunk << " will be back." << std::dec << dendl;
601 ib->post_chunk_to_pool(chunk);
602 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
603 } else {
604 conn->post_chunks_to_rq(1);
605 polled[conn].push_back(*response);
606
607 if (qp != nullptr && !qp->get_srq()) {
608 qp->remove_rq_wr(chunk);
609 chunk->clear_qp();
610 }
611 }
612 break;
613
614 case IBV_WC_WR_FLUSH_ERR:
615 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
616
617 if (qp) {
618 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
619 }
620 if (qp && qp->is_dead()) {
621 ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl;
622 } else {
623 ldout(cct, 1) << __func__ << " RQ WR return error,"
624 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
625 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
626 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
627 if (conn) {
628 ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: "
629 << conn->get_peer_qpn() << dendl;
630 }
631 }
632
633 ib->post_chunk_to_pool(chunk);
634 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
635 break;
7c673cae 636
9f95a23c
TL
637 default:
638 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
639
640 ldout(cct, 1) << __func__ << " RQ WR return error,"
641 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
642 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
643 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
644 if (conn && conn->is_connected())
645 conn->fault();
646
647 ib->post_chunk_to_pool(chunk);
648 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
649 break;
650 }
651 }
652
653 for (auto &i : polled)
654 i.first->pass_wc(std::move(i.second));
655 polled.clear();
656}
657
658RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id)
659 : Worker(c, worker_id),
660 tx_handler(new C_handle_cq_tx(this))
7c673cae
FG
661{
662 // initialize perf_logger
663 char name[128];
664 sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
665 PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
666
667 plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
668 plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
669 plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
7c673cae
FG
670
671 plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
11fdf7f2 672 plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
7c673cae 673 plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
11fdf7f2 674 plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
224ce89b 675 plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns");
7c673cae
FG
676
677 perf_logger = plb.create_perf_counters();
678 cct->get_perfcounters_collection()->add(perf_logger);
679}
680
681RDMAWorker::~RDMAWorker()
682{
683 delete tx_handler;
684}
685
686void RDMAWorker::initialize()
687{
9f95a23c 688 ceph_assert(dispatcher);
7c673cae
FG
689}
690
11fdf7f2
TL
691int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
692 const SocketOptions &opt,ServerSocket *sock)
7c673cae 693{
9f95a23c 694 ib->init();
11fdf7f2 695 dispatcher->polling_start();
9f95a23c 696
11fdf7f2
TL
697 RDMAServerSocketImpl *p;
698 if (cct->_conf->ms_async_rdma_type == "iwarp") {
9f95a23c 699 p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
11fdf7f2 700 } else {
9f95a23c 701 p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
11fdf7f2 702 }
7c673cae
FG
703 int r = p->listen(sa, opt);
704 if (r < 0) {
705 delete p;
706 return r;
707 }
708
709 *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
710 return 0;
711}
712
713int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
714{
9f95a23c 715 ib->init();
11fdf7f2
TL
716 dispatcher->polling_start();
717
718 RDMAConnectedSocketImpl* p;
719 if (cct->_conf->ms_async_rdma_type == "iwarp") {
9f95a23c 720 p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
11fdf7f2 721 } else {
9f95a23c 722 p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
11fdf7f2 723 }
7c673cae
FG
724 int r = p->try_connect(addr, opts);
725
726 if (r < 0) {
727 ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
728 delete p;
729 return r;
730 }
731 std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
732 *socket = ConnectedSocket(std::move(csi));
733 return 0;
734}
735
736int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
737{
11fdf7f2 738 ceph_assert(center.in_thread());
9f95a23c
TL
739 int r = ib->get_tx_buffers(c, bytes);
740 size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
7c673cae 741 ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
9f95a23c 742 dispatcher->inflight += r;
224ce89b 743 if (got >= bytes)
7c673cae
FG
744 return r;
745
746 if (o) {
224ce89b 747 if (!o->is_pending()) {
7c673cae 748 pending_sent_conns.push_back(o);
224ce89b
WB
749 perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1);
750 o->set_pending(1);
751 }
7c673cae
FG
752 dispatcher->make_pending_worker(this);
753 }
754 return r;
755}
756
757
758void RDMAWorker::handle_pending_message()
759{
760 ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
7c673cae
FG
761 while (!pending_sent_conns.empty()) {
762 RDMAConnectedSocketImpl *o = pending_sent_conns.front();
763 pending_sent_conns.pop_front();
224ce89b
WB
764 ssize_t r = o->submit(false);
765 ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
766 if (r < 0) {
767 if (r == -EAGAIN) {
768 pending_sent_conns.push_back(o);
769 dispatcher->make_pending_worker(this);
770 return ;
7c673cae 771 }
224ce89b 772 o->fault();
7c673cae 773 }
224ce89b
WB
774 o->set_pending(0);
775 perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1);
7c673cae 776 }
7c673cae
FG
777 dispatcher->notify_pending_workers();
778}
779
f67539c2
TL
780RDMAStack::RDMAStack(CephContext *cct)
781 : NetworkStack(cct), ib(std::make_shared<Infiniband>(cct)),
782 rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib))
7c673cae 783{
7c673cae 784 ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
9f95a23c 785 ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
7c673cae
FG
786}
787
788RDMAStack::~RDMAStack()
789{
31f18b77
FG
790 if (cct->_conf->ms_async_rdma_enable_hugepage) {
791 unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
792 }
7c673cae
FG
793}
794
20effc67
TL
795Worker* RDMAStack::create_worker(CephContext *c, unsigned worker_id)
796{
797 auto w = new RDMAWorker(c, worker_id);
798 w->set_dispatcher(rdma_dispatcher);
799 w->set_ib(ib);
800 return w;
801}
802
803void RDMAStack::spawn_worker(std::function<void ()> &&func)
7c673cae 804{
20effc67 805 threads.emplace_back(std::move(func));
7c673cae
FG
806}
807
808void RDMAStack::join_worker(unsigned i)
809{
11fdf7f2 810 ceph_assert(threads.size() > i && threads[i].joinable());
7c673cae
FG
811 threads[i].join();
812}