]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/Device.cc
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / msg / async / rdma / Device.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include "Infiniband.h"
18#include "RDMAStack.h"
19#include "Device.h"
20#include "common/errno.h"
21#include "common/debug.h"
22
23#include <poll.h>
24
25#define dout_subsys ceph_subsys_ms
26#undef dout_prefix
27#define dout_prefix *_dout << "IBDevice "
28
29static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
30static const uint32_t CQ_DEPTH = 30000;
31
32Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
33{
34#ifdef HAVE_IBV_EXP
35 union ibv_gid cgid;
36 struct ibv_exp_gid_attr gid_attr;
37 bool malformed = false;
38
39 ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
40 int r = ibv_query_port(ctxt, port_num, port_attr);
41 if (r == -1) {
42 lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
43 ceph_abort();
44 }
45
46 lid = port_attr->lid;
47
48 // search for requested GID in GIDs table
49 ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
50 << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
51 r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
52 "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
53 ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
54 &cgid.raw[ 0], &cgid.raw[ 1],
55 &cgid.raw[ 2], &cgid.raw[ 3],
56 &cgid.raw[ 4], &cgid.raw[ 5],
57 &cgid.raw[ 6], &cgid.raw[ 7],
58 &cgid.raw[ 8], &cgid.raw[ 9],
59 &cgid.raw[10], &cgid.raw[11],
60 &cgid.raw[12], &cgid.raw[13],
61 &cgid.raw[14], &cgid.raw[15]);
62
63 if (r != 16) {
64 ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
65 malformed = true;
66 }
67
68 gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
69
70 for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
71 r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
72 if (r) {
73 lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
74 ceph_abort();
75 }
76 r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
77 if (r) {
78 lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
79 ceph_abort();
80 }
81
82 if (malformed) break; // stay with gid_idx=0
83 if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
84 (memcmp(&gid, &cgid, 16) == 0) ) {
85 ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
86 break;
87 }
88 }
89
90 if (gid_idx == port_attr->gid_tbl_len) {
91 lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
92 ceph_abort();
93 }
94#else
95 int r = ibv_query_port(ctxt, port_num, port_attr);
96 if (r == -1) {
97 lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
98 ceph_abort();
99 }
100
101 lid = port_attr->lid;
102 r = ibv_query_gid(ctxt, port_num, 0, &gid);
103 if (r) {
104 lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl;
105 ceph_abort();
106 }
107#endif
108}
109
110Port::~Port()
111{
112 delete port_attr;
113}
114
115
116Device::Device(CephContext *cct, Infiniband *ib, ibv_device* d)
117 : cct(cct), device(d), lock("ibdev_lock"),
118 async_handler(new C_handle_cq_async(this)), infiniband(ib),
119 device_attr(new ibv_device_attr)
120{
121 if (device == NULL) {
122 lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
123 ceph_abort();
124 }
125 name = ibv_get_device_name(device);
126 ctxt = ibv_open_device(device);
127 if (ctxt == NULL) {
128 lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl;
129 ceph_abort();
130 }
131 int r = ibv_query_device(ctxt, device_attr);
132 if (r == -1) {
133 lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
134 ceph_abort();
135 }
136
137 port_cnt = device_attr->phys_port_cnt;
138 ports = new Port *[port_cnt + 1];
139 assert(ports);
140
141 for (int i = 1; i <= port_cnt; i++) {
142 ports[i] = new Port(cct, ctxt, i);
143 assert(ports[i]);
144 }
145
146 tx_cc = create_comp_channel(cct);
147 assert(tx_cc);
148
149 rx_cc = create_comp_channel(cct);
150 assert(rx_cc);
151
152 assert(NetHandler(cct).set_nonblock(ctxt->async_fd) == 0);
153}
154
155void Device::init(int ibport)
156{
157 Mutex::Locker l(lock);
158
159 verify_port(ibport);
160
161 if (initialized)
162 return;
163
164 pd = new ProtectionDomain(cct, this);
165
166 max_recv_wr = std::min(device_attr->max_srq_wr, (int)cct->_conf->ms_async_rdma_receive_buffers);
167 ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
168
169 max_send_wr = std::min(device_attr->max_qp_wr, (int)cct->_conf->ms_async_rdma_send_buffers);
170 ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
171
172 ldout(cct, 1) << __func__ << " device allow " << device_attr->max_cqe
173 << " completion entries" << dendl;
174
175 memory_manager = new MemoryManager(this, pd,
176 cct->_conf->ms_async_rdma_enable_hugepage);
177 memory_manager->register_rx_tx(
178 cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
179
180 srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
181 post_channel_cluster();
182
183 tx_cq = create_comp_queue(cct, tx_cc);
184 assert(tx_cq);
185
186 rx_cq = create_comp_queue(cct, rx_cc);
187 assert(rx_cq);
188
189 initialized = true;
190
191 ldout(cct, 5) << __func__ << ":" << __LINE__ << " device " << *this << " is initialized" << dendl;
192}
193
194void Device::uninit()
195{
196 Mutex::Locker l(lock);
197
198 if (!initialized)
199 return;
200
201 tx_cc->ack_events();
202 rx_cc->ack_events();
203
204 initialized = false;
205
206 delete rx_cq;
207 delete tx_cq;
208 delete rx_cc;
209 delete tx_cc;
210
211 assert(ibv_destroy_srq(srq) == 0);
212 delete memory_manager;
213 delete pd;
214}
215
216Device::~Device()
217{
218 delete async_handler;
219
220 uninit();
221
222 for (int i = 1; i <= port_cnt; i++)
223 delete ports[i];
224 delete[] ports;
225
226 assert(ibv_close_device(ctxt) == 0);
227 delete device_attr;
228}
229
230void Device::verify_port(int port_num) {
231 if (port_num < 0 || port_num > port_cnt) {
232 lderr(cct) << __func__ << " port not found" << dendl;
233 ceph_abort();
234 }
235
236 Port *port = ports[port_num];
237
238 if (port->get_port_attr()->state == IBV_PORT_ACTIVE) {
239 ldout(cct, 1) << __func__ << " found active port " << port_num << dendl;
240 } else {
241 ldout(cct, 10) << __func__ << " port " << port_num <<
242 " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
243 ceph_abort();
244 }
245}
246
247Port *Device::get_port(int ibport)
248{
249 assert(ibport > 0 && ibport <= port_cnt);
250 return ports[ibport];
251}
252
253/**
254 * Create a new QueuePair. This factory should be used in preference to
255 * the QueuePair constructor directly, since this lets derivatives of
256 * Infiniband, e.g. MockInfiniband (if it existed),
257 * return mocked out QueuePair derivatives.
258 *
259 * \return
260 * QueuePair on success or NULL if init fails
261 * See QueuePair::QueuePair for parameter documentation.
262 */
263Infiniband::QueuePair* Device::create_queue_pair(int port,
264 ibv_qp_type type)
265{
266 Infiniband::QueuePair *qp = new QueuePair(
267 cct, *this, type, port, srq, tx_cq, rx_cq, max_send_wr, max_recv_wr);
268 if (qp->init()) {
269 delete qp;
270 return NULL;
271 }
272 return qp;
273}
274
275/**
276 * Create a shared receive queue. This basically wraps the verbs call.
277 *
278 * \param[in] max_wr
279 * The max number of outstanding work requests in the SRQ.
280 * \param[in] max_sge
281 * The max number of scatter elements per WR.
282 * \return
283 * A valid ibv_srq pointer, or NULL on error.
284 */
285ibv_srq* Device::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
286{
287 ibv_srq_init_attr sia;
288 memset(&sia, 0, sizeof(sia));
289 sia.srq_context = ctxt;
290 sia.attr.max_wr = max_wr;
291 sia.attr.max_sge = max_sge;
292 return ibv_create_srq(pd->pd, &sia);
293}
294
295Infiniband::CompletionChannel* Device::create_comp_channel(CephContext *c)
296{
297 Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
298 if (cc->init()) {
299 delete cc;
300 return NULL;
301 }
302 return cc;
303}
304
305Infiniband::CompletionQueue* Device::create_comp_queue(
306 CephContext *cct, CompletionChannel *cc)
307{
308 Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
309 cct, *this, CQ_DEPTH, cc);
310 if (cq->init()) {
311 delete cq;
312 return NULL;
313 }
314 return cq;
315}
316
317int Device::post_chunk(Chunk* chunk)
318{
319 ibv_sge isge;
320 isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
321 isge.length = chunk->bytes;
322 isge.lkey = chunk->mr->lkey;
323 ibv_recv_wr rx_work_request;
324
325 memset(&rx_work_request, 0, sizeof(rx_work_request));
326 rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
327 rx_work_request.next = NULL;
328 rx_work_request.sg_list = &isge;
329 rx_work_request.num_sge = 1;
330
331 ibv_recv_wr *badWorkRequest;
332 int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
333 if (ret)
334 return -errno;
335 return 0;
336}
337
338int Device::post_channel_cluster()
339{
340 vector<Chunk*> free_chunks;
341 int r = memory_manager->get_channel_buffers(free_chunks, 0);
342 assert(r > 0);
343 for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
344 r = post_chunk(*iter);
345 assert(r == 0);
346 }
347 return 0;
348}
349
350int Device::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
351{
352 return memory_manager->get_send_buffers(c, bytes);
353}
354
355int Device::poll_tx_cq(int n, ibv_wc *wc)
356{
357 if (!initialized)
358 return 0;
359
360 return tx_cq->poll_cq(n, wc);
361}
362
363int Device::poll_rx_cq(int n, ibv_wc *wc)
364{
365 if (!initialized)
366 return 0;
367
368 return rx_cq->poll_cq(n, wc);
369}
370
371void Device::rearm_cqs()
372{
373 int ret;
374
375 if (!initialized)
376 return;
377
378 ret = tx_cq->rearm_notify();
379 assert(!ret);
380
381 ret = rx_cq->rearm_notify();
382 assert(!ret);
383}
384
385void Device::handle_async_event()
386{
387 ibv_async_event async_event;
388
389 ldout(cct, 30) << __func__ << dendl;
390
391 while (!ibv_get_async_event(ctxt, &async_event)) {
392 RDMADispatcher *d = infiniband->get_dispatcher();
393 d->process_async_event(this, async_event);
394
395 ibv_ack_async_event(&async_event);
396 }
397
398 if (errno != EAGAIN) {
399 lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
400 << " " << cpp_strerror(errno) << ")" << dendl;
401 }
402}
403
404
405DeviceList::DeviceList(CephContext *cct, Infiniband *ib)
406 : cct(cct), device_list(ibv_get_device_list(&num))
407{
408 if (device_list == NULL || num == 0) {
409 lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;
410 ceph_abort();
411 }
412 devices = new Device*[num];
413
414 poll_fds = new struct pollfd[3 * num];
415
416 for (int i = 0; i < num; ++i) {
417 struct pollfd *pfd = &poll_fds[i * 3];
418 struct Device *d;
419
420 d = new Device(cct, ib, device_list[i]);
421 devices[i] = d;
422
423 pfd[0].fd = d->tx_cc->get_fd();
424 pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
425 pfd[0].revents = 0;
426
427 pfd[1].fd = d->rx_cc->get_fd();
428 pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
429 pfd[1].revents = 0;
430
431 pfd[2].fd = d->ctxt->async_fd;
432 pfd[2].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
433 pfd[2].revents = 0;
434 }
435}
436
437DeviceList::~DeviceList()
438{
439 delete[] poll_fds;
440
441 for (int i=0; i < num; ++i) {
442 delete devices[i];
443 }
444 delete []devices;
445 ibv_free_device_list(device_list);
446}
447
448Device* DeviceList::get_device(const char* device_name)
449{
450 assert(devices);
451 for (int i = 0; i < num; ++i) {
452 if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
453 return devices[i];
454 }
455 }
456 return NULL;
457}
458
459
460Device* DeviceList::get_device(const struct ibv_context *ctxt)
461{
462 ibv_device *device = ctxt->device;
463
464 assert(devices);
465 for (int i = 0; i < num; ++i) {
466 if (devices[i]->ctxt->device == device) {
467 return devices[i];
468 }
469 }
470
471 return NULL;
472}
473
474int DeviceList::poll_tx(int num_entries, Device **d, ibv_wc *wc)
475{
476 int n = 0;
477
478 for (int i = 0; i < num; i++) {
479 *d = devices[++last_poll_dev % num];
480
481 n = (*d)->poll_tx_cq(num_entries, wc);
482 if (n)
483 break;
484 }
485
486 return n;
487}
488
489int DeviceList::poll_rx(int num_entries, Device **d, ibv_wc *wc)
490{
491 int n = 0;
492
493 for (int i = 0; i < num; i++) {
494 *d = devices[++last_poll_dev % num];
495
496 n = (*d)->poll_rx_cq(num_entries, wc);
497 if (n)
498 break;
499 }
500
501 return n;
502}
503
504int DeviceList::poll_blocking(bool &done)
505{
506 int r = 0;
507 while (!done && r == 0) {
508 r = poll(poll_fds, num * 3, 100);
509 if (r < 0) {
510 r = -errno;
511 lderr(cct) << __func__ << " poll failed " << r << dendl;
512 ceph_abort();
513 }
514 }
515
516 if (r <= 0)
517 return r;
518
519 for (int i = 0; i < num ; i++) {
520 Device *d = devices[i];
521
522 if (d->tx_cc->get_cq_event())
523 ldout(cct, 20) << __func__ << " " << *d << ": got tx cq event" << dendl;
524
525 if (d->rx_cc->get_cq_event())
526 ldout(cct, 20) << __func__ << " " << *d << ": got rx cq event" << dendl;
527
528 d->handle_async_event();
529 }
530
531 return r;
532}
533
534void DeviceList::rearm_notify()
535{
536 for (int i = 0; i < num; i++)
537 devices[i]->rearm_cqs();
538}
539
540void DeviceList::handle_async_event()
541{
542 for (int i = 0; i < num; i++)
543 devices[i]->handle_async_event();
544}
545
546void DeviceList::uninit()
547{
548 for (int i = 0; i < num; i++)
549 devices[i]->uninit();
550}