]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/rdma/RDMAStack.cc
49bafd0b4c3888630600ffe45871e26db311ee46
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <poll.h>
18 #include <errno.h>
19 #include <sys/time.h>
20 #include <sys/resource.h>
21
22 #include "include/str_list.h"
23 #include "include/compat.h"
24 #include "common/Cycles.h"
25 #include "common/deleter.h"
26 #include "common/Tub.h"
27 #include "RDMAStack.h"
28
29 #define dout_subsys ceph_subsys_ms
30 #undef dout_prefix
31 #define dout_prefix *_dout << "RDMAStack "
32
33 RDMADispatcher::~RDMADispatcher()
34 {
35 ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
36 polling_stop();
37
38 ceph_assert(qp_conns.empty());
39 ceph_assert(num_qp_conn == 0);
40 ceph_assert(dead_queue_pairs.empty());
41 }
42
43 RDMADispatcher::RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib)
44 : cct(c), ib(ib)
45 {
46 PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
47
48 plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
49 plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
50 plb.add_u64_counter(l_msgr_rdma_rx_bufs_in_use, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
51 plb.add_u64_counter(l_msgr_rdma_rx_bufs_total, "rx_bufs_total", "The total number of rx buffers");
52
53 plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
54 plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
55 plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
56 plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
57
58 plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
59 plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
60 plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
61
62 plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
63 plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
64
65 plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
66
67
68 plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
69 plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
70
71 perf_logger = plb.create_perf_counters();
72 cct->get_perfcounters_collection()->add(perf_logger);
73 Cycles::init();
74 }
75
76 void RDMADispatcher::polling_start()
77 {
78 // take lock because listen/connect can happen from different worker threads
79 std::lock_guard l{lock};
80
81 if (t.joinable())
82 return; // dispatcher thread already running
83
84 ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
85
86 tx_cc = ib->create_comp_channel(cct);
87 ceph_assert(tx_cc);
88 rx_cc = ib->create_comp_channel(cct);
89 ceph_assert(rx_cc);
90 tx_cq = ib->create_comp_queue(cct, tx_cc);
91 ceph_assert(tx_cq);
92 rx_cq = ib->create_comp_queue(cct, rx_cc);
93 ceph_assert(rx_cq);
94
95 t = std::thread(&RDMADispatcher::polling, this);
96 ceph_pthread_setname(t.native_handle(), "rdma-polling");
97 }
98
99 void RDMADispatcher::polling_stop()
100 {
101 {
102 std::lock_guard l{lock};
103 done = true;
104 }
105
106 if (!t.joinable())
107 return;
108
109 t.join();
110
111 tx_cc->ack_events();
112 rx_cc->ack_events();
113 delete tx_cq;
114 delete rx_cq;
115 delete tx_cc;
116 delete rx_cc;
117 }
118
119 void RDMADispatcher::handle_async_event()
120 {
121 ldout(cct, 30) << __func__ << dendl;
122 while (1) {
123 ibv_async_event async_event;
124 if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
125 if (errno != EAGAIN)
126 lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
127 << " " << cpp_strerror(errno) << ")" << dendl;
128 return;
129 }
130 perf_logger->inc(l_msgr_rdma_total_async_events);
131 ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl;
132
133 switch (async_event.event_type) {
134 /***********************CQ events********************/
135 case IBV_EVENT_CQ_ERR:
136 lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, "
137 << " CQ Overflow, dev = " << ib->get_device()->ctxt
138 << " Need destroy and recreate resource " << dendl;
139 break;
140 /***********************QP events********************/
141 case IBV_EVENT_QP_FATAL:
142 {
143 /* Error occurred on a QP and it transitioned to error state */
144 ibv_qp* ib_qp = async_event.element.qp;
145 uint32_t qpn = ib_qp->qp_num;
146 QueuePair* qp = get_qp(qpn);
147 lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn
148 << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
149 << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
150 }
151 break;
152 case IBV_EVENT_QP_LAST_WQE_REACHED:
153 {
154 /*
155 * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
156 * Reason: QP is force switched into Error before posting Beacon WR.
157 * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
158 * For SRQ, only WRs on the QP which is switched into Error status will be flushed.
159 * Handle: Only confirm that qp enter into dead queue pairs
160 * 2. The CQE with error was generated for the last WQE
161 * Handle: output error log
162 */
163 perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
164 ibv_qp* ib_qp = async_event.element.qp;
165 uint32_t qpn = ib_qp->qp_num;
166 std::lock_guard l{lock};
167 RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
168 QueuePair* qp = get_qp_lockless(qpn);
169
170 if (qp && !qp->is_dead()) {
171 lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn
172 << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
173 << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
174 }
175 if (!conn) {
176 ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. "
177 << " qp number: " << qpn << dendl;
178 } else {
179 conn->fault();
180 if (qp) {
181 if (!cct->_conf->ms_async_rdma_cm) {
182 enqueue_dead_qp_lockless(qpn);
183 }
184 }
185 }
186 }
187 break;
188 case IBV_EVENT_QP_REQ_ERR:
189 /* Invalid Request Local Work Queue Error */
190 [[fallthrough]];
191 case IBV_EVENT_QP_ACCESS_ERR:
192 /* Local access violation error */
193 [[fallthrough]];
194 case IBV_EVENT_COMM_EST:
195 /* Communication was established on a QP */
196 [[fallthrough]];
197 case IBV_EVENT_SQ_DRAINED:
198 /* Send Queue was drained of outstanding messages in progress */
199 [[fallthrough]];
200 case IBV_EVENT_PATH_MIG:
201 /* A connection has migrated to the alternate path */
202 [[fallthrough]];
203 case IBV_EVENT_PATH_MIG_ERR:
204 /* A connection failed to migrate to the alternate path */
205 break;
206 /***********************SRQ events*******************/
207 case IBV_EVENT_SRQ_ERR:
208 /* Error occurred on an SRQ */
209 [[fallthrough]];
210 case IBV_EVENT_SRQ_LIMIT_REACHED:
211 /* SRQ limit was reached */
212 break;
213 /***********************Port events******************/
214 case IBV_EVENT_PORT_ACTIVE:
215 /* Link became active on a port */
216 [[fallthrough]];
217 case IBV_EVENT_PORT_ERR:
218 /* Link became unavailable on a port */
219 [[fallthrough]];
220 case IBV_EVENT_LID_CHANGE:
221 /* LID was changed on a port */
222 [[fallthrough]];
223 case IBV_EVENT_PKEY_CHANGE:
224 /* P_Key table was changed on a port */
225 [[fallthrough]];
226 case IBV_EVENT_SM_CHANGE:
227 /* SM was changed on a port */
228 [[fallthrough]];
229 case IBV_EVENT_CLIENT_REREGISTER:
230 /* SM sent a CLIENT_REREGISTER request to a port */
231 [[fallthrough]];
232 case IBV_EVENT_GID_CHANGE:
233 /* GID table was changed on a port */
234 break;
235
236 /***********************CA events******************/
237 //CA events:
238 case IBV_EVENT_DEVICE_FATAL:
239 /* CA is in FATAL state */
240 lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
241 << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
242 break;
243 default:
244 lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
245 << " unknown event: " << async_event.event_type << dendl;
246 break;
247 }
248 ibv_ack_async_event(&async_event);
249 }
250 }
251
252 void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
253 {
254 std::lock_guard l{lock};
255 ib->post_chunk_to_pool(chunk);
256 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
257 }
258
259 int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp)
260 {
261 std::lock_guard l{lock};
262 return ib->post_chunks_to_rq(num, qp);
263 }
264
265 void RDMADispatcher::polling()
266 {
267 static int MAX_COMPLETIONS = 32;
268 ibv_wc wc[MAX_COMPLETIONS];
269
270 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
271 std::vector<ibv_wc> tx_cqe;
272 ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
273 uint64_t last_inactive = Cycles::rdtsc();
274 bool rearmed = false;
275 int r = 0;
276
277 while (true) {
278 int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
279 if (tx_ret > 0) {
280 ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
281 << " responses."<< dendl;
282 handle_tx_event(wc, tx_ret);
283 }
284
285 int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
286 if (rx_ret > 0) {
287 ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
288 << " responses."<< dendl;
289 handle_rx_event(wc, rx_ret);
290 }
291
292 if (!tx_ret && !rx_ret) {
293 perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
294 //
295 // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
296 // we can destroy QPs even earlier, just when beacon has been received,
297 // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
298 // CQ before other WCs are fully consumed from rx CQ. For safety, we
299 // wait for beacon and then "no-events" from CQs.
300 //
301 // Calling size() on vector without locks is totally fine, since we
302 // use it as a hint (accuracy is not important here)
303 //
304 if (!dead_queue_pairs.empty()) {
305 decltype(dead_queue_pairs) dead_qps;
306 {
307 std::lock_guard l{lock};
308 dead_queue_pairs.swap(dead_qps);
309 }
310
311 for (auto& qp: dead_qps) {
312 perf_logger->dec(l_msgr_rdma_active_queue_pair);
313 ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
314 delete qp;
315 }
316 }
317
318 if (!num_qp_conn && done && dead_queue_pairs.empty())
319 break;
320
321 uint64_t now = Cycles::rdtsc();
322 if (Cycles::to_microseconds(now - last_inactive) > cct->_conf->ms_async_rdma_polling_us) {
323 handle_async_event();
324 if (!rearmed) {
325 // Clean up cq events after rearm notify ensure no new incoming event
326 // arrived between polling and rearm
327 tx_cq->rearm_notify();
328 rx_cq->rearm_notify();
329 rearmed = true;
330 continue;
331 }
332
333 struct pollfd channel_poll[2];
334 channel_poll[0].fd = tx_cc->get_fd();
335 channel_poll[0].events = POLLIN;
336 channel_poll[0].revents = 0;
337 channel_poll[1].fd = rx_cc->get_fd();
338 channel_poll[1].events = POLLIN;
339 channel_poll[1].revents = 0;
340 r = 0;
341 perf_logger->set(l_msgr_rdma_polling, 0);
342 while (!done && r == 0) {
343 r = TEMP_FAILURE_RETRY(poll(channel_poll, 2, 100));
344 if (r < 0) {
345 r = -errno;
346 lderr(cct) << __func__ << " poll failed " << r << dendl;
347 ceph_abort();
348 }
349 }
350 if (r > 0 && tx_cc->get_cq_event())
351 ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
352 if (r > 0 && rx_cc->get_cq_event())
353 ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
354 last_inactive = Cycles::rdtsc();
355 perf_logger->set(l_msgr_rdma_polling, 1);
356 rearmed = false;
357 }
358 }
359 }
360 }
361
362 void RDMADispatcher::notify_pending_workers() {
363 if (num_pending_workers) {
364 RDMAWorker *w = nullptr;
365 {
366 std::lock_guard l{w_lock};
367 if (!pending_workers.empty()) {
368 w = pending_workers.front();
369 pending_workers.pop_front();
370 --num_pending_workers;
371 }
372 }
373 if (w)
374 w->notify_worker();
375 }
376 }
377
378 void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
379 {
380 std::lock_guard l{lock};
381 ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
382 qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
383 ++num_qp_conn;
384 }
385
386 RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
387 {
388 auto it = qp_conns.find(qp);
389 if (it == qp_conns.end())
390 return nullptr;
391 if (it->second.first->is_dead())
392 return nullptr;
393 return it->second.second;
394 }
395
396 Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp)
397 {
398 // Try to find the QP in qp_conns firstly.
399 auto it = qp_conns.find(qp);
400 if (it != qp_conns.end())
401 return it->second.first;
402
403 // Try again in dead_queue_pairs.
404 for (auto &i: dead_queue_pairs)
405 if (i->get_local_qp_number() == qp)
406 return i;
407
408 return nullptr;
409 }
410
411 Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
412 {
413 std::lock_guard l{lock};
414 return get_qp_lockless(qp);
415 }
416
417 void RDMADispatcher::enqueue_dead_qp_lockless(uint32_t qpn)
418 {
419 auto it = qp_conns.find(qpn);
420 if (it == qp_conns.end()) {
421 lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
422 return ;
423 }
424 QueuePair *qp = it->second.first;
425 dead_queue_pairs.push_back(qp);
426 qp_conns.erase(it);
427 --num_qp_conn;
428 }
429
430 void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
431 {
432 std::lock_guard l{lock};
433 enqueue_dead_qp_lockless(qpn);
434 }
435
436 void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
437 {
438 std::lock_guard l{lock};
439 auto it = qp_conns.find(qpn);
440 if (it == qp_conns.end()) {
441 lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
442 return;
443 }
444 QueuePair *qp = it->second.first;
445 if (qp->to_dead()) {
446 //
447 // Failed to switch to dead. This is abnormal, but we can't
448 // do anything, so just destroy QP.
449 //
450 dead_queue_pairs.push_back(qp);
451 qp_conns.erase(it);
452 --num_qp_conn;
453 } else {
454 //
455 // Successfully switched to dead, thus keep entry in the map.
456 // But only zero out socked pointer in order to return null from
457 // get_conn_lockless();
458 it->second.second = nullptr;
459 }
460 }
461
462 void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
463 {
464 std::vector<Chunk*> tx_chunks;
465
466 for (int i = 0; i < n; ++i) {
467 ibv_wc* response = &cqe[i];
468
469 // If it's beacon WR, enqueue the QP to be destroyed later
470 if (response->wr_id == BEACON_WRID) {
471 enqueue_dead_qp(response->qp_num);
472 continue;
473 }
474
475 ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len
476 << " status: " << ib->wc_status_to_string(response->status) << dendl;
477
478 if (response->status != IBV_WC_SUCCESS) {
479 switch(response->status) {
480 case IBV_WC_RETRY_EXC_ERR:
481 {
482 perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
483
484 ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
485 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
486 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
487 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
488
489 std::lock_guard l{lock};
490 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
491 if (conn) {
492 ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
493 << conn->get_peer_qpn() << dendl;
494 }
495 }
496 break;
497 case IBV_WC_WR_FLUSH_ERR:
498 {
499 perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
500
501 std::lock_guard l{lock};
502 QueuePair *qp = get_qp_lockless(response->qp_num);
503 if (qp) {
504 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
505 }
506 if (qp && qp->is_dead()) {
507 ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl;
508 } else {
509 lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR,"
510 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
511 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
512 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
513
514 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
515 if (conn) {
516 ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
517 << conn->get_peer_qpn() << dendl;
518 }
519 }
520 }
521 break;
522
523 default:
524 {
525 lderr(cct) << __func__ << " SQ WR return error,"
526 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
527 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
528 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
529
530 std::lock_guard l{lock};
531 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
532 if (conn && conn->is_connected()) {
533 ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state()
534 << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl;
535 conn->fault();
536 } else {
537 ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl;
538 }
539 }
540 break;
541 }
542 }
543
544 auto chunk = reinterpret_cast<Chunk *>(response->wr_id);
545 //TX completion may come either from
546 // 1) regular send message, WCE wr_id points to chunk
547 // 2) 'fin' message, wr_id points to the QP
548 if (ib->get_memory_manager()->is_valid_chunk(chunk)) {
549 tx_chunks.push_back(chunk);
550 } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
551 ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
552 } else {
553 ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
554 ceph_abort();
555 }
556 }
557
558 perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
559 post_tx_buffer(tx_chunks);
560 }
561
562 /**
563 * Add the given Chunks to the given free queue.
564 *
565 * \param[in] chunks
566 * The Chunks to enqueue.
567 * \return
568 * 0 if success or -1 for failure
569 */
570 void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
571 {
572 if (chunks.empty())
573 return ;
574
575 inflight -= chunks.size();
576 ib->get_memory_manager()->return_tx(chunks);
577 ldout(cct, 30) << __func__ << " release " << chunks.size()
578 << " chunks, inflight " << inflight << dendl;
579 notify_pending_workers();
580 }
581
582 void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
583 {
584 perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
585 perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
586
587 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
588 std::lock_guard l{lock};//make sure connected socket alive when pass wc
589
590 for (int i = 0; i < rx_number; ++i) {
591 ibv_wc* response = &cqe[i];
592 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
593 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
594 QueuePair *qp = get_qp_lockless(response->qp_num);
595
596 switch (response->status) {
597 case IBV_WC_SUCCESS:
598 ceph_assert(response->opcode == IBV_WC_RECV);
599 if (!conn) {
600 ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x"
601 << std::hex << chunk << " will be back." << std::dec << dendl;
602 ib->post_chunk_to_pool(chunk);
603 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
604 } else {
605 conn->post_chunks_to_rq(1);
606 polled[conn].push_back(*response);
607
608 if (qp != nullptr && !qp->get_srq()) {
609 qp->remove_rq_wr(chunk);
610 chunk->clear_qp();
611 }
612 }
613 break;
614
615 case IBV_WC_WR_FLUSH_ERR:
616 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
617
618 if (qp) {
619 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
620 }
621 if (qp && qp->is_dead()) {
622 ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl;
623 } else {
624 ldout(cct, 1) << __func__ << " RQ WR return error,"
625 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
626 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
627 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
628 if (conn) {
629 ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: "
630 << conn->get_peer_qpn() << dendl;
631 }
632 }
633
634 ib->post_chunk_to_pool(chunk);
635 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
636 break;
637
638 default:
639 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
640
641 ldout(cct, 1) << __func__ << " RQ WR return error,"
642 << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
643 << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
644 << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
645 if (conn && conn->is_connected())
646 conn->fault();
647
648 ib->post_chunk_to_pool(chunk);
649 perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
650 break;
651 }
652 }
653
654 for (auto &i : polled)
655 i.first->pass_wc(std::move(i.second));
656 polled.clear();
657 }
658
659 RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id)
660 : Worker(c, worker_id),
661 tx_handler(new C_handle_cq_tx(this))
662 {
663 // initialize perf_logger
664 char name[128];
665 sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
666 PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
667
668 plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
669 plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
670 plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
671
672 plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
673 plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
674 plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
675 plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted", NULL, 0, unit_t(UNIT_BYTES));
676 plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns");
677
678 perf_logger = plb.create_perf_counters();
679 cct->get_perfcounters_collection()->add(perf_logger);
680 }
681
682 RDMAWorker::~RDMAWorker()
683 {
684 delete tx_handler;
685 }
686
687 void RDMAWorker::initialize()
688 {
689 ceph_assert(dispatcher);
690 }
691
692 int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
693 const SocketOptions &opt,ServerSocket *sock)
694 {
695 ib->init();
696 dispatcher->polling_start();
697
698 RDMAServerSocketImpl *p;
699 if (cct->_conf->ms_async_rdma_type == "iwarp") {
700 p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
701 } else {
702 p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
703 }
704 int r = p->listen(sa, opt);
705 if (r < 0) {
706 delete p;
707 return r;
708 }
709
710 *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
711 return 0;
712 }
713
714 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
715 {
716 ib->init();
717 dispatcher->polling_start();
718
719 RDMAConnectedSocketImpl* p;
720 if (cct->_conf->ms_async_rdma_type == "iwarp") {
721 p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
722 } else {
723 p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
724 }
725 int r = p->try_connect(addr, opts);
726
727 if (r < 0) {
728 ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
729 delete p;
730 return r;
731 }
732 std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
733 *socket = ConnectedSocket(std::move(csi));
734 return 0;
735 }
736
737 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
738 {
739 ceph_assert(center.in_thread());
740 int r = ib->get_tx_buffers(c, bytes);
741 size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
742 ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
743 dispatcher->inflight += r;
744 if (got >= bytes)
745 return r;
746
747 if (o) {
748 if (!o->is_pending()) {
749 pending_sent_conns.push_back(o);
750 perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1);
751 o->set_pending(1);
752 }
753 dispatcher->make_pending_worker(this);
754 }
755 return r;
756 }
757
758
759 void RDMAWorker::handle_pending_message()
760 {
761 ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
762 while (!pending_sent_conns.empty()) {
763 RDMAConnectedSocketImpl *o = pending_sent_conns.front();
764 pending_sent_conns.pop_front();
765 ssize_t r = o->submit(false);
766 ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
767 if (r < 0) {
768 if (r == -EAGAIN) {
769 pending_sent_conns.push_back(o);
770 dispatcher->make_pending_worker(this);
771 return ;
772 }
773 o->fault();
774 }
775 o->set_pending(0);
776 perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1);
777 }
778 dispatcher->notify_pending_workers();
779 }
780
781 RDMAStack::RDMAStack(CephContext *cct)
782 : NetworkStack(cct), ib(std::make_shared<Infiniband>(cct)),
783 rdma_dispatcher(std::make_shared<RDMADispatcher>(cct, ib))
784 {
785 ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
786
787 unsigned num = get_num_worker();
788 for (unsigned i = 0; i < num; ++i) {
789 RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
790 w->set_dispatcher(rdma_dispatcher);
791 w->set_ib(ib);
792 }
793 ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
794 }
795
796 RDMAStack::~RDMAStack()
797 {
798 if (cct->_conf->ms_async_rdma_enable_hugepage) {
799 unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
800 }
801 }
802
803 void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
804 {
805 threads.resize(i+1);
806 threads[i] = std::thread(func);
807 }
808
809 void RDMAStack::join_worker(unsigned i)
810 {
811 ceph_assert(threads.size() > i && threads[i].joinable());
812 threads[i].join();
813 }