]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/Infiniband.cc
update sources to v12.1.1
[ceph.git] / ceph / src / msg / async / rdma / Infiniband.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include "Infiniband.h"
7c673cae
FG
18#include "common/errno.h"
19#include "common/debug.h"
31f18b77 20#include "RDMAStack.h"
7c673cae
FG
21
22#define dout_subsys ceph_subsys_ms
23#undef dout_prefix
24#define dout_prefix *_dout << "Infiniband "
25
31f18b77 26static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
7c673cae 27static const uint32_t MAX_INLINE_DATA = 0;
31f18b77
FG
28static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
29static const uint32_t CQ_DEPTH = 30000;
30
31Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
32{
33#ifdef HAVE_IBV_EXP
34 union ibv_gid cgid;
35 struct ibv_exp_gid_attr gid_attr;
36 bool malformed = false;
37
38 ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
39 int r = ibv_query_port(ctxt, port_num, port_attr);
40 if (r == -1) {
41 lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
42 ceph_abort();
43 }
44
45 lid = port_attr->lid;
46
47 // search for requested GID in GIDs table
48 ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
49 << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
50 r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
51 "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
52 ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
53 &cgid.raw[ 0], &cgid.raw[ 1],
54 &cgid.raw[ 2], &cgid.raw[ 3],
55 &cgid.raw[ 4], &cgid.raw[ 5],
56 &cgid.raw[ 6], &cgid.raw[ 7],
57 &cgid.raw[ 8], &cgid.raw[ 9],
58 &cgid.raw[10], &cgid.raw[11],
59 &cgid.raw[12], &cgid.raw[13],
60 &cgid.raw[14], &cgid.raw[15]);
61
62 if (r != 16) {
63 ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
64 malformed = true;
65 }
66
67 gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
68
69 for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
70 r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
71 if (r) {
72 lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
73 ceph_abort();
74 }
75 r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
76 if (r) {
77 lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
78 ceph_abort();
79 }
80
81 if (malformed) break; // stay with gid_idx=0
82 if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
83 (memcmp(&gid, &cgid, 16) == 0) ) {
84 ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
85 break;
86 }
87 }
88
89 if (gid_idx == port_attr->gid_tbl_len) {
90 lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
91 ceph_abort();
92 }
93#else
94 int r = ibv_query_port(ctxt, port_num, port_attr);
95 if (r == -1) {
96 lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
97 ceph_abort();
98 }
99
100 lid = port_attr->lid;
101 r = ibv_query_gid(ctxt, port_num, 0, &gid);
102 if (r) {
103 lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl;
104 ceph_abort();
105 }
106#endif
107}
108
109
110Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
111{
112 if (device == NULL) {
113 lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
114 ceph_abort();
115 }
116 name = ibv_get_device_name(device);
117 ctxt = ibv_open_device(device);
118 if (ctxt == NULL) {
119 lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl;
120 ceph_abort();
121 }
122 int r = ibv_query_device(ctxt, device_attr);
123 if (r == -1) {
124 lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
125 ceph_abort();
126 }
127}
128
129void Device::binding_port(CephContext *cct, int port_num) {
130 port_cnt = device_attr->phys_port_cnt;
131 for (uint8_t i = 0; i < port_cnt; ++i) {
132 Port *port = new Port(cct, ctxt, i+1);
133 if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
134 active_port = port;
135 ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
136 break;
137 } else {
138 ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
139 }
140 delete port;
141 }
142 if (nullptr == active_port) {
143 lderr(cct) << __func__ << " port not found" << dendl;
144 assert(active_port);
145 }
146}
147
7c673cae
FG
148
149Infiniband::QueuePair::QueuePair(
31f18b77 150 CephContext *c, Infiniband& infiniband, ibv_qp_type type,
7c673cae
FG
151 int port, ibv_srq *srq,
152 Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
153 uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
31f18b77 154: cct(c), infiniband(infiniband),
7c673cae 155 type(type),
31f18b77 156 ctxt(infiniband.device->ctxt),
7c673cae 157 ib_physical_port(port),
31f18b77 158 pd(infiniband.pd->pd),
7c673cae
FG
159 srq(srq),
160 qp(NULL),
161 txcq(txcq),
162 rxcq(rxcq),
163 initial_psn(0),
164 max_send_wr(max_send_wr),
165 max_recv_wr(max_recv_wr),
166 q_key(q_key),
167 dead(false)
168{
169 initial_psn = lrand48() & 0xffffff;
170 if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
171 lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
172 ceph_abort();
173 }
31f18b77 174 pd = infiniband.pd->pd;
7c673cae
FG
175}
176
177int Infiniband::QueuePair::init()
178{
179 ldout(cct, 20) << __func__ << " started." << dendl;
180 ibv_qp_init_attr qpia;
181 memset(&qpia, 0, sizeof(qpia));
182 qpia.send_cq = txcq->get_cq();
183 qpia.recv_cq = rxcq->get_cq();
184 qpia.srq = srq; // use the same shared receive queue
185 qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
186 qpia.cap.max_send_sge = 1; // max send scatter-gather elements
187 qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
188 qpia.qp_type = type; // RC, UC, UD, or XRC
189 qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
190
191 qp = ibv_create_qp(pd, &qpia);
192 if (qp == NULL) {
193 lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
194 if (errno == ENOMEM) {
195 lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers, "
196 " ms_async_rdma_send_buffers or"
197 " ms_async_rdma_buffer_size" << dendl;
198 }
199 return -1;
200 }
201
202 ldout(cct, 20) << __func__ << " successfully create queue pair: "
203 << "qp=" << qp << dendl;
204
205 // move from RESET to INIT state
206 ibv_qp_attr qpa;
207 memset(&qpa, 0, sizeof(qpa));
208 qpa.qp_state = IBV_QPS_INIT;
209 qpa.pkey_index = 0;
210 qpa.port_num = (uint8_t)(ib_physical_port);
211 qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
212 qpa.qkey = q_key;
213
214 int mask = IBV_QP_STATE | IBV_QP_PORT;
215 switch (type) {
216 case IBV_QPT_RC:
217 mask |= IBV_QP_ACCESS_FLAGS;
218 mask |= IBV_QP_PKEY_INDEX;
219 break;
220 case IBV_QPT_UD:
221 mask |= IBV_QP_QKEY;
222 mask |= IBV_QP_PKEY_INDEX;
223 break;
224 case IBV_QPT_RAW_PACKET:
225 break;
226 default:
227 ceph_abort();
228 }
229
230 int ret = ibv_modify_qp(qp, &qpa, mask);
231 if (ret) {
232 ibv_destroy_qp(qp);
233 lderr(cct) << __func__ << " failed to transition to INIT state: "
234 << cpp_strerror(errno) << dendl;
235 return -1;
236 }
237 ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"
238 << " qp=" << qp << dendl;
239 return 0;
240}
241
242/**
243 * Change RC QueuePair into the ERROR state. This is necessary modify
244 * the Queue Pair into the Error state and poll all of the relevant
245 * Work Completions prior to destroying a Queue Pair.
246 * Since destroying a Queue Pair does not guarantee that its Work
247 * Completions are removed from the CQ upon destruction. Even if the
248 * Work Completions are already in the CQ, it might not be possible to
249 * retrieve them. If the Queue Pair is associated with an SRQ, it is
250 * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
251 *
252 * \return
253 * -errno if the QueuePair can't switch to ERROR
254 * 0 for success.
255 */
256int Infiniband::QueuePair::to_dead()
257{
258 if (dead)
259 return 0;
260 ibv_qp_attr qpa;
261 memset(&qpa, 0, sizeof(qpa));
262 qpa.qp_state = IBV_QPS_ERR;
263
264 int mask = IBV_QP_STATE;
265 int ret = ibv_modify_qp(qp, &qpa, mask);
266 if (ret) {
267 lderr(cct) << __func__ << " failed to transition to ERROR state: "
268 << cpp_strerror(errno) << dendl;
269 return -errno;
270 }
271 dead = true;
272 return ret;
273}
274
275int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
276{
277 ibv_qp_attr qpa;
278 ibv_qp_init_attr qpia;
279
280 int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
281 if (r) {
282 lderr(cct) << __func__ << " failed to query qp: "
283 << cpp_strerror(errno) << dendl;
284 return -1;
285 }
286
287 if (rqp)
288 *rqp = qpa.dest_qp_num;
289 return 0;
290}
291
292/**
293 * Get the remote infiniband address for this QueuePair, as set in #plumb().
294 * LIDs are "local IDs" in infiniband terminology. They are short, locally
295 * routable addresses.
296 */
297int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
298{
299 ibv_qp_attr qpa;
300 ibv_qp_init_attr qpia;
301
302 int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
303 if (r) {
304 lderr(cct) << __func__ << " failed to query qp: "
305 << cpp_strerror(errno) << dendl;
306 return -1;
307 }
308
309 if (lid)
310 *lid = qpa.ah_attr.dlid;
311 return 0;
312}
313
314/**
315 * Get the state of a QueuePair.
316 */
317int Infiniband::QueuePair::get_state() const
318{
319 ibv_qp_attr qpa;
320 ibv_qp_init_attr qpia;
321
322 int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
323 if (r) {
324 lderr(cct) << __func__ << " failed to get state: "
325 << cpp_strerror(errno) << dendl;
326 return -1;
327 }
328 return qpa.qp_state;
329}
330
331/**
332 * Return true if the queue pair is in an error state, false otherwise.
333 */
334bool Infiniband::QueuePair::is_error() const
335{
336 ibv_qp_attr qpa;
337 ibv_qp_init_attr qpia;
338
339 int r = ibv_query_qp(qp, &qpa, -1, &qpia);
340 if (r) {
341 lderr(cct) << __func__ << " failed to get state: "
342 << cpp_strerror(errno) << dendl;
343 return true;
344 }
345 return qpa.cur_qp_state == IBV_QPS_ERR;
346}
347
348
31f18b77
FG
349Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
350 : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
7c673cae
FG
351{
352}
353
354Infiniband::CompletionChannel::~CompletionChannel()
355{
356 if (channel) {
357 int r = ibv_destroy_comp_channel(channel);
358 if (r < 0)
359 lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
360 assert(r == 0);
361 }
362}
363
364int Infiniband::CompletionChannel::init()
365{
366 ldout(cct, 20) << __func__ << " started." << dendl;
31f18b77 367 channel = ibv_create_comp_channel(infiniband.device->ctxt);
7c673cae
FG
368 if (!channel) {
369 lderr(cct) << __func__ << " failed to create receive completion channel: "
370 << cpp_strerror(errno) << dendl;
371 return -1;
372 }
373 int rc = NetHandler(cct).set_nonblock(channel->fd);
374 if (rc < 0) {
375 ibv_destroy_comp_channel(channel);
376 return -1;
377 }
378 return 0;
379}
380
381void Infiniband::CompletionChannel::ack_events()
382{
383 ibv_ack_cq_events(cq, cq_events_that_need_ack);
384 cq_events_that_need_ack = 0;
385}
386
387bool Infiniband::CompletionChannel::get_cq_event()
388{
389 ibv_cq *cq = NULL;
390 void *ev_ctx;
391 if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
392 if (errno != EAGAIN && errno != EINTR)
393 lderr(cct) << __func__ << " failed to retrieve CQ event: "
394 << cpp_strerror(errno) << dendl;
395 return false;
396 }
397
398 /* accumulate number of cq events that need to
399 * * be acked, and periodically ack them
400 * */
401 if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
402 ldout(cct, 20) << __func__ << " ack aq events." << dendl;
403 ibv_ack_cq_events(cq, MAX_ACK_EVENT);
404 cq_events_that_need_ack = 0;
405 }
406
407 return true;
408}
409
410
411Infiniband::CompletionQueue::~CompletionQueue()
412{
413 if (cq) {
414 int r = ibv_destroy_cq(cq);
415 if (r < 0)
416 lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
417 assert(r == 0);
418 }
419}
420
421int Infiniband::CompletionQueue::init()
422{
31f18b77 423 cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
7c673cae
FG
424 if (!cq) {
425 lderr(cct) << __func__ << " failed to create receive completion queue: "
426 << cpp_strerror(errno) << dendl;
427 return -1;
428 }
429
430 if (ibv_req_notify_cq(cq, 0)) {
431 lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
432 ibv_destroy_cq(cq);
433 cq = nullptr;
434 return -1;
435 }
436
437 channel->bind_cq(cq);
438 ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
439 return 0;
440}
441
442int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
443{
444 ldout(cct, 20) << __func__ << " started." << dendl;
445 int r = ibv_req_notify_cq(cq, 0);
446 if (r < 0)
447 lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
448 return r;
449}
450
451int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
452 int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
453 if (r < 0) {
454 lderr(cct) << __func__ << " poll_completion_queue occur met error: "
455 << cpp_strerror(errno) << dendl;
456 return -1;
457 }
458 return r;
459}
460
461
462Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
463 : pd(ibv_alloc_pd(device->ctxt))
464{
465 if (pd == NULL) {
466 lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
467 ceph_abort();
468 }
469}
470
471Infiniband::ProtectionDomain::~ProtectionDomain()
472{
224ce89b 473 ibv_dealloc_pd(pd);
7c673cae
FG
474}
475
476
477Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t len, char* b)
478 : mr(m), bytes(len), offset(0), buffer(b)
479{
480}
481
482Infiniband::MemoryManager::Chunk::~Chunk()
483{
7c673cae
FG
484}
485
486void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o)
487{
488 offset = o;
489}
490
491uint32_t Infiniband::MemoryManager::Chunk::get_offset()
492{
493 return offset;
494}
495
496void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b)
497{
498 bound = b;
499}
500
501void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
502{
503 offset = 0;
504 bound = b;
505}
506
507uint32_t Infiniband::MemoryManager::Chunk::get_bound()
508{
509 return bound;
510}
511
512uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
513{
514 uint32_t left = bound - offset;
515 if (left >= len) {
516 memcpy(buf, buffer+offset, len);
517 offset += len;
518 return len;
519 } else {
520 memcpy(buf, buffer+offset, left);
521 offset = 0;
522 bound = 0;
523 return left;
524 }
525}
526
527uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
528{
529 uint32_t left = bytes - offset;
530 if (left >= len) {
531 memcpy(buffer+offset, buf, len);
532 offset += len;
533 return len;
534 } else {
535 memcpy(buffer+offset, buf, left);
536 offset = bytes;
537 return left;
538 }
539}
540
541bool Infiniband::MemoryManager::Chunk::full()
542{
543 return offset == bytes;
544}
545
546bool Infiniband::MemoryManager::Chunk::over()
547{
548 return Infiniband::MemoryManager::Chunk::offset == bound;
549}
550
551void Infiniband::MemoryManager::Chunk::clear()
552{
553 offset = 0;
554 bound = 0;
555}
556
31f18b77
FG
557void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
558{
559 ib->post_chunk(this);
560}
561
7c673cae
FG
562Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
563 : manager(m), buffer_size(s), lock("cluster_lock")
564{
565}
566
567Infiniband::MemoryManager::Cluster::~Cluster()
568{
224ce89b
WB
569 int r = ibv_dereg_mr(chunk_base->mr);
570 assert(r == 0);
7c673cae
FG
571 const auto chunk_end = chunk_base + num_chunk;
572 for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
573 chunk->~Chunk();
574 }
575
576 ::free(chunk_base);
577 if (manager.enabled_huge_page)
578 manager.free_huge_pages(base);
579 else
580 ::free(base);
581}
582
583int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
584{
585 assert(!base);
586 num_chunk = num;
587 uint32_t bytes = buffer_size * num;
588 if (manager.enabled_huge_page) {
589 base = (char*)manager.malloc_huge_pages(bytes);
590 } else {
591 base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
592 }
593 end = base + bytes;
594 assert(base);
595 chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
596 memset(chunk_base, 0, sizeof(Chunk) * num);
597 free_chunks.reserve(num);
224ce89b
WB
598 ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
599 assert(m);
7c673cae
FG
600 Chunk* chunk = chunk_base;
601 for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
7c673cae
FG
602 new(chunk) Chunk(m, buffer_size, base+offset);
603 free_chunks.push_back(chunk);
604 chunk++;
605 }
606 return 0;
607}
608
609void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
610{
611 Mutex::Locker l(lock);
612 for (auto c : ck) {
613 c->clear();
614 free_chunks.push_back(c);
615 }
616}
617
618int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t bytes)
619{
620 uint32_t num = bytes / buffer_size + 1;
621 if (bytes % buffer_size == 0)
622 --num;
623 int r = num;
624 Mutex::Locker l(lock);
625 if (free_chunks.empty())
626 return 0;
627 if (!bytes) {
628 r = free_chunks.size();
629 for (auto c : free_chunks)
630 chunks.push_back(c);
631 free_chunks.clear();
632 return r;
633 }
634 if (free_chunks.size() < num) {
635 num = free_chunks.size();
636 r = num;
637 }
638 for (uint32_t i = 0; i < num; ++i) {
639 chunks.push_back(free_chunks.back());
640 free_chunks.pop_back();
641 }
642 return r;
643}
644
645
646Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage)
647 : device(d), pd(p)
648{
649 enabled_huge_page = hugepage;
650}
651
652Infiniband::MemoryManager::~MemoryManager()
653{
654 if (channel)
655 delete channel;
656 if (send)
657 delete send;
658}
659
660void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
661{
662 size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
663 char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
664 if (ptr == MAP_FAILED) {
665 ptr = (char *)malloc(real_size);
666 if (ptr == NULL) return NULL;
667 real_size = 0;
668 }
669 *((size_t *)ptr) = real_size;
670 return ptr + HUGE_PAGE_SIZE;
671}
672
673void Infiniband::MemoryManager::free_huge_pages(void *ptr)
674{
675 if (ptr == NULL) return;
676 void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
677 size_t real_size = *((size_t *)real_ptr);
678 assert(real_size % HUGE_PAGE_SIZE == 0);
679 if (real_size != 0)
680 munmap(real_ptr, real_size);
681 else
682 free(real_ptr);
683}
684
685void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num)
686{
687 assert(device);
688 assert(pd);
689 channel = new Cluster(*this, size);
690 channel->fill(rx_num);
691
692 send = new Cluster(*this, size);
693 send->fill(tx_num);
694}
695
696void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
697{
698 send->take_back(chunks);
699}
700
701int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
702{
703 return send->get_buffers(c, bytes);
704}
705
706int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes)
707{
708 return channel->get_buffers(chunks, bytes);
709}
710
711
31f18b77
FG
712Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
713 : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
7c673cae
FG
714{
715}
716
7c673cae
FG
717void Infiniband::init()
718{
719 Mutex::Locker l(lock);
720
721 if (initialized)
722 return;
723
31f18b77 724 device_list = new DeviceList(cct);
7c673cae
FG
725 initialized = true;
726
31f18b77
FG
727 device = device_list->get_device(device_name.c_str());
728 device->binding_port(cct, port_num);
729 assert(device);
730 ib_physical_port = device->active_port->get_port_num();
731 pd = new ProtectionDomain(cct, device);
732 assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
733
734 max_recv_wr = device->device_attr->max_srq_wr;
735 if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
736 max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
737 ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
738 } else {
739 ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
740 }
741
742 max_send_wr = device->device_attr->max_qp_wr;
743 if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
744 max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
745 ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
746 } else {
747 ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
748 }
749
750 ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
751 << " completion entries" << dendl;
752
753 memory_manager = new MemoryManager(device, pd,
754 cct->_conf->ms_async_rdma_enable_hugepage);
755 memory_manager->register_rx_tx(
756 cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
757
758 srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
759 post_channel_cluster();
760
7c673cae
FG
761 dispatcher->polling_start();
762}
763
31f18b77
FG
764Infiniband::~Infiniband()
765{
766 if (!initialized)
767 return;
768
769 if (dispatcher)
770 dispatcher->polling_stop();
771
772 ibv_destroy_srq(srq);
773 delete memory_manager;
774 delete pd;
775}
776
7c673cae
FG
777void Infiniband::set_dispatcher(RDMADispatcher *d)
778{
779 assert(!d ^ !dispatcher);
780
781 dispatcher = d;
782}
783
31f18b77
FG
784/**
785 * Create a shared receive queue. This basically wraps the verbs call.
786 *
787 * \param[in] max_wr
788 * The max number of outstanding work requests in the SRQ.
789 * \param[in] max_sge
790 * The max number of scatter elements per WR.
791 * \return
792 * A valid ibv_srq pointer, or NULL on error.
793 */
794ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
795{
796 ibv_srq_init_attr sia;
797 memset(&sia, 0, sizeof(sia));
798 sia.srq_context = device->ctxt;
799 sia.attr.max_wr = max_wr;
800 sia.attr.max_sge = max_sge;
801 return ibv_create_srq(pd->pd, &sia);
802}
803
804int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
805{
806 return memory_manager->get_send_buffers(c, bytes);
807}
808
809/**
810 * Create a new QueuePair. This factory should be used in preference to
811 * the QueuePair constructor directly, since this lets derivatives of
812 * Infiniband, e.g. MockInfiniband (if it existed),
813 * return mocked out QueuePair derivatives.
814 *
815 * \return
816 * QueuePair on success or NULL if init fails
817 * See QueuePair::QueuePair for parameter documentation.
818 */
819Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
820{
821 Infiniband::QueuePair *qp = new QueuePair(
822 cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
823 if (qp->init()) {
824 delete qp;
825 return NULL;
826 }
827 return qp;
828}
829
830int Infiniband::post_chunk(Chunk* chunk)
831{
832 ibv_sge isge;
833 isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
834 isge.length = chunk->bytes;
835 isge.lkey = chunk->mr->lkey;
836 ibv_recv_wr rx_work_request;
837
838 memset(&rx_work_request, 0, sizeof(rx_work_request));
839 rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
840 rx_work_request.next = NULL;
841 rx_work_request.sg_list = &isge;
842 rx_work_request.num_sge = 1;
843
844 ibv_recv_wr *badWorkRequest;
845 int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
846 if (ret)
847 return -errno;
848 return 0;
849}
850
851int Infiniband::post_channel_cluster()
852{
853 vector<Chunk*> free_chunks;
854 int r = memory_manager->get_channel_buffers(free_chunks, 0);
855 assert(r > 0);
856 for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
857 r = post_chunk(*iter);
858 assert(r == 0);
859 }
860 return 0;
861}
862
863Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
7c673cae 864{
31f18b77
FG
865 Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
866 if (cc->init()) {
867 delete cc;
868 return NULL;
869 }
870 return cc;
7c673cae
FG
871}
872
31f18b77
FG
873Infiniband::CompletionQueue* Infiniband::create_comp_queue(
874 CephContext *cct, CompletionChannel *cc)
7c673cae 875{
31f18b77
FG
876 Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
877 cct, *this, CQ_DEPTH, cc);
878 if (cq->init()) {
879 delete cq;
880 return NULL;
881 }
882 return cq;
883}
884
885// 1 means no valid buffer read, 0 means got enough buffer
886// else return < 0 means error
887int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
888{
889 char msg[TCP_MSG_LEN];
890 char gid[33];
891 ssize_t r = ::read(sd, &msg, sizeof(msg));
892 // Drop incoming qpt
893 if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
894 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
895 ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
896 return -EINVAL;
897 }
898 }
899 if (r < 0) {
900 r = -errno;
901 lderr(cct) << __func__ << " got error " << r << ": "
902 << cpp_strerror(r) << dendl;
903 } else if (r == 0) { // valid disconnect message of length 0
904 ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
905 } else if ((size_t)r != sizeof(msg)) { // invalid message
906 ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
907 r = -EINVAL;
908 } else { // valid message
909 sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
910 wire_gid_to_gid(gid, &(im.gid));
911 ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
912 }
913 return r;
914}
915
916int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
917{
918 int retry = 0;
919 ssize_t r;
920
921 char msg[TCP_MSG_LEN];
922 char gid[33];
923retry:
924 gid_to_wire_gid(&(im.gid), gid);
925 sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
926 ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
927 << ", " << im.peer_qpn << ", " << gid << dendl;
928 r = ::write(sd, msg, sizeof(msg));
929 // Drop incoming qpt
930 if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
931 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
932 ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
933 return -EINVAL;
934 }
935 }
936
937 if ((size_t)r != sizeof(msg)) {
938 // FIXME need to handle EAGAIN instead of retry
939 if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
940 retry++;
941 goto retry;
942 }
943 if (r < 0)
944 lderr(cct) << __func__ << " send returned error " << errno << ": "
945 << cpp_strerror(errno) << dendl;
946 else
947 lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
948 return -errno;
949 }
950 return 0;
951}
952
953void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
954{
955 char tmp[9];
956 uint32_t v32;
957 int i;
958
959 for (tmp[8] = 0, i = 0; i < 4; ++i) {
960 memcpy(tmp, wgid + i * 8, 8);
961 sscanf(tmp, "%x", &v32);
962 *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
963 }
964}
965
966void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
967{
968 for (int i = 0; i < 4; ++i)
969 sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
7c673cae
FG
970}
971
972Infiniband::QueuePair::~QueuePair()
973{
974 if (qp) {
975 ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
976 assert(!ibv_destroy_qp(qp));
977 }
978}
979
980/**
981 * Given a string representation of the `status' field from Verbs
982 * struct `ibv_wc'.
983 *
984 * \param[in] status
985 * The integer status obtained in ibv_wc.status.
986 * \return
987 * A string corresponding to the given status.
988 */
989const char* Infiniband::wc_status_to_string(int status)
990{
991 static const char *lookup[] = {
992 "SUCCESS",
993 "LOC_LEN_ERR",
994 "LOC_QP_OP_ERR",
995 "LOC_EEC_OP_ERR",
996 "LOC_PROT_ERR",
997 "WR_FLUSH_ERR",
998 "MW_BIND_ERR",
999 "BAD_RESP_ERR",
1000 "LOC_ACCESS_ERR",
1001 "REM_INV_REQ_ERR",
1002 "REM_ACCESS_ERR",
1003 "REM_OP_ERR",
1004 "RETRY_EXC_ERR",
1005 "RNR_RETRY_EXC_ERR",
1006 "LOC_RDD_VIOL_ERR",
1007 "REM_INV_RD_REQ_ERR",
1008 "REM_ABORT_ERR",
1009 "INV_EECN_ERR",
1010 "INV_EEC_STATE_ERR",
1011 "FATAL_ERR",
1012 "RESP_TIMEOUT_ERR",
1013 "GENERAL_ERR"
1014 };
1015
1016 if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
1017 return "<status out of range!>";
1018 return lookup[status];
1019}
1020
1021const char* Infiniband::qp_state_string(int status) {
1022 switch(status) {
1023 case IBV_QPS_RESET : return "IBV_QPS_RESET";
1024 case IBV_QPS_INIT : return "IBV_QPS_INIT";
1025 case IBV_QPS_RTR : return "IBV_QPS_RTR";
1026 case IBV_QPS_RTS : return "IBV_QPS_RTS";
1027 case IBV_QPS_SQD : return "IBV_QPS_SQD";
1028 case IBV_QPS_SQE : return "IBV_QPS_SQE";
1029 case IBV_QPS_ERR : return "IBV_QPS_ERR";
1030 default: return " out of range.";
1031 }
1032}