]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/RDMAStack.cc
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <sys/time.h>
18#include <sys/resource.h>
19
20#include "include/str_list.h"
21#include "common/deleter.h"
22#include "common/Tub.h"
23#include "RDMAStack.h"
24#include "RDMAConnTCP.h"
25#include "Device.h"
26
27#define dout_subsys ceph_subsys_ms
28#undef dout_prefix
29#define dout_prefix *_dout << "RDMAStack "
30
31static Tub<Infiniband> global_infiniband;
32
33RDMADispatcher::~RDMADispatcher()
34{
35 polling_stop();
36
37 ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
38
39 global_infiniband->set_dispatcher(nullptr);
40
41 assert(qp_conns.empty());
42 assert(num_qp_conn == 0);
43 assert(dead_queue_pairs.empty());
44 assert(num_dead_queue_pair == 0);
45}
46
47RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
48 : cct(c), lock("RDMADispatcher::lock"),
49 w_lock("RDMADispatcher::for worker pending list"), stack(s)
50{
51 PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
52
53 plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
54 plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
55 plb.add_u64_counter(l_msgr_rdma_inqueue_rx_chunks, "inqueue_rx_chunks", "The number of inqueue rx chunks");
56
57 plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
58 plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
59 plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
60 plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
61
62 plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
63 plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
64 plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
65
66 plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
67 plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
68
69 plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
70
71
72 plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
73 plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
74
75 perf_logger = plb.create_perf_counters();
76 cct->get_perfcounters_collection()->add(perf_logger);
77}
78
79void RDMADispatcher::polling_start()
80{
81 t = std::thread(&RDMADispatcher::polling, this);
82}
83
84void RDMADispatcher::polling_stop()
85{
86 if (!t.joinable())
87 return;
88
89 done = true;
90 t.join();
91}
92
93void RDMADispatcher::process_async_event(Device *ibdev, ibv_async_event &async_event)
94{
95 perf_logger->inc(l_msgr_rdma_total_async_events);
96 // FIXME: Currently we must ensure no other factor make QP in ERROR state,
97 // otherwise this qp can't be deleted in current cleanup flow.
98 if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
99 perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
100 uint64_t qpn = async_event.element.qp->qp_num;
101 ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
102 << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
103 Mutex::Locker l(lock);
104 RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
105 if (!conn) {
106 ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
107 } else {
108 ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
109 conn->fault();
110 erase_qpn_lockless(qpn);
111 }
112 } else {
113 ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << *ibdev
114 << " evt: " << ibv_event_type_str(async_event.event_type)
115 << dendl;
116 }
117}
118
119void RDMADispatcher::polling()
120{
121 static int MAX_COMPLETIONS = 32;
122 ibv_wc wc[MAX_COMPLETIONS];
123
124 std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
125 std::vector<ibv_wc> tx_cqe;
126 RDMAConnectedSocketImpl *conn = nullptr;
127 utime_t last_inactive = ceph_clock_now();
128 bool rearmed = false;
129 int r = 0;
130
131 while (true) {
132 Device *ibdev;
133
134 int tx_ret = global_infiniband->poll_tx(MAX_COMPLETIONS, &ibdev, wc);
135 if (tx_ret > 0) {
136 ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
137 << " responses."<< dendl;
138 handle_tx_event(ibdev, wc, tx_ret);
139 }
140
141 int rx_ret = global_infiniband->poll_rx(MAX_COMPLETIONS, &ibdev, wc);
142 if (rx_ret > 0) {
143 ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
144 << " responses."<< dendl;
145 perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
146
147 Mutex::Locker l(lock);//make sure connected socket alive when pass wc
148 for (int i = 0; i < rx_ret; ++i) {
149 ibv_wc* response = &wc[i];
150 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
151 ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
152
153 assert(wc[i].opcode == IBV_WC_RECV);
154
155 if (response->status == IBV_WC_SUCCESS) {
156 conn = get_conn_lockless(response->qp_num);
157 if (!conn) {
158 assert(ibdev->is_rx_buffer(chunk->buffer));
159 r = ibdev->post_chunk(chunk);
160 ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
161 assert(r == 0);
162 } else {
163 polled[conn].push_back(*response);
164 }
165 } else {
166 perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
167 ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
168 << ") status(" << response->status << ":"
169 << Infiniband::wc_status_to_string(response->status) << ")" << dendl;
170 assert(ibdev->is_rx_buffer(chunk->buffer));
171 r = ibdev->post_chunk(chunk);
172 if (r) {
173 ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
174 assert(r == 0);
175 }
176
177 conn = get_conn_lockless(response->qp_num);
178 if (conn && conn->is_connected())
179 conn->fault();
180 }
181 }
182
183 for (auto &&i : polled) {
184 perf_logger->inc(l_msgr_rdma_inqueue_rx_chunks, i.second.size());
185 i.first->pass_wc(std::move(i.second));
186 }
187 polled.clear();
188 }
189
190 if (!tx_ret && !rx_ret) {
191 // NOTE: Has TX just transitioned to idle? We should do it when idle!
192 // It's now safe to delete queue pairs (see comment by declaration
193 // for dead_queue_pairs).
194 // Additionally, don't delete qp while outstanding_buffers isn't empty,
195 // because we need to check qp's state before sending
196 perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
197 if (num_dead_queue_pair) {
198 Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
199 while (!dead_queue_pairs.empty()) {
200 ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
201 delete dead_queue_pairs.back();
202 perf_logger->dec(l_msgr_rdma_active_queue_pair);
203 dead_queue_pairs.pop_back();
204 --num_dead_queue_pair;
205 }
206 }
207 if (!num_qp_conn && done)
208 break;
209
210 if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
211 if (!rearmed) {
212 // Clean up cq events after rearm notify ensure no new incoming event
213 // arrived between polling and rearm
214 global_infiniband->rearm_notify();
215 rearmed = true;
216 continue;
217 }
218
219 perf_logger->set(l_msgr_rdma_polling, 0);
220
221 r = global_infiniband->poll_blocking(done);
222 if (r > 0)
223 ldout(cct, 20) << __func__ << " got a cq event." << dendl;
224
225 last_inactive = ceph_clock_now();
226 perf_logger->set(l_msgr_rdma_polling, 1);
227 rearmed = false;
228 }
229 }
230 }
231}
232
233void RDMADispatcher::notify_pending_workers() {
234 if (num_pending_workers) {
235 RDMAWorker *w = nullptr;
236 {
237 Mutex::Locker l(w_lock);
238 if (!pending_workers.empty()) {
239 w = pending_workers.front();
240 pending_workers.pop_front();
241 --num_pending_workers;
242 }
243 }
244 if (w)
245 w->notify_worker();
246 }
247}
248
249int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
250{
251 int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
252 assert(fd >= 0);
253 Mutex::Locker l(lock);
254 assert(!qp_conns.count(qp->get_local_qp_number()));
255 qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
256 ++num_qp_conn;
257 return fd;
258}
259
260RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
261{
262 auto it = qp_conns.find(qp);
263 if (it == qp_conns.end())
264 return nullptr;
265 if (it->second.first->is_dead())
266 return nullptr;
267 return it->second.second;
268}
269
270void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
271{
272 auto it = qp_conns.find(qpn);
273 if (it == qp_conns.end())
274 return ;
275 ++num_dead_queue_pair;
276 dead_queue_pairs.push_back(it->second.first);
277 qp_conns.erase(it);
278 --num_qp_conn;
279}
280
281void RDMADispatcher::erase_qpn(uint32_t qpn)
282{
283 Mutex::Locker l(lock);
284 erase_qpn_lockless(qpn);
285}
286
287void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
288{
289 std::vector<Chunk*> tx_chunks;
290
291 for (int i = 0; i < n; ++i) {
292 ibv_wc* response = &cqe[i];
293 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
294 ldout(cct, 25) << __func__ << " QP: " << response->qp_num
295 << " len: " << response->byte_len << " , addr:" << chunk
296 << " " << global_infiniband->wc_status_to_string(response->status) << dendl;
297
298 if (response->status != IBV_WC_SUCCESS) {
299 perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
300 if (response->status == IBV_WC_RETRY_EXC_ERR) {
301 ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
302 perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
303 } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
304 ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
305 << response->qp_num << " should be down while this WR=" << response->wr_id
306 << " still in flight." << dendl;
307 perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
308 } else {
309 ldout(cct, 1) << __func__ << " send work request returned error for buffer("
310 << response->wr_id << ") status(" << response->status << "): "
311 << global_infiniband->wc_status_to_string(response->status) << dendl;
312 }
313
314 Mutex::Locker l(lock);//make sure connected socket alive when pass wc
315 RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
316
317 if (conn && conn->is_connected()) {
318 ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
319 conn->fault();
320 } else {
321 ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
322 }
323 }
324
325 // FIXME: why not tx?
326 if (ibdev->get_memory_manager()->is_tx_buffer(chunk->buffer))
327 tx_chunks.push_back(chunk);
328 else
329 ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
330 }
331
332 perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
333 post_tx_buffer(ibdev, tx_chunks);
334}
335
336/**
337 * Add the given Chunks to the given free queue.
338 *
339 * \param[in] chunks
340 * The Chunks to enqueue.
341 * \return
342 * 0 if success or -1 for failure
343 */
344void RDMADispatcher::post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks)
345{
346 if (chunks.empty())
347 return ;
348
349 inflight -= chunks.size();
350 ibdev->get_memory_manager()->return_tx(chunks);
351 ldout(cct, 30) << __func__ << " release " << chunks.size()
352 << " chunks, inflight " << inflight << dendl;
353 notify_pending_workers();
354}
355
356
357RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
358 : Worker(c, i), stack(nullptr),
359 tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
360{
361 // initialize perf_logger
362 char name[128];
363 sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
364 PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
365
366 plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
367 plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
368 plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
369 plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving");
370
371 plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
372 plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
373 plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
374 plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted");
375
376 perf_logger = plb.create_perf_counters();
377 cct->get_perfcounters_collection()->add(perf_logger);
378}
379
380RDMAWorker::~RDMAWorker()
381{
382 delete tx_handler;
383}
384
385void RDMAWorker::initialize()
386{
387 if (!dispatcher) {
388 dispatcher = stack->get_dispatcher();
389 }
390}
391
392int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
393{
394 global_infiniband->init();
395
396 auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
397 int r = p->listen(sa, opt);
398 if (r < 0) {
399 delete p;
400 return r;
401 }
402
403 *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
404 return 0;
405}
406
407int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
408{
409 global_infiniband->init();
410
411 RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
412 int r = p->try_connect(addr, opts);
413
414 if (r < 0) {
415 ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
416 delete p;
417 return r;
418 }
419 std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
420 *socket = ConnectedSocket(std::move(csi));
421 return 0;
422}
423
424int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
425{
426 Device *ibdev = o->get_device();
427
428 assert(center.in_thread());
429 int r = ibdev->get_tx_buffers(c, bytes);
430 assert(r >= 0);
431 size_t got = ibdev->get_memory_manager()->get_tx_buffer_size() * r;
432 ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
433 stack->get_dispatcher()->inflight += r;
434 if (got == bytes)
435 return r;
436
437 if (o) {
438 if (pending_sent_conns.back() != o)
439 pending_sent_conns.push_back(o);
440 dispatcher->make_pending_worker(this);
441 }
442 return r;
443}
444
445
446void RDMAWorker::handle_pending_message()
447{
448 ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
449 std::set<RDMAConnectedSocketImpl*> done;
450 while (!pending_sent_conns.empty()) {
451 RDMAConnectedSocketImpl *o = pending_sent_conns.front();
452 pending_sent_conns.pop_front();
453 if (!done.count(o)) {
454 done.insert(o);
455 ssize_t r = o->submit(false);
456 ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
457 if (r < 0) {
458 if (r == -EAGAIN) {
459 pending_sent_conns.push_front(o);
460 dispatcher->make_pending_worker(this);
461 return ;
462 }
463 o->fault();
464 }
465 }
466 }
467
468 dispatcher->notify_pending_workers();
469}
470
471RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
472{
473 //
474 //On RDMA MUST be called before fork
475 //
476 int rc = ibv_fork_init();
477 if (rc) {
478 lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
479 ceph_abort();
480 }
481
482 //Check ulimit
483 struct rlimit limit;
484 getrlimit(RLIMIT_MEMLOCK, &limit);
485 if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
486 lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
487 " We recommend setting this parameter to infinity" << dendl;
488 }
489
490 if (!global_infiniband)
491 global_infiniband.construct(cct);
492 ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
493 dispatcher = new RDMADispatcher(cct, this);
494 global_infiniband->set_dispatcher(dispatcher);
495
496 unsigned num = get_num_worker();
497 for (unsigned i = 0; i < num; ++i) {
498 RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
499 w->set_stack(this);
500 }
501
502 ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
503}
504
505RDMAStack::~RDMAStack()
506{
507 delete dispatcher;
508}
509
510void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
511{
512 threads.resize(i+1);
513 threads[i] = std::thread(func);
514}
515
516void RDMAStack::join_worker(unsigned i)
517{
518 assert(threads.size() > i && threads[i].joinable());
519 threads[i].join();
520}