]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/RDMAStack.cc
update sources to v12.1.1
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
31f18b77 17#include <poll.h>
7c673cae
FG
18#include <sys/time.h>
19#include <sys/resource.h>
20
21#include "include/str_list.h"
22#include "common/deleter.h"
23#include "common/Tub.h"
24#include "RDMAStack.h"
7c673cae
FG
25
26#define dout_subsys ceph_subsys_ms
27#undef dout_prefix
28#define dout_prefix *_dout << "RDMAStack "
29
30static Tub<Infiniband> global_infiniband;
31
32RDMADispatcher::~RDMADispatcher()
33{
31f18b77 34 done = true;
7c673cae 35 polling_stop();
7c673cae
FG
36 ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
37
7c673cae
FG
38 assert(qp_conns.empty());
39 assert(num_qp_conn == 0);
40 assert(dead_queue_pairs.empty());
41 assert(num_dead_queue_pair == 0);
31f18b77
FG
42
43 tx_cc->ack_events();
44 rx_cc->ack_events();
45 delete tx_cq;
46 delete rx_cq;
47 delete tx_cc;
48 delete rx_cc;
49 delete async_handler;
50
51 global_infiniband->set_dispatcher(nullptr);
7c673cae
FG
52}
53
54RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
31f18b77 55 : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
7c673cae
FG
56 w_lock("RDMADispatcher::for worker pending list"), stack(s)
57{
58 PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
59
60 plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
61 plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
62 plb.add_u64_counter(l_msgr_rdma_inqueue_rx_chunks, "inqueue_rx_chunks", "The number of inqueue rx chunks");
63
64 plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
65 plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
66 plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
67 plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
68
69 plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
70 plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
71 plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
72
73 plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
74 plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
75
76 plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
77
78
79 plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
80 plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
81
82 perf_logger = plb.create_perf_counters();
83 cct->get_perfcounters_collection()->add(perf_logger);
84}
85
86void RDMADispatcher::polling_start()
87{
31f18b77
FG
88 tx_cc = global_infiniband->create_comp_channel(cct);
89 assert(tx_cc);
90 rx_cc = global_infiniband->create_comp_channel(cct);
91 assert(rx_cc);
92 tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
93 assert(tx_cq);
94 rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
95 assert(rx_cq);
96
7c673cae
FG
97 t = std::thread(&RDMADispatcher::polling, this);
98}
99
100void RDMADispatcher::polling_stop()
101{
31f18b77
FG
102 if (t.joinable())
103 t.join();
7c673cae
FG
104}
105
31f18b77 106void RDMADispatcher::handle_async_event()
7c673cae 107{
31f18b77
FG
108 ldout(cct, 30) << __func__ << dendl;
109 while (1) {
110 ibv_async_event async_event;
111 if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
112 if (errno != EAGAIN)
113 lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
114 << " " << cpp_strerror(errno) << ")" << dendl;
115 return;
116 }
117 perf_logger->inc(l_msgr_rdma_total_async_events);
118 // FIXME: Currently we must ensure no other factor make QP in ERROR state,
119 // otherwise this qp can't be deleted in current cleanup flow.
120 if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
121 perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
122 uint64_t qpn = async_event.element.qp->qp_num;
123 ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
124 << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
125 Mutex::Locker l(lock);
126 RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
127 if (!conn) {
128 ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
129 } else {
130 ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
131 conn->fault();
132 erase_qpn_lockless(qpn);
133 }
7c673cae 134 } else {
31f18b77
FG
135 ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
136 << " evt: " << ibv_event_type_str(async_event.event_type)
137 << dendl;
7c673cae 138 }
31f18b77 139 ibv_ack_async_event(&async_event);
7c673cae
FG
140 }
141}
142
143void RDMADispatcher::polling()
144{
145 static int MAX_COMPLETIONS = 32;
146 ibv_wc wc[MAX_COMPLETIONS];
147
148 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
149 std::vector<ibv_wc> tx_cqe;
31f18b77 150 ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
7c673cae
FG
151 RDMAConnectedSocketImpl *conn = nullptr;
152 utime_t last_inactive = ceph_clock_now();
153 bool rearmed = false;
154 int r = 0;
155
156 while (true) {
31f18b77 157 int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
7c673cae
FG
158 if (tx_ret > 0) {
159 ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
160 << " responses."<< dendl;
31f18b77 161 handle_tx_event(wc, tx_ret);
7c673cae
FG
162 }
163
31f18b77 164 int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
7c673cae 165 if (rx_ret > 0) {
31f18b77 166 ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
7c673cae
FG
167 << " responses."<< dendl;
168 perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
169
170 Mutex::Locker l(lock);//make sure connected socket alive when pass wc
171 for (int i = 0; i < rx_ret; ++i) {
172 ibv_wc* response = &wc[i];
173 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
174 ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
175
176 assert(wc[i].opcode == IBV_WC_RECV);
177
178 if (response->status == IBV_WC_SUCCESS) {
179 conn = get_conn_lockless(response->qp_num);
180 if (!conn) {
31f18b77
FG
181 assert(global_infiniband->is_rx_buffer(chunk->buffer));
182 r = global_infiniband->post_chunk(chunk);
7c673cae
FG
183 ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
184 assert(r == 0);
185 } else {
186 polled[conn].push_back(*response);
187 }
188 } else {
189 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
190 ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
191 << ") status(" << response->status << ":"
31f18b77
FG
192 << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
193 assert(global_infiniband->is_rx_buffer(chunk->buffer));
194 r = global_infiniband->post_chunk(chunk);
7c673cae
FG
195 if (r) {
196 ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
197 assert(r == 0);
198 }
199
200 conn = get_conn_lockless(response->qp_num);
201 if (conn && conn->is_connected())
202 conn->fault();
203 }
204 }
205
206 for (auto &&i : polled) {
207 perf_logger->inc(l_msgr_rdma_inqueue_rx_chunks, i.second.size());
208 i.first->pass_wc(std::move(i.second));
209 }
210 polled.clear();
211 }
212
213 if (!tx_ret && !rx_ret) {
214 // NOTE: Has TX just transitioned to idle? We should do it when idle!
215 // It's now safe to delete queue pairs (see comment by declaration
216 // for dead_queue_pairs).
217 // Additionally, don't delete qp while outstanding_buffers isn't empty,
218 // because we need to check qp's state before sending
219 perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
220 if (num_dead_queue_pair) {
221 Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
222 while (!dead_queue_pairs.empty()) {
223 ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
224 delete dead_queue_pairs.back();
225 perf_logger->dec(l_msgr_rdma_active_queue_pair);
226 dead_queue_pairs.pop_back();
227 --num_dead_queue_pair;
228 }
229 }
230 if (!num_qp_conn && done)
231 break;
232
233 if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
31f18b77 234 handle_async_event();
7c673cae
FG
235 if (!rearmed) {
236 // Clean up cq events after rearm notify ensure no new incoming event
237 // arrived between polling and rearm
31f18b77
FG
238 tx_cq->rearm_notify();
239 rx_cq->rearm_notify();
7c673cae
FG
240 rearmed = true;
241 continue;
242 }
243
31f18b77
FG
244 struct pollfd channel_poll[2];
245 channel_poll[0].fd = tx_cc->get_fd();
246 channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
247 channel_poll[0].revents = 0;
248 channel_poll[1].fd = rx_cc->get_fd();
249 channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
250 channel_poll[1].revents = 0;
251 r = 0;
7c673cae 252 perf_logger->set(l_msgr_rdma_polling, 0);
31f18b77
FG
253 while (!done && r == 0) {
254 r = poll(channel_poll, 2, 100);
255 if (r < 0) {
256 r = -errno;
257 lderr(cct) << __func__ << " poll failed " << r << dendl;
258 ceph_abort();
259 }
260 }
261 if (r > 0 && tx_cc->get_cq_event())
262 ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
263 if (r > 0 && rx_cc->get_cq_event())
264 ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
7c673cae
FG
265 last_inactive = ceph_clock_now();
266 perf_logger->set(l_msgr_rdma_polling, 1);
267 rearmed = false;
268 }
269 }
270 }
271}
272
273void RDMADispatcher::notify_pending_workers() {
274 if (num_pending_workers) {
275 RDMAWorker *w = nullptr;
276 {
277 Mutex::Locker l(w_lock);
278 if (!pending_workers.empty()) {
279 w = pending_workers.front();
280 pending_workers.pop_front();
281 --num_pending_workers;
282 }
283 }
284 if (w)
285 w->notify_worker();
286 }
287}
288
289int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
290{
291 int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
292 assert(fd >= 0);
293 Mutex::Locker l(lock);
294 assert(!qp_conns.count(qp->get_local_qp_number()));
295 qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
296 ++num_qp_conn;
297 return fd;
298}
299
300RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
301{
302 auto it = qp_conns.find(qp);
303 if (it == qp_conns.end())
304 return nullptr;
305 if (it->second.first->is_dead())
306 return nullptr;
307 return it->second.second;
308}
309
310void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
311{
312 auto it = qp_conns.find(qpn);
313 if (it == qp_conns.end())
314 return ;
315 ++num_dead_queue_pair;
316 dead_queue_pairs.push_back(it->second.first);
317 qp_conns.erase(it);
318 --num_qp_conn;
319}
320
321void RDMADispatcher::erase_qpn(uint32_t qpn)
322{
323 Mutex::Locker l(lock);
324 erase_qpn_lockless(qpn);
325}
326
31f18b77 327void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
7c673cae
FG
328{
329 std::vector<Chunk*> tx_chunks;
330
331 for (int i = 0; i < n; ++i) {
332 ibv_wc* response = &cqe[i];
333 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
334 ldout(cct, 25) << __func__ << " QP: " << response->qp_num
335 << " len: " << response->byte_len << " , addr:" << chunk
336 << " " << global_infiniband->wc_status_to_string(response->status) << dendl;
337
338 if (response->status != IBV_WC_SUCCESS) {
339 perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
340 if (response->status == IBV_WC_RETRY_EXC_ERR) {
341 ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
342 perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
343 } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
344 ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
345 << response->qp_num << " should be down while this WR=" << response->wr_id
346 << " still in flight." << dendl;
347 perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
348 } else {
349 ldout(cct, 1) << __func__ << " send work request returned error for buffer("
350 << response->wr_id << ") status(" << response->status << "): "
351 << global_infiniband->wc_status_to_string(response->status) << dendl;
352 }
353
354 Mutex::Locker l(lock);//make sure connected socket alive when pass wc
355 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
356
357 if (conn && conn->is_connected()) {
358 ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
359 conn->fault();
360 } else {
361 ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
362 }
363 }
364
31f18b77
FG
365 //TX completion may come either from regular send message or from 'fin' message.
366 //In the case of 'fin' wr_id points to the QueuePair.
367 if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
7c673cae 368 tx_chunks.push_back(chunk);
31f18b77
FG
369 } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
370 ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
371 } else {
7c673cae 372 ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
31f18b77
FG
373 ceph_abort();
374 }
7c673cae
FG
375 }
376
377 perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
31f18b77 378 post_tx_buffer(tx_chunks);
7c673cae
FG
379}
380
381/**
382 * Add the given Chunks to the given free queue.
383 *
384 * \param[in] chunks
385 * The Chunks to enqueue.
386 * \return
387 * 0 if success or -1 for failure
388 */
31f18b77 389void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
7c673cae
FG
390{
391 if (chunks.empty())
392 return ;
393
394 inflight -= chunks.size();
31f18b77 395 global_infiniband->get_memory_manager()->return_tx(chunks);
7c673cae
FG
396 ldout(cct, 30) << __func__ << " release " << chunks.size()
397 << " chunks, inflight " << inflight << dendl;
398 notify_pending_workers();
399}
400
401
402RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
403 : Worker(c, i), stack(nullptr),
404 tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
405{
406 // initialize perf_logger
407 char name[128];
408 sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
409 PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
410
411 plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
412 plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
413 plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
414 plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving");
415
416 plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
417 plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
418 plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
419 plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted");
224ce89b 420 plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns");
7c673cae
FG
421
422 perf_logger = plb.create_perf_counters();
423 cct->get_perfcounters_collection()->add(perf_logger);
424}
425
426RDMAWorker::~RDMAWorker()
427{
428 delete tx_handler;
429}
430
431void RDMAWorker::initialize()
432{
433 if (!dispatcher) {
434 dispatcher = stack->get_dispatcher();
435 }
436}
437
438int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
439{
440 global_infiniband->init();
441
31f18b77 442 auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
7c673cae
FG
443 int r = p->listen(sa, opt);
444 if (r < 0) {
445 delete p;
446 return r;
447 }
448
449 *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
450 return 0;
451}
452
453int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
454{
455 global_infiniband->init();
456
457 RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
458 int r = p->try_connect(addr, opts);
459
460 if (r < 0) {
461 ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
462 delete p;
463 return r;
464 }
465 std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
466 *socket = ConnectedSocket(std::move(csi));
467 return 0;
468}
469
470int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
471{
7c673cae 472 assert(center.in_thread());
31f18b77 473 int r = global_infiniband->get_tx_buffers(c, bytes);
7c673cae 474 assert(r >= 0);
31f18b77 475 size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
7c673cae
FG
476 ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
477 stack->get_dispatcher()->inflight += r;
224ce89b 478 if (got >= bytes)
7c673cae
FG
479 return r;
480
481 if (o) {
224ce89b 482 if (!o->is_pending()) {
7c673cae 483 pending_sent_conns.push_back(o);
224ce89b
WB
484 perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1);
485 o->set_pending(1);
486 }
7c673cae
FG
487 dispatcher->make_pending_worker(this);
488 }
489 return r;
490}
491
492
493void RDMAWorker::handle_pending_message()
494{
495 ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
7c673cae
FG
496 while (!pending_sent_conns.empty()) {
497 RDMAConnectedSocketImpl *o = pending_sent_conns.front();
498 pending_sent_conns.pop_front();
224ce89b
WB
499 ssize_t r = o->submit(false);
500 ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
501 if (r < 0) {
502 if (r == -EAGAIN) {
503 pending_sent_conns.push_back(o);
504 dispatcher->make_pending_worker(this);
505 return ;
7c673cae 506 }
224ce89b 507 o->fault();
7c673cae 508 }
224ce89b
WB
509 o->set_pending(0);
510 perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1);
7c673cae 511 }
7c673cae
FG
512 dispatcher->notify_pending_workers();
513}
514
515RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
516{
517 //
518 //On RDMA MUST be called before fork
519 //
31f18b77 520
7c673cae
FG
521 int rc = ibv_fork_init();
522 if (rc) {
523 lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
524 ceph_abort();
525 }
526
31f18b77
FG
527 ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
528 if (cct->_conf->ms_async_rdma_enable_hugepage) {
529 rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
530 ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
531 if (rc) {
532 lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
533 ceph_abort();
534 }
535 }
536
7c673cae
FG
537 //Check ulimit
538 struct rlimit limit;
539 getrlimit(RLIMIT_MEMLOCK, &limit);
540 if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
541 lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
542 " We recommend setting this parameter to infinity" << dendl;
543 }
544
545 if (!global_infiniband)
31f18b77
FG
546 global_infiniband.construct(
547 cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
7c673cae
FG
548 ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
549 dispatcher = new RDMADispatcher(cct, this);
550 global_infiniband->set_dispatcher(dispatcher);
551
552 unsigned num = get_num_worker();
553 for (unsigned i = 0; i < num; ++i) {
554 RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
555 w->set_stack(this);
556 }
557
558 ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
559}
560
561RDMAStack::~RDMAStack()
562{
31f18b77
FG
563 if (cct->_conf->ms_async_rdma_enable_hugepage) {
564 unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
565 }
566
7c673cae
FG
567 delete dispatcher;
568}
569
570void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
571{
572 threads.resize(i+1);
573 threads[i] = std::thread(func);
574}
575
576void RDMAStack::join_worker(unsigned i)
577{
578 assert(threads.size() > i && threads[i].joinable());
579 threads[i].join();
580}